In [1]:
import random
import time
import tensorflow as tf
import os
import sklearn
import cv2
import numpy as np

from sklearn.model_selection import StratifiedKFold
from keras.utils import np_utils

## Utils Functions for Pre-processing Data

In [2]:
def extract_videos3D(video_input_file_path, height, width):
    '''
    The extract_videos3D function reads a video and returns a list of frames as output.
    '''
    video_frames = list()
    cap = cv2.VideoCapture(video_input_file_path)
    while cap.isOpened():

        ret, frame = cap.read()

        if ret:
            frame = cv2.resize(frame, (width, height))
            video_frames.append(frame)

        else:
            break

    cap.release()
    cv2.destroyAllWindows()

    return video_frames

def extract_videos3D_optical_flow(video_input_file_path, height, width):
    '''
    The extract_videos3D function reads a video and returns as output a list of frames processed as optical flow.
    '''
    video_frames_optical_flow = list()
    i = 0
    cap = cv2.VideoCapture(video_input_file_path)
    ret1, frame1 = cap.read()
    frame1 = cv2.resize(frame1, (width, height))
    prvs = cv2.cvtColor(frame1, cv2.COLOR_BGR2GRAY)
    hsv = np.zeros_like(frame1)
    hsv[..., 1] = 255

    if not cap.isOpened():
        print("Error opening video stream or file")

    while cap.isOpened():

        ret2, frame2 = cap.read()

        if ret2:

            frame2 = cv2.resize(frame2, (width, height))
            next = cv2.cvtColor(frame2, cv2.COLOR_BGR2GRAY)
            flow = cv2.calcOpticalFlowFarneback(prvs, next, None, 0.5, 3, 15, 3, 5, 1.2, 0)
            mag, ang = cv2.cartToPolar(flow[..., 0], flow[..., 1])
            hsv[..., 0] = ang * 180 / np.pi / 2
            hsv[..., 2] = cv2.normalize(mag, None, 0, 255, cv2.NORM_MINMAX)
            bgr = cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)
            video_frames_optical_flow.append(bgr)
        else:
            break

        i += 1
        prvs = next

    cap.release()
    cv2.destroyAllWindows()
    return video_frames_optical_flow


def extract_videos3D_frames_substraction(video_input_file_path, height, width):
    '''
    The extract_videos3D function reads a video and returns as output a list of frames processed as the subtraction between two consecutive frames.
    '''
    video_frames = list()
    cap = cv2.VideoCapture(video_input_file_path)
    ret1, frame1 = cap.read()
    frame1 = cv2.resize(frame1, (width, height))

    while cap.isOpened():

        ret2, frame2 = cap.read()
        if ret2:
            frame2 = cv2.resize(frame2, (width, height))
            frame = frame1 - frame2
            video_frames.append(frame)
        else:
            break

        frame1 = frame2

    cap.release()
    cv2.destroyAllWindows()
    return video_frames

## Classifier Model Definition

In [3]:
class ClassifierHAR3D(object):

    def __init__(self):

        self.width = None
        self.height = None
        self.channels = None
        self.time = None
        self.batch_size_train = None
        self.batch_size_validation = None
        self.batch_size_test = None
        self.labels = None
        self.pathTrain = None
        self.pathTest = None
        self.pathValidation = None
        self.ftr = None
        self.ftv = None
        self.fts = None
        self.ftr_labels = None
        self.ftv_labels = None
        self.fts_labels = None
        self.predictions = None


    def generatorTest3D(self):

        while True:

            for count in range(int(len(self.fts) / self.batch_size_test)):

                batch_start = self.batch_size_test * count
                batch_stop = self.batch_size_test + (self.batch_size_test * count)

                lx1 = list()
                ly = list()

                for i in range(batch_start, batch_stop):

                    if self.fts[i] != '.ipynb_checkpoints':

                        ly.append(self.fts_labels[i])

                        optical_flow = extract_videos3D(self.pathTest + self.fts[i], self.height, self.width)

                        if len(optical_flow) < self.time:
                            while len(optical_flow) < self.time:
                                optical_flow.append(optical_flow[-1])
                        else:
                            optical_flow = optical_flow[0:self.time]

                        lx1.append(optical_flow)

                x1 = np.array(lx1)
                x1 = x1.astype('float32')
                x1 /= 255
                x1 = x1.reshape((x1.shape[0], self.time, self.height, self.width, self.channels))

                y = np.array(ly)
                y = np_utils.to_categorical(y, len(self.labels))

                yield x1, y

## Load and Evaluate Model

In [4]:
iteration = 1

datasets_path = ['movies_fights/', 'hockey_fights/', 'violent_flows/']
data_set = datasets_path[2] + 'Data/'


X, y = list(), list()

for file in os.listdir(data_set):
    if 'NV' in file:
        y.append(0)
        X.append(file)
    else:
        y.append(1)
        X.append(file)

X = np.array(X)
y = np.array(y)

X_train, X_test, y_train, y_test = None, None, None, None

skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=3)
for train_index, test_index in skf.split(X, y):
    X_train, X_test = X[train_index], X[test_index]
    y_train, y_test = y[train_index], y[test_index]

trainXY = list(zip(X_train, y_train))
testXY = list(zip(X_test, y_test))

random.shuffle(trainXY)
random.shuffle(testXY)

X_train, y_train = zip(*trainXY)
X_test, y_test = zip(*testXY)

classifier = ClassifierHAR3D()
classifier.channels = 3
classifier.width = 100
classifier.height = 100
classifier.time = 50
classifier.batch_size_train = 8
classifier.batch_size_test = 1
classifier.labels = [0, 1]
classifier.pathTrain = data_set
classifier.pathTest = data_set
classifier.ftr = X_train
classifier.fts = X_test
classifier.ftr_labels = y_train
classifier.fts_labels = y_test

model = tf.keras.models.load_model('violent_flows/Results/model.h5', compile=True)

model.compile(optimizer=tf.keras.optimizers.Adam(),
              loss=tf.keras.losses.CategoricalCrossentropy(),
              metrics=[tf.keras.metrics.CategoricalAccuracy()])

In [5]:
start = time.time()
classifier.predictions = model.predict(classifier.generatorTest3D(),
                                       steps=len(classifier.fts) / classifier.batch_size_test,
                                       max_queue_size=10,
                                       verbose=2)
end = time.time()
print('Inference time: ' + str((end - start)/len(classifier.fts)))

classifier.predictions = np.argmax(classifier.predictions, axis=1)
test_accuracy = sklearn.metrics.accuracy_score(classifier.fts_labels, classifier.predictions, normalize=True)

49/49 - 13s - 13s/epoch - 260ms/step
Inference time: 0.26110004892154615


In [6]:
print('Accurcay Score: ' + str(test_accuracy))

Accurcay Score: 1.0


In [7]:
iteration += 1