#**DigitalPolice Train Model**

---



This notebook contains the steps to build the Machine Learning model to detect anomaly from the surveillance camera videos. 

The steps are:

1. Splitting the extracted Anomaly dataset into training and testing, since from the dataset of [UCF101-Crime](https://www.crcv.ucf.edu/research/real-world-anomaly-detection-in-surveillance-videos/) that we got is already provided the splitted Normal dataset in training and testing.
2. Create directory for the training and testing dataset
3. Load the dataset into colab for training the model
4. Building the model : build custom generator, loss, and accuracy callbacks function, and the model referenced from the [paper](https://openaccess.thecvf.com/content_cvpr_2018/papers/Sultani_Real-World_Anomaly_Detection_CVPR_2018_paper.pdf).
5. Analyzing the training results and visualization


##**Connect Colab to the GCP Bucket to obtain the extracted dataset**

For the training model, we use the dataset that already preprocessed to extracting the features from the video open dataset using the C3D pre-trained model. The extracted dataset is stored at our GCP Bucket called ucf-fcrime in `/out/` directory, then we obtain the dataset to colab for training. In colab we can see it in `folderOnColab/out/` directory

In [None]:
from google.colab import auth
auth.authenticate_user()

In [None]:
!echo "deb http://packages.cloud.google.com/apt gcsfuse-bionic main" > /etc/apt/sources.list.d/gcsfuse.list
!curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | apt-key add -
!apt -qq update
!apt -qq install gcsfuse

In [None]:
!mkdir folderOnColab
!gcsfuse --implicit-dirs ucf-fcrime /content/folderOnColab

##**Create directory for the training and testing dataset**

Before we split the anomaly dataset into training and testing, we should create and specify the directory to contain the files or data.

In [None]:
# Define directories
import os

base_dir = '/content/folderOnColab/out'


train_anomaly_dir = os.path.join(base_dir, 'Train-Anomaly-2') # Directory with our training anomaly extracted features
train_normal_dir = os.path.join(base_dir, 'Train-Normal') # Directory with our training normal extracted features
validation_anomaly_dir = os.path.join(base_dir, 'Test-Anomaly-2') # Directory with our validation anomaly extracted features
validation_normal_dir = os.path.join(base_dir, 'Test_Normal')# Directory with our validation normal extracted features


##**Splitting the dataset**

The Normal dataset is already divided into Training and Testing before, yet it is not done for the Anomaly dataset. Therefore, we splitted the Anomaly dataset to training and testing into **80 : 20** ratio by selecting random files from total anomaly data in `Anomaly_data` directory, and separate the 80% to training data and the rest to testing data. The anomaly data contains specific crime category dataset i.e Robbery, Vandalism, Stealing, and Assault as we want to detect these type of anomaly. 

First, we define the function to split the dataset into training and testing with 80% for the training data.

In [None]:
#split dataset

import random

train_data_percentage = 0.8    # split the Anomaly dataset into 80:20, 80% = 0.8 for training
anomaly_files = os.listdir('/content/folderOnColab/out/Anomaly_data') # specify Anomaly data source directory in GCP Bucket
print(len(anomaly_files)) #output the total anomaly data

def split_train_test( filenames, percentage):
  
  total_files = len(filenames) #count all files from the dataset
  total_train = int(total_files * train_data_percentage) #count files from the dataset for training
  
  train_set = []                             #create list for the training data
  
  while (len(train_set) != total_train):     #keep add files for training until reach 80% of data
    files = random.randrange(total_files)    #select random files to train
    if files not in train_set:
      train_set.append(files)                #add files to the training set

  test_set = []                              #create list for the testing data
  for i in range(total_files):               #iterate to find files for training from all dataset
    if i in train_set:
      print("skipping " + str(i))            #skip file that already included in training set to put in testing set
      continue
    else:
      test_set.append(i)                     #add files to the training set
    
  train_files =[]
  for i in train_set:    
    train_files.append(filenames[i])
   
  test_files =[]
  for i in test_set:    
    test_files.append(filenames[i])
  
  return (train_files, test_files)

Then we do the splitting for the anomaly data while skipping the existing files in the training data to separate the testing data, without containing the same data within the training data

In [None]:
anomaly_train_test = (split_train_test (anomaly_files , train_data_percentage)) # splitting the dataset


after separating the dataset we copy the splitted dataset into specific directories for each training and testing anomaly data.

In [None]:
# Copy Anomaly files from source directory to train directory
from shutil import copyfile    

base_dir = "/content/folderOnColab/out/" #specify directory path

# Copy Anomaly train files from source directory to train directory
length = len (anomaly_train_test[0])
for i in range(length):
  source = ("/content/folderOnColab/out/Anomaly_data/" + anomaly_train_test[0][i])
  destination = (train_anomaly_dir + "/" + anomaly_train_test[0][i])
  copyfile(source, destination)
  print ("Copying anomaly train files")
  
# Copy Anomaly test files from source directory to train directory
length = len (anomaly_train_test[1])
for i in range(length):
  source = ("/content/folderOnColab/out/Anomaly_data/" + anomaly_train_test[1][i])
  destination = (validation_anomaly_dir + "/" + anomaly_train_test[1][i])
  copyfile(source, destination)
  print ("Copying anomaly test files")

train_anomaly = os.listdir(train_anomaly_dir) 
valid_anomaly = os.listdir(validation_anomaly_dir) 
print(f"Number of train files with anomaly copied: {len(train_anomaly)}")  
print(f"Number of test files with anomaly copied: {len(valid_anomaly)}")


##**Load the dataset**

Then, we should load the dataset that we want to used into colab to do the training step. First we need to import the necessary modules, then create the function to load the dataset, and last do the loading. To avoid huge use of memory in colab we should clear the session.

In [None]:
#import necessary modules
import numpy as np
import random
import tensorflow as tf

In [None]:
# create function to load the dataset from GCP Bucket to Colab

def load_all_dataset(abnormal_path, normal_path):

  normal_video_take = len(os.listdir(normal_path))
  abnormal_video_take = len(os.listdir(abnormal_path))

  normal_folder = os.listdir(normal_path)
  abnormal_folder = os.listdir(abnormal_path)

  len_normal_folder = len(normal_folder)
  len_abnormal_folder = len(abnormal_folder)


  normal_list = normal_folder # np.random.choice(normal_folder, size=normal_video_take, replace=False)
  abnormal_list = abnormal_folder # np.random.choice(abnormal_folder, size=abnormal_video_take, replace=False)

  all_normal = []
  all_abnormal = []

  for normal_file in normal_list:
    try:
      with open(os.path.join(normal_path, normal_file), 'r') as normal_f:
        print("reading " + normal_file)
        normal_words = normal_f.read().split()
        normal_num_feat = len(normal_words) / 4096
        btch_normal = []
        for normal_feat in range(0, int(normal_num_feat)):
          normal_feat_row = np.float32(normal_words[normal_feat * 4096:normal_feat * 4096 + 4096])
          btch_normal.append(normal_feat_row)
        all_normal.append(np.array(btch_normal))

    except IsADirectoryError:
      print("continue because " + normal_file + " is a directory")
      pass

  for abnormal_file in abnormal_list:
    try:
      with open(os.path.join(abnormal_path, abnormal_file), 'r') as abnormal_f:
        print("reading " + abnormal_file)
        abnormal_words = abnormal_f.read().split()
        abnormal_num_feat = len(abnormal_words) / 4096
        btch_abnormal = []
        for abnormal_feat in range(0, int(abnormal_num_feat)):
          abnormal_feat_row = np.float32(abnormal_words[abnormal_feat * 4096:abnormal_feat * 4096 + 4096])
          btch_abnormal.append(abnormal_feat_row)

        all_abnormal.append(np.array(btch_abnormal))

    except IsADirectoryError:
      print("continue because " + abnormal_file + " is a directory")
      pass

  return np.array(all_normal), np.array(all_abnormal)
      


In [None]:
#load the dataset for training

train_normal_x, train_abnormal_x = load_all_dataset(train_anomaly_dir, train_normal_dir)

In [None]:
#load the dataset for testing
test_normal_x, test_abnormal_x = load_all_dataset(validation_anomaly_dir, validation_normal_dir)

In [None]:
tf.keras.backend.clear_session()

##**Building the model**

After the dataset is ready, then we build the model. In this model we use custom generator, loss function, and accuracy callback referencing to the resource paper.

First, we create the custom generator and specify each for training and testing

In [None]:
#create custom generator

def generator(normal_x, abnormal_x, batchsize):
  n_exp= int(batchsize/2)
  
  
  while True:
    normal_x_indices = np.arange(normal_x.shape[0])
    abnormal_x_indices = np.arange(abnormal_x.shape[0])

    normal_get = np.random.choice(normal_x_indices, n_exp, replace=False)
    abnormal_get = np.random.choice(abnormal_x_indices, n_exp, replace=False)


    Abnor_list_iter = abnormal_x[abnormal_get]
    Norm_list_iter = normal_x[normal_get]
    

    AllFeatures = []
    # print("Loading Anomaly videos Features...")

    Video_count=-1
    for normal_video in Norm_list_iter:
      Video_count=Video_count+1
      count = -1;
      VideoFeatures = []
      for feat in normal_video:
        count = count + 1
        if count == 0:
          VideoFeatures = feat
        if count > 0:
          VideoFeatures = np.vstack((VideoFeatures, feat))

      if Video_count == 0:
        AllFeatures = VideoFeatures
      if Video_count > 0:
        AllFeatures = np.vstack((AllFeatures, VideoFeatures))

    # print(" Abnormal Features  loaded")
    
    # print("Loading Normal videos...")
    # print(AllFeatures.shape)
    for abnormal_video in Abnor_list_iter:
      Video_count=Video_count+1
      # print("[{}] loading vide {}".format(Video_count, VideoPath))
      
      count = -1;
      VideoFeatures = []
      for feat in abnormal_video:
        count = count + 1
        if count == 0:
          VideoFeatures = feat
        if count > 0:
          VideoFeatures = np.vstack((VideoFeatures, feat))

      AllFeatures = np.vstack((AllFeatures, VideoFeatures))

    # print("Features  loaded")

    AllLabels = np.zeros(32*batchsize, dtype='float32')
    th_loop1=n_exp*32
    th_loop2=n_exp*32-1

    for iv in range(0, 32*batchsize):
      if iv< th_loop1:
        AllLabels[iv] = 0.0
      if iv > th_loop2:
        AllLabels[iv] = 1.0

    # print(np.array(AllFeatures).shape)
    # print(np.array(AllLabels).shape)

    yield  np.array(AllFeatures), np.array(AllLabels)


In [None]:
#specify the generator for training and testing with batch size = 10

train_generator = generator(train_normal_x, train_abnormal_x, 10)
validation_generator = generator(test_normal_x, test_abnormal_x, 10)

Then, create the Accuracy (ACC) calback function 

In [None]:
# create accuracy callback function

class AccCallback(Callback):
  def __init__(self,normal_x, abnormal_x):
    self.normal_x = normal_x
    self.abnormal_x = abnormal_x


  def on_train_begin(self, logs={}):
    return

  def on_train_end(self, logs={}):
    return

  def on_epoch_begin(self, epoch, logs={}):
    return

  def on_epoch_end(self, epoch, logs={}):
    if epoch % 100 == 0:
      tp = 0
      tn = 0
      fp = 0
      fn = 0

      for vid in self.abnormal_x:
        pr = self.model.predict(vid)

        pr = pr > 0.5

        out = True in pr

        if out:
          tp += 1
        else:
          fn += 1

      for vid in self.normal_x:
        pr = model.predict(vid)

        pr = pr > 0.5

        out = True in pr

        if out:
          fp += 1
        else:
          tn += 1

      print('tp = {}, tn = {}, fp = {}, fn = {}, precision = {}, recall = {}, acc = {}'.format(tp, tn, fp, fn, tp / (tp + fp), tp / (tp + fn), (tp + tn) / (tp + tn + fp + fn)))
    return

  def on_batch_begin(self, batch, logs={}):
    return

  def on_batch_end(self, batch, logs={}):
    return

acc = AccCallback(test_normal_x, test_abnormal_x)

after that, create the custom loss function

In [None]:
#create custom loss function

@tf.function
def custom_objective(y_true, y_pred):
    
  y_true = tf.reshape(y_true, [-1])
  y_pred = tf.reshape(y_pred, [-1])

  # print(y_true)
  # print(y_pred)
  
  n_seg = 32
  nvid = 10
  n_exp = nvid / 2
  n_exp = int(n_exp)
  Num_d=32*nvid

  sub_max = tf.ones_like(y_pred)
  sub_sum_labels = tf.ones_like(y_true)
  sub_sum_l1= tf.ones_like(y_true) 
  sub_l2 = tf.ones_like(y_true)

  for ii in range(0, nvid, 1):
    
    mm = y_true[ii * n_seg:ii * n_seg + n_seg]
    # print(tf.stack([tf.math.reduce_sum(mm)]))
    sub_sum_labels = tf.concat([sub_sum_labels, tf.stack([tf.math.reduce_sum(mm)])], 0)

    Feat_Score = y_pred[ii * n_seg:ii * n_seg + n_seg]
    sub_max = tf.concat([sub_max, tf.stack([tf.math.reduce_max(Feat_Score)])], 0)  
    sub_sum_l1 = tf.concat([sub_sum_l1, tf.stack([tf.math.reduce_sum(Feat_Score)])], 0)

    z1 = tf.ones_like(Feat_Score)
    z2 = tf.concat([z1, Feat_Score], 0)
    z3 = tf.concat([Feat_Score, z1], 0)
    z_22 = z2[31:]
    z_44 = z3[:33]
    z = z_22 - z_44
    z = z[1:32]
    z = tf.math.reduce_sum(tf.math.square(z))
    sub_l2 = tf.concat([sub_l2, tf.stack([z])], 0)


  sub_score = sub_max[Num_d:]
  F_labels = sub_sum_labels[Num_d:]
  

  sub_sum_l1 = sub_sum_l1[Num_d:]
  sub_sum_l1 = sub_sum_l1[:n_exp]
  sub_l2 = sub_l2[Num_d:]
  sub_l2 = sub_l2[:n_exp]

  indx_nor = tf.where(tf.math.equal(F_labels, 0))# tf.experimental.numpy.nonzero(tf.cond(tf.math.equal(F_labels, 32), lambda: 1, lambda: 0))[0]
  indx_abn = tf.where(tf.math.equal(F_labels, 32))# tf.experimental.numpy.nonzero(tf.cond(tf.math.equal(F_labels, 0), lambda: 1, lambda: 0))[0]

  n_Nor=n_exp

  Sub_Nor = tf.gather_nd(sub_score, indx_nor)
  Sub_Abn = tf.gather_nd(sub_score, indx_abn)

  z = tf.ones_like(y_true)
  for ii in range(0, n_Nor, 1):
    # print(Sub_Abn)
    # print(Sub_Nor)
    sub_z = tf.math.maximum(1 - Sub_Abn[ii] + Sub_Nor[ii], 0)
    z = tf.concat([z, tf.stack([tf.math.reduce_sum(sub_z)])], 0)

  z = z[Num_d:]
  z = tf.math.reduce_mean(z, axis=-1) +  0.00008*tf.math.reduce_sum(sub_sum_l1) + 0.00008*tf.math.reduce_sum(sub_l2)
  # print(z)
  return z

Lasts, build the CNN model that referenced to the resources paper

In [None]:
#building the CNN model with input shape 4096 as a result from the extracted features using C3D
#use 0.6 dropout between the layers
#output one to determine anomaly (1) or normal (0)

model = tf.keras.Sequential([
                             tf.keras.layers.Input(shape=(4096,)),
                             tf.keras.layers.Dropout(0.6),
                             tf.keras.layers.Dense(512, kernel_initializer='glorot_normal', kernel_regularizer=tf.keras.regularizers.L2(0.001), activation='relu'),
                             tf.keras.layers.Dropout(0.6),
                             tf.keras.layers.Dense(32, kernel_initializer='glorot_normal', kernel_regularizer=tf.keras.regularizers.L2(0.001)),
                             tf.keras.layers.Dropout(0.6),
                             tf.keras.layers.Dense(1, kernel_initializer='glorot_normal', kernel_regularizer=tf.keras.regularizers.L2(0.001), activation='sigmoid'),
])

model.summary()

To compile the model we use Adagrad optimizer with learning rate 0.1 and use the custom loss function

In [None]:
#compile the model with Adagrad optimizer and custom loss function
model.compile(optimizer=tf.keras.optimizers.Adagrad(0.01, epsilon=1e-08), loss=custom_objective)

Train the model in 500 epochs to avoid overfitting. For analyses purposes we specify the model into history variable to plot it in graph visualization

In [None]:
history = model.fit(train_generator, validation_data=validation_generator, epochs=500, steps_per_epoch=1, validation_steps=1, callbacks=[acc])

###**save the model**

After the model is done training, we save the model for deployment purpose. 

Download it from the colab to store it in cloud for further deployment.

In [None]:
model.save('model-v2.h5')

In [None]:
tf.saved_model.save(model, 'wuoot')

In [None]:
!zip -r /content/file.zip /content/wuoot

##**Analyzing the results**

To make sure we have the proper model, we do analyze the results through loss graph, and knowing the ROC, precision, f1, recall and False Alarm Rate values to decide whether it is already good or not to be used.

###**Loss graph**
first, we plot the `loss` and `val_loss` from the model to decide best number for the epochs to avoid overfitting model.

In [None]:
#plot the loss and val_loss per epochs to a graph to know the proper epochs to avoid overfitting

import matplotlib.pyplot as plt

loss = history.history['loss']
val_loss = history.history['val_loss']

epochs = range(len(loss))

plt.plot(epochs, loss, 'r', label='Training loss')
plt.plot(epochs, val_loss, 'b', label='Validation loss')
plt.title('Training and validation loss')
plt.legend(loc=0)
plt.figure()


plt.show()

load weights from the saved model to further results analysis

In [None]:
#load model weights from the saved model

model.load_weights('model-v2.h5')

###**Confusion matrix**
Analyze the confusion matrix

In [None]:
#analyze the confusion matrix from the model, only count the prediction score that have > 0.5 values as the threshold

from sklearn.metrics import confusion_matrix

new_pred = prediction > 0.5

a = confusion_matrix(yy, new_pred)

tn, fp, fn, tp = a.ravel()
print(a)
print("tn = {}, fp = {}, fn = {}, tp = {}".format(tn, fp, fn, tp))


###**Precision, Recall, F1, and False Alarm Rate**
analyze the precision, False Alarm Rate (from false positive rate), recall and f1 from the training result

In [None]:
precision = tp / (tp + fp)
FAR = fp / (fp + tn)
recall = tp / (tp + fn)
f1 = 2 * precision * recall / (precision + recall)

print("precision = {}, FAR (False Alarm rate = {}, recall = {}, f1 = {}".format(precision, FAR, recall, f1))

In [None]:
#analyze the predicted result from test abnormal or anomaly videos, indicates True for the anomaly with >0.5 values as it is nearer to the anomaly values (1)

true = 0
false = 0

for vid in test_abnormal_x:
  pr = model.predict(vid)

  pr = pr > 0.5

  out = True in pr

  if out:
    true += 1
  else:
    false += 1

  print(out)

print(true / (true + false))


In [None]:
#analyze the predicted result from test normal videos, indicates True for the anomaly with >0.5 values as it is nearer to the anomaly values (1)

true = 0
false = 0

for vid in test_normal_x:
  pr = model.predict(vid)

  pr = pr > 0.5

  out = True in pr

  if out:
    false += 1
  else:
    true += 1

  print(out)

print(true / (true + false))

##**Test Ground**

This section is for testing the model with test dataset

In [None]:
#import necessary module

import re
import os

We import the C3D pretrained model referenced to `adamcasson` in [github](https://github.com/ardanto14/digital-police/blob/main/C3D/C3D.py), to do the testing

In [None]:
# -*- coding: utf-8 -*-
"""C3D model for Keras
# Reference:
- [Learning Spatiotemporal Features with 3D Convolutional Networks](https://arxiv.org/abs/1412.0767)
Based on code from @albertomontesg
"""

import skvideo.io
import tensorflow.keras.backend as K
from tensorflow.keras.models import Sequential
from tensorflow.keras.utils import get_file
from tensorflow.keras.layers import Dense, Dropout, Flatten
from tensorflow.keras.layers import Conv3D, MaxPooling3D, ZeroPadding3D

WEIGHTS_PATH = 'https://github.com/adamcasson/c3d/releases/download/v0.1/sports1M_weights_tf.h5'

def C3D(weights='sports1M'):
    """Instantiates a C3D Kerasl model
    
    Keyword arguments:
    weights -- weights to load into model. (default is sports1M)
    
    Returns:
    A Keras model.
    
    """
    
    if weights not in {'sports1M', None}:
        raise ValueError('weights should be either be sports1M or None')
    
    if K.image_data_format() == 'channels_last':
        shape = (16,112,112,3)
    else:
        shape = (3,16,112,112)
        
    model = Sequential()
    model.add(Conv3D(64, 3, activation='relu', padding='same', name='conv1', input_shape=shape))
    model.add(MaxPooling3D(pool_size=(1,2,2), strides=(1,2,2), padding='same', name='pool1'))
    
    model.add(Conv3D(128, 3, activation='relu', padding='same', name='conv2'))
    model.add(MaxPooling3D(pool_size=(2,2,2), strides=(2,2,2), padding='valid', name='pool2'))
    
    model.add(Conv3D(256, 3, activation='relu', padding='same', name='conv3a'))
    model.add(Conv3D(256, 3, activation='relu', padding='same', name='conv3b'))
    model.add(MaxPooling3D(pool_size=(2,2,2), strides=(2,2,2), padding='valid', name='pool3'))
    
    model.add(Conv3D(512, 3, activation='relu', padding='same', name='conv4a'))
    model.add(Conv3D(512, 3, activation='relu', padding='same', name='conv4b'))
    model.add(MaxPooling3D(pool_size=(2,2,2), strides=(2,2,2), padding='valid', name='pool4'))
    
    model.add(Conv3D(512, 3, activation='relu', padding='same', name='conv5a'))
    model.add(Conv3D(512, 3, activation='relu', padding='same', name='conv5b'))
    model.add(ZeroPadding3D(padding=(0,1,1)))
    model.add(MaxPooling3D(pool_size=(2,2,2), strides=(2,2,2), padding='valid', name='pool5'))
    
    model.add(Flatten())
    
    model.add(Dense(4096, activation='relu', name='fc6'))
    model.add(Dropout(0.5))
    model.add(Dense(4096, activation='relu', name='fc7'))
    model.add(Dropout(0.5))
    model.add(Dense(487, activation='softmax', name='fc8'))

    if weights == 'sports1M':
        weights_path = get_file('sports1M_weights_tf.h5',
                                WEIGHTS_PATH,
                                cache_subdir='models',
                                md5_hash='b7a93b2f9156ccbebe3ca24b41fc5402')
        
        model.load_weights(weights_path)
    
    return model

In [None]:
#copy model from colab
!cp /content/model-v2.h5 classify_weights_tf.h5

import the utils.py to trained the C3D, we already put the code in this [github](https://github.com/ardanto14/digital-police/blob/main/C3D/utils.py)

In [None]:
import numpy as np
from scipy.misc import imresize
from tensorflow.keras.utils import get_file


C3D_MEAN_PATH = 'https://github.com/adamcasson/c3d/releases/download/v0.1/c3d_mean.npy'
SPORTS1M_CLASSES_PATH = 'https://github.com/adamcasson/c3d/releases/download/v0.1/sports1M_classes.txt'

def preprocess_input(video):
    """Resize and subtract mean from video input
    
    Keyword arguments:
    video -- video frames to preprocess. Expected shape 
        (frames, rows, columns, channels). If the input has more than 16 frames
        then only 16 evenly samples frames will be selected to process.
    
    Returns:
    A numpy array.
    
    """
    intervals = np.ceil(np.linspace(0, video.shape[0]-1, 16)).astype(int)
    frames = video[intervals]
    
    # Reshape to 128x171
    reshape_frames = np.zeros((frames.shape[0], 128, 171, frames.shape[3]))
    for i, img in enumerate(frames):
        img = imresize(img, (128,171), 'bicubic')
        reshape_frames[i,:,:,:] = img
        
    mean_path = get_file('c3d_mean.npy',
                         C3D_MEAN_PATH,
                         cache_subdir='models',
                         md5_hash='08a07d9761e76097985124d9e8b2fe34')
    
    # Subtract mean
    mean = np.load(mean_path)
    reshape_frames -= mean
    # Crop to 112x112
    reshape_frames = reshape_frames[:,8:120,30:142,:]
    # Add extra dimension for samples
    reshape_frames = np.expand_dims(reshape_frames, axis=0)
    
    return reshape_frames

def decode_predictions(preds):
    """Returns class label and confidence of top predicted answer
    
    Keyword arguments:
    preds -- numpy array of class probability
    
    Returns:
    A list of tuples.
    
    """
    class_pred = []
    for x in range(preds.shape[0]):
        class_pred.append(np.argmax(preds[x]))
    
    labels_path = get_file('sports1M_classes.txt',
                           SPORTS1M_CLASSES_PATH,
                           cache_subdir='models',
                           md5_hash='c102dd9508f3aa8e360494a8a0468ad9')
    
    with open(labels_path, 'r') as f:
        labels = [lines.strip() for lines in f]
        
    decoded = [(labels[x],preds[i,x]) for i,x in enumerate(class_pred)]
    
    return decoded

In [None]:
endfile = []

for file in os.listdir('/content/folderOnColab/out/Test-Anomaly'):
  result = re.search(r"([a-zA-Z]*)(\d*)_", file)
  path = "/content/folderOnColab/" + result.groups()[0] + '/'
  filename = file.split('.')[0] + '.mp4'
  endfile.append(path + filename)

print(endfile)

In [None]:
endfile2 = []

for file in os.listdir('/content/folderOnColab/out/Test_Normal'):
  result = re.search(r"([a-zA-Z]*)(\d*)_", file)
  path = "/content/folderOnColab/" + 'Testing-Normal-Videos' + '/'
  
  filename = file.split('.')[0] + '.mp4'

  
  endfile2.append(path + filename)

print(endfile2)

In [None]:
from tensorflow.keras import Model
import tensorflow as tf
import cv2

base_model = C3D(weights='sports1M')
feature_extractor = Model(inputs=base_model.input, outputs=base_model.get_layer('fc6').output)




prediction_model = tf.keras.Sequential([
    tf.keras.layers.Input(shape=(4096,)),
    tf.keras.layers.Dropout(0.6),
    tf.keras.layers.Dense(512, kernel_initializer='glorot_normal', kernel_regularizer=tf.keras.regularizers.L2(0.001), activation='relu'),
    tf.keras.layers.Dropout(0.6),
    tf.keras.layers.Dense(32, kernel_initializer='glorot_normal', kernel_regularizer=tf.keras.regularizers.L2(0.001)),
    tf.keras.layers.Dropout(0.6),
    tf.keras.layers.Dense(1, kernel_initializer='glorot_normal', kernel_regularizer=tf.keras.regularizers.L2(0.001), activation='sigmoid'),
])

prediction_model.load_weights('classify_weights_tf.h5')

model = tf.keras.Sequential([
    feature_extractor,
    tf.keras.layers.Flatten(),
    prediction_model
])

after importing the pretrained model we could do the testing for anomaly and normal testing dataset implementing our model

In [None]:
tp = 0
tn = 0
fn = 0
fp = 0

for fil in endfile:
  print('processing video {}'.format(fil))
  cap = cv2.VideoCapture(fil)

  batch_size = 32
  ln = 0
  ln_btch = 0
  all_batch = []
  all_frame = []
  is_anomaly = False
  while True:
      
      ret, frame = cap.read()
      if not ret:
          break

      all_frame.append(frame)
      ln += 1

      if ln == 16:
          x = preprocess_input(np.array(all_frame))
          x = np.squeeze(x)
          
          all_batch.append(x)
          ln_btch += 1

          if ln_btch == batch_size:
              prediction = model.predict(np.array(all_batch))

              prediction = prediction > 0.5

              if prediction.any():
                  is_anomaly = True

              all_batch = []
              ln_btch = 0
          
          
          
          all_frame = []
          ln = 0

  if ln_btch > 0:
      prediction = model.predict(np.array(all_batch))

      prediction = prediction > 0.5

      if prediction.any():
          is_anomaly = True

      all_batch = []
      ln_btch = 0

  if is_anomaly:
    tp += 1
  else:
    fn += 1

  cap.release()

tn = 0
fp = 0

for fil in endfile2:
  print('processing video {}'.format(fil))
  cap = cv2.VideoCapture(fil)

  batch_size = 32
  ln = 0
  ln_btch = 0
  all_batch = []
  all_frame = []
  is_anomaly = False
  while True:
      
      ret, frame = cap.read()
      if not ret:
          break

      all_frame.append(frame)
      ln += 1

      if ln == 16:
          x = preprocess_input(np.array(all_frame))
          x = np.squeeze(x)
          
          all_batch.append(x)
          ln_btch += 1

          if ln_btch == batch_size:
              prediction = model.predict(np.array(all_batch))

              prediction = prediction > 0.5

              if prediction.any():
                  is_anomaly = True

              all_batch = []
              ln_btch = 0
          
          
          
          all_frame = []
          ln = 0

  if ln_btch > 0:
      prediction = model.predict(np.array(all_batch))

      prediction = prediction > 0.5

      if prediction.any():
          is_anomaly = True

      all_batch = []
      ln_btch = 0

  if is_anomaly:
    fp += 1
  else:
    tn += 1

  cap.release()

precision = tp + (tp / fp)
recall = tp + (tp / fn)
accuracy = (tp + tn) / (tp + tn + fp + fn)
FAR = fp / (fp + tn)
print("tp = {}, tn = {}, fp = {}, fn = {}, precision = {}, recall = {}, accuracy = {}, FAR = {}".format(tp, tn, fp, fn, precision, recall, accuracy, FAR))