In [1]:
import scipy.io as sio
import numpy as np
import pandas as pd
import os

# Define the directory containing the MATLAB files
# directory = '/Users/junaeidshoaib/Desktop/Dessertation/Dataset/Matlab_49/BCICIV_1_mat'
directory = '/Users/junaeidshoaib/Desktop/Dessertation/Dataset/Matlab/BCICIV_1calib_1000Hz_mat'

# List all MATLAB files in the directory
file_paths = [os.path.join(directory, file) for file in os.listdir(directory) if file.endswith('.mat')]

# Initialize an empty list to hold dataframes
dfs = []

# Load each file and extract the EEG signals
for file_path in file_paths:
    data = sio.loadmat(file_path)
    cnt = 0.1 * np.array(data['cnt'], dtype=np.float64)  # Convert to microvolts
    df = pd.DataFrame(cnt)
    dfs.append(df)

# Concatenate all dataframes into a single dataframe
full_df = pd.concat(dfs, ignore_index=True)

print(f'Combined DataFrame shape: {full_df.shape}')


Combined DataFrame shape: (13337509, 59)


In [2]:
# Display the first few rows of the DataFrame
print(full_df.head())


     0     1     2     3     4     5     6     7     8     9   ...    49  \
0 -83.2 -35.7 -38.1 -58.0 -86.1 -27.1 -64.4 -60.8 -31.2 -36.0  ... -73.8   
1 -77.0 -28.6 -34.5 -53.3 -80.9 -20.7 -58.3 -56.3 -27.0 -34.4  ... -69.4   
2 -74.2 -23.5 -31.7 -50.9 -77.3 -16.5 -54.2 -52.6 -24.1 -34.3  ... -67.3   
3 -75.3 -22.6 -33.5 -52.1 -78.4 -16.6 -54.7 -52.5 -25.4 -37.3  ... -69.9   
4 -76.9 -24.3 -36.7 -54.4 -81.2 -18.8 -57.5 -55.2 -29.6 -39.3  ... -74.1   

     50    51    52    53    54    55    56    57    58  
0 -28.6 -64.7 -22.5 -23.0 -65.4 -83.6 -40.4 -41.3 -38.1  
1 -24.4 -60.5 -19.1 -17.5 -60.1 -78.6 -35.6 -37.9 -32.7  
2 -22.6 -57.5 -17.1 -15.1 -56.5 -77.0 -33.9 -36.4 -30.5  
3 -24.4 -59.8 -18.5 -17.9 -58.8 -81.1 -37.8 -39.0 -34.3  
4 -28.1 -64.3 -20.9 -21.8 -63.1 -84.8 -42.6 -43.2 -39.1  

[5 rows x 59 columns]


In [None]:
from scipy.signal import butter, filtfilt

# Function to create a Butterworth band-pass filter
def butter_bandpass(lowcut, highcut, fs, order=5):
    nyquist = 0.5 * fs
    low = lowcut / nyquist
    high = highcut / nyquist
    b, a = butter(order, [low, high], btype='band')
    return b, a

# Function to apply the Butterworth filter to the data
def butter_bandpass_filter(data, lowcut, highcut, fs, order=4):
    b, a = butter_bandpass(lowcut, highcut, fs, order=order)
    y = filtfilt(b, a, data, axis=0)
    return y

# Parameters for the bandpass filter
lowcut = 8.0  # Alpha band lower edge
highcut = 30.0  # Beta band upper edge
fs = 1000  # Sampling rate (already downsampled to 100 Hz)

# Apply the filter to the entire dataframe
filtered_data = butter_bandpass_filter(full_df.values, lowcut, highcut, fs)

# Convert the filtered data back to a DataFrame
filtered_df = pd.DataFrame(filtered_data, columns=full_df.columns)

# Display the shape and first few rows of the filtered DataFrame
print(f'Filtered DataFrame shape: {filtered_df.shape}')
print(filtered_df.head())


In [None]:
import matplotlib.pyplot as plt

# Set up the figure with a grid of subplots for each channel
fig, axes = plt.subplots(4, 2, figsize=(16, 12), sharex=True)

channels = [0, 1, 2, 3]  # Channels to plot
labels = ['Channel 1', 'Channel 2', 'Channel 3', 'Channel 4']

for i, ch in enumerate(channels):
    # Plot original EEG signal
    axes[i, 0].plot(full_df.iloc[:1300000, ch], label=labels[i], color='b')
    axes[i, 0].set_title(f'Original EEG Signal - {labels[i]}')
    axes[i, 0].set_ylabel('Amplitude (µV)')
    axes[i, 0].grid(True)

    # Plot filtered EEG signal
    axes[i, 1].plot(filtered_df.iloc[:1300000, ch], label=labels[i], color='r')
    axes[i, 1].set_title(f'Filtered EEG Signal - {labels[i]}')
    axes[i, 1].set_ylabel('Amplitude (µV)')
    axes[i, 1].grid(True)

# Set common x-axis label
for ax in axes[-1, :]:
    ax.set_xlabel('Samples')

plt.tight_layout()
plt.show()


In [None]:
import numpy as np
import pandas as pd


mrk = data['mrk']

# Extract the marker positions (in samples) and the corresponding class labels
mrk_pos = np.array(mrk['pos'][0][0], dtype=np.int32).flatten()  # Positions of the cues
mrk_y = np.array(mrk['y'][0][0], dtype=np.int32).flatten()  # Target classes (-1 for class one, 1 for class two)

# Define the length of each trial (4 seconds = 400 samples at 100 Hz)
trial_length = 400  # Only the task period

# Initialize lists to store trials and labels
trials = []
labels = []

# Loop over each marker position and extract the corresponding trial
for pos, label in zip(mrk_pos, mrk_y):
    # Ensure the trial doesn't exceed the data length
    if pos + trial_length <= len(filtered_df):
        # Extract the trial data starting from the marker position for 4 seconds
        trial = filtered_df.iloc[pos:pos + trial_length].values
        trials.append(trial)
        labels.append(label)

# Convert to numpy arrays for easier manipulation
trials = np.array(trials)
labels = np.array(labels)

# Display the shape of the segmented trials
print(f'Number of trials: {trials.shape[0]}')
print(f'Trial shape (samples x channels): {trials.shape[1:]}')

# Optional: Visualize the first trial of the first channel
import matplotlib.pyplot as plt

plt.plot(trials[0][:, 0])
plt.title(f'First Trial - Channel 1 - Label: {labels[0]}')
plt.show()


In [None]:
# Sampling rate
fs = 100  # Hz

# Display start and end times for the first 5 trials
for i in  range(min(5, len(mrk_pos))):
    start_sample = mrk_pos[i]
    end_sample = start_sample + trial_length
    start_time = start_sample / fs
    end_time = end_sample / fs
    
    print(f"Trial {i+1}: Start time = {start_time:.2f}s, End time = {end_time:.2f}s")


In [None]:
# Check the first few marker positions
print("First few marker positions (in samples):", mrk_pos[:10])

# Convert marker positions to time (in seconds)
marker_times = mrk_pos / fs
print("First few marker times (in seconds):", marker_times[:10])


In [None]:
print(f'Number of markers: {len(mrk_pos)}')


In [None]:
from sklearn.model_selection import train_test_split

# Perform stratified split
X_train, X_test, y_train, y_test = train_test_split(trials, labels, test_size=0.3, random_state=42, stratify=labels)

# Check the new distribution
unique_train, counts_train = np.unique(y_train, return_counts=True)
train_distribution = dict(zip(unique_train, counts_train))
print(f'Stratified Training set class distribution: {train_distribution}')

unique_test, counts_test = np.unique(y_test, return_counts=True)
test_distribution = dict(zip(unique_test, counts_test))
print(f'Stratified Test set class distribution: {test_distribution}')


In [None]:
import numpy as np
from mne.decoding import CSP

# Initialize CSP object with desired parameters
csp = CSP(n_components=59, reg='ledoit_wolf', log=True, norm_trace=False)


# Fit CSP on the training data
X_train_csp = csp.fit_transform(X_train, y_train)

# Apply CSP to the test data
X_test_csp = csp.transform(X_test)

# Output the shape of the CSP-transformed data
print(f'CSP-transformed training set shape: {X_train_csp.shape}')
print(f'CSP-transformed test set shape: {X_test_csp.shape}')


In [None]:
from pyswarm import pso
import random
from sklearn.svm import SVC
from sklearn.model_selection import cross_val_score
import numpy as np

# Set the random seed for reproducibility
random_seed = 71
np.random.seed(random_seed)
random.seed(random_seed)

# Define the objective function for PSO
def svm_evaluate(features):
    # Convert binary mask to feature selection
    selected_features = X_train_csp[:, features > 0.5]
    
    # Ensure at least one feature is selected
    if selected_features.shape[1] == 0:
        return 1.0  # Assign a poor score if no features are selected
    
    # Train an SVM classifier on the selected features
    clf = SVC(kernel='rbf', C=100, gamma='auto', random_state=random_seed)
    
    # Perform cross-validation to assess accuracy
    scores = cross_val_score(clf, selected_features, y_train, cv=5, scoring='accuracy')
    
    # Return the negative mean accuracy (PSO minimizes, so we use negative to maximize accuracy)
    return -scores.mean()

# Define the bounds for PSO (0 to 1 for each feature)
lb = [0] * X_train_csp.shape[1]  # Lower bounds
ub = [1] * X_train_csp.shape[1]  # Upper bounds

# Run PSO to select features
best_features, _ = pso(svm_evaluate, lb, ub, swarmsize=50, maxiter=1000)

# Convert the result to a binary mask
best_features = best_features > 0.5
selected_features_train = X_train_csp[:, best_features]
selected_features_test = X_test_csp[:, best_features]

# Train the final SVM model on the selected features
final_clf = SVC(kernel='rbf', C=100, gamma='auto', random_state=random_seed)
final_clf.fit(selected_features_train, y_train)

# Evaluate the classifier on the test set
test_accuracy = final_clf.score(selected_features_test, y_test)
print(f'Test accuracy after PSO feature selection: {test_accuracy}')
