## Import libraries and load raw NewHandPD Dataset

In [1]:
import os
import re
import pandas as pd
from collections import defaultdict
from itertools import product
import random
from PIL import Image
import matplotlib.pyplot as plt

import torch
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms.functional as TF
import torch.nn as nn
import torchvision.models as models
from torchvision.transforms import ColorJitter
import torch.optim as optim


## Data pre-processing
### Get patient metadata from Signals files

In [2]:
def get_patient_data(folder_path, _type):
    if _type == 'healthy':
        filename_pattern = re.compile(r'^sigMea3-H(\d+)\.txt$')
    elif _type == 'parkinson':
        filename_pattern = re.compile(r'^sigMea3-P(\d+)\.txt$') 
        
    fields = ['Gender', 'Writing_Hand', 'Weight', 'Height', 'Smoker']
    
    defaults = {
        'Gender': -1,
        'Writing_Hand': -1,
        'Weight': -1,
        'Height': -1,
        'Smoker': -1
    }
    
    data = []
    for filename in os.listdir(folder_path):
        match = filename_pattern.match(filename)
        if not match:
            continue
    
        number = int(match.group(1))
        file_path = os.path.join(folder_path, filename)
    
        metadata = defaults.copy()
        with open(file_path, 'r') as f:
            for line in f:
                for field in fields:
                    if f"<{field}>" in line:
                        value = re.search(rf"<{field}>(.*?)</{field}>", line)
                        if value and value.group(1).isdigit():
                            metadata[field] = int(value.group(1))

        if _type == 'healthy':
            row = ['H' + str(number)] + [metadata[k] for k in fields]
        elif _type == 'parkinson':
            row = ['P' + str(number)] + [metadata[k] for k in fields]
        data.append(row)

    # one-hot encoding of categorical variables
    df = pd.DataFrame(data, columns=['Number'] + fields)
    df = pd.get_dummies(df, columns=['Gender', 'Writing_Hand', 'Smoker'], drop_first=True)
    df = df.applymap(lambda x: 1 if x is True else (0 if x is False else x))
    
    df['SortKey'] = df['Number'].str.extract(r'([0-9]+)').astype(int)
    df = df.sort_values('SortKey').drop(columns='SortKey').reset_index(drop=True)
    
    return df

In [3]:
healthy_df = get_patient_data('/kaggle/input/patient/Signal', 'healthy')
parkinson_df = get_patient_data('/kaggle/input/patient/Signal 2', 'parkinson')

print(len(healthy_df))
print(len(parkinson_df))

healthy_df = healthy_df[~(healthy_df == -1).any(axis=1)].reset_index(drop=True)
parkinson_df = parkinson_df[~(parkinson_df == -1).any(axis=1)].reset_index(drop=True)
parkinson_df['Smoker_2'] = 0

print(len(healthy_df))
print(len(parkinson_df))

  df = df.applymap(lambda x: 1 if x is True else (0 if x is False else x))


35
31
30
31


  df = df.applymap(lambda x: 1 if x is True else (0 if x is False else x))


In [4]:
# Numerical variables normalization after concatenation of healthy and PD metadata
data_df = pd.concat([healthy_df, parkinson_df], ignore_index=True)
data_df['Weight'] = (data_df['Weight'] - data_df['Weight'].min()) / (data_df['Weight'].max() - data_df['Weight'].min())
data_df['Height'] = (data_df['Height'] - data_df['Height'].min()) / (data_df['Height'].max() - data_df['Height'].min())

print(len(data_df))
print(data_df)

61
   Number    Weight    Height  Gender_2  Writing_Hand_2  Smoker_2
0      H1  0.578125  0.764706         0               0         0
1      H2  0.437500  1.000000         0               0         0
2      H3  0.421875  0.176471         1               0         0
3      H4  0.812500  0.676471         0               0         0
4      H5  0.312500  0.294118         0               0         1
..    ...       ...       ...       ...             ...       ...
56    P28  0.234375  0.147059         1               0         0
57    P29  0.687500  0.441176         1               0         0
58    P30  0.328125  0.441176         0               0         0
59    P31  0.437500  0.294118         1               0         0
60    P32  0.531250  0.588235         0               0         0

[61 rows x 6 columns]


### Image pairs formation, matching with metadata and splitting in training, validation, test datasets 

In [5]:
dataset_path = '/kaggle/input/newhandpd'

def extract_ids(filename):
    base = os.path.splitext(filename)[0]  
    parts = base.split('-')
    if len(parts) == 2:
        return parts[0], parts[1] 
    return None, None

def get_pairs_by_person(group_prefix):
    spiral_path = os.path.join(dataset_path, f"{group_prefix}Spiral")
    meander_path = os.path.join(dataset_path, f"{group_prefix}Meander")

    spiral_dict = defaultdict(list)
    meander_dict = defaultdict(list)

    for fname in os.listdir(spiral_path):
        _, person_id = extract_ids(fname)
        if person_id:
            spiral_dict[person_id].append(os.path.join(f"{group_prefix}Spiral", fname))

    for fname in os.listdir(meander_path):
        _, person_id = extract_ids(fname)
        if person_id:
            meander_dict[person_id].append(os.path.join(f"{group_prefix}Meander", fname))

    # Group all pairs per person
    person_to_pairs = defaultdict(list)
    for person_id in spiral_dict:
        if person_id in meander_dict:
            pairs = list(product(spiral_dict[person_id], meander_dict[person_id]))
            person_to_pairs[person_id].extend(pairs)

    return person_to_pairs


def enrich_and_filter_pairs(person_to_pairs, df):
    enriched = {}
    valid_ids = set(df['Number'])

    for person_id, pairs in person_to_pairs.items():
        if person_id in valid_ids:
            row = df[df['Number'] == person_id].iloc[0].to_dict()
            enriched[person_id] = {
                'pairs': pairs,
                'metadata': row
            }
    return enriched

def split_by_individuals(person_to_data, seed=42):
    person_ids = list(person_to_data.keys())
    random.seed(seed)
    random.shuffle(person_ids)

    total = len(person_ids)
    n_train = int(0.7 * total)
    n_val = int(0.15 * total)

    train_ids = person_ids[:n_train]
    val_ids = person_ids[n_train:n_train + n_val]
    test_ids = person_ids[n_train + n_val:]

    train_pairs = [(pair, dict(list(person_to_data[pid]['metadata'].items())[1:])) 
               for pid in train_ids for pair in person_to_data[pid]['pairs']]

    val_pairs = [(pair, dict(list(person_to_data[pid]['metadata'].items())[1:])) 
                 for pid in val_ids for pair in person_to_data[pid]['pairs']]
    
    test_pairs = [(pair, dict(list(person_to_data[pid]['metadata'].items())[1:])) 
                  for pid in test_ids for pair in person_to_data[pid]['pairs']]


    return train_pairs, val_pairs, test_pairs


healthy_pairs_by_person = get_pairs_by_person("Healthy")
patient_pairs_by_person = get_pairs_by_person("Patient")
    
healthy_data = enrich_and_filter_pairs(healthy_pairs_by_person, data_df)
parkinson_data = enrich_and_filter_pairs(patient_pairs_by_person, data_df)

healthy_train, healthy_val, healthy_test = split_by_individuals(healthy_data)
patient_train, patient_val, patient_test = split_by_individuals(parkinson_data)


print("Healthy:")
print(f"  Train: {len(healthy_train)} pairs")
print(f"  Val:   {len(healthy_val)} pairs")
print(f"  Test:  {len(healthy_test)} pairs")

print("\nPatient:")
print(f"  Train: {len(patient_train)} pairs")
print(f"  Val:   {len(patient_val)} pairs")
print(f"  Test:  {len(patient_test)} pairs")


Healthy:
  Train: 336 pairs
  Val:   64 pairs
  Test:  80 pairs

Patient:
  Train: 336 pairs
  Val:   64 pairs
  Test:  96 pairs


### Dataset building for original image pairs and for contrast augmentation (adapted to include images pairs and metadata)

In [6]:
class Dataset_no_data_aug(Dataset):
    def __init__(self, pairs, root_dir, label, img_size=(128, 128)):
        self.pairs = pairs  
        self.root_dir = root_dir
        self.label = label  
        self.img_size = img_size

        self.metadata_keys = ['Weight', 'Height', 'Gender_2', 'Writing_Hand_2', 'Smoker_2']

    def __len__(self):
        return len(self.pairs)

    def __getitem__(self, idx):
        (spiral_path, meander_path), metadata = self.pairs[idx]

        spiral_img = Image.open(os.path.join(self.root_dir, spiral_path)).convert("RGB")
        meander_img = Image.open(os.path.join(self.root_dir, meander_path)).convert("RGB")

        # Resize to 128x128 dimensions
        spiral_img = TF.resize(spiral_img, self.img_size)
        meander_img = TF.resize(meander_img, self.img_size)

        # Save in tensor
        spiral_img = TF.to_tensor(spiral_img)
        meander_img = TF.to_tensor(meander_img)

        # Normalization
        mean = [0.485, 0.456, 0.406]  
        std = [0.229, 0.224, 0.225]  
        spiral_img = TF.normalize(spiral_img, mean, std)
        meander_img = TF.normalize(meander_img, mean, std)

        meta_tensor = torch.tensor([metadata[k] for k in self.metadata_keys], dtype=torch.float)

        y = torch.tensor(self.label, dtype=torch.long)
        
        return spiral_img, meander_img, meta_tensor, y


class Dataset_contrast(Dataset):
    def __init__(self, pairs, root_dir, label, img_size=(128, 128), contrast_factor_range=(0.5, 1.5)):
        self.root_dir = root_dir
        self.label = label
        self.img_size = img_size
        self.contrast_factor_range = contrast_factor_range

        self.expanded_pairs = [((pair, metadata), False) for pair, metadata in pairs] + \
                              [((pair, metadata), True) for pair, metadata in pairs]

        self.metadata_keys = ['Weight', 'Height', 'Gender_2', 'Writing_Hand_2', 'Smoker_2']

    def __len__(self):
        return len(self.expanded_pairs)

    def __getitem__(self, idx):
        ((spiral_path, meander_path), metadata), apply_contrast = self.expanded_pairs[idx]

        spiral_img = Image.open(os.path.join(self.root_dir, spiral_path)).convert("RGB")
        meander_img = Image.open(os.path.join(self.root_dir, meander_path)).convert("RGB")

        spiral_img = TF.resize(spiral_img, self.img_size)
        meander_img = TF.resize(meander_img, self.img_size)

        # Apply contrast augmentation
        if apply_contrast:
            contrast_factor = random.uniform(*self.contrast_factor_range)
            spiral_img = TF.adjust_contrast(spiral_img, contrast_factor)
            meander_img = TF.adjust_contrast(meander_img, contrast_factor)

        spiral_img = TF.to_tensor(spiral_img)
        meander_img = TF.to_tensor(meander_img)

        mean = [0.485, 0.456, 0.406]
        std = [0.229, 0.224, 0.225]
        spiral_img = TF.normalize(spiral_img, mean, std)
        meander_img = TF.normalize(meander_img, mean, std)

        meta_tensor = torch.tensor([metadata[k] for k in self.metadata_keys], dtype=torch.float)
        y = torch.tensor(self.label, dtype=torch.long)

        return spiral_img, meander_img, meta_tensor, y


### Combine healthy and PD labeled image pair datasets

In [7]:
def create_dataset_no_data_aug(pairs_healthy, pairs_patient, root_dir, img_size=(128, 128)):
    dataset = (
        Dataset_no_data_aug(pairs_healthy, root_dir=root_dir, label=0,
                          img_size=img_size) +
        Dataset_no_data_aug(pairs_patient, root_dir=root_dir, label=1,
                          img_size=img_size)
    )
    return dataset

def create_dataset_contrast(pairs_healthy, pairs_patient, root_dir, img_size=(128, 128)):
    dataset = (
        Dataset_contrast(pairs_healthy, root_dir=root_dir, label=0,
                          img_size=img_size) +
        Dataset_contrast(pairs_patient, root_dir=root_dir, label=1,
                          img_size=img_size)
    )
    return dataset


train_dataset = create_dataset_no_data_aug(healthy_train, patient_train, dataset_path, img_size=(128,128))
val_dataset   = create_dataset_no_data_aug(healthy_val, patient_val, dataset_path, img_size=(128,128))
test_dataset  = create_dataset_no_data_aug(healthy_test, patient_test, dataset_path, img_size=(128,128))

train_dataset_contrast = create_dataset_contrast(healthy_train, patient_train, dataset_path, img_size=(128,128))

print(len(train_dataset))
print(len(val_dataset))
print(len(test_dataset))

print(len(train_dataset_contrast))

672
128
176
1344


In [8]:
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader   = DataLoader(val_dataset, batch_size=32, shuffle=False)
test_loader  = DataLoader(test_dataset, batch_size=32, shuffle=False)

train_loader_contrast = DataLoader(train_dataset_contrast, batch_size=32, shuffle=True)

## Definition of the intermediate fusion multimodal model adapted for metadata modality

In [9]:
class Intermediate_metadata(nn.Module):
    def __init__(self, num_classes=2, metadata_dim=5):  
        super(Intermediate_metadata, self).__init__()

        # Load pretrained ResNet18s
        base_spiral = models.resnet18(pretrained=True)
        base_meander = models.resnet18(pretrained=True)
        base_shared = models.resnet18(pretrained=True)

        # Separate early layers up to layer3
        self.spiral_early = nn.Sequential(
            base_spiral.conv1,
            base_spiral.bn1,
            base_spiral.relu,
            base_spiral.maxpool,
            base_spiral.layer1,
            base_spiral.layer2,
            base_spiral.layer3
        )

        self.meander_early = nn.Sequential(
            base_meander.conv1,
            base_meander.bn1,
            base_meander.relu,
            base_meander.maxpool,
            base_meander.layer1,
            base_meander.layer2,
            base_meander.layer3
        )

        self.reduce_spiral = nn.Linear(256, 128)
        self.reduce_meander = nn.Linear(256, 128)

        # Late resnet18 after fusion
        self.shared_late = nn.Sequential(
            base_shared.layer4,
            base_shared.avgpool
        )

        # Metadata MLP
        self.meta_mlp = nn.Sequential(
            nn.Linear(metadata_dim, 32),
            nn.ReLU(),
            nn.Linear(32, 32),
            nn.ReLU()
        )

        # Classifier
        self.classifier = nn.Sequential(
            nn.Flatten(),  
            nn.Linear(512 + 32, 128),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(128, num_classes)
        )

    def forward(self, spiral_img, meander_img, metadata):

        # Early feature extraction
        spiral_feat = self.spiral_early(spiral_img)
        meander_feat = self.meander_early(meander_img)

        # Global Average Pooling
        spiral_feat = torch.nn.functional.adaptive_avg_pool2d(spiral_feat, (1, 1)).squeeze(-1).squeeze(-1)
        meander_feat = torch.nn.functional.adaptive_avg_pool2d(meander_feat, (1, 1)).squeeze(-1).squeeze(-1)

        # Dimensionality reduction
        spiral_feat = self.reduce_spiral(spiral_feat)
        meander_feat = self.reduce_meander(meander_feat)

        # Intermediate fusion 
        fused_feat = torch.cat((spiral_feat, meander_feat), dim=1)  
        fused_feat = fused_feat.unsqueeze(-1).unsqueeze(-1) 

        # Late ResNet18 layers
        fused_feat = self.shared_late(fused_feat)           
        fused_feat = fused_feat.squeeze(-1).squeeze(-1)      

        # Process metadata
        meta_feat = self.meta_mlp(metadata)  

        # Final fusion
        combined_feat = torch.cat((fused_feat, meta_feat), dim=1)  

        # Classifier
        out = self.classifier(combined_feat)
        return out


In [10]:
print("CUDA available:", torch.cuda.is_available())
print("Device name:", torch.cuda.get_device_name(0) if torch.cuda.is_available() else "No GPU found")

CUDA available: True
Device name: Tesla T4


## Define training and testing functions

In [12]:
def evaluate_model(model, val_loader, device):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for spiral_img, meander_img, metadata, labels in val_loader:
            spiral_img = spiral_img.to(device)
            meander_img = meander_img.to(device)
            metadata = metadata.to(device).float()
            labels = labels.to(device)

            outputs = model(spiral_img, meander_img, metadata)

            _, predicted = torch.max(outputs, 1)
            correct += (predicted == labels).sum().item()
            total += labels.size(0)

    accuracy = correct / total
    return accuracy


def train_model2(model, train_loader, val_loader, criterion, optimizer, epochs=10, device=None):
    model.to(device)
    train_losses = []
    train_accuracies = []
    val_accuracies = []

    for epoch in range(epochs):
        model.train()
        train_loss, correct, total = 0.0, 0, 0

        for spiral_img, meander_img, metadata, labels in train_loader:
            spiral_img = spiral_img.to(device)
            meander_img = meander_img.to(device)
            metadata = metadata.to(device).float()
            labels = labels.to(device)

            optimizer.zero_grad()
            outputs = model(spiral_img, meander_img, metadata)

            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            train_loss += loss.item() * labels.size(0)
            _, predicted = outputs.max(1)
            correct += predicted.eq(labels).sum().item()
            total += labels.size(0)

        train_accuracy = correct / total
        val_accuracy = evaluate_model(model, val_loader, device)

        train_losses.append(train_loss / total)
        train_accuracies.append(train_accuracy)
        val_accuracies.append(val_accuracy)

        print(f"Epoch [{epoch+1}/{epochs}] - Loss: {train_loss/total:.4f}, Train Acc: {train_accuracy:.4f}, Val Acc: {val_accuracy:.4f}")

    return train_losses, train_accuracies, val_accuracies


In [13]:
def test_model(model, test_loader, device):
    model.eval()
    correct = 0
    total = 0
    all_predictions = []
    all_labels = []

    with torch.no_grad():
        for spiral_img, meander_img, metadata, labels in test_loader:
            spiral_img = spiral_img.to(device)
            meander_img = meander_img.to(device)
            metadata = metadata.to(device).float()
            labels = labels.to(device)

            outputs = model(spiral_img, meander_img, metadata)
            _, predicted = torch.max(outputs, 1)

            correct += (predicted == labels).sum().item()
            total += labels.size(0)

            all_predictions.extend(predicted.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    accuracy = correct / total
    print(f"Test Accuracy: {accuracy:.4f}")
    return accuracy, all_predictions, all_labels


### training and testing while using hyperpamaters values of models 1 and 2

In [16]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

train_loader = train_loader_contrast

lr = 0.0005
wd = 0.0

model = Intermediate_metadata(num_classes=2).to(device)

optimizer = optim.SGD(model.parameters(), lr=lr, weight_decay=wd, momentum=0.9)
criterion = nn.CrossEntropyLoss()

train_losses, train_accuracies, val_accuracies = train_model2(model, train_loader, val_loader, criterion, optimizer, epochs=10, device=device)
acc, _, _ = test_model(model, test_loader, device)

Epoch [1/10] - Loss: 0.5020, Train Acc: 0.7619, Val Acc: 0.7188
Epoch [2/10] - Loss: 0.1900, Train Acc: 0.9561, Val Acc: 0.8125
Epoch [3/10] - Loss: 0.0758, Train Acc: 0.9866, Val Acc: 0.7891
Epoch [4/10] - Loss: 0.0398, Train Acc: 0.9963, Val Acc: 0.8281
Epoch [5/10] - Loss: 0.0267, Train Acc: 0.9963, Val Acc: 0.8203
Epoch [6/10] - Loss: 0.0262, Train Acc: 0.9970, Val Acc: 0.8203
Epoch [7/10] - Loss: 0.0161, Train Acc: 0.9985, Val Acc: 0.8359
Epoch [8/10] - Loss: 0.0129, Train Acc: 0.9985, Val Acc: 0.8438
Epoch [9/10] - Loss: 0.0203, Train Acc: 0.9948, Val Acc: 0.8047
Epoch [10/10] - Loss: 0.0073, Train Acc: 1.0000, Val Acc: 0.8203
Test Accuracy: 0.8864


In [17]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

train_loader = train_loader_contrast

lr = 0.0001
wd = 0.0001

model = Intermediate_metadata(num_classes=2).to(device)

optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=wd)
criterion = nn.CrossEntropyLoss()

train_losses, train_accuracies, val_accuracies = train_model2(model, train_loader, val_loader, criterion, optimizer, epochs=10, device=device)
acc, _, _ = test_model(model, test_loader, device)

Epoch [1/10] - Loss: 0.3363, Train Acc: 0.8631, Val Acc: 0.7500
Epoch [2/10] - Loss: 0.0371, Train Acc: 0.9963, Val Acc: 0.8047
Epoch [3/10] - Loss: 0.0160, Train Acc: 0.9978, Val Acc: 0.7500
Epoch [4/10] - Loss: 0.0177, Train Acc: 0.9948, Val Acc: 0.7500
Epoch [5/10] - Loss: 0.0199, Train Acc: 0.9948, Val Acc: 0.8750
Epoch [6/10] - Loss: 0.0041, Train Acc: 1.0000, Val Acc: 0.8750
Epoch [7/10] - Loss: 0.0021, Train Acc: 1.0000, Val Acc: 0.8750
Epoch [8/10] - Loss: 0.0020, Train Acc: 1.0000, Val Acc: 0.8516
Epoch [9/10] - Loss: 0.0014, Train Acc: 1.0000, Val Acc: 0.8203
Epoch [10/10] - Loss: 0.0023, Train Acc: 0.9993, Val Acc: 0.7969
Test Accuracy: 0.9091
