In [None]:
import os
import time
import math
import scipy.io
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from PIL import Image
from matplotlib.pyplot import imshow
import tensorflow as tf
from tensorflow.keras import datasets, layers, models
from sklearn.model_selection import train_test_split
from keras import layers
from keras.layers import Input, Add, Dense, Activation, ZeroPadding2D, BatchNormalization, Flatten, Conv2D, AveragePooling2D, MaxPooling2D, Conv2DTranspose
from keras.models import Model, load_model
from keras.initializers import glorot_uniform
from IPython.display import SVG
from tensorflow.keras.utils import model_to_dot
import keras.backend as K
from keras.callbacks import EarlyStopping
from google.colab import output
from google.colab import drive
drive.mount('/content/drive')

mat = scipy.io.loadmat('/content/drive/My Drive/FYP/data_EEG_AI.mat')

In [None]:
df = []
colname = ['sample_no','channel_labels','label','time_points','data']
for i in range(7800*24):
	row_content = []
	row_content.append(i//24)
	row_content.append(mat["channel_labels"][i%24][0][0])
	row_content.append(i//(300*24))
	row_content.append(list(range(-200,3001,4)))
	row_content.append(mat['data'][i%24][:,i//24])
	df.append(row_content)

df = pd.DataFrame(df,columns=colname)
#df.to_csv("EEG_sorted.csv",index=False)
df.head()

In [None]:
#source: https://www.mdpi.com/1424-8220/22/5/1750
ROWS = 24
COLS = 801
CHANNELS = 1
CLASSES = 26

def gen_block(X, param1, param2, param3):
    _, h, w, ch = X.shape
    #param = [filter_size[3,10], stride [3,5], filter number]
    X_shortcut = X

    # Left Path
    X = Conv2D(filters = param1[2], kernel_size = param1[0], strides = param1[1], kernel_initializer = glorot_uniform(seed=0), padding='same')(X)
    X = BatchNormalization(axis = 3)(X)
    X = Activation('relu')(X)
    X = Conv2D(filters = param2[2], kernel_size = param2[0], strides = param2[1], kernel_initializer = glorot_uniform(seed=0), padding='same')(X)
    X = BatchNormalization(axis = 3)(X)

    # Right Path
    X_shortcut = Conv2D(filters = param3[2], kernel_size = param3[0], strides = param3[1], kernel_initializer = glorot_uniform(seed=0), padding='same')(X_shortcut)
    X_shortcut = BatchNormalization(axis = 3)(X_shortcut)

    # Final step: Add shortcut value to main path, and pass it through a RELU activation
    X = Add()([X, X_shortcut])
    X = Activation('relu')(X)

    return X

def Generator(input_shape = (64, 64, 3), classes = 2):
    # Define the input as a tensor with shape input_shape
    X_input = Input(input_shape)

    param1 = [(3,10), (2,3), 16]
    param2 = [(1,1), (1,1), 32]
    param3 = [(1,1), (2,3), 32]
    X = gen_block(X_input, param1, param2, param3)
    param1 = [(3,10), (3,3), 32]
    param2 = [(1,1), (1,1), 64]
    param3 = [(1,1), (3,3), 64]
    X = gen_block(X, param1, param2, param3)
    param1 = [(2,3), (2,1), 64]
    param2 = [(1,1), (1,1), 128]
    param3 = [(1,1), (2,1), 128]
    X = gen_block(X, param1, param2, param3)
    h, w, ch = input_shape
    #X = ZeroPadding2D(((math.floor(h/2), math.ceil(h/2)), (math.floor((w+1)/2), math.ceil((w+1)/2))))(X)
    X = Conv2DTranspose(128, kernel_size=(2,3), strides=(2,1), padding = 'same')(X)
    X = Activation('relu')(X)
    #X = ZeroPadding2D(((h, h), (math.floor((w+3)/2), math.ceil((w+3)/2))))(X)
    X = Conv2DTranspose(64, kernel_size=(3,5), strides=(3,3), padding = 'same')(X)
    X = Activation('relu')(X)
    #X = ZeroPadding2D(((h, h), (math.floor((4*w+5)/2), math.ceil((4*w+5)/2))))(X)
    X = Conv2DTranspose(1, kernel_size=(3,10), strides=(2,3), padding = 'same')(X)
    X = Activation('relu')(X)

    # Create model
    model = Model(inputs = X_input, outputs = X, name='Generator')

    return model

def disc_block(X, param1, param2, param3):
    _, h, w, ch = X.shape
    #param = [filter_size[3,10], stride [3,5], filter number]
    X_shortcut = X

    # Left Path
    X = ZeroPadding2D(((math.floor(((param1[1][0]-1)*h+param1[0][0]-param1[1][0])/2), math.ceil(((param1[1][0]-1)*h+param1[0][0]-param1[1][0])/2)), (math.floor(((param1[1][1]-1)*w+param1[0][1]-param1[1][1])/2), math.ceil(((param1[1][1]-1)*w+param1[0][1]-param1[1][1])/2))))(X)
    X = Conv2D(filters = param1[2], kernel_size = param1[0], strides = param1[1], kernel_initializer = glorot_uniform(seed=0))(X)
    X = BatchNormalization(axis = 3)(X)
    X = Activation('leaky_relu')(X)
    X = ZeroPadding2D(((math.floor(((param2[1][0]-1)*h+param2[0][0]-param2[1][0])/2), math.ceil(((param2[1][0]-1)*h+param2[0][0]-param2[1][0])/2)), (math.floor(((param2[1][1]-1)*w+param2[0][1]-param2[1][1])/2), math.ceil(((param2[1][1]-1)*w+param2[0][1]-param2[1][1])/2))))(X)
    X = Conv2D(filters = param2[2], kernel_size = param2[0], strides = param2[1], kernel_initializer = glorot_uniform(seed=0))(X)
    X = BatchNormalization(axis = 3)(X)

    # Right Path
    X_shortcut = ZeroPadding2D(((math.floor(((param3[1][0]-1)*h+param3[0][0]-param3[1][0])/2), math.ceil(((param3[1][0]-1)*h+param3[0][0]-param3[1][0])/2)), (math.floor(((param3[1][1]-1)*w+param3[0][1]-param3[1][1])/2), math.ceil(((param3[1][1]-1)*w+param3[0][1]-param3[1][1])/2))))(X_shortcut)
    X_shortcut = Conv2D(filters = param3[2], kernel_size = param3[0], strides = param3[1], kernel_initializer = glorot_uniform(seed=0))(X_shortcut)
    X_shortcut = BatchNormalization(axis = 3)(X_shortcut)

    # Final step: Add shortcut value to main path, and pass it through a RELU activation
    X = Add()([X, X_shortcut])
    X = Activation('leaky_relu')(X)

    return X

def Discriminator(input_shape = (24, 801, 1), classes = 26):
    # Define the input as a tensor with shape input_shape
    X_input = Input(input_shape)

    param1 = [(2,10), (2,4), 16]
    param2 = [(1,1), (1,1), 32]
    param3 = [(1,1), (2,4), 32]
    X = disc_block(X_input, param1, param2, param3)
    param1 = [(2,5), (2,2), 32]
    param2 = [(1,1), (1,1), 64]
    param3 = [(1,1), (2,2), 64]
    X = disc_block(X, param1, param2, param3)
    param1 = [(2,3), (2,2), 64]
    param2 = [(1,1), (1,1), 128]
    param3 = [(1,1), (2,2), 128]
    X = disc_block(X, param1, param2, param3)
    param1 = [(2,3), (2,2), 128]
    param2 = [(1,1), (1,1), 256]
    param3 = [(1,1), (2,2), 256]
    X = disc_block(X, param1, param2, param3)
    h, w, ch = input_shape
    #X = ZeroPadding2D(((math.floor(h/2), math.ceil(h/2)), (math.floor(w/2), math.ceil(w/2))))(X)
    X = Conv2D(512, kernel_size=(2,2), strides=(2,2))(X)
    X = BatchNormalization(axis = 3)(X)
    X = Activation('leaky_relu')(X)
    #X = ZeroPadding2D(((0, 0), (1, 2)))(X)
    X = Conv2D(1, kernel_size=(1,4))(X)
    X = Flatten()(X)
    X = Dense(1, activation='softmax', kernel_initializer = glorot_uniform(seed=0))(X)

    # Create model
    model = Model(inputs = X_input, outputs = X, name='Discriminator')

    return model

gen = Generator(input_shape = (ROWS, COLS, CHANNELS), classes = CLASSES)
dis = Discriminator(input_shape = (ROWS, COLS, CHANNELS), classes = CLASSES)

gen.summary()
dis.summary()


cross_entropy = tf.keras.losses.BinaryCrossentropy(from_logits=True)
def discriminator_loss(real_output, fake_output):
    real_loss = cross_entropy(tf.ones_like(real_output), real_output)
    fake_loss = cross_entropy(tf.zeros_like(fake_output), fake_output)
    total_loss = real_loss + fake_loss
    return total_loss
def generator_loss(fake_output):
    return cross_entropy(tf.ones_like(fake_output), fake_output)
generator_optimizer = tf.keras.optimizers.Adam(1e-4)
discriminator_optimizer = tf.keras.optimizers.Adam(1e-4)
checkpoint_dir = '/content/drive/My Drive/FYP/GAN_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")
checkpoint = tf.train.Checkpoint(generator_optimizer=generator_optimizer,
                                 discriminator_optimizer=discriminator_optimizer,
                                 generator=gen,
                                 discriminator=dis)

In [None]:
# source: https://www.tensorflow.org/tutorials/generative/dcgan
EPOCHS = 50
BATCH_SIZE = 8
noise_dim = (24, 801)
num_examples_to_generate = 8

# Notice the use of `tf.function`
# This annotation causes the function to be "compiled".
@tf.function
def train_step(images):
    noise = tf.random.normal([BATCH_SIZE, noise_dim[0]*noise_dim[1]], mean=0.0, stddev=1.0)
    noise = tf.reshape(noise, [BATCH_SIZE, 24, 801, 1])

    with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape:
      generated_images = gen(noise, training=True)

      real_output = dis(images, training=True)
      fake_output = dis(generated_images, training=True)

      gen_loss = generator_loss(fake_output)
      disc_loss = discriminator_loss(real_output, fake_output)
      print(f"gen_loss: {gen_loss}; disc_loss = {disc_loss}\n")

    gradients_of_generator = gen_tape.gradient(gen_loss, gen.trainable_variables)
    gradients_of_discriminator = disc_tape.gradient(disc_loss, dis.trainable_variables)

    generator_optimizer.apply_gradients(zip(gradients_of_generator, gen.trainable_variables))
    discriminator_optimizer.apply_gradients(zip(gradients_of_discriminator, dis.trainable_variables))

def train(dataset, epochs):
  dataset = tf.split(dataset, num_or_size_splits = math.ceil(dataset.shape[0]/BATCH_SIZE))
  for epoch in range(epochs):
    start = time.time()
    for image_batch in dataset:
      train_step(image_batch)

    # Save the model every 15 epochs
    if (epoch + 1) % 15 == 0:
      checkpoint.save(file_prefix = checkpoint_prefix)

    print ('Time for epoch {} is {} sec'.format(epoch + 1, time.time()-start))

data = np.reshape(np.vstack(df['data'].to_numpy()), (7800, 24, 801))
x_train, x_test, y_train, y_test = train_test_split(data, np.array([i//(300) for i in range(7800)]), test_size=0.2, random_state=0)
train(x_train, EPOCHS)
output.eval_js('new Audio("https://upload.wikimedia.org/wikipedia/commons/0/05/Beep-09.ogg").play()')

In [None]:
checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))

In [None]:
for i in range(10):
  out = gen(np.reshape(data[i], (1, 24, 801, 1)))
  out = np.reshape(out, (24,801))
  plt.figure(figsize=(16,9))
  plt.ylabel("potential")
  plt.xlabel("time after stimulus")
  for channel in range(len(out)):
    plt.plot(mat['time_points'][:,0],out[channel], label=mat['channel_labels'][channel][0][0])
  plt.legend(loc='best')
  plt.show()

In [None]:
di = {}
for i in range(len(data)):
  out_2d = np.reshape(gen(np.reshape(data[i], (1, 24, 801, 1))), (24,801))
  for j in range(24):
    di[i*24+j] = out_2d[j]
output_df = pd.DataFrame.from_dict(di, orient='index')
print(len(output_df))
output_df.head()

In [None]:
output_df.to_csv('/content/drive/My Drive/FYP/out.csv', index=False)