In [None]:
from google.colab import drive
drive.mount('/content/drive/')

Mounted at /content/drive/


Model definition for all 3D CNNs built from scratch.

They require an input_shape for the first layer, which is usually chosen with the dimension of the first sample in the train dataset.

There are two versions: the second model is actually simpler than the first one, but the latter is trained on two different versions of the dataset with 2 seconds and 3 seconds bursts.

In [None]:
from tqdm.notebook import tqdm
import cv2
import numpy as np
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv3D, MaxPooling3D, Conv2D, MaxPooling2D, Flatten, Dense, Dropout
from tensorflow.keras.callbacks import EarlyStopping
import os
from tensorflow.keras.utils import to_categorical
import pickle
import sys
import random
random.seed(33)

def 3DFirstModel(input_shape):
  model = Sequential()
  model.add(Conv3D(16, kernel_size=(1, 3, 3), activation='relu', input_shape=input_shape))
  model.add(MaxPooling3D(pool_size=(1, 2, 2)))
  model.add(Conv3D(32, kernel_size=(3, 1, 1), activation='relu'))
  model.add(MaxPooling3D(pool_size=(2, 1, 1)))
  model.add(Flatten())
  model.add(Dense(1, activation='sigmoid'))
  return model

def 3DFSecondModel(input_shape):
  model = Sequential()
  model.add(Conv3D(16, kernel_size=(3, 3, 3), activation='relu', input_shape=input_shape))
  model.add(MaxPooling3D(pool_size=(2, 2, 2)))
  model.add(Flatten())
  model.add(Dense(1, activation='sigmoid'))
  return model

def create_cnn(name, input_shape):
  if name == '3DFirstModel':
    return 3DFirstModel(input_shape)
  if name == '3DFSecondModel2Seconds' or name == '3DFSecondModel3Seconds':
    return 3DFSecondModel(input_shape)
  return None


Methods used for training and testing the network.

For training, provide a model and a dataset, in the format of the load_video_dataset method. The result of the train function is the trained model to be used by the test method. The function also shows a graph of the history training of the model, which contains loss and accuracy for both training and validation.

The test function calculates statistics over the testing dataset and plots the corresponding confusion matrix and roc curve

In [None]:
def plot_history(history, method):
  fig, (ax1, ax2) = plt.subplots(2)
  fig.set_size_inches(18.5, 10.5)
  # Do not use the default fractional ticks for 'epochs' axis
  plt.setp((ax1, ax2), xticks=range(method['EPOCHS']))
  # Plot loss
  ax1.set_title('Loss')
  ax1.plot(history.history['loss'], label = 'train')
  ax1.plot(history.history['val_loss'], label = 'test')
  ax1.set_ylabel('Loss')
  # Determine upper bound of y-axis
  max_loss = max(history.history['loss'] + history.history['val_loss'])
  ax1.set_ylim([0, np.ceil(max_loss)])
  ax1.set_xlabel('Epoch')
  ax1.legend(['Train', 'Validation'])
  # Plot accuracy
  ax2.set_title('Accuracy')
  ax2.plot(history.history['accuracy'],  label = 'train')
  ax2.plot(history.history['val_accuracy'], label = 'test')
  ax2.set_ylabel('Accuracy')
  ax2.set_ylim([0, 1])
  ax2.set_xlabel('Epoch')
  ax2.legend(['Train', 'Validation'])
  plt.show()


def train(X_train, Y_train, model_name, method):
  H5_FILENAME = f'{ROOT_PATH}/Modelli/{model_name}.h5'
  try: # load the h5 checkpoint
    model = tf.keras.models.load_model(H5_FILENAME)
  except: # create and train the model if we don't have a saved h5 checkpoint
    print("failed to load model. Attempting to recreate it")
    try:
      model = create_cnn(model_name, X_train[0].shape)
    except:
      print('Cannot create CNN')
      return None
    # Model compilation
    model.compile(optimizer=method['OPTIMIZER'], loss='binary_crossentropy', metrics=['accuracy'])
    print("compile completed")
    # Early stopping over validation loss uses restore_best_weights to save the best model
    early_stopping = EarlyStopping(monitor='val_loss', patience=method['PATIENCE'], restore_best_weights=True)
    try:
      # We save the history of loss and accuracy for both training and validation
      history = model.fit(X_train, Y_train, epochs=method['EPOCHS'], batch_size=method['BATCH_SIZE'], validation_split=method['VALIDATION_SPLIT'], callbacks=[early_stopping])
    except Exception as e:
      print(e)
      raise Exception('cannot fit')
    # save the h5 checkpoint
    model.save(H5_FILENAME)
    plot_history(history, method)
  print(f"train completed {model_name}")
  # Return the trained model for it to be tested
  return model

def test(model, X_test, Y_test):
  # Metrics
  predictions = model.predict(X_test)
  # The threshold is set to 50% for plotting the confusion matrix
  # For different error costs scenarios, a different threshold can be chosen
  threshold = 0.5
  Y_prediction = np.where(predictions > threshold, 1, 0)
  # Classification report
  # Contains some valuable metrics, like accuracy and recall for the different classes
  classification_report = metrics.classification_report(Y_test, Y_prediction, digits = 5)
  print(classification_report)
  # conf matrix
  conf_matrix = confusion_matrix(Y_test, Y_prediction)
  disp = ConfusionMatrixDisplay(confusion_matrix=conf_matrix,
                                display_labels=['Non violence','Violence'])
  disp.plot()
  # roc
  fpr, tpr, th = metrics.roc_curve(Y_test, predictions)
  roc_auc = metrics.roc_auc_score(Y_test, predictions)
  plt.figure()
  plt.plot(fpr, tpr, label='ROC curve (area = %0.2f)' % roc_auc)
  plt.plot([0, 1], [0, 1], color='navy', linestyle='--')
  plt.xlim([0.0, 1.0])
  plt.ylim([0.0, 1.05])
  plt.xlabel('False Positive Rate')
  plt.ylabel('True Positive Rate')
  plt.title('Receiver operating characteristic')
  plt.legend(loc="lower right")
  plt.show()


Retrieves the labelled training and testing dataset to be used by the training function.

If the specified pickle files are not found in the dataset_path directory, the function fails. To actually build the dataset, use the create_video_dataset function defined in the DatasetPreprocessing file.

In [None]:
def load_video_dataset(dataset_path, pkl_config, pickle_name):
  train_data = []
  train_labels = []
  test_data = []
  test_labels = []

  pickles_dir = f'{ROOT_PATH}/pickles'

  train_data_pkl_filepath = f'{pickles_dir}/{pickle_name}-train_data.pkl'
  train_label_pkl_filepath = f'{pickles_dir}/{pickle_name}-train_labels.pkl'
  test_data_pkl_filepath = f'{pickles_dir}/{pickle_name}-test_data.pkl'
  test_label_pkl_filepath = f'{pickles_dir}/{pickle_name}-test_labels.pkl'
  train_data, train_labels, test_data, test_labels = None, None, None, None
  try:
    with open(train_data_pkl_filepath, 'rb') as trd, open(train_label_pkl_filepath, 'rb') as trl, open(test_data_pkl_filepath, 'rb') as ted, open(test_label_pkl_filepath, 'rb') as tel:
      train_data = pickle.load(trd)
      train_labels = pickle.load(trl)
      test_data = pickle.load(ted)
      test_labels = pickle.load(tel)
    print("extracted from cached pickles")
  except FileNotFoundError:
    print(f"Cannot find {pickle_name}. Please create the pickle files using create_video_dataset()")
  except Exception as e:
    print(f"An unexpected error occurred: {e}")
  return train_data, train_labels, test_data, test_labels

Example usage of the load_video_dataset, train and test function.

Due to the dependency between specific dataset versions (with 2 seconds and 3 seconds bursts) and the model used, we redifine `dataset_configs` multiple times with the correct parameters.

In [None]:
trainmethod_configs = {
  'EPOCHS':  30,
  'BATCH_SIZE':  32,
  'OPTIMIZER':  'adam',
  'VALIDATION_SPLIT':  0.2,
  'PATIENCE':  15,
  'START_FROM_EPOCH':  5
}

ROOT_PATH = 'drive/MyDrive/Piras_Quint_Volpi'
dataset_path = f'{ROOT_PATH}/Dataset originale'


In [None]:
dataset_configs = {
  'SIZE':  224,
  'FRAMES':  5,
  'TRAIN_SPLIT':  0.8,
  'FPS':  5,
  'CROP':  True
}

X_train, Y_train, X_test, Y_test = load_video_dataset(dataset_path, dataset_configs, 'default')
model = train(X_train, Y_train, '3DFirstModel', trainmethod_configs):
results = test(model, X_test, Y_test)

In [None]:
dataset_configs = {
  'SIZE':  224,
  'FRAMES':  10,
  'TRAIN_SPLIT':  0.8,
  'FPS':  5,
  'CROP':  True
}

X_train, Y_train, X_test, Y_test = load_video_dataset(dataset_path, dataset_configs, 'default')
model = train(X_train, Y_train, '3DSecondModel2Seconds', trainmethod_configs):
results = test(model, X_test, Y_test)

In [None]:
dataset_configs = {
  'SIZE':  224,
  'FRAMES':  15,
  'TRAIN_SPLIT':  0.8,
  'FPS':  5,
  'CROP':  True
}

X_train, Y_train, X_test, Y_test = load_video_dataset(dataset_path, dataset_configs, 'default')
model = train(X_train, Y_train, '3DSecondModel3Seconds', trainmethod_configs):
results = test(model, X_test, Y_test)