In [None]:
import scipy.io
import numpy as np
import matplotlib.pyplot as plt
from keras.utils.np_utils import to_categorical
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import minmax_scale
import tensorflow as tf
from tensorflow.keras.layers import Flatten, Dense, Conv1D, MaxPool1D, Dropout,BatchNormalization,SpatialDropout1D

# Load data EEG signal
(the data is filter 20 channels and label is already making one hot reference on min2net paper)

In [None]:
#Load your data
x_all = np.load('X_all.npy')
y_all = np.load('y_all.npy')
x_train, x_val, y_train, y_val = train_test_split(x_all, y_all,test_size=0.20,random_state=69)

# Yuchen-Wang-SH

In [None]:
# from src.models.PAMI import CNN_P300_PAMI
# input_ch = len([7,8, 9, 10, 12, 13,14 ,17,18,19,20, 32,33 ,34,35,36,37,38,39,40])
# model = CNN_P300_PAMI((400, input_ch,1))
# model.summary()
import tensorflow as tf
from keras import layers, models
from keras import backend as K
from keras.utils.generic_utils import get_custom_objects
import numpy as np
def sigmoid_pami_p300(x):
    return 1.7159 * K.tanh(2 * x / 3)
def CNN_P300_PAMI(input_shape):
    get_custom_objects().update({'custom_activation': layers.Activation(sigmoid_pami_p300)})

    num_electrodes, points_to_slice = input_shape[1] , input_shape[0]
    num_first_filter = 10
    num_second_filter = 5 * num_first_filter

    input_layer = layers.Input(input_shape)
    conv_1 = layers.convolutional.Conv2D(filters=num_first_filter, kernel_size=[num_electrodes, 1], 
                                        strides=[1, 1], activation='linear')(input_layer)

    conv_1_acti = layers.Activation(sigmoid_pami_p300)(conv_1)

    conv_2 = layers.convolutional.Conv2D(filters=num_second_filter, kernel_size=[1, 13], 
                                        strides=[1, 13], activation='linear')(conv_1_acti)
    conv_2_acti = layers.Activation(sigmoid_pami_p300)(conv_2)
    flatten = layers.Flatten()(conv_2_acti)
    fc = layers.Dense(100, activation='sigmoid')(flatten)
    output_layer = layers.Dense(4, activation='sigmoid')(fc)
    model = models.Model(inputs=input_layer, outputs=output_layer)
    return model

input_ch = len([7,8, 9, 10, 12, 13,14 ,17,18,19,20, 32,33 ,34,35,36,37,38,39,40])
model = CNN_P300_PAMI((400, input_ch,1))
model.summary()


In [None]:
from src.models.PAMI import CNN_P300_PAMI
import tensorflow as tf
# model = CNN_P300_PAMI(train['signal'].shape[1:]) 
model.compile(optimizer='adam',loss = tf.keras.losses.CategoricalCrossentropy(), metrics=['accuracy'])
earlystopping = EarlyStopping(monitor = "val_accuracy",patience = 3)
model.fit(x_train.reshape(x_train.shape[0], x_train.shape[1], x_train.shape[2],1) , y_train, x_val.reshape(x_val.shape[0], x_val.shape[1], x_val.shape[2],1), y_val)

In [None]:
model.save('/content/drive/MyDrive/Colab_Notebooks/Temporary_dataset/my_model.h5')

# CNN 1D model

In [None]:
#CNN
import tensorflow as tf
from tensorflow.keras.layers import Flatten, Dense, Conv1D, MaxPool1D, Dropout,BatchNormalization,SpatialDropout1D
# Create sequential model 
cnn_model = tf.keras.models.Sequential()
#Zero CNN layer  with 32 filters, conv window 3, relu activation and same padding
cnn_model.add(Conv1D(filters=32, kernel_size=(20,), padding='same', activation=tf.keras.layers.LeakyReLU(alpha=0.001), input_shape = (400, 20)))
cnn_model.add(BatchNormalization())
#First CNN layer  with 32 filters, conv window 3, relu activation and same padding
cnn_model.add(Conv1D(filters=32, kernel_size=(20,), padding='same', activation=tf.keras.layers.LeakyReLU(alpha=0.001)))
cnn_model.add(BatchNormalization())
cnn_model.add(SpatialDropout1D(0.5))
#Second CNN layer  with 64 filters, conv window 3, relu activation and same padding
cnn_model.add(Conv1D(filters=32, kernel_size=(6,), padding='same', activation=tf.keras.layers.LeakyReLU(alpha=0.001)))
cnn_model.add(BatchNormalization())
#Third CNN layer with Max pooling
cnn_model.add(MaxPool1D(pool_size=(2,), strides=2, padding='same'))
cnn_model.add(Conv1D(filters=32, kernel_size=(3,), padding='same', activation=tf.keras.layers.LeakyReLU(alpha=0.001)))
cnn_model.add(SpatialDropout1D(0.5))
#Flatten the output
cnn_model.add(Flatten())
#Add a dense layer with 296 neurons
cnn_model.add(Dense(units = 296, activation=tf.keras.layers.LeakyReLU(alpha=0.001)))
#Add a dense layer with 148 neurons
cnn_model.add(Dense(units = 148, activation=tf.keras.layers.LeakyReLU(alpha=0.001)))
#Add a dense layer with 74 neurons
cnn_model.add(Dense(units = 74, activation=tf.keras.layers.LeakyReLU(alpha=0.001)))

#Softmax as last layer with five outputs
cnn_model.add(Dense(units = 4, activation='softmax'))

loss = tf.keras.losses.categorical_crossentropy
optimizer = tf.keras.optimizers.Adam(learning_rate=1e-5)

cnn_model.compile(optimizer=optimizer, loss = loss, metrics=['accuracy'])
cnn_model.summary()

In [None]:
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
import os
modelPath = os.path.join(os.getcwd(), 'bestModel.h5')
checkpoint = ModelCheckpoint( # set model saving checkpoints
    modelPath, # set path to save model weights
    monitor='val_loss', # set monitor metrics
    verbose=1, # set training verbosity
    save_best_only=True, # set if want to save only best weights
    save_weights_only=False, # set if you want to save only model weights
    mode='auto', # set if save min or max in metrics
    period=1 # interval between checkpoints
    )

earlystopping = EarlyStopping(
    monitor='val_loss', # set monitor metrics
    min_delta=0.001, # set minimum metrics delta
    patience=4, # number of epochs to stop training
    restore_best_weights=True, # set if use best weights or last weights
    )
callbacksList = [checkpoint, earlystopping] # build callbacks list

cnn_model_history = cnn_model.fit(x_train, y_train, epochs=100, batch_size = 64, validation_data = (x_val, y_val), callbacks=callbacksList)

# Submission kaggle

In [None]:
xpred = xp_test[:,:,[7,8, 9, 10, 12, 13,14 ,17,18,19,20, 32,33 ,34,35,36,37,38,39,40]]
xpred.shape

In [None]:
predict = model.predict(xpred.reshape(xpred.shape[0],1, xpred.shape[1], xpred.shape[2]))

In [None]:
predict.shape

In [None]:
predict[0]

In [None]:
sub=[]
for i in predict:
  sub.append(np.argmax(i))

In [None]:
import pandas as pd
a=pd.read_csv('/content/sample_submission.csv')
a

In [None]:
num=[]
for i in range(400):
  num.append(i+1)
for i in range(400):
  num.append('f'+str(i+1))

In [None]:
for i in range(400):
  sub.append(None)

In [None]:
import pandas as pd
df = pd.DataFrame({'sample_id':num , 'prediction':sub})
df.to_csv('/content/submission.csv')

In [None]:
df = df.reset_index(drop=True)
df

# try preprocessing fast fourier transform (Not use)

In [None]:
!pip install scipy

In [None]:
import matplotlib.pyplot as plt
from scipy.fft import fft, fftfreq
samplerate=100
N=400
xf = fftfreq(N,1/samplerate)
xx = fft(x_all)
plt.plot(xf,xx)
plt.show()

In [None]:
# 41600,400,400,20
from scipy.fft import fft, fftfreq
samplerate = 100 #hz
duration = 4 #sec
x_fft = []
for i in x_all:
  am = fft(i)
  # f = fftfreq(samplerate*duration , 1/samplerate)
  x_fft.append(am.tolist())
x_fft = np.array(x_fft)
