# Cognitive Decline Prediction Using EEG Data and Machine Learning

## 1. Project Overview

This project develops a machine learning classifier to distinguish between Alzheimer's Disease, Frontotemporal Dementia, and healthy controls using EEG signal features. The analysis follows a complete pipeline from data loading through model optimization.

In [None]:
import mne
from bids import BIDSLayout
import os
import numpy as np
import pandas as pd

## 2. Data Preprocessing

### EEG Signal Preprocessing Pipeline
The EEG recordings were exported in .eeg format and transformed to the BIDS-accepted .set format, with a preprocessing pipeline that included applying a Butterworth band-pass filter (0.5-45 Hz), re-referencing, and using Artifact Subspace Reconstruction (ASR) to remove bad data segments. Independent Component Analysis (ICA) was then performed to identify and reject components classified as eye or jaw artifacts, even though some eye movement artifacts remained in the resting state recordings.

In [None]:
data_path = "data/derivatives"

## 3. Data Loading and Integration

In [None]:
# Function to load subject data
def load_subject_data(subject_id):
    subject_path = os.path.join(data_path, f"sub-{subject_id:03d}", "eeg", f"sub-{subject_id:03d}_task-eyesclosed_eeg.set")
    # print(f"Trying to load: {subject_path}")  # Debugging line
    raw = mne.io.read_raw_eeglab(subject_path, preload=True)  # Load the .set file
    return raw

In [None]:
'''def load_all_subjects(data_path, num_subjects=88):
    raw_data_list = []
    for subject_id in range(1, num_subjects + 1):
        try:
            raw = load_subject_data(subject_id)  # Load your EEG data here
            raw_data_list.append(raw)
            print(f"Loaded data for subject {subject_id}")
        except FileNotFoundError:
            print(f"Data for subject {subject_id} not found.")
    return raw_data_list'''

In [None]:
'''raw_data = load_subject_data(1)
if raw_data is not None:
    print(raw_data.info)  # Access the info attribute directly
else:
    print("No data was loaded.")
    
raw_data.plot(n_channels=10, scalings='auto') # Test if data was loaded correctly'''

In [None]:
# Load participant data
participant_data_path = 'data/participants.tsv'
participant_data = pd.read_csv(participant_data_path, sep='\t')

# Create a list to hold EEG data
eeg_data_list = []

# Load EEG data for each subject and store it in a list
for subject_id in range(1, 89):  # Assuming 88 subjects
    try:
        raw = load_subject_data(subject_id)
        eeg_data_list.append(raw)
    except FileNotFoundError:
        print(f"Data for subject {subject_id} not found.")

# Create a DataFrame for EEG data
eeg_df = pd.DataFrame({
    'participant_id': [f'sub-{str(i).zfill(3)}' for i in range(1, 89)],
    'eeg_data': eeg_data_list
})

# Check if all EEG data has been loaded
print(f"Loaded EEG data for {len(eeg_data_list)} subjects.")

# Merge EEG data with participant information
combined_data = pd.merge(eeg_df, participant_data, on='participant_id')

# Display summary of combined data instead of the entire DataFrame
# For example, display the first few rows of participant information and EEG data shapes
print(combined_data[['participant_id'] + list(participant_data.columns)])  # Adjust according to actual participant data columns

# If you want to see the shape of the EEG data
for index, row in combined_data.iterrows():
    print(f"{row['participant_id']} EEG data shape: {row['eeg_data'].get_data().shape}")

### Encoding Labels

Encode the group labels into numerical format:
Alzheimer's Disease (A) = 0
Frontotemporal Dementia (F) = 1
Healthy Control (C) = 2

In [None]:
group_mapping = {'A': 0, 'F': 1, 'C': 2}
combined_data['Group'] = combined_data['Group'].map(group_mapping)

In [None]:
# Save the combined data to a CSV file
output_csv_path = 'data/combined_eeg_participant_data.csv'
combined_data.to_csv(output_csv_path, index=False)

print(f"Combined data saved to {output_csv_path}")

## Define Feature Extraction Functions

Let's define functions to extract relevant features from your EEG data.

### Bandpower Extraction 

Analyze how different frequency bands (delta, theta, alpha, beta, gamma) change across different brain regions (channels).

In [None]:
def extract_bandpower(raw, freq_bands, n_fft=2048):
    """Extract bandpower features from EEG data."""
    # Compute the Power Spectral Density (PSD)
    spectrum = raw.compute_psd(method='welch', n_fft=n_fft)
    psd_data, freqs = spectrum.get_data(return_freqs=True)

    bandpowers = {}
    for band, (fmin, fmax) in freq_bands.items():
        # Find the indices of frequencies within the specified band
        band_idx = np.where((freqs >= fmin) & (freqs <= fmax))[0]
        # Average the PSD values over the band
        bandpower = np.mean(psd_data[:, band_idx], axis=1)  # Average across all frequencies in the band
        bandpowers[band] = bandpower

    return bandpowers


### Set Indices of Frequencies and Initialize Feature Dictionary

In [None]:
freq_bands = {
    'delta': (0.5, 4),
    'theta': (4, 8),
    'alpha': (8, 13),
    'beta': (12, 25),
    'gamma': (25, 45)
}

# Create a list to hold the feature dictionaries
features_list = []

### Power Spectral Density (PSD)

Using the Welch method is a technique for estimating the power of a signal's frequency components. It provides a way to analyze the distribution of power across different frequencies in a signal, making it useful for identifying dominant frequency components in various applications such as EEG analysis

In [None]:
# Function to compute band power for different frequency bands per channel
def compute_bandpower_per_channel(raw, freq_range):
    # Compute the power spectral density using Welch's method
    spectrum = raw.compute_psd(method="welch")
    # Retrieve the power spectral density data and corresponding frequencies
    data, freqs = spectrum.get_data(return_freqs=True)

    # Find indices of the frequency range
    freq_indices = np.logical_and(freqs >= freq_range[0], freqs <= freq_range[1])
    # Compute the average power in the desired frequency band for each channel
    band_power = np.mean(data[:, freq_indices], axis=1)  # Average power for each channel
    
    return band_power

### Connectivity Measures

Connectivity measures are useful for assessing interactions between different EEG channels or brain regions. Correlation measures linear dependency between signals from different channels.

In [None]:
# Function to compute connectivity between EEG channels
def compute_connectivity(raw):
    """
    Computes the functional connectivity using correlation between channels.
    
    Parameters:
        raw: MNE Raw object containing the EEG data.
        
    Returns:
        Connectivity matrix (correlation coefficients).
    """
    data = raw.get_data()  # Get all channels, shape (n_channels, n_samples)
    connectivity_matrix = np.corrcoef(data)  # Calculate correlation coefficients
    return connectivity_matrix

###  Extract Features and Append to Feature Dictionary

In [None]:
features_list.clear() #Ensure we reset our feeatures_list
# Iterate over the rows in combined_data
for index, row in combined_data.iterrows():
    raw = row['eeg_data']  # Get the raw EEG data
    bandpower_results = {}

    # Calculate band power for each frequency band for each channel
    for band, freq_range in freq_bands.items():
        bandpower_results[band] = compute_bandpower_per_channel(raw, freq_range)

    # Create a feature dictionary
    features = {
        'participant_id': row['participant_id'],
        'Gender': row['Gender'],
        'Age': row['Age'],
        'Group': row['Group'],
        'MMSE': row['MMSE']
    }

    # Add bandpower features for each channel
    for band, power in bandpower_results.items():
        # Assuming you have 'n_channels' number of channels
        for i, channel_power in enumerate(power):
            features[f'bandpower_{band}_channel_{i+1}'] = channel_power  # +1 for human-readable indexing
        features[f'average_bandpower_{band}'] = np.mean(power)

    # Calculate and add connectivity features
    connectivity_matrix = compute_connectivity(raw)
    
    # For simplicity, you can extract average connectivity or specific values
    features['average_connectivity'] = np.mean(connectivity_matrix)  # Average connectivity across channels

    # Example: Add specific channel correlations if desired
    # features['connectivity_1_2'] = connectivity_matrix[0, 1]  # Correlation between channel 1 and channel 2

    features_list.append(features)


In [None]:
# Create a new DataFrame for features
features_df = pd.DataFrame(features_list)

# Display the features DataFrame
print(features_df.head())

In [None]:
# Save features to a CSV file
features_df.to_csv('data/extracted_features.csv', index=False)
print("Extracted features saved to extracted_features.csv")

In [None]:
# Display descriptive statistics
print(features_df.describe())

## Data Visualization 

Visually compare the distribution of various bandpower features among different groups of participants using a correlation matrix. We will use the plot to determine which channel recordings and frequency types correlate with our patient groupings.

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt

In [None]:
numeric_features_df = features_df.select_dtypes(include=[np.number])

In [None]:
correlation_matrix = numeric_features_df.corr()

correlation_matrix.to_csv('data/correlation_matrix_df.csv', index=False)

In [None]:
# Step 2: Extract correlations with "Group" and drop self-correlation
group_correlation = correlation_matrix['Group'].drop('Group')  # Drop self-correlation

group_correlation .to_csv('data/group_correlation _df.csv', index=False)
# Step 3: Get absolute values of the correlations
absolute_group_correlation = group_correlation.abs()

# Step 4: Sort the absolute correlations in descending order
sorted_group_correlation = absolute_group_correlation.sort_values(ascending=False)

# Display the sorted correlations
print(sorted_group_correlation[:21])

In [None]:
# Define the columns to drop (for both bandpower_gamma and bandpower_beta, plus Age and MMSE)
columns_to_drop = ['Age', 'MMSE']

# Drop the specified columns and rows from the correlation matrix
filtered_corr_matrix = correlation_matrix.drop(columns=columns_to_drop)

# Drop the same rows to keep the matrix symmetric
filtered_corr_matrix = filtered_corr_matrix.drop(index=columns_to_drop)

# Plot the filtered correlation matrix
plt.figure(figsize=(75, 75))
sns.heatmap(filtered_corr_matrix, annot=True, fmt=".2f", cmap='coolwarm', square=True)
plt.title('Correlation Matrix of Features')
plt.show()


### We care about 'Group' correlation in our matrix

In [None]:
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

# Assuming correlation_matrix is already defined
# Define the columns to drop (for both bandpower_gamma and bandpower_beta, plus Age and MMSE)
columns_to_drop = ['Age', 'MMSE', 'Group']

# Drop the specified columns to create a filtered correlation matrix
filtered_corr_matrix = correlation_matrix.drop(columns=columns_to_drop)

# Select only the "Group" row from the filtered correlation matrix
group_corr_matrix = filtered_corr_matrix.loc[['Group']]

# Plot the correlation matrix for "Group" only
plt.figure(figsize=(75, 50))  # Adjust the figure size as needed
sns.heatmap(group_corr_matrix, annot=True, fmt=".2f", cmap='coolwarm', square=True, cbar=False)
plt.title('Correlation of Features with Group')
plt.show()


## Relevant Feature Extraction

This way, we will only train our model using the top features that provide the highest accuracy.

According to the literature, AD patients exhibit changes in the RBP such as reduced alpha power and increased theta power.

In [None]:
# Step 1: Extract the top 19 feature names, excluding MMSE
top_features = sorted_group_correlation.index[:21].tolist()

# Step 2: Remove MMSE if it's in the top features
top_features = [feature for feature in top_features if feature != 'MMSE'] # We only want to use EEG

# Step 3: Format the list to match your desired output
feature_columns = [f"{feature}" for feature in top_features]

# Print the feature columns in the desired format
formatted_feature_columns = ',\n    '.join(feature_columns)
formatted_output = f"feature_columns = [\n    {formatted_feature_columns}\n]"

print(formatted_output)


## Split  dataset into training and testing subsets

In [None]:
from sklearn.model_selection import train_test_split

# Select features and target variable
X = features_df[feature_columns]  # Features with selected columns
y = features_df['Group']           # Target variable (you can change this based on your analysis)

# Split the dataset into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.35, random_state=42)

# Optional: Print the shapes of the training and testing sets to confirm
print(f"X_train shape: {X_train.shape}, X_test shape: {X_test.shape}")
print(f"y_train shape: {y_train.shape}, y_test shape: {y_test.shape}")


In [None]:
print("\n📁 Saving train/test splits to CSV files...")

# Create a directory for the splits (optional)
output_dir = "train_test_splits"
if not os.path.exists(output_dir):
    os.makedirs(output_dir)
    print(f"Created directory: {output_dir}")

# Save training features
X_train.to_csv(f"{output_dir}/X_train.csv", index=False)
print(f"✅ Saved X_train to {output_dir}/X_train.csv")

# Save test features  
X_test.to_csv(f"{output_dir}/X_test.csv", index=False)
print(f"✅ Saved X_test to {output_dir}/X_test.csv")

# Save training labels
pd.Series(y_train, name='Group').to_csv(f"{output_dir}/y_train.csv", index=False)
print(f"✅ Saved y_train to {output_dir}/y_train.csv")

# Save test labels
pd.Series(y_test, name='Group').to_csv(f"{output_dir}/y_test.csv", index=False)
print(f"✅ Saved y_test to {output_dir}/y_test.csv")

In [None]:
# Load the saved data
X_train = pd.read_csv("train_test_splits/X_train.csv")
X_test = pd.read_csv("train_test_splits/X_test.csv")  # Fixed: was X_train.csv
y_train = pd.read_csv("train_test_splits/y_train.csv").iloc[:, 0]  # Fixed: was X_train.csv
y_test = pd.read_csv("train_test_splits/y_test.csv").iloc[:, 0]    # Fixed: was X_train.csv

print("Data loaded successfully!")
print(f"Training set shape: {X_train.shape}")
print(f"Test set shape: {X_test.shape}")
print(f"Original class distribution:")
print(y_train.value_counts().sort_index())

## Resampling Evaluation

In [None]:
from imblearn.over_sampling import SMOTE, ADASYN, BorderlineSMOTE, SVMSMOTE
from imblearn.under_sampling import RandomUnderSampler, TomekLinks
from imblearn.combine import SMOTEENN, SMOTETomek

# Dictionary of resampling techniques to test
resampling_techniques = {
    'Original (No Resampling)': None,
    'SMOTE': SMOTE(random_state=42),
    'ADASYN': ADASYN(random_state=42),
    'Borderline SMOTE': BorderlineSMOTE(random_state=42, kind='borderline-1'),
    'SVM SMOTE': SVMSMOTE(random_state=42),
    'Random Undersampling': RandomUnderSampler(random_state=42),
    'Tomek Links': TomekLinks(),
    'SMOTE + ENN': SMOTEENN(random_state=42),
    'SMOTE + Tomek': SMOTETomek(random_state=42)
}

print(f"Will test {len(resampling_techniques)} different techniques")

In [None]:
from xgboost import XGBClassifier
from sklearn.metrics import accuracy_score, classification_report

def quick_evaluate_resampling(X_train_res, y_train_res, technique_name):
    """Quick evaluation with basic XGBoost parameters"""
    model = XGBClassifier(
        learning_rate=0.1,
        max_depth=2,
        n_estimators=200,
        random_state=42,
        objective='multi:softmax',
        verbosity=0
    )
    
    model.fit(X_train_res, y_train_res)
    y_pred = model.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    
    return accuracy

print("Quick evaluation function defined")

In [None]:
# Store results for comparison
resampling_results = {}
resampled_datasets = {}

print("Testing resampling techniques...\n" + "="*60)

for technique_name, resampler in resampling_techniques.items():
    print(f"\nTesting: {technique_name}")
    print("-" * 40)
    
    try:
        # Apply resampling
        if resampler is None:
            X_train_res, y_train_res = X_train.copy(), y_train.copy()
            print("Using original data distribution")
        else:
            X_train_res, y_train_res = resampler.fit_resample(X_train, y_train)
            print(f"Resampled data distribution:")
        
        # Show class distribution
        class_counts = pd.Series(y_train_res).value_counts().sort_index()
        print(class_counts)
        print(f"Total samples: {len(y_train_res)}")
        
        # Quick evaluation
        accuracy = quick_evaluate_resampling(X_train_res, y_train_res, technique_name)
        
        # Store results
        resampling_results[technique_name] = {
            'accuracy': accuracy,
            'total_samples': len(y_train_res),
            'class_distribution': class_counts.to_dict()
        }
        
        # Store best datasets for later use
        resampled_datasets[technique_name] = (X_train_res, y_train_res)
        
        print(f"Quick test accuracy: {accuracy:.3f}")
        
    except Exception as e:
        print(f"Error with {technique_name}: {str(e)}")
        continue
    
    print("-" * 40)

print(f"\nCompleted testing {len(resampling_results)} techniques")

In [None]:
# Create results summary
results_df = pd.DataFrame(resampling_results).T
results_df = results_df.sort_values('accuracy', ascending=False)

print("RESAMPLING TECHNIQUE COMPARISON")
print("=" * 50)
print(f"{'Technique':<25} {'Accuracy':<10} {'Samples':<10}")
print("-" * 50)

for technique, row in results_df.iterrows():
    print(f"{technique:<25} {row['accuracy']:<10.3f} {row['total_samples']:<10}")

print("\nTop 3 techniques:")
top_3 = results_df.head(3).index.tolist()
for i, technique in enumerate(top_3, 1):
    accuracy = results_df.loc[technique, 'accuracy']
    print(f"{i}. {technique}: {accuracy:.3f}")

In [None]:
# Automatically select the best technique
best_technique = results_df.index[0]  # Top performer
best_accuracy = results_df.iloc[0]['accuracy']

print(f"Best resampling technique: {best_technique}")
print(f"Best quick test accuracy: {best_accuracy:.3f}")

# Get the best resampled dataset
X_train_resampled, y_train_resampled = resampled_datasets[best_technique]

print(f"\nFinal resampled training set:")
print(f"Shape: {X_train_resampled.shape}")
print(f"Class distribution:")
print(pd.Series(y_train_resampled).value_counts().sort_index())

# Save the best resampled data (optional)
# pd.DataFrame(X_train_resampled).to_csv("X_train_resampled_best.csv", index=False)
# pd.Series(y_train_resampled).to_csv("y_train_resampled_best.csv", index=False)

print(f"\nReady for hyperparameter tuning with {best_technique}!")

## Hyperparameter Tuning 

In [None]:
from xgboost import XGBClassifier
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
import pandas as pd
import numpy as np
import time
print("Libraries loaded for manual hyperparameter tuning")

In [None]:
manual_params = {
    'learning_rate': 0.1,        # Lower learning rate for better generalization
    'max_depth': 2,               # Shallow trees to prevent overfitting
    'n_estimators': 200,          # Moderate number of trees
    'subsample': 0.9,             # Use 90% of samples for each tree
    'colsample_bytree': 0.9,      # Use 90% of features for each tree
    'reg_alpha': 0,              # L1 regularization
    'reg_lambda': 5,             # L2 regularization
    'gamma': 1,                   # Minimum loss reduction required
    'min_child_weight': 1         # Minimum samples in leaf
}

In [None]:
print("Creating XGBoost model with manual parameters...")

# Create model with manual parameters
xgb_model = XGBClassifier(
    **manual_params,
    random_state=42,
    eval_metric='logloss',
    use_label_encoder=False
)

# Train the model
print("Training model...")
start_time = time.time()
xgb_model.fit(X_train_resampled, y_train_resampled)
end_time = time.time()

print(f"Training completed in {(end_time - start_time):.2f} seconds")

In [None]:
from sklearn.model_selection import cross_val_score, StratifiedKFold

# Evaluate with cross-validation
print("Evaluating with cross-validation...")
cv_strategy = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
cv_scores = cross_val_score(xgb_model, X_train_resampled, y_train_resampled, 
                           cv=cv_strategy, scoring='accuracy')

print("CROSS-VALIDATION RESULTS")
print("=" * 30)
print(f"CV Scores: {cv_scores}")
print(f"Mean CV Score: {cv_scores.mean():.4f}")
print(f"Std CV Score: {cv_scores.std():.4f}")

In [None]:
# Make predictions on test set
y_pred = xgb_model.predict(X_test)
test_accuracy = accuracy_score(y_test, y_pred)

print("TEST SET EVALUATION")
print("=" * 30)
print(f"Test Accuracy: {test_accuracy:.4f}")
print(f"CV Score: {cv_scores.mean():.4f}")
print(f"Difference: {abs(test_accuracy - cv_scores.mean()):.4f}")

if abs(test_accuracy - cv_scores.mean()) > 0.05:
    print("Large difference between CV and test scores - possible overfitting")
else:
    print("Good generalization - CV and test scores are close")

In [None]:
print(f"\nDETAILED CLASSIFICATION REPORT")
print("=" * 50)
print(classification_report(y_test, y_pred))

# Confusion Matrix
print("CONFUSION MATRIX")
print("-" * 20)
cm = confusion_matrix(y_test, y_pred)
print(cm)

# Per-class accuracy
print("\nPER-CLASS ACCURACY")
print("-" * 25)
for i in range(len(np.unique(y_test))):
    if i < len(cm):
        class_accuracy = cm[i, i] / cm[i, :].sum() if cm[i, :].sum() > 0 else 0
        print(f"Class {i}: {class_accuracy:.3f}")

In [None]:

from sklearn.model_selection import learning_curve
# Plot learning curves to visualize overfitting
def plot_learning_curves(model, X, y, cv=5):
    """Plot learning curves to detect overfitting"""
    
    train_sizes, train_scores, val_scores = learning_curve(
        model, X, y, cv=cv, n_jobs=-1,
        train_sizes=np.linspace(0.1, 1.0, 10),
        scoring='accuracy'
    )
    
    train_mean = np.mean(train_scores, axis=1)
    train_std = np.std(train_scores, axis=1)
    val_mean = np.mean(val_scores, axis=1)
    val_std = np.std(val_scores, axis=1)
    
    plt.figure(figsize=(10, 6))
    plt.plot(train_sizes, train_mean, 'o-', color='blue', label='Training Accuracy')
    plt.fill_between(train_sizes, train_mean - train_std, train_mean + train_std, alpha=0.1, color='blue')
    
    plt.plot(train_sizes, val_mean, 'o-', color='red', label='Validation Accuracy')
    plt.fill_between(train_sizes, val_mean - val_std, val_mean + val_std, alpha=0.1, color='red')
    
    plt.xlabel('Training Set Size')
    plt.ylabel('Accuracy')
    plt.title('Learning Curves')
    plt.legend()
    plt.grid(True, alpha=0.3)
    plt.show()
    
    # Analysis
    final_gap = train_mean[-1] - val_mean[-1]
    print(f"Final gap between training and validation: {final_gap:.3f}")
    if final_gap > 0.1:
        print("Strong evidence of overfitting")
    elif final_gap > 0.05:
        print("Moderate overfitting")
    else:
        print("Good generalization")

# Run learning curve analysis
print("Generating learning curves...")
plot_learning_curves(xgb_model, X_train_resampled, y_train_resampled)

### Classification Report

In [None]:
import matplotlib.pyplot as plt

# Get feature importance
feature_importance = xgb_model.feature_importances_
feature_names = X_train.columns if hasattr(X_train, 'columns') else [f'Feature_{i}' for i in range(len(feature_importance))]

# Create feature importance DataFrame
importance_df = pd.DataFrame({
    'feature': feature_names,
    'importance': feature_importance
}).sort_values('importance', ascending=False)

print("TOP 20 MOST IMPORTANT FEATURES")
print("=" * 40)
print(importance_df.head(10))

# Plot feature importance
plt.figure(figsize=(12, 8))
top_features = importance_df.head(10)
plt.barh(range(len(top_features)), top_features['importance'])
plt.yticks(range(len(top_features)), top_features['feature'])
plt.xlabel('Feature Importance')
plt.title('Top 10 Feature Importances (XGBoost)')
plt.gca().invert_yaxis()
plt.tight_layout()
plt.show()

### Binary Approach

In [None]:
# Create binary classification: Disease vs Healthy
print("Creating binary classification: Disease vs Healthy")

# Combine classes 0 and 1 as "Disease", class 2 as "Healthy"
y_binary = (y == 2).astype(int)  # 1 = Healthy, 0 = Disease

print("Binary class distribution:")
print(f"Disease (0): {sum(y_binary == 0)} samples")
print(f"Healthy (1): {sum(y_binary == 1)} samples")

# Split data
X_train_bin, X_test_bin, y_train_bin, y_test_bin = train_test_split(
    X, y_binary, test_size=0.3, stratify=y_binary, random_state=42
)

print(f"Training set: Disease={sum(y_train_bin == 0)}, Healthy={sum(y_train_bin == 1)}")
print(f"Test set: Disease={sum(y_test_bin == 0)}, Healthy={sum(y_test_bin == 1)}")

In [None]:
def create_eeg_discriminative_features(X_train, X_test, y_train):
    """Create EEG features specifically for class discrimination"""
    
    X_train_new = X_train.copy()
    X_test_new = X_test.copy()
    
    # 1. FREQUENCY BAND RATIOS (important for EEG classification)
    print("Creating frequency band ratios...")
    
    # Alpha/Beta ratio (cognitive load indicator)
    alpha_cols = [col for col in X_train.columns if 'alpha' in col and 'channel' in col]
    beta_cols = [col for col in X_train.columns if 'beta' in col and 'channel' in col]
    
    if alpha_cols and beta_cols:
        X_train_new['alpha_beta_ratio_mean'] = X_train[alpha_cols].mean(axis=1) / (X_train[beta_cols].mean(axis=1) + 1e-8)
        X_test_new['alpha_beta_ratio_mean'] = X_test[alpha_cols].mean(axis=1) / (X_test[beta_cols].mean(axis=1) + 1e-8)
        
        # Channel-specific alpha/beta ratios for important channels
        for i, (alpha_col, beta_col) in enumerate(zip(alpha_cols[:5], beta_cols[:5])):  # Top 5 channels
            ratio_name = f'alpha_beta_ratio_ch_{i+1}'
            X_train_new[ratio_name] = X_train[alpha_col] / (X_train[beta_col] + 1e-8)
            X_test_new[ratio_name] = X_test[alpha_col] / (X_test[beta_col] + 1e-8)
    
    # 2. HEMISPHERIC ASYMMETRY (left vs right brain activity)
    print("Creating hemispheric asymmetry features...")
    
    # Assuming standard EEG channel layout - adjust channel numbers based on your setup
    left_channels = [7, 9, 10]  # Example left hemisphere channels
    right_channels = [15, 16, 19]  # Example right hemisphere channels
    
    for band in ['alpha', 'beta']:
        left_cols = [col for col in X_train.columns if band in col and any(f'channel_{ch}' in col for ch in left_channels)]
        right_cols = [col for col in X_train.columns if band in col and any(f'channel_{ch}' in col for ch in right_channels)]
        
        if left_cols and right_cols:
            left_mean_train = X_train[left_cols].mean(axis=1)
            right_mean_train = X_train[right_cols].mean(axis=1)
            left_mean_test = X_test[left_cols].mean(axis=1)
            right_mean_test = X_test[right_cols].mean(axis=1)
            
            # Asymmetry index: (Left - Right) / (Left + Right)
            X_train_new[f'{band}_asymmetry'] = (left_mean_train - right_mean_train) / (left_mean_train + right_mean_train + 1e-8)
            X_test_new[f'{band}_asymmetry'] = (left_mean_test - right_mean_test) / (left_mean_test + right_mean_test + 1e-8)
    
    # 3. CLASS-SPECIFIC FEATURES (simplified)
    print("Creating class-discriminative features...")
    
    # Find features with highest between-class variance
    from sklearn.feature_selection import f_classif
    f_scores, _ = f_classif(X_train, y_train)
    top_discriminative_features = X_train.columns[np.argsort(f_scores)[-3:]]  # Top 3 to avoid too many features
    
    print(f"Most discriminative original features: {list(top_discriminative_features)}")
    
    # Create simple ratio features from top discriminative features
    for i, feat1 in enumerate(top_discriminative_features):
        for feat2 in top_discriminative_features[i+1:]:
            ratio_name = f'{feat1}_div_{feat2}'.replace('bandpower_', '').replace('channel_', 'ch')[:30]  # Shorten names
            X_train_new[ratio_name] = X_train[feat1] / (X_train[feat2] + 1e-8)
            X_test_new[ratio_name] = X_test[feat1] / (X_test[feat2] + 1e-8)
    
    return X_train_new, X_test_new

# Apply feature engineering to binary classification
print("Applying feature engineering to binary classification...")
X_train_bin_eng, X_test_bin_eng = create_eeg_discriminative_features(
    X_train_bin, X_test_bin, y_train_bin
)

print(f"Original features: {X_train_bin.shape[1]}")
print(f"After engineering: {X_train_bin_eng.shape[1]}")
print(f"New features added: {X_train_bin_eng.shape[1] - X_train_bin.shape[1]}")

# Apply feature selection
from sklearn.feature_selection import SelectKBest, f_classif
selector_binary = SelectKBest(score_func=f_classif, k=25)  # Select top 25 features
X_train_bin_selected = selector_binary.fit_transform(X_train_bin_eng, y_train_bin)
X_test_bin_selected = selector_binary.transform(X_test_bin_eng)  # Fixed: use X_test_bin_eng

# Get selected feature names
selected_feature_names = X_train_bin_eng.columns[selector_binary.get_support()]
print(f"Selected {len(selected_feature_names)} best features for binary classification")

# Create final DataFrames without index issues
X_train_bin_final = pd.DataFrame(X_train_bin_selected, columns=selected_feature_names)
X_test_bin_final = pd.DataFrame(X_test_bin_selected, columns=selected_feature_names)

print(f"Final binary feature set shapes:")
print(f"Training: {X_train_bin_final.shape}")
print(f"Test: {X_test_bin_final.shape}")
print("Feature engineering for binary classification completed!")

In [None]:
from imblearn.over_sampling import SMOTE

# Apply SMOTE for binary classification
print("Applying SMOTE to binary classification...")
smote_binary = SMOTE(random_state=42)
X_train_bin_resampled, y_train_bin_resampled = smote_binary.fit_resample(
    X_train_bin_final, y_train_bin
)

print("Binary class distribution after SMOTE:")
print(f"Disease (0): {sum(y_train_bin_resampled == 0)}")
print(f"Healthy (1): {sum(y_train_bin_resampled == 1)}")

# Train binary XGBoost
print("Training binary XGBoost model...")
xgb_binary = XGBClassifier(
    learning_rate=0.1,
    max_depth=3,
    n_estimators=200,
    subsample=0.8,
    colsample_bytree=0.8,
    reg_alpha=10,
    reg_lambda=20,
    random_state=42,
    eval_metric='logloss',
    use_label_encoder=False
)

xgb_binary.fit(X_train_bin_resampled, y_train_bin_resampled)

# Evaluate binary model
y_pred_binary = xgb_binary.predict(X_test_bin_final)
accuracy_binary = accuracy_score(y_test_bin, y_pred_binary)

print(f"\nBINARY CLASSIFICATION RESULTS:")
print("=" * 40)
print(f"Accuracy: {accuracy_binary:.4f}")
print("\nClassification Report:")
print(classification_report(y_test_bin, y_pred_binary, 
                          target_names=['Disease', 'Healthy']))

# Confusion matrix
cm_binary = confusion_matrix(y_test_bin, y_pred_binary)
print("\nConfusion Matrix:")
print("Predicted:  Disease  Healthy")
print(f"Disease:      {cm_binary[0,0]:3d}      {cm_binary[0,1]:3d}")
print(f"Healthy:      {cm_binary[1,0]:3d}      {cm_binary[1,1]:3d}")

In [None]:
# Verify performance with CV
from sklearn.model_selection import cross_val_score
cv_scores_binary = cross_val_score(xgb_binary, X_train_bin_resampled, y_train_bin_resampled, cv=5)
print(f"Binary CV Scores: {cv_scores_binary}")
print(f"Mean CV Score: {cv_scores_binary.mean():.4f} (±{cv_scores_binary.std():.4f})")

## Conclusion

We can conclude from our hyperparameter tuning that despite potentially achieving higher F1 scores, this would lead to overfitting as our model's performance on the test data remains poor. 

If we perform cross-validation accross our original data set, we get an average CV score of 65%. 

Our model is able to determine whether or not our subject is healthy or not, but struggles when determining whether they have AD or Dementia.