#**Motor Imagery EEG Signal Analysis for Finger Movement: An Approach to Brain-Computer Interfaces**

---
`In this study, we aimed to classify finger movements using brain signals captured through EEG. We focused on five distinct classes of finger movements: Thumb, Index, Middle, Ring, and Pinky. Utilizing RNN for feature extraction from EEG data, we reduced the complexity by using only 4 channels out of the original 64. This reduction was achieved before applying Continuous Wavelet Transform (CWT) analysis with the Morlet mother wavelet and a scale of 10
`



In [None]:
# Eng/Amr Mostafa Omar
# Nile University ,Cairo, Egypt
# Data 20/6/2024

In [2]:
from google.colab import drive

# Mount Google Drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
pip install PyWavelets

In [14]:
import scipy.io
import numpy as np
import matplotlib.pyplot as plt
from scipy.signal import welch, butter, filtfilt
from scipy.signal import butter, filtfilt
from sklearn.preprocessing import RobustScaler
from sklearn.model_selection import train_test_split
import gc
import seaborn as sns
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import Callback
from sklearn.metrics import confusion_matrix, classification_report
import pywt
import psutil
import os

##**Step 1: Loading Data**

In [None]:
# Function to load data from a .mat file
def load_data_from_mat(mat_file_path):
    mat_data = scipy.io.loadmat(mat_file_path)
    data = mat_data['Data'][0, 0]
    X = data['trials']  # EEG data (256, 64, 900)
    y = data['Labels'].flatten()  # Target labels (900,)
    Fs = data['Fs'][0, 0]  # Sampling frequency
    Channels = [ch[0] for ch in data['Channels'][0]]  # EEG channels
    return X, y, Fs, Channels

# List of file paths
mat_file_paths = [
    '/content/drive/MyDrive/Folder 1/Power_Spectrum_trials_subject_1.mat',
    '/content/drive/MyDrive/Folder 1/Power_Spectrum_trials_subject_2.mat',
    '/content/drive/MyDrive/Folder 1/Power_Spectrum_trials_subject_3.mat',
    '/content/drive/MyDrive/Folder 1/Power_Spectrum_trials_subject_4.mat',
    '/content/drive/MyDrive/Folder 1/Power_Spectrum_trials_subject_5.mat'
]


# Initialize lists to hold all data
X_all = []
Y_all = []

# Load all data
for mat_file_path in mat_file_paths:
    X, y, Fs, Channels = load_data_from_mat(mat_file_path)
    X_all.append(X)
    Y_all.append(y)

# Convert lists to arrays
X = np.concatenate(X_all, axis=2)  # Concatenate along the trials axis
y = np.concatenate(Y_all)  # Concatenate labels

# Print shapes to verify
print("Combined EEG Data Shape (Trials):", X.shape)
print("Combined Labels Shape:", y.shape)
print("Sampling Frequency:", Fs)
print("Channels:", Channels)


##**Step 2: Preprocessing**

In [None]:
# Define band-pass filter parameters
lowcut = 8.0
highcut = 30.0
order = 5

# Function to apply band-pass filter
def bandpass_filter(data, lowcut, highcut, fs, order=5):
    nyquist = 0.5 * fs
    low = lowcut / nyquist
    high = highcut / nyquist
    b, a = butter(order, [low, high], btype='band')
    filtered_data = filtfilt(b, a, data, axis=0)
    return filtered_data

# Apply band-pass filter to all trials and channels
filtered_data = np.zeros_like(X)
for trial in range(X.shape[2]):
    for channel in range(X.shape[1]):
        filtered_data[:, channel, trial] = bandpass_filter(X[:, channel, trial], lowcut, highcut, Fs, order=order)

# Common Average Referencing (CAR)
mean_signal = np.mean(filtered_data, axis=1, keepdims=True)
filtered_data -= mean_signal

# Define the list of channels to keep
selected_channels = ['T7',  'TP7','PO3',  'PO7']


# Get indices of selected channels
channel_indices = [Channels.index(ch) for ch in selected_channels]

# Select only the specified channels
filtered_data_selected = filtered_data[:, channel_indices, :]

# Print shape to verify
print("Filtered and CAR Processed Data Shape (Selected Channels):", filtered_data_selected.shape)

# Reshape 3D data to 2D
reshaped_data = filtered_data_selected.reshape(-1, filtered_data_selected.shape[1])

# Apply RobustScaler
scaler = RobustScaler()
scaled_data = scaler.fit_transform(reshaped_data)

# Reshape back to 3D if necessary
scaled_data_3d = scaled_data.reshape(filtered_data_selected.shape)

print("Scaled Data Shape (Selected Channels):", scaled_data_3d.shape)


##**Step 3 :Split Data into Classes**

In [None]:
# Initialize arrays for different classes (Thumb, Index, Middle, Ring, Pinky)
X_thumb, Y_thumb = None, None
X_index, Y_index = None, None
X_middle, Y_middle = None, None
X_ring, Y_ring = None, None
X_pinky, Y_pinky = None, None

# Function to split and assign data
def assign_data_to_class(class_label, scaled_data_3d, y, percentage=0.25):
    # Select the target trials for the current class
    target_trials = scaled_data_3d[:, :, y == class_label]
    num_target_trials = target_trials.shape[2]

    # Determine the number of non-target trials to select from each other class
    num_non_target_trials_per_class = int(percentage * num_target_trials)

    # Select the non-target trials from other classes (non-randomly, using the first 25%)
    non_target_trials = []
    for other_class_label in range(1, 6):
        if other_class_label != class_label:
            other_class_trials = scaled_data_3d[:, :, y == other_class_label]
            non_target_trials.append(other_class_trials[:, :, :num_non_target_trials_per_class])
    non_target_trials = np.concatenate(non_target_trials, axis=2)

    # Combine target and non-target trials
    combined_trials = np.concatenate((target_trials, non_target_trials), axis=2)
    combined_labels = np.array([1] * num_target_trials + [0] * non_target_trials.shape[2])  # 1 for target, 0 for non-target

    return combined_trials, combined_labels

# Assign data to each class
X_thumb, Y_thumb = assign_data_to_class(1, scaled_data_3d, y)
X_index, Y_index = assign_data_to_class(2, scaled_data_3d, y)
X_middle, Y_middle = assign_data_to_class(3, scaled_data_3d, y)
X_ring, Y_ring = assign_data_to_class(4, scaled_data_3d, y)
X_pinky, Y_pinky = assign_data_to_class(5, scaled_data_3d, y)

# Print shapes to verify
print("Thumb Data Shape (Trials):", X_thumb.shape, "Labels Shape:", Y_thumb.shape)
print("Index Data Shape (Trials):", X_index.shape, "Labels Shape:", Y_index.shape)
print("Middle Data Shape (Trials):", X_middle.shape, "Labels Shape:", Y_middle.shape)
print("Ring Data Shape (Trials):", X_ring.shape, "Labels Shape:", Y_ring.shape)
print("Pinky Data Shape (Trials):", X_pinky.shape, "Labels Shape:", Y_pinky.shape)


##**Step 4: Split Data into Training and Testing Sets**

In [None]:
# Function to split data into train and test sets
def split_data(trials, labels):
    # Split into train and test sets (80:20 ratio)
    train_trials, test_trials, train_labels, test_labels = train_test_split(
        trials.transpose(2, 0, 1), labels, test_size=0.2, random_state=42, stratify=labels)

    # Transpose back to (256, 64, num_trials) shape
    train_trials = train_trials.transpose(1, 2, 0)
    test_trials = test_trials.transpose(1, 2, 0)

    return train_trials, test_trials, train_labels, test_labels

# Split each part into train and test sets
X_train_thumb, X_test_thumb, Y_train_thumb, Y_test_thumb = split_data(X_thumb, Y_thumb)
X_train_index, X_test_index, Y_train_index, Y_test_index = split_data(X_index, Y_index)
X_train_middle, X_test_middle, Y_train_middle, Y_test_middle = split_data(X_middle, Y_middle)
X_train_ring, X_test_ring, Y_train_ring, Y_test_ring = split_data(X_ring, Y_ring)
X_train_pinky, X_test_pinky, Y_train_pinky, Y_test_pinky = split_data(X_pinky, Y_pinky)

# Print shapes to verify
print("Thumb Train Data Shape (Trials):", X_train_thumb.shape, "Train Labels Shape:", Y_train_thumb.shape)
print("Thumb Test Data Shape (Trials):", X_test_thumb.shape, "Test Labels Shape:", Y_test_thumb.shape)

print("Index Train Data Shape (Trials):", X_train_index.shape, "Train Labels Shape:", Y_train_index.shape)
print("Index Test Data Shape (Trials):", X_test_index.shape, "Test Labels Shape:", Y_test_index.shape)

print("Middle Train Data Shape (Trials):", X_train_middle.shape, "Train Labels Shape:", Y_train_middle.shape)
print("Middle Test Data Shape (Trials):", X_test_middle.shape, "Test Labels Shape:", Y_test_middle.shape)

print("Ring Train Data Shape (Trials):", X_train_ring.shape, "Train Labels Shape:", Y_train_ring.shape)
print("Ring Test Data Shape (Trials):", X_test_ring.shape, "Test Labels Shape:", Y_test_ring.shape)

print("Pinky Train Data Shape (Trials):", X_train_pinky.shape, "Train Labels Shape:", Y_train_pinky.shape)
print("Pinky Test Data Shape (Trials):", X_test_pinky.shape, "Test Labels Shape:", Y_test_pinky.shape)


#**Step 5: Apply Continuous Wavelet Transform (CWT)**

In [9]:
def apply_cwt(data):
    scales = np.arange(1, 101)  #  scales from 1 to 100
    cwt_coeffs = []
    for trial in data.transpose(2, 1, 0):  # Transpose to (trials, channels, samples)
        trial_coeffs = []
        for channel in trial:
            coeffs, freqs = pywt.cwt(channel, scales, 'morl')  # wavelet 'morl'
            trial_coeffs.append(coeffs)
        cwt_coeffs.append(trial_coeffs)
    cwt_coeffs = np.array(cwt_coeffs)
    # Transpose to get shape (trials, samples, coefficients, channels)
    cwt_coeffs = cwt_coeffs.transpose(0, 2, 3, 1)
    return cwt_coeffs

##**Step 6 :Train and Evaluate RNN model**

In [10]:
# Memory management functions
def print_memory_usage():
    process = psutil.Process(os.getpid())
    print(f"Memory usage: {process.memory_info().rss / 1024 ** 2:.2f} MB")

def clear_memory():
    gc.collect()
    print_memory_usage()

class ClearMemory(Callback):
    def on_epoch_end(self, epoch, logs=None):
        clear_memory()

# Transformer block for numerical data
class TransformerBlock(tf.keras.layers.Layer):
    def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1):
        super(TransformerBlock, self).__init__()
        self.att = MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
        self.ffn = tf.keras.Sequential(
            [Dense(ff_dim, activation="relu"), Dense(embed_dim),]
        )
        self.layernorm1 = LayerNormalization(epsilon=1e-6)
        self.layernorm2 = LayerNormalization(epsilon=1e-6)
        self.dropout1 = Dropout(rate)
        self.dropout2 = Dropout(rate)

    def call(self, inputs, training):
        attn_output = self.att(inputs, inputs)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(inputs + attn_output)
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        return self.layernorm2(out1 + ffn_output)

# Define datasets and labels (as per your current setup)
datasets = {
    'thumb': (X_train_thumb, X_test_thumb, Y_train_thumb, Y_test_thumb),
    'index': (X_train_index, X_test_index, Y_train_index, Y_test_index),
    'middle': (X_train_middle, X_test_middle, Y_train_middle, Y_test_middle),
    'ring': (X_train_ring, X_test_ring, Y_train_ring, Y_test_ring),
    'pinky': (X_train_pinky, X_test_pinky, Y_train_pinky, Y_test_pinky),
}

results = {}

In [None]:
# Loop over each dataset
for finger, (X_train, X_test, Y_train, Y_test) in datasets.items():

    # Step 3: Organize Data into RNN-Ready Format using CWT
    X_train_cwt = apply_cwt(X_train)
    X_test_cwt = apply_cwt(X_test)

    # Concatenate CWT coefficients across channels
    X_train_concat = np.concatenate(X_train_cwt, axis=-1)
    X_test_concat = np.concatenate(X_test_cwt, axis=-1)

    # Clear memory after concatenating CWT coefficients
    del X_train_cwt, X_test_cwt
    gc.collect()

    # Reshape data to match RNN input shape (timesteps, features)
    X_train_concat = X_train_concat.reshape(-1, 100, 256 * 4)
    X_test_concat = X_test_concat.reshape(-1, 100, 256 * 4)

    # Build RNN model
    model = Sequential()
    model.add(LSTM(128, input_shape=(100, 256 * 4), return_sequences=True))
    model.add(LSTM(64, return_sequences=False))
    model.add(Dense(64, activation='relu'))
    model.add(Dense(1, activation='sigmoid'))

    # Compile model
    model.compile(optimizer=Adam(lr=1e-5), loss='binary_crossentropy', metrics=['accuracy'])
    model.summary()

    # Train model with custom callback to clear memory after each epoch
    history = model.fit(X_train_concat, Y_train, epochs=15, batch_size=32, validation_data=(X_test_concat, Y_test), callbacks=[ClearMemory()])

    # Evaluate model
    loss, accuracy = model.evaluate(X_test_concat, Y_test)
    results[finger] = accuracy
    print(f"Accuracy for {finger}: {accuracy:.4f}")

    # Plot training & validation accuracy values
    plt.figure(figsize=(12, 4))
    plt.subplot(1, 2, 1)
    plt.plot(history.history['accuracy'])
    plt.plot(history.history['val_accuracy'])
    plt.title(f'{finger.capitalize()} Model Accuracy')
    plt.ylabel('Accuracy')
    plt.xlabel('Epoch')
    plt.legend(['Train', 'Test'], loc='upper left')

    # Plot training & validation loss values
    plt.subplot(1, 2, 2)
    plt.plot(history.history['loss'])
    plt.plot(history.history['val_loss'])
    plt.title(f'{finger.capitalize()} Model Loss')
    plt.ylabel('Loss')
    plt.xlabel('Epoch')
    plt.legend(['Train', 'Test'], loc='upper left')

    plt.show()

    # Predict the values from the test set
    Y_pred = (model.predict(X_test_concat) > 0.5).astype("int32")

    # Confusion matrix
    cm = confusion_matrix(Y_test, Y_pred)
    plt.figure(figsize=(6, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=['Not Target', 'Target'], yticklabels=['Not Target', 'Target'])
    plt.ylabel('Actual')
    plt.xlabel('Predicted')
    plt.title(f'{finger.capitalize()} Confusion Matrix')
    plt.show()

    # Classification report
    print(f'{finger.capitalize()} Classification Report:')
    print(classification_report(Y_test, Y_pred, target_names=['Not Target', 'Target']))

# Print final accuracies
print("Final accuracies:")
for finger, accuracy in results.items():
    print(f"{finger}: {accuracy:.4f}")
