# Evaluation CrimeNet with RLVS test videos

## Load test videos

In [3]:
import cv2
import numpy as np
import os
import time

path_videos_nv = 'E:/RLVS/NonViolence/'
path_videos_v = 'E:/RLVS/Violence/'

videos_v = os.listdir(path_videos_v)
videos_nv = os.listdir(path_videos_nv)

label_videos_v = [1 for i in videos_v]
label_videos_nv = [0 for j in videos_nv]

videos = videos_v + videos_nv
label_videos = label_videos_v + label_videos_nv

width = 224
height = 224
channels = 3

## Functions to read videos and optical flow

In [4]:
# Define la función de lectura de vídeo
def read_video_optical_flow(vid, width, height, resize=False):
    video_frames_optical_flow = list()
    i = 0
    cap = cv2.VideoCapture(vid)
    ret1, frame1 = cap.read()
    if resize:
        frame1 = cv2.resize(frame1, (width, height))
    prvs = cv2.cvtColor(frame1, cv2.COLOR_BGR2GRAY)
    hsv = np.zeros_like(frame1)
    hsv[..., 1] = 255

    if not cap.isOpened():
        print("Error opening video stream or file")

    while cap.isOpened():
        ret2, frame2 = cap.read()
        if ret2:
            if resize:
                frame2 = cv2.resize(frame2, (width, height))
            next = cv2.cvtColor(frame2, cv2.COLOR_BGR2GRAY)
            flow = cv2.calcOpticalFlowFarneback(prvs, next, None, 0.5, 3, 15, 3, 5, 1.2, 0)
            mag, ang = cv2.cartToPolar(flow[..., 0], flow[..., 1])
            hsv[..., 0] = ang * 180 / np.pi / 2
            hsv[..., 2] = cv2.normalize(mag, None, 0, 255, cv2.NORM_MINMAX)
            bgr = cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)
            bgr = np.reshape(bgr, (width, height, channels))
            video_frames_optical_flow.append(bgr)
        else:
            break
        i += 1
        prvs = next
    cap.release()
    cv2.destroyAllWindows()
    return video_frames_optical_flow

# Define la función de lectura de vídeo
def read_video(vid, width, height, resize=False):
    video_frames = list()
    cap = cv2.VideoCapture(vid)
    while cap.isOpened():
        ret, frame = cap.read()
        if ret:
            if resize:
                frame = cv2.resize(frame, (width, height))
                frame = np.reshape(frame, (width, height, channels))
            video_frames.append(frame)
        else:
            break
    cap.release()
    cv2.destroyAllWindows()
    return video_frames

## Load pre-trained CrimeNet model

In [6]:
from ViT import *
import neural_structured_learning as nsl

vit_model = create_vit_classifier()


adv_config = nsl.configs.make_adv_reg_config(multiplier=0.2,
                                             adv_step_size=0.05,
                                             adv_grad_norm='infinity')

adv_model = nsl.keras.AdversarialRegularization(vit_model,
                                                label_keys=['label'],
                                                adv_config=adv_config)

adv_model.compile(optimizer=tf.optimizers.Adam(learning_rate=0.0001),
                  loss=tf.keras.losses.CategoricalCrossentropy(from_logits=False),
                  metrics=[# tf.keras.metrics.TruePositives(name='tp'),
                           # tf.keras.metrics.FalsePositives(name='fp'),
                           # tf.keras.metrics.TrueNegatives(name='tn'),
                           # tf.keras.metrics.FalseNegatives(name='fn'),
                           # tf.keras.metrics.CategoricalAccuracy(name='accuracy'),
                           tf.keras.metrics.AUC(curve="ROC"),
                           tf.keras.metrics.AUC(curve="PR")])


adv_model.load_weights('Results/logs/checkpoint/20240329-143918')

<tensorflow.python.checkpoint.checkpoint.CheckpointLoadStatus at 0x20de2e392b0>

## Load and shuffle test dataset videos

In [8]:
from sklearn.model_selection import StratifiedShuffleSplit

X_train = []
y_train = []
X_valid_test = []
y_valid_test = []
X_test = []
y_test = []
X_valid = []
y_valid = []

split = StratifiedShuffleSplit(n_splits=1, test_size=0.2, random_state=0)
for train_index, test_valid_index in split.split(videos, label_videos):
    for ti in train_index:
        X_train.append(videos[ti])
        y_train.append(label_videos[ti])

    for tsi in test_valid_index:
        X_valid_test.append(videos[tsi])
        y_valid_test.append(label_videos[tsi])

split2 = StratifiedShuffleSplit(n_splits=1, test_size=0.5, random_state=0)
for test_index, valid_index in split2.split(X_valid_test, y_valid_test):
    for tssi in test_index:
        X_test.append(X_valid_test[tssi])
        y_test.append(y_valid_test[tssi])

    for tvi in valid_index:
        X_valid.append(X_valid_test[tvi])
        y_valid.append(y_valid_test[tvi])
        
test_total_op = []
test_total_rgb = []
for i in range(len(X_test)):
    if 'NV' in X_test[i]:
        print('Loading Test: ' + path_videos_nv + X_test[i])
        video_frames_op = read_video_optical_flow(path_videos_nv + X_test[i], 20, 20, resize=True)
        video_frames_rgb = read_video(path_videos_nv + X_test[i], 20, 20, resize=True)
    else:
        print('Loading Test: ' + path_videos_v + X_test[i])
        video_frames_op = read_video_optical_flow(path_videos_v + X_test[i], 20, 20, resize=True)
        video_frames_rgb = read_video(path_videos_v + X_test[i], 20, 20, resize=True)
    for j in range(len(video_frames_op)):
        fr_op = video_frames_op[j]
        fr_rgb = video_frames_rgb[j]
        if 'NV' in X_test:
            test_total_op.append((fr_op, 0))
            test_total_rgb.append((fr_rgb, 0))
        else:
            test_total_op.append((fr_op, 1))
            test_total_rgb.append((fr_rgb, 1))

Loading Test: E:/RLVS/NonViolence/NV_368.mp4
Loading Test: E:/RLVS/Violence/V_817.mp4
Loading Test: E:/RLVS/Violence/V_485.mp4
Loading Test: E:/RLVS/Violence/V_630.mp4
Loading Test: E:/RLVS/Violence/V_365.mp4
Loading Test: E:/RLVS/Violence/V_587.mp4
Loading Test: E:/RLVS/Violence/V_549.mp4
Loading Test: E:/RLVS/Violence/V_641.mp4
Loading Test: E:/RLVS/NonViolence/NV_334.mp4
Loading Test: E:/RLVS/NonViolence/NV_974.mp4
Loading Test: E:/RLVS/Violence/V_30.mp4
Loading Test: E:/RLVS/NonViolence/NV_570.mp4
Loading Test: E:/RLVS/Violence/V_217.mp4
Loading Test: E:/RLVS/Violence/V_471.mp4
Loading Test: E:/RLVS/NonViolence/NV_119.mp4
Loading Test: E:/RLVS/NonViolence/NV_847.mp4
Loading Test: E:/RLVS/Violence/V_830.mp4
Loading Test: E:/RLVS/NonViolence/NV_807.mp4
Loading Test: E:/RLVS/NonViolence/NV_242.mp4
Loading Test: E:/RLVS/Violence/V_983.mp4
Loading Test: E:/RLVS/Violence/V_500.mp4
Loading Test: E:/RLVS/NonViolence/NV_357.mp4
Loading Test: E:/RLVS/NonViolence/NV_55.mp4
Loading Test: E:/RL

## Function to generate dataset

In [9]:
# Test

def generatorTestData(batch_size_test=16):
    while True:
        for count in range(int(len(test_total_op) / batch_size_test)):
            batch_start = batch_size_test * count
            batch_stop = batch_size_test + (batch_size_test * count)
            lx_op = []
            lx_rgb = []
            ly = []

            for i in range(batch_start, batch_stop):
                frame_op = cv2.resize(test_total_op[i][0], (width, height))
                frame_op = (frame_op.astype('float32') - 127.5) / 127.5

                frame_rgb = cv2.resize(test_total_rgb[i][0], (width, height))
                frame_rgb = (frame_rgb.astype('float32') - 127.5) / 127.5

                label = test_total_op[i][1]

                lx_op.append(frame_op)
                lx_rgb.append(frame_rgb)
                ly.append(label)

            x_op = np.array(lx_op).astype('float32')
            x_rgb = np.array(lx_rgb).astype('float32')

            y = np.array(ly).astype('float32')
            y = tf.keras.utils.to_categorical(y, num_classes=num_classes, dtype='float32')

            x_op = tf.convert_to_tensor(x_op)
            x_rgb = tf.convert_to_tensor(x_rgb)
            y = tf.convert_to_tensor(y)

            yield {'feature_1': x_op, 'feature_2': x_rgb, 'label': y}

## Evaluation test dataset videos with CrimeNet

In [6]:
start_time_test = time.time()
adv_model.evaluate(generatorTestData(batch_size_test=1),
                   steps=int(len(test_total_op) / 1))
print('Inference time: ' + str((time.time() - start_time_test) / len(test_total_op)))

2024-03-30 11:58:06.631548: I tensorflow/stream_executor/cuda/cuda_blas.cc:1614] TensorFloat-32 will be used for the matrix multiplication. This will only be logged once.


Inference time: 0.04974933064586892


In [10]:
from IPython.display import HTML
import base64

def display_video(frames):
    video_base64 = [base64.b64encode(cv2.imencode('.png', frame)[1]).decode() for frame in frames]
    video_tag = '''
    <video width="640" height="480" controls>
        <source src="data:video/mp4;base64,{0}" type="video/mp4">
    Your browser does not support the video tag.
    </video>'''.format(video_base64[0])
    display(HTML(video_tag))


def predict_frames(frames_op, frames_rgb, model, width, height):
    predictions = []
    for frame_op, frame_rgb in zip(frames_op, frames_rgb):
        frame_op_resized = cv2.resize(frame_op, (width, height))
        frame_op_normalized = (frame_op_resized.astype('float32') - 127.5) / 127.5
        
        frame_rgb_resized = cv2.resize(frame_rgb, (width, height))
        frame_rgb_normalized = (frame_rgb_resized.astype('float32') - 127.5) / 127.5
        
        frame_op_input = np.expand_dims(frame_op_normalized, axis=0)
        frame_rgb_input = np.expand_dims(frame_rgb_normalized, axis=0)
        
        frame_op_input = tf.convert_to_tensor(frame_op_input)
        frame_rgb_input = tf.convert_to_tensor(frame_rgb_input)
        
        label_aux = [0]
        label_aux = np.array(label_aux).astype('float32')
        label_aux = tf.convert_to_tensor(label_aux)
        label_aux = tf.keras.utils.to_categorical(label_aux, num_classes=2, dtype='float32')
        
        pred = model.predict({'feature_1': frame_op_input, 'feature_2': frame_rgb_input, 'label':label_aux})
        predictions.append(np.argmax(pred))
    return predictions

def overlay_labels_on_frames(frames, predictions):
    labeled_frames = []
    for i in range(len(frames)):
        label = "Violencia" if predictions[i] == 1 else "Normal"
        color = (0, 0, 255) if predictions[i] == 1 else (255, 0, 0)
        labeled_frame = frames[i].copy()
        cv2.putText(labeled_frame, label, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, color, 2, cv2.LINE_AA)
        labeled_frames.append(labeled_frame)
    return labeled_frames

def save_video(frames, output_path, fps=30):
    height, width, _ = frames[0].shape
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
    for frame in frames:
        out.write(frame)
    out.release()

input_video_path = 'E:/RLVS/Violence/V_830.mp4'
output_video_path = 'Results/V_830.mp4'

video_frames_op = read_video_optical_flow(input_video_path, width, height, resize=True)
video_frames_rgb = read_video(input_video_path, width, height, resize=True)

predictions = predict_frames(video_frames_op, video_frames_rgb, adv_model, width, height)

labeled_frames = overlay_labels_on_frames(video_frames_rgb[0:len(video_frames_rgb)-1], predictions)

save_video(labeled_frames, output_video_path, fps=30)

print("Save video path:", output_video_path)

display_video(labeled_frames)



Save video path: Results/V_830.mp4
