In [None]:
import tensorflow as tf
import numpy as np
import os
import cv2
from sklearn.model_selection import train_test_split
import time
from tqdm import tqdm
import pickle
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix

### Load Data

This section loads each video from the directory. The loaded data is then pickled and stored in file named X and Y. If you want to use the already loaded data, please move to the next section. Make sure that if you want to load the data again, load the pickled file or else the when the data is shuffled, some of the validation and test data might become part of the training data and the model will overfit on the test/validation data.

In [None]:
image_dims = (120, 120)

In [None]:
X = [] # (no. of videos, no. of frames, frame height, frame width, no. of channels of frame)
y = []

In [None]:
cheat_folder = os.path.relpath('<Relative path to Cheat folder>')

# To load all files replace with len(files)
for root, dirs, files in os.walk(cheat_folder):
    for counter in tqdm(range(0, 4500)):
        filename = files[counter]
        # Open the video

        cap = cv2.VideoCapture(cheat_folder + '/' + filename)
        frames = []
        while (cap.isOpened()):
            # Capture each frame
            ret, frame = cap.read()

            # Check if a frame is capture
            if ret:
                # Resize the frame to image_dims x 3
                frame = cv2.resize(frame, image_dims, interpolation=cv2.INTER_CUBIC)
                # Store the frame in a list
                frames.append(np.array(frame))
            else:
                break

        if len(frames) == 16:
            # Append video to x
            X.append(np.array(frames))
            # Add label 1 (cheating)
            y.append(1)

        # Release video capture object
        cap.release()

In [None]:
n_cheat_folder = os.path.relpath('<Relative path to N-Cheat folder>')

# To download all files replace with len(files)
for root, dirs, files in os.walk(n_cheat_folder):
    for counter in tqdm(range(0, 4500)):
        filename = files[counter]

        # Open the video
        cap = cv2.VideoCapture(n_cheat_folder + '/' + filename)
        frames = []
        while (cap.isOpened()):
            # Capture each frame
            ret, frame = cap.read()

            # Check if a frame is capture
            if ret:
                # Resize the frame to image_dims x 3
                frame = cv2.resize(frame, image_dims, interpolation=cv2.INTER_CUBIC)
                # Store the frame in a list
                frames.append(np.array(frame))
            else:
                break

        if len(frames) == 16:
            # Append video to x
            X.append(np.array(frames))
            # Add label 0 (not cheating)
            y.append(0)

        # Release video capture object
        cap.release()

In [None]:
# shuffle dataset
temp = list(zip(X, y))
np.random.shuffle(temp)
X, y = zip(*temp)
del temp

In [None]:
# Pickle the data
with open('X', 'wb') as f:
    pickle.dump(X, f)

with open('y', 'wb') as f:
    pickle.dump(y, f)

In [None]:
# Load pickled data
with open('X', 'rb') as f:
    X = pickle.load(f)

with open('y', 'rb') as f:
    y = pickle.load(f)

In [None]:
X = np.array(X, dtype=np.float32)
y = np.array(y)

In [None]:
print('Input shape:', X.shape)
print('Output shape:', y.shape)

### Normalize Data

In [None]:
X = X / 255.0

In [None]:
x_train, x_val, y_train, y_val = X[:7500], X[7500:], y[:7500], y[7500:]

In [None]:
# Delete extra variables, will save some space.
del X, y

In [None]:
x_val, x_test, y_val, y_test = x_val[:750], x_val[750:], y_val[:750], y_val[750:]

In [None]:
# training set = 0.7, validation set = 0.15, test set = 0.15
print('Training set:', x_train.shape, y_train.shape, '\n')
print('Validation set:', x_val.shape, y_val.shape, '\n')
print('Test set:', x_test.shape, y_test.shape)

In [None]:
# Training and test set sizes
train_set_size = x_train.shape[0]
val_set_size = x_val.shape[0]
test_set_size = x_test.shape[0]
classes = ['N-Cheat', 'Cheat']
num_of_classes = len(classes)
input_shape = (16, image_dims[0], image_dims[1], 3)

BATCH_SIZE = 64
BUFFER_SIZE = len(x_train)
steps_per_epoch = train_set_size // BATCH_SIZE

In [None]:
lstm_units = 256

In [None]:
# Shuffle dataset
train_dataset = list(zip(x_train, y_train))

val_dataset = tf.data.Dataset.from_tensor_slices((x_val, y_val)).shuffle(val_set_size)
val_dataset = val_dataset.batch(val_set_size, drop_remainder=True)

In [None]:
# Delete extra variables, will save some space.
del x_train, y_train
del x_val, y_val

In [None]:
example_input_batch, example_target_batch = next(iter(val_dataset))
example_input_batch.shape, example_target_batch.shape

In [None]:
class CNN_LSTM(tf.keras.Model):
    def __init__(self, lstm_units, num_of_classes):
        super(CNN_LSTM, self).__init__()
        self.lstm_units = lstm_units
        # Initialize inceptionV3
        self.inceptionV3 = tf.keras.applications.InceptionV3(
            input_shape=(image_dims[0], image_dims[1], 3), include_top=False, weights='imagenet', pooling='avg')
        self.inceptionV3.trainable = False
        
        # Define LSTM
        self.lstm_1 = tf.keras.layers.LSTM(
            lstm_units, 
            return_state=True,
            recurrent_initializer='glorot_uniform')
        
        self.fc = tf.keras.layers.Dense(num_of_classes)

    def call(self, x, hidden_state, cell_state):
        # x (batch size, 16, image_dims[0], image_dims[1], 3)
        # hidden_state (batch size, lstm_units)
        # cell_state (batch size, lstm_units)
        for i in range(0, x.shape[1]):
            # x[:, i, :, :, :] # (batch size, image_dims[0], image_dims[1], 3)
            out = self.inceptionV3(x[:, i, :, :, :]) # (batch size, 1, 2048)
            out = tf.expand_dims(out, 1)
            _, hidden_state, cell_state = self.lstm_1(
                out, initial_state=[hidden_state, cell_state])
            

        # Pass the last hidden state
        output = self.fc(hidden_state) # (batch size, num_of_classes)

        return output
  
    def initialize_state(self, batch_size):
        return tf.zeros((batch_size, self.lstm_units)), tf.zeros((batch_size, self.lstm_units))

In [None]:
cnn_lstm = CNN_LSTM(lstm_units, num_of_classes)

In [None]:
# Load weights (Do not execute if weights are not saved)
cnn_lstm.load_weights('cnn_lstm weights 6/cnn_lstm_weights_6')

In [None]:
# Define loss function and optimizer
optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)
# Loss object
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction='none')
# Loss function
def loss_function(real, pred):
    # real (batch size,)
    # pred (batch size, num of classes)
    loss = loss_object(real, pred)
    # Take the mean loss of the batch
    loss = tf.reduce_mean(loss)

    return loss

In [None]:
lamda = 0.01
def reg_loss_function(real, pred, weights):
    # real (batch size,)
    # pred (batch size, num of classes)
    # weights: trainable_variables
    loss = loss_object(real, pred)
    
    # L2 Loss to penalize the weights
    reg_loss = 0
    for layer_weights in weights:
        reg_loss += tf.reduce_sum(tf.square(layer_weights))
        
    reg_loss = tf.math.divide((tf.reduce_sum(loss) + lamda * reg_loss), loss.shape[0])
    loss = tf.reduce_mean(loss)
    return loss, reg_loss

In [None]:
def train_step(inp, targ):
    # Initialize hidden state and cell_state
    hidden_state, cell_state = cnn_lstm.initialize_state(inp.shape[0])

    with tf.GradientTape() as tape:
        # Run the LSTM
        output = cnn_lstm(inp, hidden_state, cell_state)

        # Calculate loss
        loss, reg_loss = reg_loss_function(targ, output, cnn_lstm.trainable_variables)

    # Get the trainable variables
    trainable_variables = cnn_lstm.trainable_variables

    # Take the gradient of trainable variables w.r.t loss
    gradients = tape.gradient(reg_loss, trainable_variables)

    # Use the optimizer to back propagate and update weights
    optimizer.apply_gradients(zip(gradients, trainable_variables))

    return loss, tf.argmax(output, axis=1, output_type=tf.int32)

In [None]:
train_loss_plot = []
val_loss_plot = []
train_acc_plot = []
val_acc_plot = []

reg_train_loss_plot = []

In [None]:
EPOCHS = 20

for epoch in range(0, EPOCHS):
    start = time.time()

    training_loss = 0
    val_loss = 0

    training_accuracy = 0
    val_accuracy = 0
    
    # Shuffle dataset
    np.random.shuffle(train_dataset)
    
    for batch in range(0, len(train_dataset)):
        inp, labels = zip(*train_dataset[batch * BATCH_SIZE: batch * BATCH_SIZE + BATCH_SIZE])
        inp = np.array(inp)
        labels = np.array(labels)
        
        batch_loss, batch_predictions = train_step(inp, labels)

        # Calculate training batch accuracy
        batch_accuracy = tf.reduce_sum(
            tf.cast(batch_predictions==labels, dtype=tf.float32)) / len(labels)

        training_loss += batch_loss
        training_accuracy += batch_accuracy

        if (batch + 1) % 10 == 0:
            print('Epoch {} Batch {} Loss {:.4f}'.format(epoch + 1,
                                            batch + 1,
                                            batch_loss.numpy()), 
                 'Accuracy {:.4f}'.format(batch_accuracy.numpy()))

    for (x_val, y_val) in val_dataset:
        # Find validation loss
        hidden_state, cell_state = cnn_lstm.initialize_state(x_val.shape[0])
        val_prediction_score = cnn_lstm(x_val, hidden_state, cell_state)
        val_loss = loss_function(y_val, val_prediction_score)

        # Validation accuracy
        val_accuracy = tf.reduce_sum(
            tf.cast(
                tf.argmax(val_prediction_score, axis=1, output_type=tf.int32)==y_val, dtype=tf.float32)) / len(y_val)

        
    train_loss_plot.append(training_loss / steps_per_epoch)
    val_loss_plot.append(val_loss.numpy())

    train_acc_plot.append(training_accuracy / steps_per_epoch)
    val_acc_plot.append(val_accuracy)
   
    print('Training: Epoch {} Loss {:.4f}'.format(epoch + 1, training_loss / steps_per_epoch), 
         'Accuracy {:.4f}'.format(training_accuracy / steps_per_epoch))

    print('Validation: Epoch {} Loss {:.4f}'.format(epoch + 1, val_loss), 
         'Accuracy {:.4f}'.format(val_accuracy))

    print('Time taken for 1 epoch {} sec\n'.format(time.time() - start))
    
    # Save weights after every epoch
    cnn_lstm.save_weights('cnn_lstm weights 6/cnn_lstm_weights_6', save_format='tf')
    # Save loss and accuracy after every epoch
    with open('cnn_lstm weights 6/train_loss_6', 'wb') as f:
        pickle.dump(train_loss_plot, f)

    with open('cnn_lstm weights 6/val_loss_6', 'wb') as f:
        pickle.dump(val_loss_plot, f)

    with open('cnn_lstm weights 6/train_acc_6', 'wb') as f:
        pickle.dump(train_acc_plot, f)

    with open('cnn_lstm weights 6/val_acc_6', 'wb') as f:
        pickle.dump(val_acc_plot, f)

In [None]:
# Load accuracy and loss
with open('cnn_lstm weights 6/train_loss_6', 'rb') as f:
    train_loss_plot = pickle.load(f)
    
with open('cnn_lstm weights 6/val_loss_6', 'rb') as f:
    val_loss_plot = pickle.load(f)
    
with open('cnn_lstm weights 6/train_acc_6', 'rb') as f:
    train_acc_plot =  pickle.load(f)
    
with open('cnn_lstm weights 6/val_acc_6', 'rb') as f:
    val_acc_plot = pickle.load(f)

In [None]:
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Loss plot')

plt.plot(train_loss_plot)
plt.plot(val_loss_plot)

plt.legend(['Training', 'Validation'])

In [None]:
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.title('Accuracy plot')

plt.plot(train_acc_plot)
plt.plot(val_acc_plot)

plt.legend(['Training', 'Validation'])

In [None]:
def evaluate(x):
    # Get intial hidden state and cell state
    hidden_state, cell_state = cnn_lstm.initialize_state(x.shape[0])
    # Predict
    predictions = cnn_lstm(x, hidden_state, cell_state)
    return predictions

In [None]:
test_predictions = evaluate(x_test) 

In [None]:
test_accuracy = tf.reduce_sum(
    tf.cast(tf.argmax(test_predictions, axis=1) == y_test, dtype=tf.float32)) / len(y_test)
    
test_loss = loss_function(y_test, test_predictions)
print('Test Accuracy:', test_accuracy)
print('Test Loss:', test_loss)

In [None]:
cm = confusion_matrix(y_test, tf.argmax(test_predictions, axis=1, output_type=tf.int32))

In [None]:
def plot_confusion_matrix(cm,
                          target_names,
                          title='Confusion matrix',
                          cmap=None,
                          normalize=True):
    """
    given a sklearn confusion matrix (cm), make a nice plot

    Arguments
    ---------
    cm:           confusion matrix from sklearn.metrics.confusion_matrix

    target_names: given classification classes such as [0, 1, 2]
                  the class names, for example: ['high', 'medium', 'low']

    title:        the text to display at the top of the matrix

    cmap:         the gradient of the values displayed from matplotlib.pyplot.cm
                  see http://matplotlib.org/examples/color/colormaps_reference.html
                  plt.get_cmap('jet') or plt.cm.Blues

    normalize:    If False, plot the raw numbers
                  If True, plot the proportions

    Usage
    -----
    plot_confusion_matrix(cm           = cm,                  # confusion matrix created by
                                                              # sklearn.metrics.confusion_matrix
                          normalize    = True,                # show proportions
                          target_names = y_labels_vals,       # list of names of the classes
                          title        = best_estimator_name) # title of graph

    Citiation
    ---------
    http://scikit-learn.org/stable/auto_examples/model_selection/plot_confusion_matrix.html

    """
    import matplotlib.pyplot as plt
    import numpy as np
    import itertools

    accuracy = np.trace(cm) / np.sum(cm).astype('float')
    misclass = 1 - accuracy

    if cmap is None:
        cmap = plt.get_cmap('Blues')

    plt.figure(figsize=(8, 6))
    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()

    if target_names is not None:
        tick_marks = np.arange(len(target_names))
        plt.xticks(tick_marks, target_names, rotation=45)
        plt.yticks(tick_marks, target_names)

    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]


    thresh = cm.max() / 1.5 if normalize else cm.max() / 2
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        if normalize:
            plt.text(j, i, "{:0.4f}".format(cm[i, j]),
                     horizontalalignment="center",
                     color="white" if cm[i, j] > thresh else "black")
        else:
            plt.text(j, i, "{:,}".format(cm[i, j]),
                     horizontalalignment="center",
                     color="white" if cm[i, j] > thresh else "black")


    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label\naccuracy={:0.4f}; misclass={:0.4f}'.format(accuracy, misclass))
    plt.show()

In [None]:
plot_confusion_matrix(cm, ['N-Cheat', 'Cheat'], normalize=True)