In [6]:
from tensorflow import keras
import tensorflow as tf
from tensorflow.keras import regularizers
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.models import Sequential,load_model
from tensorflow.keras.layers import Dense, Dropout, Activation, Flatten, BatchNormalization
from tensorflow.keras.layers import Conv2D, MaxPooling2D
from tensorflow.keras.models import load_model
from sklearn.utils import class_weight
import tensorflow.keras.layers as layers
from data_generator import NpyDataGenerator

import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import numpy as np
import os
from tqdm import tqdm
from collections import Counter
import numpy as np
from tensorflow.keras.optimizers import RMSprop, SGD, Adam,Adagrad
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau
from sklearn.utils.multiclass import unique_labels
from tensorflow.keras.layers import *
# import einops

In [10]:
train_path = "/media/kashraf/TOSHIBA EXT/Dissertation/stage2/EEG movie/Theta/train"
test_path = "/media/kashraf/TOSHIBA EXT/Dissertation/stage2/EEG movie/Theta/test"
train_gen = NpyDataGenerator(train_path,batch_size=4)
validation_gen = NpyDataGenerator(test_path,batch_size=1,shuffle=False)
print("Training samples: ",train_gen.num_samples)
print("Test samples: ",validation_gen.num_samples)

Training samples:  4200
Test samples:  1800


In [8]:
# sample = "/media/kashraf/TOSHIBA EXT/Dissertation/stage2/wavelet/train_real/cl8/ERP8_6.npy"
# data= np.load(sample)

### (2+1)D CNN
Unlike 3D CNN, in (2+1)D CNN we factorize the convolution. Instead of a single 3D convolution to process the time and space dimensions,a "(2+1)D" convolution processes the space and time dimensions separately. The figure below shows the factored spatial and temporal convolutions of a (2 + 1)D convolution.
![(2+1)D convolutions](https://www.tensorflow.org/images/tutorials/video/2plus1CNN.png)
figure source: https://www.tensorflow.org/images/. 
The main advantage of this approach is that it reduces the number of parameters. In the (2 + 1)D convolution the spatial convolution takes in data of the shape `(1, width, height)`, while the temporal convolution takes in data of the shape `(time, 1, 1)`. For example, a (2 + 1)D convolution with kernel size `(3 x 3 x 3)` would need weight matrices of size `(9 * channels**2) + (3 * channels**2)`, less than half as many as the full 3D convolution. 

### Residual block
A ResNet model is made from a sequence of residual blocks. A residual block has two branches. The main branch performs the calculation, but is difficult for gradients to flow through. The residual branch bypasses the main calculation and mostly just adds the input to the output of the main branch. Gradients flow easily through this branch. Therefore, an easy path from the loss function to any of the residual block's main branch will be present. This avoids the vanishing gradient problem.

Create the main branch of the residual block with the following class. In contrast to the standard ResNet structure this uses the custom Conv2Plus1D layer instead of

In [None]:
import tensorflow as tf
from tensorflow.keras import layers, models

def Conv2Plus1D(filters, kernel_size, padding='same'):
    """A custom layer for 2D convolution followed by 1D convolution."""
    def layer(x):
        # First apply spatial convolution
        x = layers.Conv3D(filters, (1, kernel_size[1], kernel_size[2]), padding=padding)(x)
        x = layers.BatchNormalization()(x)
        x = layers.ReLU()(x)
        # Followed by temporal convolution
        x = layers.Conv3D(filters, (kernel_size[0], 1, 1), padding=padding)(x)
        x = layers.BatchNormalization()(x)
        x = layers.ReLU()(x)
        return x
    return layer

def add_residual_block(input_tensor, filters, kernel_size, stride=(1, 1, 1)):
    """Add a residual block with optional downsampling."""
    # Main path
    x = Conv2Plus1D(filters, kernel_size, padding='same')(input_tensor)
    x = layers.MaxPooling3D(pool_size=(1, 2, 2), strides=stride, padding='same')(x)

    # Residual path
    if input_tensor.shape[-1] != filters or stride != (1, 1, 1):
        # Adjust dimensions via 1x1 convolutions if number of filters or stride changes
        res = layers.Conv3D(filters, (1, 1, 1), strides=stride, padding='same')(input_tensor)
        res = layers.BatchNormalization()(res)
    else:
        res = input_tensor

    # Combine main path with residual path
    x = layers.add([x, res])
    x = layers.ReLU()(x)
    return x

# Input tensor for a 3D model
input_shape = (176, 64, 28, 1)  # (time, height, width, channels)
input_tensor = layers.Input(shape=input_shape)

# Initial convolution layer
x = Conv2Plus1D(16, (3, 7, 7))(input_tensor)
x = layers.MaxPooling3D(pool_size=(1, 2, 2), padding='same')(x)

# Adding residual blocks
x = add_residual_block(x, 32, (3, 3, 3))
x = add_residual_block(x, 64, (3, 3, 3), stride=(2, 2, 2))
x = add_residual_block(x, 128, (3, 3, 3), stride=(2, 2, 2))
x = add_residual_block(x, 256, (3, 3, 3), stride=(2, 2, 2))

# Final global pooling and dense layer
x = layers.GlobalAveragePooling3D()(x)
x = layers.Dense(512, activation='relu')(x)
x = layers.Dropout(0.5)(x)  # Dropout added for regularization
x = layers.Dense(4)(x)  # Assuming 4 is the number of output classes or features

model = models.Model(inputs=input_tensor, outputs=x)
model.summary()


### Train the model

In [None]:
# Define the mixed precision policy
mixed_precision = tf.keras.mixed_precision.experimental.Policy('mixed_float16')
tf.keras.mixed_precision.experimental.set_policy(mixed_precision)             
checkpoint = ModelCheckpoint("/media/kashraf/TOSHIBA EXT/Dissertation/stage2/modeling/weights/3D_CNN_WT.h5",
                             monitor="val_loss",
                             mode="min",
                             save_best_only = True,
                             verbose=1)

earlystop = EarlyStopping(monitor = 'val_loss', 
                          min_delta = 0, 
                          patience = 20,
                          verbose = 1,
                          restore_best_weights = True)

reduce_lr = ReduceLROnPlateau(monitor = 'val_loss',
                              factor = 0.01,
                              patience = 5,
                              verbose = 1,
                              min_delta = 0.000001)

# we put our call backs into a callback list
callbacks = [ checkpoint,reduce_lr,earlystop]

# We use a very small learning rate 
model.compile(loss = 'categorical_crossentropy',
              optimizer = Adam(learning_rate=0.0001),
              metrics = ['accuracy'])


epochs = 100

# history= model.fit(   
#     train_gen,
#     epochs = epochs,
#     callbacks =callbacks,
#     validation_data = validation_gen)

In [11]:
import tensorflow as tf
from tensorflow.keras import layers, models
from kerastuner import HyperModel, Hyperband, Tuner



class CNN3DHyperModel(HyperModel):
    def __init__(self, input_shape):
        self.input_shape = input_shape
    
    def build(self, hp):
        model = models.Sequential()
        model.add(layers.Input(shape=self.input_shape))
        
        # Hyperparameters for convolutional layers
        for i in range(hp.Int('num_layers', 1, 5)):
            kernel_size = hp.Choice(f'kernel_size_{i}', ['3x3x3', '3x5x5', '3x7x7'])
            kernel_size = tuple(map(int, kernel_size.split('x')))
            
            model.add(layers.Conv3D(
                filters=hp.Choice(f'filters_{i}', [16, 32, 64,128]),
                kernel_size=kernel_size,
                activation=hp.Choice('activation', ['relu']),
                padding='same'
            ))
            if hp.Boolean(f'batch_norm_{i}'):
                model.add(layers.BatchNormalization())

            # Pooling layer choice
            if hp.Choice(f'pooling_type_{i}', ['max', 'avg']) == 'max':
                model.add(layers.MaxPooling3D(pool_size=(2, 2, 2)))
            else:
                model.add(layers.AveragePooling3D(pool_size=(2, 2, 2)))

            # Dropout in convolutional blocks
            model.add(layers.Dropout(rate=hp.Float(f'dropout_conv_{i}', 0, 0.5, step=0.1)))
        
        # Global pooling before the dense layers
        model.add(layers.GlobalAveragePooling3D())

        # Dense layer
        model.add(layers.Dense(
            units=hp.Int('dense_units', 256, 1024, step=64),
            activation=hp.Choice('dense_activation', ['relu'])
        ))

        # Dropout after dense layer
        model.add(layers.Dropout(rate=hp.Float('dropout_dense', 0, 0.5, step=0.1)))

        # Output layer
        model.add(layers.Dense(4))  # Assuming 4 is the number of output classes
        
        # Compile model
        optimizer_choice = hp.Choice('optimizer', ['adam', 'sgd', 'rmsprop'])
        if optimizer_choice == 'adam':
            optimizer = tf.keras.optimizers.Adam(learning_rate=hp.Float('lr_adam', 1e-4, 1e-2, sampling='log'))
        elif optimizer_choice == 'sgd':
            optimizer = tf.keras.optimizers.SGD(learning_rate=hp.Float('lr_sgd', 1e-4, 1e-2, sampling='log'))
        else:
            optimizer = tf.keras.optimizers.RMSprop(learning_rate=hp.Float('lr_rmsprop', 1e-4, 1e-2, sampling='log'))
        
        model.compile(optimizer=optimizer,
                      loss='categorical_crossentropy',
                      metrics=['accuracy'])
        
        return model

input_shape = (64, 64, 176, 1)  # (time, height, width, channels)
hypermodel = CNN3DHyperModel(input_shape=input_shape)

# Hyperparameter tuner configuration using Hyperband
tuner = Hyperband(
    hypermodel,
    objective='val_accuracy',
    max_epochs=50,
    hyperband_iterations=4,
    directory='hyperband',
    project_name='3d_cnn_opt'
)

# Assuming you have loaded and preprocessed your dataset
# train_data, train_labels, val_data, val_labels = load_data()

# Hyperparameter search
tuner.search(train_gen,epochs=50, validation_data=validation_gen)

# Get the optimal hyperparameters
best_hps = tuner.get_best_hyperparameters(num_trials=1)[0]
print(f"""
The best number of layers: {best_hps.get('num_layers')}
The best activation function: {best_hps.get('activation')}
The best pooling type: {best_hps.get('pooling_type_0')}  # Example for the first layer
The best number of filters: {best_hps.get('filters_0')}  # Example for the first layer
The best optimizer: {best_hps.get('optimizer')}
The best learning rate: {best_hps.get('lr_' + best_hps.get('optimizer'))}
The best number of dense units: {best_hps.get('dense_units')}
The best dropout rate in conv layers: {best_hps.get('dropout_conv_0')}  # Example for the first layer
The best dropout rate after dense layer: {best_hps.get('dropout_dense')}
""")


Trial 78 Complete [01h 40m 20s]
val_accuracy: 0.2688888907432556

Best val_accuracy So Far: 0.27388888597488403
Total elapsed time: 13h 06m 19s

Search: Running Trial #79

Hyperparameter    |Value             |Best Value So Far 
num_layers        |2                 |1                 
kernel_size_0     |3x7x7             |3x7x7             
filters_0         |128               |64                
activation        |relu              |relu              
batch_norm_0      |True              |True              
pooling_type_0    |avg               |avg               
dropout_conv_0    |0.4               |0                 
dense_units       |256               |512               
dense_activation  |relu              |relu              
dropout_dense     |0                 |0.5               
optimizer         |rmsprop           |rmsprop           
lr_adam           |0.00034109        |0.0056664         
kernel_size_1     |3x5x5             |3x5x5             
filters_1         |64         

In [None]:
results_path = "/media/kashraf/Elements/Dissertation/modelling/General/results/AE_denoised/figures/beta/2+1D CNN"


### Curves

In [None]:
import matplotlib.pyplot as plt
plt.style.use("ggplot")
plt.figure(figsize=(15,7))
plt.xticks(fontsize=13,weight="bold")
plt.yticks(fontsize=13,weight="bold")
loss_values = history.history['loss']
val_loss_values = history.history['val_loss']
epochs = range(1, len(loss_values) + 1)

line1 = plt.plot(epochs, val_loss_values, label='Validation Loss')
line2 = plt.plot(epochs, loss_values, label='Training Loss')
plt.setp(line1, linewidth=3.0,marker = '+', markersize=10.0)
plt.setp(line2, linewidth=3.0,marker = '4', markersize=10.0)
plt.xlabel('Epochs',fontsize=20,weight="bold") 
plt.ylabel('Loss',fontsize=20,weight="bold")
plt.grid(True)
plt.xticks(fontsize=15,weight="bold")
plt.yticks(fontsize=15,weight="bold")
plt.legend(fontsize=20,loc = "upper center")
plt.savefig(os.path.join(results_path,'Loss_3D_CNN_rms_V2.png'),bbox_inches ="tight",pad_inches =0)
plt.show()


In [None]:

plt.style.use("ggplot")
plt.figure(figsize=(15,7))
plt.xticks(fontsize=13,weight="bold")
plt.yticks(fontsize=13,weight="bold")
acc_values = history.history['accuracy']
val_acc_values = history.history['val_accuracy']
epochs = range(1, len(loss_values) + 1)

line1 = plt.plot(epochs, val_acc_values, label='Validation Accuracy')
line2 = plt.plot(epochs, acc_values , label='Training Accuracy')
plt.setp(line1, linewidth=3.0,marker = '+', markersize=10.0)
plt.setp(line2, linewidth=3.0,marker = '4', markersize=10.0)
plt.xlabel('Epochs',fontsize=20,weight="bold") 
plt.ylabel('Loss',fontsize=20,weight="bold")
plt.grid(True)
plt.xticks(fontsize=15,weight="bold")
plt.yticks(fontsize=15,weight="bold")
plt.legend(fontsize=20,loc= "lower center")
plt.savefig(os.path.join(results_path,'Accuracy_3D_CNN_rms_V2.png'),bbox_inches ="tight",pad_inches =0)
plt.show()


In [None]:
import seaborn as sr
import pandas as pd
# model= load_model("/media/kashraf/Elements/Dissertation/modelling/General/weights/3D_CNN.h5")
validation_gen = NpyDataGenerator(test_path,batch_size=4,shuffle=False)
_, y_test, _ = validation_gen[0]
y_pred= np.argmax(model.predict(validation_gen), axis=1)


plt.style.use("ggplot")
class_names=["CL_1","CL_2","CL_3","CL_4"]
# report=classi(y_test,y_pred,target_names=class_names)
np.save(os.path.join(results_path,'Report_beta_3D_CNN.npy'),report)

conf=confusion_matrix(y_test,y_pred,normalize="true")
conf_df=pd.DataFrame(conf, index=class_names, columns=class_names)
# print("\nFace  accuracy =",accuracy)
# print("\n Classification report: \n",report)
fig=plt.figure(figsize=(15,10))
sr.heatmap(conf_df,annot=True,cmap="Blues")
# plt.title("Confusion matrix")
plt.savefig(os.path.join(results_path,'Conf_Mat_beta_3D_CNN_V1.png'))
plt.show()

In [None]:
train_gen.npy_files