<a href="https://colab.research.google.com/github/bafanaS/dim-reduction-with-cnn-lstm/blob/main/CNN_or_CNN_LSTM_for_Real_%26_Imagery_Data.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# CNN / CNN LSTM for Real and Imagery

This notebook was used to do all DL testing in regards to real and imagery classification

# Data Processing

In [1]:
import numpy as np
import pandas as pd

In [2]:


from matplotlib import rcParams # rcParams is a file that specifies Matplotlib style defaults
from matplotlib import pyplot as plt

rcParams['figure.figsize'] = [20, 4]
rcParams['font.size'] = 15
rcParams['axes.spines.top'] = False
rcParams['axes.spines.right'] = False
rcParams['figure.autolayout'] = True

In [3]:
import os, requests

fname = 'motor_imagery.npz'
url = "https://osf.io/ksqv8/download"

if not os.path.isfile(fname):
  try:
    r = requests.get(url)
  except requests.ConnectionError:
    print("!!! Failed to download data !!!")
  else:
    if r.status_code != requests.codes.ok:
      print("!!! Failed to download data !!!")
    else:
      with open(fname, "wb") as fid:
        fid.write(r.content)

In [4]:
alldat = np.load(fname, allow_pickle=True)['dat']

In [7]:
participant=2

In [8]:
# trained initial sets on participant 1 and now attempting to fine tune on participant 2 with only the dense layers
from scipy import signal
import numpy as np

# Find resting state intervals and add to tongue/hand interval list.
intervals = [(on, off) for on, off in zip(alldat[participant][0]['t_on'], alldat[participant][0]['t_off'])]
resting = [(intervals[i][1], intervals[i+1][0]) for i in range(0, len(intervals)-1)]
intervals = np.append(np.array(intervals), np.array(resting), axis = 0)

classes = np.append(np.array([0 if i < 12 else 1 for i in alldat[participant][0]['stim_id']]), np.repeat(2, len(resting)))

channels = len(alldat[participant][0]['locs'])
stims = len(intervals)

X_real = np.empty((stims, 3000, channels))
y_real = np.empty((stims))

for i in range(stims):

    t_on = intervals[i][0]
    label = classes[i]

    # Real data only

    V = alldat[participant][0]['V'].astype('float32')#[:, :48]
    b, a = signal.butter(3, [50], btype='high', fs=1000)
    V = signal.filtfilt(b, a, V, 0)

    V = np.abs(V)**2
    b, a = signal.butter(3, [10], btype='low', fs=1000)
    V = signal.filtfilt(b, a, V, 0)

    V_real = V/V.mean(0)
    V_real = V_real[t_on : t_on + 3000]

    X_real[i] = V_real
    y_real[i] = label

In [9]:
from scipy import signal
import numpy as np

# Find resting state intervals and add to tongue/hand interval list.
intervals = [(on, off) for on, off in zip(alldat[participant][1]['t_on'], alldat[participant][1]['t_off'])]
resting = [(intervals[i][1], intervals[i+1][0]) for i in range(0, len(intervals)-1)]
intervals = np.append(np.array(intervals), np.array(resting), axis = 0)

classes = np.append(np.array([0 if i < 12 else 1 for i in alldat[participant][1]['stim_id']]), np.repeat(2, len(resting)))

channels = len(alldat[participant][1]['locs'])
stims = len(intervals)

X_imag = np.empty((stims, 3000, channels))
y_imag = np.empty((stims))

for i in range(stims):

    t_on = intervals[i][0]
    label = classes[i]

    # Real data only

    V = alldat[participant][1]['V'].astype('float32')#[:, :48]
    b, a = signal.butter(3, [50], btype='high', fs=1000)
    V = signal.filtfilt(b, a, V, 0)

    V = np.abs(V)**2
    b, a = signal.butter(3, [10], btype='low', fs=1000)
    V = signal.filtfilt(b, a, V, 0)

    V_real = V/V.mean(0)
    V_real = V_real[t_on : t_on + 3000]

    X_imag[i] = V_real
    y_imag[i] = label

In [10]:
concat = np.squeeze(np.concatenate((X_real, X_imag), axis=0))
labels = np.concatenate((np.zeros((X_real.shape[0])), np.ones((X_imag.shape[0]))), axis = 0)
idx = np.random.permutation(labels.shape[0])
X, y = concat[idx], labels[idx]

In [11]:
from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.25, random_state=0)

In [12]:
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical

In [13]:
from keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout

In [14]:
X_train = np.expand_dims(X_train, axis=3)

In [15]:
X_test = np.expand_dims(X_test, axis=3)

In [16]:
y_train = to_categorical(y_train)

In [17]:
y_test = to_categorical(y_test)

# Keras Tuner CNN / CNN-LSTM Attempts

In [None]:
!pip install keras-tuner --upgrade



In [None]:
# Limiting to the smallest number of channels
channels = 46

### CNN LSTM Tuning

In [None]:
from tensorflow.keras.layers import LSTM, TimeDistributed, Dropout
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense
from tensorflow.keras.optimizers import Adam
from kerastuner.engine.hyperparameters import HyperParameters

def build_model(hp: HyperParameters):
    model = Sequential()

    # CNN layers
    model.add(Conv2D(filters=hp.Int('conv_1_filter', min_value=32, max_value=128, step=16),
                     kernel_size=hp.Choice('conv_1_kernel', values=[3, 5]),
                     activation='relu',
                     input_shape=(3000, channels, 1)))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Conv2D(filters=hp.Int('conv_2_filter', min_value=16, max_value=64, step=16),
                     kernel_size=hp.Choice('conv_2_kernel', values=[3, 5]),
                     activation='relu'))
    model.add(MaxPooling2D(pool_size=(2, 2)))

    # LSTM layers
    model.add(TimeDistributed(Flatten()))
    model.add(LSTM(units=hp.Int('lstm_1_units', min_value=50, max_value=200, step=50),
                   activation='tanh', return_sequences=True))  # Return sequences for the next LSTM layer
    model.add(Dropout(hp.Float('lstm_1_dropout', min_value=0.0, max_value=0.5, step=0.1)))
    model.add(LSTM(units=hp.Int('lstm_2_units', min_value=50, max_value=200, step=50),
                   activation='tanh'))
    model.add(Dropout(hp.Float('lstm_2_dropout', min_value=0.0, max_value=0.5, step=0.1)))

    # Dense layers
    model.add(Flatten())
    model.add(Dense(units=hp.Int('dense_1_units', min_value=16, max_value=128, step=16),
                    activation='relu'))
    model.add(Dense(units=hp.Int('dense_2_units', min_value=8, max_value=64, step=8),
                    activation='relu'))
    model.add(Dense(2, activation='softmax'))  # 2 classes: real or imaginary

    model.compile(optimizer=Adam(learning_rate=hp.Float('learning_rate', min_value=1e-5, max_value=1e-3, sampling='LOG')),
                  loss='categorical_crossentropy',
                  metrics=['accuracy'])

    return model


### CNN Tuning

In [None]:
from tensorflow.keras.layers import LSTM, TimeDistributed, Dropout
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense
from tensorflow.keras.optimizers import Adam
from kerastuner.engine.hyperparameters import HyperParameters

def build_model(hp: HyperParameters):
    model = Sequential()

    # CNN layers
    model.add(Conv2D(filters=hp.Int('conv_1_filter', min_value=32, max_value=128, step=16),
                     kernel_size=hp.Choice('conv_1_kernel', values=[3, 5]),
                     activation='relu',
                     input_shape=(3000, channels, 1)))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Conv2D(filters=hp.Int('conv_2_filter', min_value=16, max_value=64, step=16),
                     kernel_size=hp.Choice('conv_2_kernel', values=[3, 5]),
                     activation='relu'))
    model.add(MaxPooling2D(pool_size=(2, 2)))

    # LSTM layers
    # model.add(TimeDistributed(Flatten()))
    # model.add(LSTM(units=hp.Int('lstm_1_units', min_value=50, max_value=200, step=50),
    #                activation='tanh', return_sequences=True))  # Return sequences for the next LSTM layer
    # model.add(Dropout(hp.Float('lstm_1_dropout', min_value=0.0, max_value=0.5, step=0.1)))
    # model.add(LSTM(units=hp.Int('lstm_2_units', min_value=50, max_value=200, step=50),
    #                activation='tanh'))
    # model.add(Dropout(hp.Float('lstm_2_dropout', min_value=0.0, max_value=0.5, step=0.1)))

    # Dense layers
    model.add(Flatten())
    model.add(Dense(units=hp.Int('dense_1_units', min_value=16, max_value=128, step=16),
                    activation='relu'))
    model.add(Dense(units=hp.Int('dense_2_units', min_value=8, max_value=64, step=8),
                    activation='relu'))
    model.add(Dense(2, activation='softmax'))  # 2 classes: real or imaginary

    model.compile(optimizer=Adam(learning_rate=hp.Float('learning_rate', min_value=1e-5, max_value=1e-3, sampling='LOG')),
                  loss='categorical_crossentropy',
                  metrics=['accuracy'])

    return model


In [None]:
from kerastuner import RandomSearch

tunerr = RandomSearch(
    build_model,
    objective='val_accuracy',
    max_trials=25, # Number of different hyperparameter combinations to try
    directory='output',
    project_name='Brain_Activity_Classification'
)

In [None]:
from sklearn.utils.class_weight import compute_class_weight

y_train_integers = np.argmax(y_train, axis=1)
class_weights = compute_class_weight('balanced', classes=np.unique(y_train_integers), y=y_train_integers)
class_weights_dict = {i: weight for i, weight in enumerate(class_weights)}

# Pass class weights to the search method
tunerr.search(X_train, y_train, batch_size=25, epochs=50, validation_data=(X_test, y_test), class_weight=class_weights_dict)

best_model = tunerr.get_best_models(num_models=1)[0]
best_hyperparameters = tunerr.get_best_hyperparameters(num_trials=1)[0]

Trial 5 Complete [00h 00m 43s]
val_accuracy: 0.800000011920929

Best val_accuracy So Far: 0.8500000238418579
Total elapsed time: 00h 03m 55s


In [None]:
loss, accuracy = best_model.evaluate(X_test, y_test, verbose=1)

print(f"Test loss: {loss}")
print(f"Test accuracy: {accuracy}")

Test loss: 0.4827667772769928
Test accuracy: 0.8500000238418579


#### Saving CNN LSTM

In [None]:
best_model.save(f'cnn-lstm-ri-p{participant}.keras')

#### Saving CNN

In [None]:
best_model.save(f'cnn-ri-p{participant}.keras')

# LSTM CNN BEST VALUES

In [18]:
from tensorflow.keras.models import load_model
model = load_model('cnn-lstm-ri-p2.keras')


In [19]:
# model = create_model()
# model.load_weights('cnn_lstm-ri.h5')

loss, accuracy = model.evaluate(X_test, y_test, verbose=1)

print(f"Test loss: {loss}")
print(f"Test accuracy: {accuracy}")


Test loss: 0.41180092096328735
Test accuracy: 0.8999999761581421


# CNN BEST VALUES

In [20]:
from tensorflow.keras.models import load_model
model = load_model('cnn-ri-p2.keras')


In [21]:
loss, accuracy = model.evaluate(X_test, y_test, verbose=1)

print(f"Test loss: {loss}")
print(f"Test accuracy: {accuracy}")


Test loss: 0.5352226495742798
Test accuracy: 0.8999999761581421


# Transfer Learning

### CNN LSTM

In [22]:
from tensorflow.keras.models import load_model
model = load_model('cnn-lstm-ri-p4.keras')

In [24]:
for layer in list(model.layers)[0:10]:
  layer.trainable = False

In [25]:
for layer in list(model.layers):
  print(layer.name, layer.trainable)

conv2d False
max_pooling2d False
conv2d_1 False
max_pooling2d_1 False
time_distributed False
lstm False
dropout False
lstm_1 False
dropout_1 False
flatten_1 False
dense True
dense_1 True
dense_2 True


In [26]:
from sklearn.utils.class_weight import compute_class_weight


y_train_integers = np.argmax(y_train, axis=1)
class_weights = compute_class_weight('balanced', classes=np.unique(y_train_integers), y=y_train_integers)
class_weights_dict = {i: weight for i, weight in enumerate(class_weights)}

# Pass class weights to the search method
history = model.fit(X_train, y_train, batch_size=25, epochs=50, validation_data=(X_test, y_test), class_weight=class_weights_dict)


Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50


In [27]:
loss, accuracy = model.evaluate(X_test, y_test, verbose=1)

print(f"Test loss: {loss}")
print(f"Test accuracy: {accuracy}")


Test loss: 2.002915620803833
Test accuracy: 0.4333333373069763


### CNN

In [28]:
from tensorflow.keras.models import load_model
model = load_model('cnn-ri-p4.keras')

In [29]:
for layer in list(model.layers)[0:5]:
  layer.trainable = False

In [30]:
for layer in list(model.layers):
  print(layer.name, layer.trainable)

conv2d False
max_pooling2d False
conv2d_1 False
max_pooling2d_1 False
flatten False
dense True
dense_1 True
dense_2 True


In [31]:
from sklearn.utils.class_weight import compute_class_weight


y_train_integers = np.argmax(y_train, axis=1)
class_weights = compute_class_weight('balanced', classes=np.unique(y_train_integers), y=y_train_integers)
class_weights_dict = {i: weight for i, weight in enumerate(class_weights)}

# Pass class weights to the search method
history = model.fit(X_train, y_train, batch_size=25, epochs=50, validation_data=(X_test, y_test), class_weight=class_weights_dict)


Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50


In [32]:
loss, accuracy = model.evaluate(X_test, y_test, verbose=1)

print(f"Test loss: {loss}")
print(f"Test accuracy: {accuracy}")


Test loss: 1.8779959678649902
Test accuracy: 0.4166666567325592
