In [9]:
import cv2
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from torchvision import transforms
import os
import pandas as pd
from torchvision.models import densenet121
from convlstm import ConvLSTM
from tqdm import tqdm
import seaborn as sns
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score, f1_score, precision_score, recall_score
import matplotlib.pyplot as plt

In [None]:
# modify this to your own directory
directory = 'C:/Users/Akv3/Downloads/archive_violence_detection/SCVD/SCVD_converted_sec_split/Train'

In [16]:
# # TESTING THAT A VIDEO CAN BE READ
# # Create a VideoCapture object
# cap = cv2.VideoCapture(directory + '/' + 'Violence' + '/' + 'Violence001.avi')

# # Check if camera opened successfully
# if (cap.isOpened()== False): 
#     print("Error opening video file")

# # Read until video is completed
# while(cap.isOpened()):
#     # Capture frame-by-frame
#     ret, frame = cap.read()
#     if ret == True:
#         # Display the resulting frame
#         cv2.imshow('Frame', frame)
#         # Press Q on keyboard to exit
#         if cv2.waitKey(25) & 0xFF == ord('q'):
#             break
#     # Break the loop
#     else: 
#         break

# # When everything done, release the video capture object
# cap.release()

# # Closes all the frames
# cv2.destroyAllWindows()

In [11]:
def calculate_optical_flow(video_path, frame_skip=8):
    cap = cv2.VideoCapture(video_path)
    ret, frame1 = cap.read()
    prvs = cv2.resize(frame1, (224, 224))
    prvs = cv2.cvtColor(prvs, cv2.COLOR_BGR2GRAY)
    hsv = np.zeros((prvs.shape[0], prvs.shape[1], 3))
    hsv[..., 1] = 255

    optical_flows = []  # list to store optical flow of each frame
    frame_count = 0
    while True:
        ret, frame2 = cap.read()
        if not ret:
            break
        frame_count += 1
        if frame_count % frame_skip != 0:
            continue
        next = cv2.resize(frame2, (224, 224))
        next = cv2.cvtColor(next, cv2.COLOR_BGR2GRAY)

        flow = cv2.calcOpticalFlowFarneback(
            prvs, next, None, 0.5, 3, 15, 3, 5, 1.2, 0)
        mag, ang = cv2.cartToPolar(flow[..., 0], flow[..., 1])
        hsv[..., 0] = ang * 180 / np.pi / 2
        hsv[..., 2] = cv2.normalize(mag, None, 0, 255, cv2.NORM_MINMAX)
        bgr = cv2.cvtColor(hsv.astype(np.float32), cv2.COLOR_HSV2BGR)

        optical_flows.append(bgr)  # store optical flow of current frame

        prvs = next

    cap.release()

    return optical_flows  # return list of optical flows

### Loading all videos, transforming them into optical flow representations, storing in memory, and creating a training data loader.

**Now the model is ready to be trained**

In [15]:
Normal = []
Violence = []
Weaponized = []
labels = []
for subdir in os.listdir(directory):
    for filename in tqdm(os.listdir(os.path.join(directory, subdir))):
        if filename.endswith(".avi"):  # videos are in .avi format
            video_path = os.path.join(directory, subdir, filename)
            optical_flows = calculate_optical_flow(video_path)  # assuming this now returns a list of optical flows
            optical_flows = np.stack([np.transpose(flow, (2, 0, 1)) for flow in optical_flows])  # transpose each optical flow
            if subdir == 'Normal':
                Normal.append(optical_flows)
                labels.append(0)
            elif subdir == 'Violence':
                Violence.append(optical_flows)
                labels.append(1)
            elif subdir == 'Weaponized':
                Weaponized.append(optical_flows)
                labels.append(2)
            break
        
        
print(len(Normal), len(Violence), len(Weaponized))
# put the data into a tensor dataset
Normal = np.array(Normal, dtype=np.float32)
Violence = np.array(Violence, dtype=np.float32)
Weaponized = np.array(Weaponized, dtype=np.float32)
train_data = np.concatenate([Normal, Violence, Weaponized])
train_data = torch.tensor(train_data)
train_data = train_data.permute(0, 2, 1, 3, 4)
print(train_data.shape)
train_labels = torch.tensor(labels).long()
train_dataset = TensorDataset(train_data, train_labels)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
print(len(train_loader))
print(train_loader.dataset.tensors[0].shape)

  0%|          | 0/872 [00:00<?, ?it/s]
  0%|          | 0/970 [00:00<?, ?it/s]
  0%|          | 0/832 [00:00<?, ?it/s]

1 1 1
torch.Size([3, 3, 3, 224, 224])
1
torch.Size([3, 3, 3, 224, 224])





### DenseNet121 model adapted for video data and ConvLSTM added

In [None]:
model = densenet121(pretrained=True)
# DenseBlock 1
new_layers = nn.Sequential()
for i in range(len(model.features.denseblock1)):
    new_layer = nn.Sequential(
    nn.BatchNorm3d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True),
    nn.ReLU(inplace=True),
    nn.Conv3d(64, 128, kernel_size=(1, 1, 1), stride=(1, 1, 1), padding=(1, 1, 1), bias=False),
    nn.BatchNorm3d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True),
    nn.ReLU(inplace=True),
    nn.Conv3d(128, 64, kernel_size=(3, 3, 3), stride=(1, 1, 1), padding=(1, 1, 1), bias=False))
    new_layers.add_module(str(i), new_layer)
model.features.denseblock1 = new_layers
# print(model.features.denseblock1)
# print(len(model.features.denseblock1))
# DenseBlock 2
new_layers = nn.Sequential()
for i in range(len(model.features.denseblock2)):
    new_layer = nn.Sequential(
    nn.BatchNorm3d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True),
    nn.ReLU(inplace=True),
    nn.Conv3d(128, 128, kernel_size=(1, 1, 1), stride=(1, 1, 1), padding=(1, 1, 1), bias=False),
    nn.BatchNorm3d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True),
    nn.ReLU(inplace=True),
    nn.Conv3d(128, 128, kernel_size=(3, 3, 3), stride=(1, 1, 1), padding=(1, 1, 1), bias=False))
    new_layers.add_module(str(i), new_layer)
model.features.denseblock2 = new_layers
# print(model.features.denseblock2)
# print(len(model.features.denseblock2))
# DenseBlock 3
new_layers = nn.Sequential()
for i in range(len(model.features.denseblock3)):
    new_layer = nn.Sequential(
    nn.BatchNorm3d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True),
    nn.ReLU(inplace=True),
    nn.Conv3d(256, 128, kernel_size=(1, 1, 1), stride=(1, 1, 1), padding=(1, 1, 1), bias=False),
    nn.BatchNorm3d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True),
    nn.ReLU(inplace=True),
    nn.Conv3d(128, 256, kernel_size=(3, 3, 3), stride=(1, 1, 1), padding=(1, 1, 1), bias=False))
    new_layers.add_module(str(i), new_layer)
model.features.denseblock3 = new_layers
# print(model.features.denseblock3)
# print(len(model.features.denseblock3))
# DenseBlock 4
new_layers = nn.Sequential()
for i in range(len(model.features.denseblock4)):
    new_layer = nn.Sequential(
    nn.BatchNorm3d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True),
    nn.ReLU(inplace=True),
    nn.Conv3d(512, 128, kernel_size=(1, 1, 1), stride=(1, 1, 1), padding=(1, 1, 1), bias=False),
    nn.BatchNorm3d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True),
    nn.ReLU(inplace=True),
    nn.Conv3d(128, 512, kernel_size=(3, 3, 3), stride=(1, 1, 1), padding=(1, 1, 1), bias=False))
    new_layers.add_module(str(i), new_layer)
model.features.denseblock4 = new_layers
# print(model.features.denseblock4)
# print(len(model.features.denseblock4))
# Transition Layers
new_layers = nn.Sequential(
    nn.BatchNorm3d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True),
    nn.ReLU(inplace=True),
    nn.Conv3d(64, 128, kernel_size=(1, 1, 1), stride=(1, 1, 1), padding=(1, 1, 1), bias=False),
    nn.AvgPool3d(kernel_size=(2, 2, 2), stride=(2, 2, 2), padding=0))
model.features.transition1 = new_layers
# print(model.features.transition1)
new_layers = nn.Sequential(
    nn.BatchNorm3d(128, eps=1e-05, momentum=0.1,
                   affine=True, track_running_stats=True),
    nn.ReLU(inplace=True),
    nn.Conv3d(128, 256, kernel_size=(1, 1, 1), stride=(
        1, 1, 1), padding=(1, 1, 1), bias=False),
    nn.AvgPool3d(kernel_size=(2, 2, 2), stride=(2, 2, 2), padding=0))
model.features.transition2 = new_layers
# print(model.features.transition2)
new_layers = nn.Sequential(
    nn.BatchNorm3d(256, eps=1e-05, momentum=0.1,
                   affine=True, track_running_stats=True),
    nn.ReLU(inplace=True),
    nn.Conv3d(256, 512, kernel_size=(1, 1, 1), stride=(
        1, 1, 1), padding=(1, 1, 1), bias=False),
    nn.AvgPool3d(kernel_size=(2, 2, 2), stride=(2, 2, 2), padding=0))
model.features.transition3 = new_layers
# print(model.features.transition3)
# Replace the first four layers
model.features.conv0 = nn.Conv3d(3, 64, kernel_size=(7, 7, 7), stride=(2, 2, 2), padding=(3, 3, 3), bias=False)
model.features.norm0 = nn.BatchNorm3d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
model.features.relu0 = nn.ReLU(inplace=True)
model.features.pool0 = nn.MaxPool3d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
model.features.norm5 = nn.BatchNorm3d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
# print(model.features)
# Flatten Layer
model.add_module('flatten', nn.Flatten())

# ConvLSTM Layer
model.add_module('convlstm', ConvLSTM(input_dim=512,
                                      hidden_dim=[64, 64, 128],
                                      kernel_size=(3, 3),
                                      num_layers=3,
                                      batch_first=True,
                                      bias=True,
                                      return_all_layers=False))

# Classification Layer
model.classifier = nn.Linear(33280, 3)
print(model)



DenseNet(
  (features): Sequential(
    (conv0): Conv3d(3, 64, kernel_size=(7, 7, 7), stride=(2, 2, 2), padding=(3, 3, 3), bias=False)
    (norm0): BatchNorm3d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (relu0): ReLU(inplace=True)
    (pool0): MaxPool3d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
    (denseblock1): Sequential(
      (0): Sequential(
        (0): BatchNorm3d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (1): ReLU(inplace=True)
        (2): Conv3d(64, 128, kernel_size=(1, 1, 1), stride=(1, 1, 1), padding=(1, 1, 1), bias=False)
        (3): BatchNorm3d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (4): ReLU(inplace=True)
        (5): Conv3d(128, 64, kernel_size=(3, 3, 3), stride=(1, 1, 1), padding=(1, 1, 1), bias=False)
      )
      (1): Sequential(
        (0): BatchNorm3d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (1): ReLU(inp

**The cell below tests that the model is working as expected**

In [None]:
# Test model on one optical flow example
test_vid_path = directory + '/' + 'Violence' + '/' + 'Violence001.avi'
test_optical_flow = calculate_optical_flow(test_vid_path)
# assuming this now returns a list of optical flows
test_optical_flow = np.stack([np.transpose(flow, (2, 0, 1))
                             for flow in test_optical_flow])
test_optical_flow = torch.tensor(test_optical_flow).float().unsqueeze(0)
test_optical_flow = test_optical_flow.permute(0, 2, 1, 3, 4)
print(test_optical_flow.shape)
output = model(test_optical_flow)
print(output.shape)

torch.Size([1, 3, 3, 224, 224])
torch.Size([1, 3])


## Training the model

In [None]:
optimizer = optim.Adam(model.parameters(), lr=0.001)
lossfun = nn.CrossEntropyLoss()
# a function that trains the model
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)
def trainModel():

  # number of epochs
  numepochs = 3

  # create a new model
  net = model
  net = net.to(device)

  # initialize losses
  losses = torch.zeros(numepochs)
  trainAcc = []
  # loop over epochs
  for epochi in range(numepochs):

    # switch on training mode
    net.train()

    # loop over training data batches
    batchAcc = []
    batchLoss = []
    for X, y in train_loader:
      X = X.to(device)
      y = y.to(device)
      # forward pass and loss
      yHat = net(X)
      loss = lossfun(yHat, y)

      # backprop
      optimizer.zero_grad()
      loss.backward()
      optimizer.step()

      # loss from this batch
      batchLoss.append(loss.item())
      # compute accuracy
      batchAcc.append(
          100*torch.mean((torch.argmax(yHat, axis=1) == y).float()).item())
    # end of batch loop...

    # now that we've trained through the batches, get their average training accuracy
    trainAcc.append(np.mean(batchAcc))

    # and get average losses across the batches
    losses[epochi] = np.mean(batchLoss)
    
    print(
        f'Epoch {epochi+1}/{numepochs}, Loss: {losses[epochi]}, Accuracy: {trainAcc[-1]}d')
  # end epochs
  # output
  return trainAcc, losses

cuda


In [None]:
trainAcc, losses = trainModel()
# Save the model
torch.save(model.state_dict(), 'model.pth')

## Testing Model Performance
**If model has been trained (or trained and the model is saved in a pth file), run the cells below**

In [None]:
# # Uncomment this cell only if you have the saved model and wish to load it. Note: model variable must be initialized above before loading the model
# def getSavedModel():
#     net = model
#     net.load_state_dict(torch.load('model.pth'))
#     return net
# model = getSavedModel()
    

In [None]:
# Directory of test videos
directory = 'C:/Users/Akv3/Downloads/archive_violence_detection/SCVD/SCVD_converted_sec_split/Test' # modify this to your own directory

**Load test data**

In [None]:
Normal = []
Violence = []
Weaponized = []
labels = []
for subdir in os.listdir(directory):
    for filename in tqdm(os.listdir(os.path.join(directory, subdir))):
        if filename.endswith(".avi"):  # videos are in .avi format
            video_path = os.path.join(directory, subdir, filename)
            # assuming this now returns a list of optical flows
            optical_flows = calculate_optical_flow(video_path)
            # transpose each optical flow
            optical_flows = np.stack(
                [np.transpose(flow, (2, 0, 1)) for flow in optical_flows])
            if subdir == 'Normal':
                Normal.append(optical_flows)
                labels.append(0)
            elif subdir == 'Violence':
                Violence.append(optical_flows)
                labels.append(1)
            elif subdir == 'Weaponized':
                Weaponized.append(optical_flows)
                labels.append(2)
            break


print(len(Normal), len(Violence), len(Weaponized))
# put the data into a tensor dataset
Normal = np.array(Normal, dtype=np.float32)
Violence = np.array(Violence, dtype=np.float32)
Weaponized = np.array(Weaponized, dtype=np.float32)
test_data = np.concatenate([Normal, Violence, Weaponized])
test_data = torch.tensor(test_data)
test_data = test_data.permute(0, 2, 1, 3, 4)
print(test_data.shape)
test_labels = torch.tensor(labels).long()
test_dataset = TensorDataset(test_data, test_labels)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=True)
print(len(test_loader))
print(test_loader.dataset.tensors[0].shape)

In [None]:
model.eval()
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)
testAcc = []
with torch.no_grad():
    yTrue = []
    yPred = []
    batchAcc = []
    for inputs, labels in test_loader:
        inputs = inputs.to(device)
        labels = labels.to(device)
        outputs = model(inputs)
        batchAcc.append(100*torch.mean((torch.argmax(outputs, axis=1) == labels).float()).item())
        yTrue.extend(labels.cpu().numpy())
        yPred.extend(torch.argmax(outputs, axis=1).cpu().numpy())
    testAcc.append(np.mean(batchAcc))
print(classification_report(yTrue, yPred))
print('Accuracy:', accuracy_score(yTrue, yPred))
print('F1:', f1_score(yTrue, yPred, average='weighted'))
print('Precision:', precision_score(yTrue, yPred, average='weighted'))
print('Recall:', recall_score(yTrue, yPred, average='weighted'))
sns.heatmap(confusion_matrix(yTrue, yPred), annot=True, fmt='d', cmap='Blues', cbar=False, xticklabels=['Normal', 'Violence', 'Weaponized'], yticklabels=['Normal', 'Violence', 'Weaponized'])
plt.show()

In [None]:
# Plot trainAcc/testAcc and losses
plt.figure(figsize=(10, 5))
plt.subplot(1, 2, 1)
plt.plot(trainAcc)
plt.plot(testAcc)
plt.title('Training and Test Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy (%)')
plt.legend(['Train', 'Test'])
plt.subplot(1, 2, 2)
plt.plot(losses)
plt.title('Training Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.show()