In [None]:
import random
import numpy as np
import tensorflow as tf
import os
import keras_tuner as kt
# Set seeds
SEED = 42
random.seed(SEED)
np.random.seed(SEED)
tf.random.set_seed(SEED)
os.environ['PYTHONHASHSEED'] = str(SEED)
os.environ['TF_DETERMINISTIC_OPS'] = '1'

In [None]:
import pandas as pd
import numpy as np
import os

meta = pd.read_csv('Train_Test_Split.csv')
meta = meta[meta['label'].isin(['AD', 'Healthy'])]

train_meta = meta[meta['split'] == 'train']
test_meta = meta[meta['split'] == 'test']

def load_and_segment(subject_id, data_dir='Data_sampled_128HZ', segment_len=1024):
    file_path = os.path.join(data_dir, f"{subject_id}_data.npy")
    data = np.load(file_path)
    _, time_steps = data.shape
    num_segments = time_steps // segment_len
    if num_segments == 0:
        return np.empty((0, 19, segment_len))
    data = data[:, :num_segments * segment_len]
    segments = data.reshape(19, num_segments, segment_len).transpose(1, 0, 2)
    return segments

def process_data(meta_df, data_dir='Data_sampled_128HZ'):
    X = []
    y = []
    label_map = {'AD': 1, 'Healthy': 0}
    for _, row in meta_df.iterrows():
        segments = load_and_segment(row['subject_id'], data_dir)
        if segments.shape[0] == 0:
            continue
        X.append(segments)
        label = label_map[row['label']]
        one_hot = np.eye(2)[label]
        y.extend([one_hot] * segments.shape[0])
    X = np.concatenate(X, axis=0)
    y = np.array(y)
    return X, y
X_train, y_train = process_data(train_meta)
X_test, y_test = process_data(test_meta)
X_train = (X_train * 1e6) - np.mean(X_train * 1e6, axis=2, keepdims=True)
X_test = (X_test * 1e6) - np.mean(X_test * 1e6, axis=2, keepdims=True)

In [None]:
from tensorflow.keras.models import load_model, Model

def load_and_trim_model(path):
    model = load_model(path)
    model.pop() 
    model.trainable = False  # freeze
    return model
bilstm_model = load_and_trim_model('Models\Final_Bilstm_model.keras')
cnn_time_model = load_and_trim_model('Models\Final_CNNSpatial_model.keras')
cnn_freq_model = load_and_trim_model('Models\Final_CNNSpectral_model.keras')

In [None]:
import tensorflow as tf
from tensorflow.keras.layers import Input, Dense, LayerNormalization, Dropout, MultiHeadAttention, GlobalAveragePooling1D, Concatenate, Lambda, Embedding
from tensorflow.keras.models import Model

def build_fusion_model_with_transformer(bilstm_model, cnn_time_model, cnn_freq_model, common_dim=128, num_heads=4, ff_dim=256, dropout_rate=0.3):
    bilstm_input = Input(shape=bilstm_model.output_shape[1:])
    cnn_time_input = Input(shape=cnn_time_model.output_shape[1:])
    cnn_freq_input = Input(shape=cnn_freq_model.output_shape[1:])

    bilstm_proj = Dense(common_dim)(bilstm_input)
    cnn_time_proj = Dense(common_dim)(cnn_time_input)
    cnn_freq_proj = Dense(common_dim)(cnn_freq_input)

    x = Lambda(lambda t: tf.stack(t, axis=1))([bilstm_proj, cnn_time_proj, cnn_freq_proj])

    for _ in range(2):
        attn_output = MultiHeadAttention(num_heads=num_heads, key_dim=common_dim)(x, x)
        x = LayerNormalization(epsilon=1e-6)(x + attn_output)

        ffn_output = Dense(ff_dim, activation='relu')(x)
        ffn_output = Dense(common_dim)(ffn_output)
        x = LayerNormalization(epsilon=1e-6)(x + ffn_output)

    x = GlobalAveragePooling1D()(x)
    x = Dropout(dropout_rate)(x)
    out = Dense(2, activation='softmax')(x)

    model = Model(inputs=[bilstm_input, cnn_time_input, cnn_freq_input], outputs=out)
    return model


In [None]:
fusion_model = build_fusion_model_with_transformer(
    bilstm_model, cnn_time_model, cnn_freq_model,
    common_dim=128, num_heads=4, ff_dim=256, dropout_rate=0.3
)

from tensorflow.keras.optimizers import Adam

optimizer = Adam(learning_rate=1e-3) 

fusion_model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy','AUC'])


In [None]:
#BiLSTM Inputs
top_channels = [14, 2, 0, 18, 4]
X_train_selected = X_train[:, top_channels, :].transpose(0, 2, 1)  # (samples, 1024, 5)
X_test_selected = X_test[:, top_channels, :].transpose(0, 2, 1)    # (samples, 1024, 5)
X_bilstm_input = X_train_selected
# CNNSPatial Inputs
X_train_cnn = X_train[..., np.newaxis]  # shape: (N, 19, 1024, 1)
X_test_cnn = X_test[..., np.newaxis]
X_cnn_time_input = X_train_cnn
#CNNSpectral Inputs
from scipy.signal import welch

def compute_spectral_features(X, fs=128, nperseg=256):
    num_segments, num_channels, num_samples = X.shape
    psd_all = []

    for seg in X:
        seg_psd = []
        for ch in seg:
            freqs, psd = welch(ch, fs=fs, nperseg=nperseg)
            seg_psd.append(psd)
        psd_all.append(seg_psd)

    psd_all = np.array(psd_all)  
    psd_all = np.log1p(psd_all)  
    return psd_all, freqs

X_train_spec, freqs = compute_spectral_features(X_train)
X_test_spec, _ = compute_spectral_features(X_test)

X_train_spec = X_train_spec[..., np.newaxis]  # shape: (N, 19, freq_bins, 1)
X_test_spec = X_test_spec[..., np.newaxis]

X_cnn_freq_input = X_train_spec

In [None]:
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
callbacks = [
    EarlyStopping(monitor='val_loss', patience=30, restore_best_weights=True),
    ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=4)
]

In [None]:
X_bilstm_feat = bilstm_model.predict(X_bilstm_input)
X_cnn_time_feat = cnn_time_model.predict(X_cnn_time_input)
X_cnn_freq_feat = cnn_freq_model.predict(X_cnn_freq_input)

In [None]:
import keras_tuner as kt
from tensorflow.keras.layers import Input, Dense, LayerNormalization, Dropout, MultiHeadAttention, GlobalAveragePooling1D, Concatenate, Lambda
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
import tensorflow as tf

# This function defines the searchable model. 
# Note the Input shapes match the features you already extracted.
def build_hypermodel(hp):
    # --- Define hyperparameters for the second, focused search ---
    
    # Best was 256 (the max). Let's explore higher values.
    hp_common_dim = hp.Int('common_dim', min_value=192, max_value=512, step=64)
    
    # Best was 8 (the max). Let's explore around and slightly above it.
    hp_num_heads = hp.Choice('num_heads', values=[6, 8, 10, 12])
    
    # Best was 256 (in the middle). Let's zoom in with smaller steps.
    hp_ff_dim = hp.Int('ff_dim', min_value=192, max_value=384, step=64)
    
    # Best was 0.2 (low end). Let's search a tighter range around it.
    hp_dropout = hp.Float('dropout_rate', min_value=0.1, max_value=0.3, step=0.05)
    
    # Best was 0.00068. Let's center the log-scale search around that magnitude.
    hp_lr = hp.Float('learning_rate', min_value=1e-4, max_value=9e-4, sampling='log')
    
    # Best was 2 (in the middle). Let's see if 2, 3, or 4 is optimal.
    hp_num_blocks = hp.Int('num_transformer_blocks', min_value=2, max_value=4, step=1)

    # --- Your existing model-building code ---
    # (No changes needed to the section below)
    
    bilstm_input = Input(shape=X_bilstm_feat.shape[1:])
    cnn_time_input = Input(shape=X_cnn_time_feat.shape[1:])
    cnn_freq_input = Input(shape=X_cnn_freq_feat.shape[1:])

    bilstm_proj = Dense(hp_common_dim)(bilstm_input)
    cnn_time_proj = Dense(hp_common_dim)(cnn_time_input)
    cnn_freq_proj = Dense(hp_common_dim)(cnn_freq_input)

    x = Lambda(lambda t: tf.stack(t, axis=1))([bilstm_proj, cnn_time_proj, cnn_freq_proj])

    for _ in range(hp_num_blocks):
        attn_output = MultiHeadAttention(num_heads=hp_num_heads, key_dim=hp_common_dim)(x, x)
        x = LayerNormalization(epsilon=1e-6)(x + attn_output)

        ffn_output = Dense(hp_ff_dim, activation='relu')(x)
        ffn_output = Dense(hp_common_dim)(ffn_output)
        x = LayerNormalization(epsilon=1e-6)(x + ffn_output)

    x = GlobalAveragePooling1D()(x)
    x = Dropout(hp_dropout)(x)
    out = Dense(2, activation='softmax')(x)

    model = Model(inputs=[bilstm_input, cnn_time_input, cnn_freq_input], outputs=out)
    
    model.compile(
        optimizer=Adam(learning_rate=hp_lr),
        loss='categorical_crossentropy', 
        metrics=['accuracy', 'AUC']
    )
    return model
# --- This section replaces your original .fit() call ---

import numpy as np
from sklearn.model_selection import train_test_split
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau

# Define your callbacks
callbacks = [
    EarlyStopping(monitor='val_loss', patience=15, restore_best_weights=True),
    ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5)
]

# --- New Step: Create a stable validation set ---
# We will split your pre-computed features and labels one time.

# Get the total number of samples
num_samples = X_bilstm_feat.shape[0]
# Create an array of indices [0, 1, 2, ..., num_samples-1]
indices = np.arange(num_samples)

# Split the *indices* into training and validation sets
# We use random_state=42 to ensure this split is the same every time you run it
train_indices, val_indices = train_test_split(indices, test_size=0.2, random_state=42)

# Create the training sets using the train_indices
X_bilstm_train = X_bilstm_feat[train_indices]
X_cnn_time_train = X_cnn_time_feat[train_indices]
X_cnn_freq_train = X_cnn_freq_feat[train_indices]
y_train_main = y_train[train_indices]

# Create the validation sets using the val_indices
X_bilstm_val = X_bilstm_feat[val_indices]
X_cnn_time_val = X_cnn_time_feat[val_indices]
X_cnn_freq_val = X_cnn_freq_feat[val_indices]
y_val_main = y_train[val_indices]


# --- Now, run the tuner using this stable validation set ---

# Initialize the tuner (using your refined build_hypermodel function)
tuner = kt.RandomSearch(
    build_hypermodel, 
    objective=kt.Objective("val_AUC", direction="max"),
    max_trials=20, 
    executions_per_trial=1,
    directory='my_tuner_dir_run2', # Use a new directory for this new run
    project_name='eeg_fusion_stable'
)

# Run the hyperparameter search
print("--- Starting Stable Hyperparameter Search ---")
tuner.search(
    # Use the new training sets
    [X_bilstm_train, X_cnn_time_train, X_cnn_freq_train],
    y_train_main,
    
    # Use the new validation_data argument instead of validation_split
    validation_data=(
        [X_bilstm_val, X_cnn_time_val, X_cnn_freq_val],
        y_val_main
    ),
    epochs=50,
    batch_size=32,
    callbacks=callbacks
)
print("--- Hyperparameter Search Finished ---")


# --- Get results and train the final model ---

# Get the best hyperparameters
best_hps = tuner.get_best_hyperparameters(num_trials=1)[0]
print("\nOptimal hyperparameters found:")
for hp, value in best_hps.values.items():
    print(f"- {hp}: {value}")

# Build the final model with the best hyperparameters
final_model = tuner.hypermodel.build(best_hps)

# Train the final model on the *entire training set*
# (X_bilstm_feat, X_cnn_time_feat, etc. contain 100% of the training data)
history = final_model.fit(
    [X_bilstm_feat, X_cnn_time_feat, X_cnn_freq_feat],
    y_train,
    epochs=100, # Train for longer
    batch_size=32,
    # We use the same stable validation set again for monitoring
    validation_data=(
        [X_bilstm_val, X_cnn_time_val, X_cnn_freq_val],
        y_val_main
    ),
    callbacks=callbacks 
)

# --- Finally, evaluate the best model on the unseen test data ---
print("\n--- Evaluating best model on the test set ---")
X_test_bilstm_feat = bilstm_model.predict(X_test_selected)
X_test_cnn_time_feat = cnn_time_model.predict(X_test_cnn)
X_test_cnn_freq_feat = cnn_freq_model.predict(X_test_spec)

test_loss, test_acc, test_auc = final_model.evaluate(
    [X_test_bilstm_feat, X_test_cnn_time_feat, X_test_cnn_freq_feat], 
    y_test
)
print(f"\nFinal Test Accuracy: {test_acc:.4f}, Final Test AUC: {test_auc:.4f}")

In [None]:
X_bilstm_test_feat = bilstm_model.predict(X_test_selected)
X_cnn_time_test_feat = cnn_time_model.predict(X_test_cnn)
X_cnn_freq_test_feat = cnn_freq_model.predict(X_test_spec)

In [None]:
from sklearn.metrics import classification_report, roc_auc_score, average_precision_score

y_pred = fusion_model.predict([X_bilstm_test_feat, X_cnn_time_test_feat, X_cnn_freq_test_feat])
y_pred_classes = np.argmax(y_pred, axis=1)
y_true = np.argmax(y_test, axis=1)

print("ROC AUC:", roc_auc_score(y_true, y_pred[:, 1]))
print("Average Precision:", average_precision_score(y_true, y_pred[:, 1]))
print(classification_report(y_true, y_pred_classes))

In [None]:
output_folder = "Models"
os.makedirs(output_folder,exist_ok=True)
model_path = os.path.join(output_folder,"Final_End_to_End_model.keras")
model.save(model_path)

In [None]:
from scipy.stats import mode

def patient_level_ensemble_2(fusion_model, bilstm_model, cnn_time_model, cnn_freq_model, meta_df, voting='soft'):
    y_true = []
    y_pred = []
    y_prob = [] 

    for _, row in meta_df.iterrows():
        subject_id = row['subject_id']
        label_str = row['label']
        true_label = 1 if label_str == 'AD' else 0

        segments = load_and_segment(subject_id)
        if segments.shape[0] == 0:
            continue
        segments = (segments * 1e6) - np.mean(segments * 1e6, axis=2, keepdims=True)
        top_channels = [14, 2, 0, 18, 4]
        bilstm_input = segments[:, top_channels, :].transpose(0, 2, 1)
        cnn_time_input = segments[..., np.newaxis]
        spec_feats, _ = compute_spectral_features(segments)
        cnn_freq_input = spec_feats[..., np.newaxis]

        bilstm_feat = bilstm_model.predict(bilstm_input, verbose=0)
        cnn_time_feat = cnn_time_model.predict(cnn_time_input, verbose=0)
        cnn_freq_feat = cnn_freq_model.predict(cnn_freq_input, verbose=0)

        preds = fusion_model.predict([bilstm_feat, cnn_time_feat, cnn_freq_feat], verbose=0)

        if voting == 'soft':
            avg_prob = np.mean(preds, axis=0)
            y_pred.append(np.argmax(avg_prob))
            y_prob.append(avg_prob[1])
        elif voting == 'hard':
            pred_classes = np.argmax(preds, axis=1)
            voted_class = mode(pred_classes, keepdims=True).mode[0]
            y_pred.append(voted_class)
            y_prob.append(np.mean(preds[:, 1]))

        y_true.append(true_label)

    return np.array(y_true), np.array(y_pred), np.array(y_prob)


In [None]:
yt_hard, yp_hard, prob_hard = patient_level_ensemble_2(fusion_model, bilstm_model, cnn_time_model, cnn_freq_model, test_meta, voting='hard')

In [None]:
from sklearn.metrics import classification_report, roc_auc_score, average_precision_score

def evaluate_predictions(y_true, y_pred, y_prob, voting_type="Soft"):
    print(f"\n=== {voting_type} Voting Results ===")
    print(classification_report(y_true, y_pred))

evaluate_predictions(yt_hard, yp_hard, prob_hard, voting_type="Hard")