In [None]:
import os
import numpy as np
import matplotlib.pyplot as plt
import seqborn as sns
import pandas as pd
import seaborn as sns
import cv2
from sklearn.model_selection import train_test_split
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.metrics import f1_score
import statistics
import tensorflow as tf
from tensorflow.keras.utils import to_categorical
from tensorflow import keras
from tensorflow.keras.models import load_model
from keras import Sequential
from keras import layers
from keras.callbacks import EarlyStopping
from keras import callbacks
from collections import deque
import json

In [None]:
#Define HyperParameters

IMAGE_HEIGHT = 224
IMAGE_WIDTH = 224
NUM_FEATURES = 1024
SEQUENCE_LENGTH = 10
DATASET_DIR = "Real Life Violence Dataset"
path = f'/datasets/{DATASET_DIR}'
classes_labels = os.listdir(path)
EXAMPLES = 2000

In [None]:
#Showing Number of examples in each class

for i, vid in enumerate(classes_labels):
    class_video_nums = os.listdir(f'{path}/{vid}')
    print("Class %s has %d examples" %(vid, len(class_video_nums)))

In [None]:
#Assigning Violent=1 and notViolent=0 Label to each example

labels = np.zeros(shape=(EXAMPLES))
v = 0
for ind,val in enumerate(classes_labels):
    video_paths = os.listdir(f'{path}/{val}')
    print(len(video_paths))
    for e, video_path in enumerate(video_paths):
        if ( val == 'Violence'):
            labels[v] = 1
        else:
            labels[v] = 0
        v = v + 1

In [None]:
#extract and process the frames of a Video example, return ndarray with RGB values of 'SEQUENCE_LENGTH' Frames the Video example

def frames_extraction(video_path, method):
    frames_list = []
    video_reader = cv2.VideoCapture(video_path)
    video_frames_count = int(video_reader.get(cv2.CAP_PROP_FRAME_COUNT))
    if (video_frames_count < SEQUENCE_LENGTH):
        return frames_list
    if (method == 'sparse'): #Takes 'SEQUENCE_LENGTH' Frames sparsed along the video
        skip_frames_window = max(int(video_frames_count/SEQUENCE_LENGTH), 1)
        for frame_counter in range(SEQUENCE_LENGTH):
            video_reader.set(cv2.CAP_PROP_POS_FRAMES, frame_counter * skip_frames_window)
            ok, frame = video_reader.read() 
            if not ok:
                break
            frame = cv2.resize(frame, (IMAGE_HEIGHT, IMAGE_HEIGHT))
            frame = frame / 255
            frames_list.append(frame)
    elif (method == 'sequential'): #Takes First 'SEQUENCE_LENGTH' Frame
        for frame_counter in range(SEQUENCE_LENGTH):
            ok, frame = video_reader.read() 
            if not ok:
                break
            frame = cv2.resize(frame, (IMAGE_HEIGHT, IMAGE_HEIGHT))
            frame = frame / 255
            frames_list.append(frame)
    video_reader.release() 
    return frames_list

In [None]:
# We Get the features in this way if we do NOT use a pre-trained model
# return ndarray with RGB values of 'SEQUENCE_LENGTH' Frames the all the video examples in the dataset

def get_features():
    features = np.zeros(shape=(EXAMPLES,SEQUENCE_LENGTH,IMAGE_HEIGHT,IMAGE_WIDTH,3),dtype=np.float32) #float32 to minimze using RAM
    frames = np.zeros(shape=(SEQUENCE_LENGTH,IMAGE_HEIGHT,IMAGE_WIDTH,3),dtype=np.float32)
    labels = []
    counter = 0

    for class_index, class_name in enumerate(classes_labels):
        print(class_name)
        video_paths = os.listdir(f'{path}/{class_name}')
        for e, video_path in enumerate(video_paths):
            frames = frames_extraction(f'{path}/{class_name}/{video_path}', 'sparse')
            frames = np.array(frames)
            if (len(frames) < SEQUENCE_LENGTH):
                continue;
            features[counter][:] = frames
            labels.append(class_index)
            counter = counter + 1
    return features

In [None]:
# We Get the features in this way if we do NOT use a pre-trained model

features = get_features()
print(features.shape) #should be (2000,10,224,224,3)

In [None]:
# THIS cell runs ONLY WHEN WE USE a PRE-TRAINED MODEL
# Build Pre-trained CNN model to extract features maps from the data

def build_feature_extractor():
    feature_extractor = keras.applications.MobileNet(
        weights="imagenet",
        include_top=False,
        pooling= 'avg',
        input_shape=(IMAGE_HEIGHT, IMAGE_WIDTH, 3),
    )
    preprocess_input = keras.applications.mobilenet.preprocess_input
    inputs = keras.Input((IMAGE_HEIGHT, IMAGE_WIDTH, 3))
    preprocessed = preprocess_input(inputs)
    outputs = feature_extractor(preprocessed)
    extractor = keras.Model(inputs, outputs, name="feature_extractor")
    return extractor

extractor = build_feature_extractor()
for layer in extractor.layers:
    layer.trainable = False

In [None]:
# THIS cell runs ONLY WHEN WE USE a PRE-TRAINED MODEL
# get The Features from the defined pre-trained model to use it directly in a fine-tuned model.
# Keeping only the features extracted by the pre-trained model reserves using unnecessary excessive RAM to store all the data

def prepare_all_videos():
    features = np.zeros(shape=(EXAMPLES,SEQUENCE_LENGTH,NUM_FEATURES),dtype=np.float32) #NUM_FEATURES is the number of features returned by the pretrained model
    frames = []
    pred_video_frames = np.zeros(shape=(SEQUENCE_LENGTH,NUM_FEATURES),dtype=np.float32)
    counter = 0
    for class_index, class_name in enumerate(classes_labels):
        print(class_name)
        video_paths = os.listdir(f'/gdrive/MyDrive/{DATASET_DIR}/{class_name}')
        for e, video_path in enumerate(video_paths):
            print(e)
            frames = frames_extraction(f'/gdrive/MyDrive/{DATASET_DIR}/{class_name}/{video_path}', 'sparse')
            if (len(frames) < SEQUENCE_LENGTH):
                continue;
            for idx, f in enumerate(frames):
                pred_video_frames[idx,:] = extractor.predict(np.array(f).reshape(1,IMAGE_HEIGHT,IMAGE_WIDTH,3))
            features[counter][:] = pred_video_frames.squeeze()
            counter = counter + 1
    return features

In [None]:
# THIS cell runs ONLY WHEN WE USE a PRE-TRAINED MODEL

features = prepare_all_videos()
print(features.shape) #should be (2000,10,1024) because 1024 is the number of features returned by MobileNet
#if we use Global Average Pooling in the final layer

In [None]:
# Train test Split Method: 80% Training, 20% Testing
# used in the first two models Only (No pre-trained models included) because it was hard to use K Fold Cross Validation with them

features_train, features_test, labels_train, labels_test = train_test_split(features, labels, stratify = labels, test_size=0.2,shuffle= True, random_state=1)

In [None]:
# K-Fold Cross Validation method with 5 splits
# used in all the models except the first two for better evaluation

skf = StratifiedKFold(n_splits=10, shuffle= True, random_state = 1)

In [None]:
# First Model - CNN & LSTM (Without pre-trained CNN model)

def cnn_lstm_model():
    cnn = keras.Sequential([
      keras.layers.Conv2D(64, (3, 3), padding='same',activation = 'relu',input_shape = (IMAGE_HEIGHT, IMAGE_WIDTH, 3)),
      keras.layers.BatchNormalization(momentum = 0.9),
      keras.layers.AveragePooling2D(pool_size = (2,2), strides=2),
      keras.layers.Conv2D(128, (3, 3), padding='same',activation = 'relu'),
      keras.layers.BatchNormalization(momentum = 0.9),
      keras.layers.AveragePooling2D( pool_size = (2,2), strides=2),
      keras.layers.Conv2D(256, (3, 3), padding='same',activation = 'relu'),
      keras.layers.BatchNormalization(momentum = 0.9),
      keras.layers.GlobalAveragePooling2D(),
  ])
    cnn_lstm_model= keras.Sequential([
      keras.layers.TimeDistributed(cnn, input_shape=(SEQUENCE_LENGTH, IMAGE_HEIGHT, IMAGE_WIDTH,3)),
      keras.layers.LSTM(32),
      keras.layers.Dense(512, activation= 'relu'),
      keras.layers.Dropout(0.5),
      keras.layers.Dense(128, activation= 'relu'),
      keras.layers.Dropout(0.3),
      keras.layers.Dense(64, activation= 'relu'),
      keras.layers.Dropout(0.2),
      keras.layers.Dense(2, activation= 'sigmoid')
  ])
    cnn.summary()
    cnn_lstm_model.summary()
    return cnn_lstm_model

In [None]:
# Second Model - 3D CNN (Without pre-trained CNN model)

def create_3dcnn_model():
    model = Sequential([
      keras.layers.Conv3D(64, (3, 3, 3), padding='same',activation = 'relu',input_shape = (SEQUENCE_LENGTH, IMAGE_HEIGHT, IMAGE_WIDTH, 3)),
      keras.layers.BatchNormalization(momentum = 0.9),
      keras.layers.AveragePooling3D(pool_size = (1,2,2), strides=(1,2,2)),
      keras.layers.Conv3D(128, (3, 3, 3), padding='same',activation = 'relu'),
      keras.layers.BatchNormalization(momentum = 0.9),
      keras.layers.AveragePooling3D( pool_size = (2,2,2), strides=2),
      keras.layers.Conv3D(256, (3, 3, 3), padding='same',activation = 'relu'),
      keras.layers.BatchNormalization(momentum = 0.9),
      keras.layers.GlobalAveragePooling3D(),
      keras.layers.Dense(512, activation = 'relu'),
      keras.layers.Dropout(0.5),
      keras.layers.Dense(128, activation = 'relu'),
      keras.layers.Dropout(0.3),
      keras.layers.Dense(64, activation = 'relu'),
      keras.layers.Dropout(0.2),
      keras.layers.Dense(2, activation="softmax")
      ])
    model.summary()
    return model

In [None]:
#Training Example for the first model, the second model has been trained the same way but with epochs = 15

model = cnn_lstm_model()
earlyStopping = EarlyStopping(monitor = 'val_loss', patience = 20, mode = 'min', min_delta=0.001, restore_best_weights = True)
adam = tf.keras.optimizers.Adam(lr=0.001, beta_1=0.9, beta_2=0.999, epsilon=1e-07)
reduce_lr_loss = callbacks.ReduceLROnPlateau(monitor='val_loss',patience=5, verbose=1,factor=0.5,min_lr=0.0000001)
model.compile(loss="categorical_crossentropy", optimizer=adam, metrics=["accuracy"])
history = model.fit(features_train, to_categorical(labels_train),
      batch_size = 20,
      epochs=100,
      verbose=1,
      validation_data=(features_test,to_categorical(labels_test)),
      callbacks=[earlyStopping, reduce_lr_loss]
      )

In [None]:
#Learning curves have been plotted for the first and second model in this way

history_df = pd.DataFrame(history.history)

#Plotting Loss
plt.plot(history_df.loc[:, ['loss']], label='loss')
plt.plot(history_df.loc[:, ['val_loss']], label='Validation loss')
plt.xlabel("Epoch")
plt.title("Training Loss and Validation Loss for 3D CNN model")
plt.legend()
plt.show()

#Plotting Accuracy
plt.plot(history_df.loc[:, ['accuracy']], label='accuracy')
plt.plot(history_df.loc[:, ['val_accuracy']], label='Validation accuracy')
plt.xlabel("Epoch")
plt.title("Training Accuracy and Validation Accuracy for 3D CNN model")
plt.legend()
plt.show()

In [None]:
# Getting the Confusion matrix
# label = 1 then Violent, label = 0 then non Violent

pred = model.predict(features_test)
yprd = pred > 0.5
ypredicted = []
for zero,one in yprd:
    if zero == True:
        ypredicted.append(0)
    else:
        ypredicted.append(1) 
y = []
for zero,one in to_categorical(labels_test):
    if zero == True:
        y.append(0)
    else:
        y.append(1) 

confusion = confusion_matrix(y,ypredicted)
confusion.shape

In [None]:
# Printing the Confusion matrix

def print_confusion_matrix(confusion_matrix, class_names):
    df_cm = pd.DataFrame(
        confusion_matrix, index=class_names, columns=class_names, 
    )
    fig = plt.figure()
    try:
        heatmap = sns.heatmap(df_cm, annot=True, fmt="d")
    except ValueError:
        raise ValueError("Confusion matrix values must be integers.")
    heatmap.yaxis.set_ticklabels(heatmap.yaxis.get_ticklabels(), rotation=0, ha='right')
    heatmap.xaxis.set_ticklabels(heatmap.xaxis.get_ticklabels(), rotation=45, ha='right')
    plt.ylabel('True label')
    plt.xlabel('Predicted label')
    return fig

print_confusion_matrix(confusion, [0,1])

In [None]:
# Third, Fourth and Fifth Model (Pre-trained CNN models used with BiLSTM Layer)
# Each of VGG16 Extracted Features, ResNet50 Extracted Features and MobileNet Extracted Features have been passed
# as input to this pre-trained model, as a result we got three models 

def PreTrained_BiLSTM():
    PreTrained_BiLSTM = keras.Sequential([
    keras.layers.Bidirectional(keras.layers.LSTM(80),input_shape=(SEQUENCE_LENGTH, NUM_FEATURES)),
    keras.layers.Dense(512, activation = 'relu'),
    keras.layers.Dropout(0.3),
    keras.layers.Dense(128, activation = 'relu'),
    keras.layers.Dropout(0.2),
    keras.layers.Dense(64, activation = 'relu'),
    keras.layers.Dense(len(classes_labels), activation="softmax")
    ])
    return PreTrained_BiLSTM

In [None]:
# Sixth, Seventh and eighth Model (Pre-trained CNN models used with LSTM Layer)
# Each of VGG16 Extracted Features, ResNet50 Extracted Features and MobileNet Extracted Features have been passed
# as input to this pre-trained model, as a result we got three models 

def PreTrained_LSTM():
    PreTrained_LSTM = keras.Sequential([
    keras.layers.LSTM(80,input_shape=(SEQUENCE_LENGTH, NUM_FEATURES)),
    keras.layers.Dense(512, activation = 'relu'),
    keras.layers.Dropout(0.3),
    keras.layers.Dense(128, activation = 'relu'),
    keras.layers.Dropout(0.2),
    keras.layers.Dense(64, activation = 'relu'),
    keras.layers.Dense(len(classes_labels), activation="softmax")
    ])
    return PreTrained_LSTM

In [None]:
# ninth ,tenth and eleventh Model (Pre-trained CNN models used with GRU Layer)
# Each of VGG16 Extracted Features, ResNet50 Extracted Features and MobileNet Extracted Features have been passed
# as input to this pre-trained model, as a result we got three models 

def PreTrained_GRU():
    PreTrained_GRU = keras.Sequential([
    keras.layers.GRU(80,input_shape=(SEQUENCE_LENGTH, NUM_FEATURES)),
    keras.layers.Dense(512, activation = 'relu'),
    keras.layers.Dropout(0.3),
    keras.layers.Dense(128, activation = 'relu'),
    keras.layers.Dropout(0.2),
    keras.layers.Dense(64, activation = 'relu'),
    keras.layers.Dense(len(classes_labels), activation="softmax")
    ])
    return PreTrained_GRU

In [None]:
#Twelveth and Thirteenth Model
#Pre-trained CNN ResNet50  used with GRU returns hidden states
#Pre-trained CNN MobileNet used with GRU returns hidden states

def model_GRU_hidden():
    model_GRU_hidden = keras.Sequential([
    keras.layers.GRU(80,input_shape=(SEQUENCE_LENGTH, NUM_FEATURES),return_sequences=True),
    keras.layers.GlobalAveragePooling1D(),
    keras.layers.Dense(512, activation = 'relu'),
    keras.layers.Dropout(0.3),
    keras.layers.Dense(128, activation = 'relu'),
    keras.layers.Dropout(0.2),
    keras.layers.Dense(64, activation = 'relu'),
    keras.layers.Dense(len(classes_labels), activation="softmax")
    ])
    return model_GRU_hidden

In [None]:
#Training Example for the models from 3 to 13 with 5 folds cross validation method
# We calculate the accuracy, f1-score and standard deviation for each fold

accuracy = []
train_accuracy = []
f1Score = []
skf = StratifiedKFold(n_splits=10, shuffle= True, random_state = 1)
i = 1
for train_index, test_index in skf.split(features, labels):
    earlyStopping = EarlyStopping(monitor = 'val_loss', patience = 20, mode = 'min', min_delta=0.001, restore_best_weights = True)
    adam = tf.keras.optimizers.Adam(lr=0.001, beta_1=0.9, beta_2=0.999, epsilon=1e-07)
    reduce_lr_loss = callbacks.ReduceLROnPlateau(monitor='val_loss',patience=5, verbose=1,factor=0.5,min_lr=0.0000001)
    print('Fold ',i)
    features_train, features_test = features[train_index][:], features[test_index][:]
    labels_train, labels_test = labels[train_index][:], labels[test_index][:]
    model = MobileNet_BiLSTM()
    model.compile(loss="categorical_crossentropy", optimizer=adam, metrics=["accuracy"])
    history = model.fit(features_train, to_categorical(labels_train),
          batch_size = 20,
          epochs=100,
          verbose=1,
          validation_data=(features_test, to_categorical(labels_test)),
          callbacks=[earlyStopping, reduce_lr_loss]
          )
    pred = model.predict(features_test)
    pred = pred > 0.5
    ypredic = []
    counter = 0
    
    for zero,one in pred:
        if zero == True:
            ypredic.append(0)
        else:
            ypredic.append(1)
    print(f1_score(labels_test, ypredic, average='macro'))
    print('-------------')
    print(model.evaluate(features_test,to_categorical(labels_test)))
    print(model.evaluate(features_train,to_categorical(labels_train)))
    accuracy.append(model.evaluate(features_test,to_categorical(labels_test)))
    train_accuracy.append(model.evaluate(features_train,to_categorical(labels_train)))
    f1Score.append(f1_score(labels_test, ypredic, average='macro'))
    i = i + 1

In [None]:
#printing the results of the training process in the previos cell 

accuracy = np.array(accuracy)
train_accuracy = np.array(train_accuracy)
print("Mean Accuracy Train: \t", statistics.mean(train_accuracy[:, 1]))
print("Mean Accuracy: \t\t", statistics.mean(accuracy[:, 1]))
print("Mean Loss Train: \t", statistics.mean(train_accuracy[:, 0]))
print("Mean Loss: \t\t", statistics.mean(accuracy[:, 0]))
print("Mean f1-score: \t\t", statistics.mean(f1Score))
print("Stdev Accuracy: \t", statistics.stdev(accuracy[:, 1]))
print("Stdev f1-score: \t",statistics.stdev(f1Score))

In [None]:
# Plotting the previous results

Folds = range(1,11)

# Plotting Accuracy among 10 folds
plt.xlim(0,10)
plt.plot(Folds,accuracy[:, 1], label='Accuracy')
plt.axis('on')
plt.xlabel("Fold")
plt.title("Accuracy among 10 folds for VGG16 and BiLSTM")
plt.legend()
plt.show()

# Plotting f1-score among 10 folds
plt.xlim(0,10)
plt.plot(Folds,f1Score, label='f1-score')
plt.axis('on')
plt.xlabel("Fold")
plt.title("f1-score among 10 folds for VGG16 and BiLSTM")
plt.legend()
plt.show()

# Plotting Training and Validation loss among 10 folds
plt.xlim(0,10)
plt.plot(Folds,train_accuracy[:, 0], label='loss')
plt.plot(Folds,accuracy[:, 0], label='val_loss')
plt.axis('on')
plt.xlabel("Fold")
plt.title("Training and Validation loss among 10 folds for VGG16 and BiLSTM")
plt.legend()
plt.show()

In [None]:
model.save('C:\\Users\\ASUS\\MyModel.hdf5')
model = load_model('C:\\Users\\ASUS\\MyModel.hdf5')

In [None]:
# Video segmentaion with Sliding Window Approach
# Window Size = 10, Stride = 1 or 10 (determined by 'mode' Variable)
# extractor is the CNN pre-trained we want to use, model is what follows it

def predTest(video_file_path,output_file_path,file_name,mode, SEQUENCE_LENGTH, IMAGE_SIZE, NUM_FEATURES,extractor,model):
    video_reader = cv2.VideoCapture(video_file_path)
    video_frames_count = int(video_reader.get(cv2.CAP_PROP_FRAME_COUNT))
    original_video_width = int(video_reader.get(cv2.CAP_PROP_FRAME_WIDTH))
    original_video_height = int(video_reader.get(cv2.CAP_PROP_FRAME_HEIGHT))
    video_writer = cv2.VideoWriter(output_file_path, cv2.VideoWriter_fourcc('M', 'P', '4', 'V'), 
                                 int(video_reader.get(cv2.CAP_PROP_FPS)), (original_video_width, original_video_height))
    framing_rate = int(video_reader.get(cv2.CAP_PROP_FPS))
    frames_queue = deque(maxlen = SEQUENCE_LENGTH)
    predicted_class_name = ''
    m = ''
    counter = 0
    total = 0
    frame_num = 0
    data = {'framing rate': str(framing_rate), 'frames': {'1':'0',  '2':'0','3':'0', '4':'0', '5':'0', '6':'0', '7':'0', '8':'0', '9':'0'}}
    while video_reader.isOpened():
        ok, frame = video_reader.read() 

        if not ok:
            break
        frame_num = frame_num + 1
        resized_frame = cv2.resize(frame, (IMAGE_SIZE, IMAGE_SIZE))
        frames_queue.append(extractor.predict(np.array(resized_frame).reshape(1,IMAGE_SIZE,IMAGE_SIZE,3)))
        predicted_labels_probabilities = np.zeros(shape=(1,2))
        if len(frames_queue) == SEQUENCE_LENGTH:
            predicted_labels_probabilities = model.predict(np.array(frames_queue).reshape(1,SEQUENCE_LENGTH,NUM_FEATURES))
            total = total + 1
            m = str(np.amax(predicted_labels_probabilities))
            pred = predicted_labels_probabilities > 0.5
            ypredic = ''
            predicted_label = ''
            for zero,one in pred:
                if zero == True:
                    ypredic=  0
                else:
                    ypredic =1 
            if (ypredic == 0):
                predicted_label = 'NonViolence'
                data['frames'].update({str(frame_num):str(1-np.amax(predicted_labels_probabilities))})
            else:
                predicted_label = 'Violence'
                counter = counter + 1
                data['frames'].update({str(frame_num):m})
            predicted_class_name = predicted_label
            if (mode == '10 Steps'):
                frames_queue.clear()
        cv2.putText(frame, predicted_class_name , (100, 25), cv2.FONT_HERSHEY_PLAIN  , 2, (255, 255, 255), 2)
        cv2.putText(frame, m , (100, 50), cv2.FONT_HERSHEY_PLAIN  , 2, (255, 255, 255), 2)
        video_writer.write(frame)
        
    video_reader.release()
    video_writer.release()
    with open('static/json/'+file_name[:len(file_name)-4]+'.json', 'w') as f:
        json.dump(data, f, indent=2)
    return (counter > 1), counter/total, framing_rate, video_frames_count

In [None]:
# the video 'demo.mp4' located in video_file_path is saved with preditions to output_file_path

extractor = load_model('C:\\Users\\ASUS\\ResNet50.hdf5')
model = load_model('C:\\Users\\ASUS\\ResNet50Improved.hdf5')
video_file_path = 'C:\\Users\\ASUS\\demo.mp4'
output_file_path = 'C:\\Users\\ASUS\\demos\\ResultDemo.mp4'
file_name = 'demo.mp4'
mode = 'High'   # Stride = 1
mode = 'Normal' #Stride = 10

predTest(video_file_path ,output_file_path, file_name, mode, SEQUENCE_LENGTH, IMAGE_SIZE, NUM_FEATURES,extractor,model)