# 1. Import Libraries

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

from sklearn.preprocessing import RobustScaler
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score, roc_curve, auc, classification_report, confusion_matrix
from sklearn.model_selection import StratifiedShuffleSplit

import torch
import torch.nn.functional as F
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
from torchsummary import summary
from torchviz import make_dot

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)
SEED = 42

np.random.seed(SEED)
torch.manual_seed(SEED)
DATA_DIRECTORY = 'Data/hyperaktiv_with_controls/hyperaktiv_with_controls/'
VALID_IDs = [1, 3, 5, 7, 9, 11, 15, 19, 20, 21, 22, 23, 24, 27, 31, 32, 33, 34, 35, 36, 37, 39, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 55, 56, 57, 58, 59, 60, 61, 63, 64, 68, 71, 73, 75, 77, 78, 79, 81, 82, 83, 84, 85, 87, 88, 89, 90, 91, 93, 94, 95, 97, 98, 101, 104, 105]

# 2. Load and Visualize Demographic Data

In [None]:
demographic_data = pd.read_csv(f'{DATA_DIRECTORY}patient_info.csv', sep=';')

# Plot the balance of the ADHD class in the demographic_data for every record that has ID in VALID_IDs.
demographic_data_test_val = demographic_data[demographic_data['ID'].isin(VALID_IDs)]
labels = demographic_data_test_val['ADHD'].values
print(labels)

demographic_data_test_val['ADHD'].value_counts().plot(kind='bar', title='ADHD class balance in the dataset')
plt.xticks([0, 1], ['Control', 'ADHD'], rotation=0)
plt.ylabel('Number of records')
plt.show()

control_ids = demographic_data_test_val[demographic_data_test_val['ADHD'] == 0]['ID'].values
adhd_ids = demographic_data_test_val[demographic_data_test_val['ADHD'] == 1]['ID'].values

print(f'Number of control patients: {len(control_ids)}; IDS: {control_ids}')
print(f'Number of ADHD patients: {len(adhd_ids)}; IDS: {adhd_ids}')

# 3. Split Data Into Train, Validation and Test Set

In [None]:
def split_dataset(ids, labels, train_ratio=0.80, val_ratio=0.10, test_ratio=0.10, random_seed=42):
    splits = StratifiedShuffleSplit(n_splits=1, test_size=test_ratio, random_state=random_seed)
    remaining_ratio = 1.0 - test_ratio
    val_relative_ratio = val_ratio / remaining_ratio
    train_val_ids, test_ids = next(splits.split(ids, labels))
    train_val_split = StratifiedShuffleSplit(n_splits=1, test_size=val_relative_ratio, random_state=random_seed)
    train_ids, val_ids = next(train_val_split.split(ids[train_val_ids], labels[train_val_ids]))

    train_ids = ids[train_ids]
    val_ids = ids[val_ids]
    test_ids = ids[test_ids]

    return train_ids, val_ids, test_ids

train_ids, val_ids, test_ids = split_dataset(np.array(VALID_IDs), np.array(labels))

# 4. Visualize Class Balance in Each Split

In [None]:
demographic_data_train = demographic_data_test_val[demographic_data_test_val['ID'].isin(train_ids)]
demographic_data_train['ADHD'].value_counts().plot(kind='bar', title='ADHD class balance in the train dataset')
plt.xticks([0, 1], ['Control', 'ADHD'], rotation=0)
plt.ylabel('Number of records')
plt.show()

demographic_data_val = demographic_data_test_val[demographic_data_test_val['ID'].isin(val_ids)]
demographic_data_val['ADHD'].value_counts().plot(kind='bar', title='ADHD class balance in the validation dataset')
plt.xticks([0, 1], ['Control', 'ADHD'], rotation=0)
plt.ylabel('Number of records')
plt.show()

demographic_data_test = demographic_data_test_val[demographic_data_test_val['ID'].isin(test_ids)]
demographic_data_test['ADHD'].value_counts().plot(kind='bar', title='ADHD class balance in the test dataset')
plt.xticks([0, 1], ['Control', 'ADHD'], rotation=0)
plt.ylabel('Number of records')
plt.show()

# 5. Define Data Loading and Preprocessing Functions

In [None]:
def scale_data(data):
    scaler = RobustScaler()
    return scaler.fit_transform(data.reshape(-1, 1)).flatten()

def load_data(sample, demographic_data):
    patients_data = {}

    for patient_id in sample:
        hrv_data = pd.read_csv(f'{DATA_DIRECTORY}hrv_data/patient_hr_{patient_id}.csv', sep=';')
        activity_data = pd.read_csv(f'{DATA_DIRECTORY}activity_data/patient_activity_{patient_id}.csv', sep=';')
        labels = demographic_data[demographic_data['ID'] == patient_id]['ADHD'].values[0]

        hrv_data['TIMESTAMP'] = pd.to_datetime(hrv_data['TIMESTAMP'], errors='coerce')
        activity_data['TIMESTAMP'] = pd.to_datetime(activity_data['TIMESTAMP'], errors='coerce')

        df_hrv = pd.DataFrame(data=hrv_data).set_index('TIMESTAMP')
        df_activity = pd.DataFrame(data=activity_data).set_index('TIMESTAMP')

        df_hrv = df_hrv.resample('1S').mean()
        df_activity = df_activity.resample('1S').mean()

        df_hrv['HRV'] = df_hrv['HRV'].fillna(method='ffill')
        df_activity['ACTIVITY'] = df_activity['ACTIVITY'].fillna(method='ffill')

        hrv_series = scale_data(df_hrv['HRV'].values)
        activity_series = scale_data(df_activity['ACTIVITY'].values)

        patients_data[patient_id] = {
            'hrv': hrv_series,
            'activity': activity_series,
            'adhd': labels
        }

    return patients_data

train_data = load_data(train_ids, demographic_data=demographic_data_train)
val_data = load_data(val_ids, demographic_data=demographic_data_val)
test_data = load_data(test_ids, demographic_data=demographic_data_test)

all_data = {
    'train': train_data,
    'val': val_data,
    'test': test_data,
}

# 6. Truncate and Pad Sequences

In [None]:
def truncate_and_pad_sequences(sequence, max_length):
    sequence = sequence[:max_length]
    if len(sequence) < max_length:
        sequence = np.pad(sequence, (0, max_length - len(sequence)), 'constant', constant_values=0)
    return sequence

def get_global_max_length(data):
    lengths = [len(data[pid]['hrv']) for pid in data]
    return int(np.percentile(lengths, 95))

GLOBAL_MAX_LENGTH = max(get_global_max_length(train_data), get_global_max_length(val_data))

for dataset in [all_data['train'], all_data['val'], all_data['test']]:
    for patient_id, data in dataset.items():
        data['hrv'] = truncate_and_pad_sequences(data['hrv'], GLOBAL_MAX_LENGTH)
        data['activity'] = truncate_and_pad_sequences(data['activity'], GLOBAL_MAX_LENGTH)

# 7. Define Model Architecture

In [None]:
class DualBranch1DCNN(nn.Module):
    def __init__(self, use_dropout=False, dropout_rate=0.5, use_batch_norm=False):
        super(DualBranch1DCNN, self).__init__()
        kernel_size = 5
        stride = 2
        pool_size = 2
        self.hrv_branch = self.build_branch(use_dropout, dropout_rate, use_batch_norm, kernel_size, stride, pool_size)
        self.activity_branch = self.build_branch(use_dropout, dropout_rate, use_batch_norm, kernel_size, stride, pool_size)

        self.final_layers = nn.Sequential(
            nn.Linear(256, 128),
            nn.ReLU()
        )
        if use_batch_norm:
            self.final_layers.add_module('final_batch_norm', nn.BatchNorm1d(128))
        if use_dropout:
            self.final_layers.add_module('final_dropout', nn.Dropout(dropout_rate))
        self.final_layers.add_module('final_output', nn.Linear(128, 1))

    def build_branch(self, use_dropout, dropout_rate, use_batch_norm, kernel_size, stride, pool_size):
        layers = [
            nn.Conv1d(1, 16, kernel_size, stride=stride),
            nn.ReLU(),
            nn.AvgPool1d(pool_size),
            nn.Conv1d(16, 32, kernel_size, stride=stride),
            nn.ReLU(),
            nn.AvgPool1d(pool_size),
            nn.Flatten(),
            nn.Linear(32 * self.calculate_output_length(GLOBAL_MAX_LENGTH, kernel_size, stride, pool_size), 128),
            nn.ReLU()
        ]
        if use_batch_norm:
            layers.append(nn.BatchNorm1d(128))
        if use_dropout:
            layers.append(nn.Dropout(dropout_rate))
        return nn.Sequential(*layers)

    def calculate_output_length(self, input_length, kernel_size, stride, pool_size):
        output_length = input_length
        for _ in range(2):
            output_length = (output_length - (kernel_size - 1) - 1) // stride + 1
            output_length = (output_length - (pool_size - 1) - 1) // pool_size + 1
        return output_length

    def forward(self, hrv_data, activity_data):
        hrv_features = self.hrv_branch(hrv_data)
        activity_features = self.activity_branch(activity_data)
        combined_features = torch.cat((hrv_features, activity_features), dim=1)
        output = self.final_layers(combined_features)
        return output

# 8. Create Dataset Class

In [None]:
class ADHDData(Dataset):
    def __init__(self, data):
        self.hrv_data = [torch.tensor(data[pid]['hrv'], dtype=torch.float32).unsqueeze(0) for pid in data]
        self.activity_data = [torch.tensor(data[pid]['activity'], dtype=torch.float32).unsqueeze(0) for pid in data]
        self.labels = [data[pid]['adhd'] for pid in data]

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, index):
        return self.hrv_data[index], self.activity_data[index], self.labels[index]

def pad_collate_fn(batch):
    hrv_data, activity_data, labels = zip(*batch)
    hrv_data_padded = pad_sequence(hrv_data, batch_first=True, padding_value=0)
    activity_data_padded = pad_sequence(activity_data, batch_first=True, padding_value=0)
    labels_tensor = torch.tensor(labels, dtype=torch.float32)
    return hrv_data_padded, activity_data_padded, labels_tensor

train_dataset = ADHDData(all_data['train'])
val_dataset = ADHDData(all_data['val'])
test_dataset = ADHDData(all_data['test'])

print(f'Train dataset size: {len(train_dataset)}')
print(f'Validation dataset size: {len(val_dataset)}')
print(f'Test dataset size: {len(test_dataset)}')

# 9. Define Training And Validaiton Functions

In [None]:
def train_model(model, train_loader, val_loader, device, config):
    criterion = nn.BCELoss()
    optimizer = optim.Adam(model.parameters(), lr=config['learning_rate']) if config['optimizer_name'] == 'adam' \
                else optim.SGD(model.parameters(), lr=config['learning_rate'], momentum=config.get('momentum', 0.9))

    train_losses = []
    val_losses = []
    train_accuracies = []
    val_accuracies = []

    best_val_accuracy = 0
    best_metrics = {}

    for epoch in range(config['num_epochs']):
        model.train()
        total = 0
        correct = 0
        train_loss = 0

        for hrv_data, activity_data, labels in train_loader:
            hrv_data, activity_data, labels = hrv_data.to(device), activity_data.to(device), labels.to(device).float()

            optimizer.zero_grad()
            outputs = torch.sigmoid(model(hrv_data, activity_data))
            loss = criterion(outputs, labels.unsqueeze(1))
            loss.backward()
            optimizer.step()

            train_loss += loss.item()
            predicted = outputs.round()
            total += labels.size(0)
            correct += (predicted == labels.unsqueeze(1)).sum().item()

        train_accuracy = 100 * correct / total
        train_losses.append(train_loss / len(train_loader))
        train_accuracies.append(train_accuracy)

        val_loss, val_accuracy = validate_model(model, val_loader, criterion, device)
        val_losses.append(val_loss)
        val_accuracies.append(val_accuracy)

        if val_accuracy > best_val_accuracy:
            best_val_accuracy = val_accuracy
            best_metrics = {
                'train_loss': train_losses,
                'val_loss': val_losses,
                'train_accuracy': train_accuracies,
                'val_accuracy': val_accuracies
            }
            torch.save(model.state_dict(), 'best_model.pth')
            print("Best model updated.")

    return best_metrics

def validate_model(model, val_loader, criterion, device):
    model.eval()
    val_loss = 0
    correct = 0
    total = 0
    with torch.no_grad():
        for hrv_data, activity_data, labels in val_loader:
            hrv_data, activity_data, labels = hrv_data.to(device), activity_data.to(device), labels.to(device).float()
            outputs = torch.sigmoid(model(hrv_data, activity_data))
            loss = criterion(outputs, labels.unsqueeze(1))

            val_loss += loss.item()
            predicted = outputs.round()
            correct += (predicted == labels.unsqueeze(1)).sum().item()
            total += labels.size(0)

    accuracy = 100 * correct / total
    return val_loss / len(val_loader), accuracy

# 10. Generate and Sample Configurations

In [None]:
import random
import itertools

def generate_configurations():
    optimizer_names = ['adam', 'sgd']
    batch_sizes = [16, 32, 64]
    learning_rates = [0.001, 0.01]
    num_epochs = [20, 30]
    use_dropout = [True, False]
    dropout_rates = [0.2, 0.5]
    use_batch_norm = [True, False]
    momenta = [0.8, 0.9]

    all_combinations = list(itertools.product(optimizer_names, batch_sizes, learning_rates, num_epochs,
                                              use_dropout, dropout_rates, use_batch_norm, momenta))

    configurations = [
        {'optimizer_name': combo[0], 'batch_size': combo[1], 'learning_rate': combo[2], 'num_epochs': combo[3],
         'use_dropout': combo[4], 'dropout_rate': combo[5], 'use_batch_norm': combo[6], 'momentum': combo[7]}
        for combo in all_combinations if combo[0] == 'sgd' or combo[7] == None
    ]

    return configurations

def sample_configurations(num_samples):
    configurations = generate_configurations()
    sampled_configurations = random.sample(configurations, min(num_samples, len(configurations)))
    return sampled_configurations

# 11. Train Models with Sampled Configurations

In [None]:
N = 10
sampled_configurations = sample_configurations(N)
best_model = None
best_metrics = None
best_accuracy = 0
best_config = None

results = []

for config in sampled_configurations:
    print(f"Training with configuration: {config}")
    train_loader = DataLoader(train_dataset, batch_size=config['batch_size'], shuffle=True, collate_fn=pad_collate_fn)
    val_loader = DataLoader(val_dataset, batch_size=config['batch_size'], shuffle=False, collate_fn=pad_collate_fn)

    model = DualBranch1DCNN(
        use_dropout=config['use_dropout'],
        dropout_rate=config['dropout_rate'],
        use_batch_norm=config['use_batch_norm']).to(device)

    metrics = train_model(model, train_loader, val_loader, device, config)

    current_val_accuracy = metrics['val_accuracy'][-1]
    if current_val_accuracy > best_accuracy:
        best_accuracy = current_val_accuracy
        best_model = model
        best_metrics = metrics
        best_config = config
        torch.save(model.state_dict(), f'best_model.pth')
        print(f"New best model saved with config ID {config} and validation accuracy {best_accuracy}")

    results.append({
        'config': config,
        'train_loss': metrics['train_loss'][-1],
        'val_loss': metrics['val_loss'][-1],
        'train_accuracy': metrics['train_accuracy'][-1],
        'val_accuracy': metrics['val_accuracy'][-1]
    })

print(f"Best Model Config: {best_config}, with Best Validation Accuracy: {best_accuracy}")

# 12. Plot Metrics

In [None]:
def plot_metrics(metrics):
    epochs = range(len(metrics['train_loss']))
    plt.figure(figsize=(10, 5))

    plt.subplot(1, 2, 1)
    plt.plot(epochs, metrics['train_loss'], label='Train Loss')
    plt.plot(epochs, metrics['val_loss'], label='Validation Loss')
    plt.title('Loss over Epochs')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()

    plt.subplot(1, 2, 2)
    plt.plot(epochs, metrics['train_accuracy'], label='Train Accuracy')
    plt.plot(epochs, metrics['val_accuracy'], label='Validation Accuracy')
    plt.title('Accuracy over Epochs')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy (%)')
    plt.legend()

    plt.tight_layout()
    plt.show()

plot_metrics(best_metrics)

# 13. Test Model

In [None]:
test_loader = DataLoader(test_dataset, batch_size=best_config['batch_size'], shuffle=False, collate_fn=pad_collate_fn)

def test_model(model, test_loader, device):
    criterion = nn.BCELoss()
    model.eval()
    test_loss = 0
    correct = 0
    total = 0
    TP = 0
    FP = 0
    FN = 0

    all_predictions = []
    all_labels = []

    with torch.no_grad():
        for hrv_data, activity_data, labels in test_loader:
            hrv_data, activity_data, labels = hrv_data.to(device), activity_data.to(device), labels.to(device).float()
            outputs = torch.sigmoid(model(hrv_data, activity_data))
            loss = criterion(outputs, labels.unsqueeze(1))
            test_loss += loss.item()
            predicted = outputs.round()

            correct += (predicted == labels.unsqueeze(1)).sum().item()
            total += labels.size(0)

            TP += (predicted * labels.unsqueeze(1)).sum().item()
            FP += (predicted * (1 - labels.unsqueeze(1))).sum().item()
            FN += ((1 - predicted) * labels.unsqueeze(1)).sum().item()

            all_predictions.extend(predicted.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    accuracy = correct / total
    precision = TP / (TP + FP) if TP + FP > 0 else 0
    recall = TP / (TP + FN) if TP + FN > 0 else 0
    avg_loss = test_loss / len(test_loader)

    print(f'Test Loss: {avg_loss:.4f}, Test Accuracy: {accuracy * 100:.2f}%, Precision: {precision:.2f}, Recall: {recall:.2f}')
    return avg_loss, accuracy, precision, recall, all_labels, all_predictions

test_loss, test_accuracy, precision, recall, labels, predictions = test_model(best_model, test_loader, device)
print(test_loss, test_accuracy, precision, recall)

# 14. Plot ROC-AUC Curve

In [None]:
def plot_roc_auc(labels, predictions):
    fpr, tpr, _ = roc_curve(labels, predictions)
    roc_auc = auc(fpr, tpr)

    plt.figure()
    plt.plot(fpr, tpr, color='darkorange', lw=2, label=f'ROC curve (area = {roc_auc:.2f})')
    plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('Receiver Operating Characteristic')
    plt.legend(loc="lower right")
    plt.show()

plot_roc_auc(labels, predictions)

# 15. Visualize Model Architecture and Summary

In [None]:
x_hrv = torch.randn(1, 1, GLOBAL_MAX_LENGTH).to(device)
x_activity = torch.randn(1, 1, GLOBAL_MAX_LENGTH).to(device)
y = best_model(x_hrv, x_activity)
make_dot(y, params=dict(best_model.named_parameters())).render("model_architecture", format="png")

summary(best_model, [(1, GLOBAL_MAX_LENGTH), (1, GLOBAL_MAX_LENGTH)])

# 16. Classification Report and Confusion Matrix

In [None]:
predicted_labels = np.round(predictions)
report = classification_report(labels, predicted_labels, target_names=['Control', 'ADHD'])
print(report)

cm = confusion_matrix(labels, predicted_labels)
sns.heatmap(cm, annot=True, fmt="d", cmap='Blues', xticklabels=['Control', 'ADHD'], yticklabels=['Control', 'ADHD'])
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Confusion Matrix')
plt.show()

# 17. Save Results

In [None]:
results_df = pd.DataFrame(results)
results_df.to_csv('1d_CNN_AVGPool_results.csv', index=False)

best_model_summary = {
    "Model Configuration": best_config,
    "Test Metrics": {
        "Test Accuracy": test_accuracy,
        "Test Precision": precision,
        "Test Recall": recall,
        "Test Loss": test_loss
    }
}

pd.set_option('display.max_columns', None)
print(pd.DataFrame(best_model_summary))