# Pattern Recognition and Deep Learning - Final Assignment

Group Members:
- Berber van Drunen (6396410)
- Dean Newar (2755858)
- Frederieke Blom (6433294)
- Jens van der Weide (2492520)
- Joeke Wolterbeek (6798942)
- Jos Kisjes (2675293)

Import modules

In [None]:
import os
import random
import h5py
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, confusion_matrix, classification_report
from keras.models import Sequential
from keras.layers import LSTM, Dense
from keras.optimizers import Adam
from kerastuner.tuners import RandomSearch
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.regularizers import l2
from tensorflow.keras.utils import to_categorical
import tensorflow as tf

Data Loading and Preprocessing class

In [14]:
class DataLoader:
    def __init__(self, base_directory=''):
        self.base_directory = base_directory

    def get_dataset_name(self, file_name_with_dir):
        filename_without_dir = file_name_with_dir.split('/')[-1]
        temp = filename_without_dir.split('_')[:-1]
        dataset_name = "_".join(temp)
        return dataset_name

    def znorm(self, data):
        """
        Normalizes time-wise
        """
        mean_rows = np.mean(data, axis=1, keepdims=True)
        std_rows = np.std(data, axis=1, keepdims=True)
        scaled_data = ((data - mean_rows) / std_rows)
        return scaled_data

    def load_data_from_folder(self, folder, shuffle=True, downsample_factor=4):
        data_directory = os.path.join(self.base_directory, folder)
        data = []
        labels = []

        label_mapping = {
            'rest': 0,
            'task_motor': 1,
            'task_story_math': 2,
            'task_working_memory': 3
        }

        file_names = [file_name for file_name in os.listdir(data_directory) if file_name.endswith(".h5")]
        if shuffle:
            random.shuffle(file_names)

        for file_name in file_names:
            file_path = os.path.join(data_directory, file_name)
            with h5py.File(file_path, 'r') as f:
                dataset_name = self.get_dataset_name(file_name)
                matrix = f.get(dataset_name)[()]

                label = None
                for task_prefix in label_mapping.keys():
                    if task_prefix in file_name:
                        label = label_mapping[task_prefix]  # Use the numerical value from label_mapping
                        break

                if label is not None:
                    matrix = self.znorm(matrix)
                    matrix = matrix[:, ::downsample_factor]

                    data.append(matrix)
                    labels.append(label)  # Append the label directly
                else:
                    print(f"Warning: No label found for file {file_name}")

        return np.array(data), np.array(labels)

# Example usage
data_loader = DataLoader()


## Intra 

### Preprocessing Intra

Loading data

In [15]:
data_train, labels_train = data_loader.load_data_from_folder('./Final Project data/Intra/train')

# Print shapes of loaded data
print(f"Data Shape: {data_train.shape}")
print(f"Labels Shape: {labels_train.shape}")

data_test, labels_test = data_loader.load_data_from_folder('./Final Project data/Intra/test')

# Print shapes of loaded data
print(f"Data Shape: {data_test.shape}")
print(f"Labels Shape: {labels_test.shape}")

Data Shape: (32, 248, 8906)
Labels Shape: (32,)
Data Shape: (8, 248, 8906)
Labels Shape: (8,)


Reshape data

In [17]:
# Reshape data: [nr samples, time steps, features]
X_train = data_train
X_test = data_test
y_train = labels_train
y_test = labels_test

X_train = X_train.reshape((X_train.shape[0], X_train.shape[2], X_train.shape[1]))
X_test = X_test.reshape((X_test.shape[0], X_test.shape[2], X_test.shape[1]))

Convert to categorical data

In [18]:
y_train_encoded = to_categorical(y_train, num_classes=4)
y_test_encoded = to_categorical(y_test, num_classes=4)

NameError: name 'to_categorical' is not defined

### Basic Intra Model

In [None]:
# Function to build baseline LSTM model
def build_baseline_model(hp):
    model = Sequential()

    # Tuning the number of LSTM layers and their units
    for i in range(hp.Int('num_lstm_layers', 1, 4)):
        model.add(LSTM(
            units=hp.Choice('units_' + str(i), values=[16, 32, 64, 128]),
            return_sequences=i < hp.get('num_lstm_layers') - 1,  # Only the last layer should not return sequences
            input_shape=(X_train.shape[1], X_train.shape[2]),
            dropout=hp.Choice('lstm_dropout_', values=[0.1, 0.2, 0.3, 0.4, 0.5])))

    # Final Dense layer for classification
    model.add(Dense(4, activation='softmax'))  # 4 classes

    # Compile model
    model.compile(
        optimizer=tf.keras.optimizers.Adam(
            hp.Choice('learning_rate', values=[1e-2, 1e-3, 1e-4])),
        loss='categorical_crossentropy',
        metrics=['accuracy']
    )
    return model

# Set up the tuner for hyperparameter optimization
tuner = RandomSearch(
    build_baseline_model,
    objective='val_accuracy',
    max_trials=10,
    executions_per_trial=3,
    directory='./',
    project_name='baseline_lstm_hparam_tuning2'
)

# Start the hyperparameter tuning
tuner.search(X_train, y_train_encoded, epochs=10, batch_size=8, validation_data=(X_test, y_test_encoded))

# Get the optimal hyperparameters
best_hps = tuner.get_best_hyperparameters(num_trials=1)[0]

# Early stopping callback
early_stopping_callback = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)

# Train the model with the best hyperparameters
model = tuner.hypermodel.build(best_hps)
model.fit(X_train, y_train_encoded, epochs=50, batch_size=8, validation_data=(X_test, y_test_encoded), callbacks=[early_stopping_callback])

# Model Summary
model.summary()

In [None]:
# Evaluating the model on the training data
train_loss, train_accuracy = model.evaluate(X_train, y_train_encoded)
print(f"Training Accuracy: {train_accuracy*100:.2f}%")

# Evaluating the model on the test data
test_loss, test_accuracy = model.evaluate(X_test, y_test_encoded)
print(f"Test Accuracy: {test_accuracy*100:.2f}%")

In [None]:
## To see the best hyper params + valuesL:
print("Best Hyperparameters:")
for hyperparam in best_hps.values:
    print(f"{hyperparam}: {best_hps.get(hyperparam)}")

In [None]:
from sklearn.metrics import confusion_matrix, classification_report
import seaborn as sns
import matplotlib.pyplot as plt

# Predictions
y_pred = model.predict(X_test)
y_pred_classes = np.argmax(y_pred, axis=1)

# Confusion Matrix
cm = confusion_matrix(np.argmax(y_test_encoded, axis=1), y_pred_classes)
sns.heatmap(cm, annot=True, fmt='d')
plt.ylabel('Actual')
plt.xlabel('Predicted')
plt.title('Confusion Matrix')
plt.show()

# Classification Report for additional metrics
print(classification_report(np.argmax(y_test_encoded, axis=1), y_pred_classes))

### Improved Intra model

Hyperparameter tuning with LSTM model

In [None]:
def build_model(hp):
    model = Sequential()

    # Tuning the number of LSTM layers and their units
    for i in range(hp.Int('num_lstm_layers', 1, 4)):
        model.add(LSTM(
            units=hp.Choice('units_' + str(i), values=[16, 32, 64, 128]), #values=[16, 32, 64, 128]
            return_sequences=i < hp.get('num_lstm_layers') - 1,  # Only the last layer should not return sequences
            input_shape=(X_train.shape[1], X_train.shape[2]),
            dropout=hp.Choice('lstm_dropout_', values=[0.1, 0.2, 0.3, 0.4, 0.5])))

    # Tuning the number of Dense layers and their units
    for i in range(hp.Int('num_dense_layers', 1, 4)):
        model.add(Dense(
            units=hp.Choice('dense_units_' + str(i), values=[16, 32, 64, 128]), #values=[16, 32, 64, 128]
            activation='relu'))

    model.add(Dense(4, activation='softmax'))  # 4 classes

    model.compile(
        optimizer=tf.keras.optimizers.Adam(
            hp.Choice('learning_rate', values=[1e-2, 1e-3, 1e-4])), #values=[1e-2, 1e-3, 1e-4]
        loss='categorical_crossentropy',
        metrics=['accuracy']
    )
    return model

tuner = RandomSearch(
    build_model,
    objective='val_accuracy',
    max_trials=10,
    executions_per_trial=3,
    directory='./',
    project_name='hparam_tuning'
)

tuner.search_space_summary()

tuner.search(X_train, y_train_encoded, epochs=10, batch_size=8, validation_data=(X_test, y_test_encoded))

best_hps = tuner.get_best_hyperparameters(num_trials=1)[0]

Build model with the best hyperparameters

In [None]:
# Early stopping callback
early_stopping_callback = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)

epochs = 50

model = tuner.hypermodel.build(best_hps) 
model.fit(X_train, y_train_encoded, epochs=epochs, batch_size=8, validation_data=(X_test, y_test_encoded), callbacks=[early_stopping_callback])

Model summary

In [None]:
model.summary()

Best hyperparameters

In [None]:
## To see the best hyper params + valuesL:
print("Best Hyperparameters:")
for hyperparam in best_hps.values:
    print(f"{hyperparam}: {best_hps.get(hyperparam)}")

Predictions using model with best hyperparameters + confusion matrix

In [None]:
# Predictions
y_pred = model.predict(X_test)
y_pred_classes = np.argmax(y_pred, axis=1)

# Confusion Matrix
cm = confusion_matrix(np.argmax(y_test_encoded, axis=1), y_pred_classes)
sns.heatmap(cm, annot=True, fmt='d')
plt.ylabel('Actual')
plt.xlabel('Predicted')
plt.title('Confusion Matrix')
plt.show()

# Classification Report for additional metrics
print(classification_report(np.argmax(y_test_encoded, axis=1), y_pred_classes))

Training and test accuracies for training and test data

In [None]:
# Evaluating the model on the training data
train_loss, train_accuracy = model.evaluate(X_train, y_train_encoded)
print(f"Training Accuracy: {train_accuracy*100:.2f}%")

# Evaluating the model on the test data
test_loss, test_accuracy = model.evaluate(X_test, y_test_encoded)
print(f"Test Accuracy: {test_accuracy*100:.2f}%")

Run compile the model 10 times to get a distribution of accuracies

In [None]:
def build_model_from_best_hps(best_hps):
    model = Sequential()
    model.add(LSTM(units=best_hps['units_0'], return_sequences=True, input_shape=(X_train.shape[1], X_train.shape[2]), dropout=best_hps['lstm_dropout_']))
    model.add(LSTM(units=best_hps['units_1'], return_sequences=best_hps['num_lstm_layers'] > 2))
    if best_hps['num_lstm_layers'] > 2:
        model.add(LSTM(units=best_hps['units_2']))
    model.add(Dense(units=best_hps['dense_units_0'], activation='relu'))
    model.add(Dense(units=best_hps['dense_units_1'], activation='relu'))
    if best_hps['num_dense_layers'] > 2:
        model.add(Dense(units=best_hps['dense_units_2'], activation='relu'))
    model.add(Dense(4, activation='softmax'))
    model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=best_hps['learning_rate']), loss='categorical_crossentropy', metrics=['accuracy'])
    return model

num_runs = 10
all_train_accuracies = []
all_test_accuracies = []
best_accuracy = 0
best_model = None

for run in range(num_runs):
    print(f"Training Run {run+1}/{num_runs}")

    model = build_model_from_best_hps(best_hps)
    model.fit(X_train, y_train_encoded, epochs=50, batch_size=8, validation_data=(X_test, y_test_encoded), callbacks=[EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)], verbose=0)

    train_loss, train_accuracy = model.evaluate(X_train, y_train_encoded, verbose=0)
    test_loss, test_accuracy = model.evaluate(X_test, y_test_encoded, verbose=0)

    all_train_accuracies.append(train_accuracy)
    all_test_accuracies.append(test_accuracy)

    if test_accuracy > best_accuracy:
        best_accuracy = test_accuracy
        best_model = tf.keras.models.clone_model(model)
        best_model.set_weights(model.get_weights())

print(f"Average Training Accuracy: {np.mean(all_train_accuracies):.2f}%")
print(f"Average Test Accuracy: {np.mean(all_test_accuracies):.2f}%")
print(f"Best Model Test Accuracy: {best_accuracy:.2f}%")

# Confusion Matrix and Classification Report for the best model
y_pred = best_model.predict(X_test)
y_pred_classes = np.argmax(y_pred, axis=1)
cm = confusion_matrix(np.argmax(y_test_encoded, axis=1), y_pred_classes)
sns.heatmap(cm, annot=True, fmt='d')
plt.ylabel('Actual')
plt.xlabel('Predicted')
plt.title('Confusion Matrix')
plt.show()
print(classification_report(np.argmax(y_test_encoded, axis=1), y_pred_classes))

## Cross

### Preprocessing Cross

Loading data

In [None]:
# Load data and labels for each subset
data_train1, labels_train1 = data_loader.load_data_from_folder('./Final Project data/Cross/train')
data_test1, labels_test1 = data_loader.load_data_from_folder('./Final Project data/Cross/test1')
data_test2, labels_test2 = data_loader.load_data_from_folder('./Final Project data/Cross/test2')
data_test3, labels_test3 = data_loader.load_data_from_folder('./Final Project data/Cross/test3')

# Print shapes of loaded data
print(f"Train Data Shape: {data_train1.shape}, Train Labels Shape: {labels_train1.shape}")
print(f"Test1 Data Shape: {data_test1.shape}, Test1 Labels Shape: {labels_test1.shape}")
print(f"Test2 Data Shape: {data_test2.shape}, Test2 Labels Shape: {labels_test2.shape}")
print(f"Test3 Data Shape: {data_test3.shape}, Test3 Labels Shape: {labels_test3.shape}")

Reshape data

In [None]:
# reshapen zodat het in de vorm: [nr samples, time steps, features] is, voor LSTM
X_train = data_train1
X_test1, X_test2, X_test3 = data_test1, data_test2, data_test3
y_train = labels_train1
y_test1, y_test2, y_test3 = labels_test1, labels_test2, labels_test3

X_train = X_train.reshape((X_train.shape[0], X_train.shape[2], X_train.shape[1]))
X_test1, X_test2, X_test3 = X_test1.reshape((X_test1.shape[0], X_test1.shape[2], X_test1.shape[1])), X_test2.reshape((X_test2.shape[0], X_test2.shape[2], X_test2.shape[1])), X_test3.reshape((X_test3.shape[0], X_test3.shape[2], X_test3.shape[1]))

Convert to categorical data

In [None]:
y_train_encoded = to_categorical(y_train, num_classes=4)
y_test1_encoded, y_test2_encoded, y_test3_encoded = to_categorical(y_test1, num_classes=4), to_categorical(y_test2, num_classes=4), to_categorical(y_test3, num_classes=4)

In [None]:
X_test_res = np.concatenate((X_test2, X_test3), axis=0)
y_test_res = np.concatenate((y_test2, y_test3), axis=0)

### Basic Cross Model

In [None]:
def build_model(hp):
    model = Sequential()

    # Tuning the number of LSTM layers and their units
    for i in range(hp.Int('num_lstm_layers', 2, 4)):
        model.add(LSTM(
            units=hp.Choice('units_' + str(i), values=[32, 64, 128, 256]), 
            return_sequences=i < hp.get('num_lstm_layers') - 1, 
            input_shape=(X_train.shape[1], X_train.shape[2])
            ))

    model.add(Dense(4, activation='softmax'))  # 4 classes

    model.compile(
        optimizer=tf.keras.optimizers.Adam(
            hp.Choice('learning_rate', values=[1e-2, 1e-3, 1e-4, 1e-5]))
        loss='categorical_crossentropy',
        metrics=['accuracy']
    )
    return model

tuner = RandomSearch(
    build_model,
    objective='val_accuracy',
    max_trials=10,
    executions_per_trial=1,
    directory='./',
    project_name='hparam_tuning'
)

tuner.search_space_summary()

# Start the hyperparameter tuning
tuner.search(X_train, y_train_encoded, epochs=10, batch_size=8, validation_data=(X_test1, y_test1_encoded))
# epochs + batch size voor nu constant (behalve epochs bij fitten van model)

# Get the optimal hyperparameters
best_hps = tuner.get_best_hyperparameters(num_trials=1)[0]

# Train the model with the best hyperparameters
model = tuner.hypermodel.build(best_hps)
model.fit(X_train, y_train_encoded, epochs=50, batch_size=8, validation_data=(X_test1, y_test1_encoded), callbacks=[early_stopping_callback])

# Model Summary
model.summary()

In [None]:
# Evaluating the model on the training data
train_loss, train_accuracy = model.evaluate(X_train, y_train_encoded)
print(f"Training Accuracy: {train_accuracy*100:.2f}%")

# Evaluating the model on the test data
test_loss, test_accuracy = model.evaluate(X_test_res, y_test_res)
print(f"Test Accuracy: {test_accuracy*100:.2f}%")

In [None]:
## To see the best hyper params + valuesL:
print("Best Hyperparameters:")
for hyperparam in best_hps.values:
    print(f"{hyperparam}: {best_hps.get(hyperparam)}")

In [None]:
from sklearn.metrics import confusion_matrix, classification_report
import seaborn as sns
import matplotlib.pyplot as plt

# Predictions
y_pred = model.predict(X_test_res)
y_pred_classes = np.argmax(y_pred, axis=1)

# Confusion Matrix
cm = confusion_matrix(np.argmax(y_test_res, axis=1), y_pred_classes)
sns.heatmap(cm, annot=True, fmt='d')
plt.ylabel('Actual')
plt.xlabel('Predicted')
plt.title('Confusion Matrix')
plt.show()

# Classification Report for additional metrics
print(classification_report(np.argmax(y_test_res, axis=1), y_pred_classes))

### Improved Cross Model

Hyperparameter tuning with LSTM model

In [None]:
def build_model(hp):
    model = Sequential()

    # Tuning the number of LSTM layers and their units
    for i in range(hp.Int('num_lstm_layers', 2, 4)):
        model.add(LSTM(
            units=hp.Choice('units_' + str(i), values=[32, 64, 128, 256]), 
            return_sequences=i < hp.get('num_lstm_layers') - 1,  
            input_shape=(X_train.shape[1], X_train.shape[2]),
            dropout=hp.Choice('lstm_dropout_', values=[0.2, 0.3, 0.4, 0.5]),
            kernel_regularizer=l2(hp.Choice('lstm_kernel_regularizer_', values=[1e-3, 1e-4, 1e-5]))
            ))

    # Tuning the number of Dense layers and their units
    for i in range(hp.Int('num_dense_layers', 2, 6)):
        model.add(Dense(
            units=hp.Choice('dense_units_' + str(i), values=[32, 64, 128, 256])
            activation='relu',
            kernel_regularizer=l2(hp.Choice('dense_kernel_regularizer_', values=[1e-3, 1e-4, 1e-5]))))

    model.add(Dense(4, activation='softmax'))  # 4 classes

    model.compile(
        optimizer=tf.keras.optimizers.Adam(
            hp.Choice('learning_rate', values=[1e-2, 1e-3, 1e-4, 1e-5]))
        loss='categorical_crossentropy',
        metrics=['accuracy']
    )
    return model

tuner = RandomSearch(
    build_model,
    objective='val_accuracy',
    max_trials=10,
    executions_per_trial=1,
    directory='./',
    project_name='hparam_tuning'
)

tuner.search_space_summary()

# Start the hyperparameter tuning
tuner.search(X_train, y_train_encoded, epochs=10, batch_size=8, validation_data=(X_test1, y_test1_encoded))

# Get the optimal hyperparameters
best_hps = tuner.get_best_hyperparameters(num_trials=1)[0]



Build model with the best hyperparameters

In [None]:
# Early stopping callback
early_stopping_callback = EarlyStopping(monitor='val_loss', patience=8, restore_best_weights=True)

epochs = 50


model = tuner.hypermodel.build(best_hps)

model.fit(X_train, y_train_encoded, epochs=epochs, batch_size=32, validation_data=(X_test1, y_test1_encoded), callbacks=[early_stopping_callback])


Model summary

In [None]:
model.summary()

Best Hyperparameters

In [None]:
print("Best Hyperparameters:")
for hyperparam in best_hps.values:
    print(f"{hyperparam}: {best_hps.get(hyperparam)}")

Predictions using model with best hyperparameters + confusion matrix

In [None]:
# Predictions
y_pred = model.predict(X_test_res)
y_pred_classes = np.argmax(y_pred, axis=1)

# Confusion Matrix
cm = confusion_matrix(np.argmax(y_test_res, axis=1), y_pred_classes)
sns.heatmap(cm, annot=True, fmt='d')
plt.ylabel('Actual')
plt.xlabel('Predicted')
plt.title('Confusion Matrix')
plt.show()

# Classification Report for additional metrics
print(classification_report(np.argmax(y_test_res, axis=1), y_pred_classes))

Training and test accuracies for training and test data

In [None]:
# Evaluating the model on the training data
train_loss, train_accuracy = model.evaluate(X_train, y_train_encoded)
print(f"Training Accuracy: {train_accuracy*100:.2f}%")

# Evaluating the model on the test data
test_loss, test_accuracy = model.evaluate(X_test, y_test_encoded)
print(f"Test Accuracy: {test_accuracy*100:.2f}%")

Run compile the model 10 times to get a distribution of accuracies

In [None]:
def build_model_from_best_hps(loaded_hyperparameters): 
    model = Sequential()

    # Adding LSTM layers as per loaded hyperparameters
    for i in range(loaded_hyperparameters['num_lstm_layers']):
        model.add(LSTM(
            units=loaded_hyperparameters[f'units_{i}'],
            return_sequences=i < loaded_hyperparameters['num_lstm_layers'] - 1,
            input_shape=(X_train.shape[1], X_train.shape[2]),
            dropout=loaded_hyperparameters.get('lstm_dropout_', 0),
            kernel_regularizer=l2(loaded_hyperparameters.get('lstm_kernel_regularizer_'))
            ))

    # Adding Dense layers as per loaded hyperparameters
    for i in range(loaded_hyperparameters['num_dense_layers']):
        model.add(Dense(
            units=loaded_hyperparameters[f'dense_units_{i}'],
            activation='relu',
            kernel_regularizer=l2(loaded_hyperparameters.get('dense_kernel_regularizer_'))
            ))

    model.add(Dense(4, activation='softmax'))  # 4 classes

    model.compile(
        optimizer=tf.keras.optimizers.Adam(loaded_hyperparameters['learning_rate']),
        loss='categorical_crossentropy',
        metrics=['accuracy']
    )
    return model

In [None]:
num_runs = 10
all_train_accuracies = []
all_test_accuracies = []
best_accuracy = 0
best_model = None

for run in range(num_runs):
    print(f"Training Run {run+1}/{num_runs}")

    model = build_model_from_best_hps(best_hps)
    model.fit(X_train, y_train_encoded, epochs=50, batch_size=8, validation_data=(X_test1, y_test1_encoded), callbacks=[EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)], verbose=2)

    train_loss, train_accuracy = model.evaluate(X_train, y_train_encoded, verbose=0)
    test_loss, test_accuracy = model.evaluate(X_test_res, y_test_res, verbose=0)

    all_train_accuracies.append(train_accuracy)
    all_test_accuracies.append(test_accuracy)

    if test_accuracy > best_accuracy:
        best_accuracy = test_accuracy
        best_model = tf.keras.models.clone_model(model)
        best_model.set_weights(model.get_weights())

print(f"Average Training Accuracy: {np.mean(all_train_accuracies):.2f}%")
print(f"Average Test Accuracy: {np.mean(all_test_accuracies):.2f}%")
print(f"Best Model Test Accuracy: {best_accuracy:.2f}%")

In [None]:
# Confusion Matrix and Classification Report for the best model
y_pred = best_model.predict(X_test_res)
y_pred_classes = np.argmax(y_pred, axis=1)
cm = confusion_matrix(np.argmax(y_test_res, axis=1), y_pred_classes)
sns.heatmap(cm, annot=True, fmt='d')
plt.ylabel('Actual')
plt.xlabel('Predicted')
plt.title('Confusion Matrix')
plt.show()
print(classification_report(np.argmax(y_test_res, axis=1), y_pred_classes))