In [3]:
import numpy as np
import tensorflow as tf
from tqdm import tqdm  # tqdm for the progress bar
import time
from keras.models import Sequential
from keras.layers import LSTM
from keras.layers import Dense




In [4]:
with open('/Users/chrismata/Desktop/CS Brain Project/MindBigDataVisualMnist2021-Muse2v0.17.txt') as f:
    lines = f.read().splitlines()
    
print(len(lines))

36000


Text file processing:

This reads each line, extracts the label and 2048 EEG data points, converts them to floats, and stores them in lists. The lists are converted to NumPy arrays to make the data easy to work with.

In [5]:
NUM_EXAMPLES = 18000   # total unique label examples in the data file

eeg_data = []   # eeg_data initialized as empty list
labels = []   # labels initialized as empty list 

# Create tqdm progress bar for 18000 lines 
with tqdm(total=NUM_EXAMPLES) as pbar:

    for i in range(NUM_EXAMPLES):
        
        line = lines[i]
        
        # Split and extract label
        parts = line.split(',')  
        label = int(parts[2])   # extract label as 3rd element
        labels.append(label)    # add label to labels list
        
        # Extract EEG 
        eeg = parts[788:788+2048]   # 788 is after the png image that we don't need, +2048 brings us to the end of eeg
        eeg = [float(v) for v in eeg]   # convert to floats
        
        # Split EEG into the 4 channels
        TP9 = eeg[0:512] 
        AF7 = eeg[512:1024]
        AF8 = eeg[1024:1536] 
        TP10 = eeg[1536:2048]
        
        # Store channels 
        channels = [TP9, AF7, AF8, TP10]   
        eeg_data.append(channels)   # add channels to eeg_data

        # Update progress bar
        pbar.update(1)

# Convert labels to np array
labels = np.array(labels)

# Convert to eeg_data to np array
eeg_data = np.array(eeg_data) 

100%|██████████| 18000/18000 [00:05<00:00, 3048.63it/s]


Little testing:

In [19]:
print(len(labels))
print(labels[1])

neg_one = 0
zero = 0
one = 0
two = 0
three = 0
four = 0
five = 0
six = 0
seven = 0
eight = 0
nine = 0

for label in labels:
    if label == -1:
        neg_one += 1
    elif label == 0:
        zero += 1
    elif label == 1:
        one += 1
    elif label == 2:
        two += 1
    elif label == 3:
        three += 1
    elif label == 4:
        four += 1
    elif label == 5:
        five += 1
    elif label == 6:
        six += 1
    elif label == 7:
        seven += 1
    elif label == 8:
        eight += 1
    elif label == 9:
        nine += 1

print("Occurrences of -1:", neg_one)
print("Occurrences of 0:", zero)
print("Occurrences of 1:", one)
print("Occurrences of 2:", two)
print("Occurrences of 3:", three)
print("Occurrences of 4:", four)
print("Occurrences of 5:", five)
print("Occurrences of 6:", six)
print("Occurrences of 7:", seven)
print("Occurrences of 8:", eight)
print("Occurrences of 9:", nine)


18000
5
Occurrences of -1: 9000
Occurrences of 0: 893
Occurrences of 1: 1023
Occurrences of 2: 889
Occurrences of 3: 924
Occurrences of 4: 899
Occurrences of 5: 775
Occurrences of 6: 905
Occurrences of 7: 961
Occurrences of 8: 844
Occurrences of 9: 887


Preprocessesing: TODO

LSTM MODEL: FIXME

In [18]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical

# Split data into train-test
X_train, X_test, y_train, y_test = train_test_split(eeg_data, labels, train_size=0.8, random_state=42)   # removing random state doesnt align with the first labels. #FIXME

# Parameters
num_classes = 10 # 0-9 digits   #may have to FIXME for -1
num_channels = 4 
seq_length = 512

print("X_train shape:", X_train.shape) 
print("X_test shape:", X_test.shape) 
print("y_train shape:", y_train.shape)
print("y_test shape:", y_test.shape)

# One-hot encode y  
y_train = to_categorical(y_train, num_classes)
y_test = to_categorical(y_test, num_classes)

print("y_train shape after one-hot encoding:", y_train.shape)
print("y_test shape after one-hot encoding:", y_test.shape)

# Check first 5 labels
print("First 5 y_train labels:", y_train[:5])

# Check class distribution
class_counts = np.sum(y_train, axis=0)
print("Class distribution:", class_counts)

#############################################################

# Build model 
model = Sequential()

# LSTM layer
model.add(LSTM(64, return_sequences=True, 
               input_shape=(seq_length, num_channels)))

# Dropout for regularization
model.add(Dropout(0.2))

# LSTM layer 
model.add(LSTM(64))

# Dropout
model.add(Dropout(0.2))

# Fully connected output layer  
model.add(Dense(num_classes, activation='softmax'))

# Compile 
model.compile(loss='categorical_crossentropy',
              optimizer='adam',
              metrics=['accuracy'])
              
# Reshape data for channels last format   (samples, timestep, features)           
X_train = X_train.reshape((-1, seq_length, num_channels))
y_train = tf.keras.utils.to_categorical(y_train, num_classes)

X_test = X_test.reshape((-1, seq_length, num_channels))
y_test = tf.keras.utils.to_categorical(y_test, num_classes)

# Train model
model.fit(X_train, y_train, 
          validation_data=(X_test, y_test),
          epochs=10)

X_train shape: (14400, 4, 512)
X_test shape: (3600, 4, 512)
y_train shape: (14400,)
y_test shape: (3600,)
y_train shape after one-hot encoding: (14400, 10)
y_test shape after one-hot encoding: (3600, 10)
First 5 y_train labels: [[0. 0. 1. 0. 0. 0. 0. 0. 0. 0.]
 [1. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 1.]
 [0. 0. 0. 0. 0. 0. 1. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 1.]]
Class distribution: [ 709.  820.  709.  732.  726.  630.  731.  751.  674. 7918.]
Epoch 1/10


ValueError: in user code:

    File "/Users/chrismata/Downloads/enter/envs/brain/lib/python3.11/site-packages/keras/src/engine/training.py", line 1338, in train_function  *
        return step_function(self, iterator)
    File "/Users/chrismata/Downloads/enter/envs/brain/lib/python3.11/site-packages/keras/src/engine/training.py", line 1322, in step_function  **
        outputs = model.distribute_strategy.run(run_step, args=(data,))
    File "/Users/chrismata/Downloads/enter/envs/brain/lib/python3.11/site-packages/keras/src/engine/training.py", line 1303, in run_step  **
        outputs = model.train_step(data)
    File "/Users/chrismata/Downloads/enter/envs/brain/lib/python3.11/site-packages/keras/src/engine/training.py", line 1081, in train_step
        loss = self.compute_loss(x, y, y_pred, sample_weight)
    File "/Users/chrismata/Downloads/enter/envs/brain/lib/python3.11/site-packages/keras/src/engine/training.py", line 1139, in compute_loss
        return self.compiled_loss(
    File "/Users/chrismata/Downloads/enter/envs/brain/lib/python3.11/site-packages/keras/src/engine/compile_utils.py", line 265, in __call__
        loss_value = loss_obj(y_t, y_p, sample_weight=sw)
    File "/Users/chrismata/Downloads/enter/envs/brain/lib/python3.11/site-packages/keras/src/losses.py", line 142, in __call__
        losses = call_fn(y_true, y_pred)
    File "/Users/chrismata/Downloads/enter/envs/brain/lib/python3.11/site-packages/keras/src/losses.py", line 268, in call  **
        return ag_fn(y_true, y_pred, **self._fn_kwargs)
    File "/Users/chrismata/Downloads/enter/envs/brain/lib/python3.11/site-packages/keras/src/losses.py", line 2122, in categorical_crossentropy
        return backend.categorical_crossentropy(
    File "/Users/chrismata/Downloads/enter/envs/brain/lib/python3.11/site-packages/keras/src/backend.py", line 5560, in categorical_crossentropy
        target.shape.assert_is_compatible_with(output.shape)

    ValueError: Shapes (32, 10, 10) and (32, 10) are incompatible


In [None]:
print(X_test.shape)
print(X_train.shape)

(3600, 512, 4)
(14400, 512, 4)


In [None]:
def evaluate_and_print(model, X_test, y_test):
  test_loss, test_acc = model.evaluate(X_test, y_test)
  
  predictions = model.predict(X_test)

  for i in range(2000):
    print("Actual:", np.argmax(y_test[i]), "Predicted:", np.argmax(predictions[i]))

  print("Test loss:", test_loss)
  print("Test accuracy:", test_acc)

# Usage:  
evaluate_and_print(model, X_test, y_test)

Actual: 9 Predicted: 9
Actual: 9 Predicted: 9
Actual: 9 Predicted: 9
Actual: 9 Predicted: 9
Actual: 9 Predicted: 9
Actual: 9 Predicted: 9
Actual: 9 Predicted: 9
Actual: 9 Predicted: 9
Actual: 1 Predicted: 9
Actual: 6 Predicted: 9
Actual: 9 Predicted: 9
Actual: 3 Predicted: 9
Actual: 9 Predicted: 9
Actual: 7 Predicted: 9
Actual: 9 Predicted: 9
Actual: 9 Predicted: 9
Actual: 8 Predicted: 9
Actual: 9 Predicted: 9
Actual: 7 Predicted: 9
Actual: 9 Predicted: 9
Actual: 1 Predicted: 9
Actual: 9 Predicted: 9
Actual: 6 Predicted: 9
Actual: 1 Predicted: 9
Actual: 9 Predicted: 9
Actual: 1 Predicted: 9
Actual: 6 Predicted: 9
Actual: 9 Predicted: 9
Actual: 9 Predicted: 9
Actual: 9 Predicted: 9
Actual: 9 Predicted: 9
Actual: 9 Predicted: 2
Actual: 1 Predicted: 9
Actual: 2 Predicted: 9
Actual: 9 Predicted: 9
Actual: 7 Predicted: 9
Actual: 9 Predicted: 9
Actual: 4 Predicted: 9
Actual: 9 Predicted: 9
Actual: 9 Predicted: 9
Actual: 7 Predicted: 9
Actual: 9 Predicted: 9
Actual: 9 Predicted: 9
Actual: 9 P

In [None]:
# Get class labels 
y_train_labels = np.argmax(y_train, axis=1)
y_test_labels = np.argmax(y_test, axis=1)

# Calculate class frequencies
train_class_counts = np.bincount(y_train_labels)
test_class_counts = np.bincount(y_test_labels)

print("X_train class distribution:")
print(train_class_counts) 

print("X_test class distribution:")   
print(test_class_counts)

# Check distribution difference
class_diff = np.abs(train_class_counts - test_class_counts)
print("Class distribution difference:")
print(class_diff)

X_train class distribution:
[ 722  818  710  734  728  620  715  759  657 7937]
X_test class distribution:
[ 171  205  179  190  171  155  190  202  187 1950]
Class distribution difference:
[ 551  613  531  544  557  465  525  557  470 5987]


To spectrogram:

The key steps are:
Perform STFT on EEG data with 128 sample window
Take absolute value to get amplitude spectrogram
Log scale and normalize spectrogram
Plot spectrogram for visualization
Stack along channels to create 3D input for CNN



In [None]:
import numpy as np
from scipy import signal
import matplotlib.pyplot as plt

def to_spectrogram():
    # Already have our EEG_data

    # STFT parameters
    fs = 256 # Sampling frequency in hz
    f, t, Zxx = signal.stft(eeg_data, fs=fs, nperseg=128)

    # Generate spectrogram
    spectrogram = np.abs(Zxx)

    # Log scale spectrogram
    log_spectrogram = 10*np.log10(spectrogram)

    # Normalize
    norm_log_spectrogram = log_spectrogram / np.max(log_spectrogram)

    # Plot
    plt.imshow(norm_log_spectrogram, aspect='auto', origin='lower',
               extent=[t.min(), t.max(), f.min(), f.max()])
    plt.colorbar()
    plt.xlabel('Time')
    plt.ylabel('Frequency')

    # Use normalized log spectrogram as input
    inputs = np.stack([norm_log_spectrogram]*3, axis=-1) # Repeat along channels
    
to_spectrogram()