In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!pip install gradio

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
import numpy as np
from skimage.transform import resize
from skimage.io import imread
import cv2
import torch
from torch import nn
import torch.nn.functional as F
from torchvision import datasets, models, transforms  
import os
import time

import gradio as gr

In [None]:
def capture(filename, timesep, rgb, h, w):
    tmp = []
    frames = np.zeros((timesep, rgb, h, w), dtype = np.float)
    i = 0
    vc = cv2.VideoCapture(filename)
    if vc.isOpened():
        rval , frame = vc.read()
    else:
        rval = False
    frm = resize(frame, (h,  w, rgb))  
    frm = np.expand_dims(frm, axis = 0)
    frm = np.moveaxis(frm, -1, 1)
    if(np.max(frm) > 1):
        frm = frm / 255.0
    frames[i][:] = frm
    i += 1
    while i < timesep:
        tmp[:] = frm[:]
        rval, frame = vc.read()
        frm = resize(frame, (h, w, rgb))
        frm = np.expand_dims(frm, axis = 0)
        if(np.max(frm) > 1):
            frm = frm / 255.0
        frm = np.moveaxis(frm, -1, 1)
        frames[i - 1][:] = frm
        i += 1

    return frames

In [None]:
class TimeWarp(nn.Module):
    def __init__(self, baseModel, method = 'squeeze'):
        super(TimeWarp, self).__init__()
        self.baseModel = baseModel
        self.method = method
 
    def forward(self, x):
        batch_size, time_steps, C, H, W = x.size()
        if self.method == 'loop':
            output = []
            for i in range(time_steps):
                x_t = self.baseModel(x[:, i, :, :, :])
                x_t = x_t.view(x_t.size(0), -1)
                output.append(x_t)

            x = torch.stack(output, dim = 0).transpose_(0, 1)
            output = None 
            x_t = None  
        else:
            x = x.contiguous().view(batch_size * time_steps, C, H, W)
            x = self.baseModel(x)
            x = x.view(x.size(0), -1)
            x = x.contiguous().view(batch_size , time_steps , x.size(-1))
        return x

In [None]:
class extractlastcell(nn.Module):
    def forward(self, x):
        out , _ = x
        return out[:, -1, :]

In [None]:
def VideoFightModel(device):
    
    pretrained = True
    baseModel = models.vgg19(pretrained = pretrained).features
    i = 0
    for child in baseModel.children():
        if i < 28:
            for param in child.parameters():
                param.requires_grad = False
        else:
            for param in child.parameters():
                param.requires_grad = True
        i +=1

    num_features = 12800
    rnn_hidden_size = 80
    rnn_num_layers = 1
    num_classes = 1
    dropout_rate = 0.2

    model = nn.Sequential(TimeWarp(baseModel, method = 'loop'),
                          nn.LSTM(num_features, rnn_hidden_size, rnn_num_layers, batch_first = True, bidirectional = True),
                          extractlastcell(),
                          nn.Linear(160, 256),
                          nn.ReLU(),
                          nn.Dropout(dropout_rate),
                          nn.Linear(256, num_classes))
    
    weights = '/content/drive/MyDrive/AnomalyDetection/Statemamonmixed96accviolance.pth'
    checkpoint = torch.load(weights)
    model.load_state_dict(checkpoint['state_dict'])
    return model

In [None]:
def pred_fight(model, video, accuracy = 0.58):
    model.eval()
    outputs = model(video)
    torch.sigmoid(outputs)
    preds = torch.sigmoid(outputs).to('cpu')
    preds = preds.detach().numpy()
    if preds[0][0] >= accuracy:
        return True, preds[0][0]
    else:
        return False, preds[0][0]

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

device(type='cuda')

In [None]:
model = VideoFightModel(device)
model.to(device)

Sequential(
  (0): TimeWarp(
    (baseModel): Sequential(
      (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (1): ReLU(inplace=True)
      (2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (3): ReLU(inplace=True)
      (4): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
      (5): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (6): ReLU(inplace=True)
      (7): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (8): ReLU(inplace=True)
      (9): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
      (10): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (11): ReLU(inplace=True)
      (12): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (13): ReLU(inplace=True)
      (14): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (15): ReLU(inplace=True)
      (

In [None]:
def detect_violence(filename, accuracy = 0.65):
    video = capture(filename, 40, 3, 170, 170)
    v = np.array([video])
    v = torch.from_numpy(v)
    v  = v.to(device, dtype = torch.float)
    time1 = time.time()
    millis1 = int(round(time1 * 1000))
    # print("Start time:", time.strftime('%H:%M:%S', time.localtime(time1)))
    f , precent = pred_fight(model, v, accuracy)
    time2 = time.time()
    millis2 = int(round(time2 * 1000))
    # print("End time:", time.strftime('%H:%M:%S', time.localtime(time2)))
    violence = 'Detected' if f == True else 'Not detected'
    detection_accuracy = str(precent * 100) + ' %'
    # result = {'Violence': violence, 'Detection Probability': detection_accuracy}
    processing_time = str(millis2 - millis1) + ' ms'
    # result['processing_time'] =  str(millis2 - millis1)
    # return result
    return [violence, detection_accuracy]

In [None]:
result = detect_violence('/content/drive/MyDrive/AnomalyDetection/MenHavingArgument.mp4')

Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  This is separate from the ipykernel package so we can avoid doing imports until


In [None]:
result

['Detected', '87.51553297042847 %']

In [None]:
inputs = gr.Video(source = 'upload', label = 'Input video', show_label =  True)
outputs = [gr.Textbox(label = 'Violence', show_label = True),
           gr.Textbox(label = 'Percentage Accuracy', show_label = True)]
demo = gr.Interface(fn = detect_violence, 
                    inputs = inputs, 
                    outputs = outputs,
                    title = 'Violence Detection Using Conv2D + LSTM Model'
                    )
demo.launch(share = True)

Colab notebook detected. To show errors in colab notebook, set `debug=True` in `launch()`
Running on public URL: https://36635.gradio.app

This share link expires in 72 hours. For free permanent hosting, check out Spaces (https://huggingface.co/spaces)


(<gradio.routes.App at 0x7f03eaa41d90>,
 'http://127.0.0.1:7864/',
 'https://36635.gradio.app')

Exception in callback None(<Task finishe...> result=None>)
handle: <Handle>
Traceback (most recent call last):
  File "/usr/lib/python3.7/asyncio/events.py", line 88, in _run
    self._context.run(self._callback, *self._args)
TypeError: 'NoneType' object is not callable
