In [1]:
import pickle
import os
import numpy as np

## Data splitting

In [2]:
with open('dataset.pkl','rb') as f:
    dataset = pickle.load(f)

In [3]:
x = dataset['features']
np.array(x).shape

(105, 12, 64, 64, 3)

In [4]:
y = dataset['labels']
y = []
for label in dataset['labels']:
    if(label == 'correct'):
        y.append(1)
    if(label == 'incorrect'):
        y.append(0)

In [5]:
dataset['labels'] = y

In [6]:
from sklearn.model_selection import train_test_split
xtrain, xtest, ytrain, ytest = train_test_split(np.array(dataset['features']),np.array(dataset['labels']), test_size=0.25, random_state=100)

## Training

In [7]:
from keras.layers import Dense, ConvLSTM2D, MaxPooling3D, TimeDistributed, Dropout, Flatten
from keras.models import Sequential
from keras.callbacks import EarlyStopping

In [8]:
def create_model():
    model = Sequential()

    model.add(ConvLSTM2D(filters=4, kernel_size=(3,3), activation='tanh', data_format='channels_last', recurrent_dropout=0.2, return_sequences=True,
                         input_shape = (12,64,64,3)))
    model.add(MaxPooling3D(pool_size=(1,2,2),padding='same', data_format='channels_last'))
    model.add(TimeDistributed(Dropout(0.2)))

    model.add(ConvLSTM2D(filters=8, kernel_size=(3,3), activation='tanh', data_format='channels_last', recurrent_dropout=0.2, return_sequences=True))
    model.add(MaxPooling3D(pool_size=(1,2,2),padding='same', data_format='channels_last'))
    model.add(TimeDistributed(Dropout(0.2)))

    model.add(ConvLSTM2D(filters=14, kernel_size=(3,3), activation='tanh', data_format='channels_last', recurrent_dropout=0.2, return_sequences=True))
    model.add(MaxPooling3D(pool_size=(1,2,2),padding='same', data_format='channels_last'))
    model.add(TimeDistributed(Dropout(0.2)))

    model.add(ConvLSTM2D(filters=14, kernel_size=(3,3), activation='tanh', data_format='channels_last', recurrent_dropout=0.2, return_sequences=True))
    model.add(MaxPooling3D(pool_size=(1,2,2),padding='same', data_format='channels_last'))
    model.add(TimeDistributed(Dropout(0.2)))

    model.add(ConvLSTM2D(filters=16, kernel_size=(3,3), activation='tanh', data_format='channels_last', recurrent_dropout=0.2, return_sequences=True))
    model.add(MaxPooling3D(pool_size=(1,2,2),padding='same', data_format='channels_last'))
    
    model.add(Flatten())

    model.add(Dense(1, activation='sigmoid'))

    model.summary()
    return model

In [9]:
model = create_model()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv_lstm2d (ConvLSTM2D)    (None, 12, 62, 62, 4)     1024      
                                                                 
 max_pooling3d (MaxPooling3D  (None, 12, 31, 31, 4)    0         
 )                                                               
                                                                 
 time_distributed (TimeDistr  (None, 12, 31, 31, 4)    0         
 ibuted)                                                         
                                                                 
 conv_lstm2d_1 (ConvLSTM2D)  (None, 12, 29, 29, 8)     3488      
                                                                 
 max_pooling3d_1 (MaxPooling  (None, 12, 15, 15, 8)    0         
 3D)                                                             
                                                        

In [10]:
# early_stopping_callback = EarlyStopping(monitor='val_loss', patience=10, mode='min', restore_best_weights=True)

# model.compile(loss='binary_crossentropy', optimizer='Adam', metrics=['accuracy'])

# model.fit(xtrain, ytrain, epochs=50, batch_size=4, shuffle=True, callbacks=[early_stopping_callback], validation_split=0.2)

In [11]:
from keras.callbacks import EarlyStopping, ModelCheckpoint


# Define Early Stopping Callback
early_stopping_callback = EarlyStopping(monitor='val_loss', patience=10, mode='min', restore_best_weights=True)

# Define ModelCheckpoint Callback to save the model at the end of training
model_checkpoint_callback = ModelCheckpoint(filepath='best_model.h5', save_best_only=True, save_weights_only=False, monitor='val_loss', mode='min')

# Compile the model
model.compile(loss='binary_crossentropy', optimizer='Adam', metrics=['accuracy'])

# Train the model with validation data and callbacks
model.fit(xtrain, ytrain, epochs=50, batch_size=4, shuffle=True, callbacks=[early_stopping_callback, model_checkpoint_callback], validation_split=0.2)




Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50

: 

In [12]:
from keras.models import load_model

# Load the best model saved during training
best_model = load_model('best_model.h5')

In [13]:
ypred = best_model.predict(xtest)



In [16]:
# Define the threshold
threshold = 0.5

# Convert sigmoid outputs to binary values
ypred = (ypred >= threshold).astype(int)
ypred

array([[1],
       [1],
       [1],
       [1],
       [1],
       [0],
       [0],
       [0],
       [1],
       [0],
       [0],
       [0],
       [1],
       [1],
       [0],
       [1],
       [0],
       [0],
       [0],
       [0],
       [1],
       [0],
       [1],
       [1],
       [0],
       [0],
       [1]])

In [17]:
from sklearn.metrics import *

confusion_matrix(ytest,ypred)

array([[14,  0],
       [ 0, 13]], dtype=int64)