In [27]:
# GROUP MEMBERS:
# MUHAMMAD ABDULLAH
# HAASHA BIN ATIF
# MUHAMAMAD AHMED
 

In [28]:
import torch
import numpy as np
import pandas as pd
import os
import pickle
import torch.nn as nn
import torchvision
import cv2 as cv
import PIL
from tqdm.notebook import tqdm
from time import perf_counter
import subprocess

In [29]:
thread_identification_MODEL_PATH='../input/additional/2-Way-BestModel.pth'

thread_classification_nonweighted_MODEL_PATH='../input/additional/3-Way-NonWeighted-BestModel.pth'
thread_classification_weighted_MODEL_PATH='../input/additional/3-Way-Weighted-BestModel.pth'

four_class_nonweighted_model='../input/additional/4-Way-NonWeighted-BestModel.pth'
four_class_weighted_model='../input/additional/4-Way-Weighted-BestModel.pth'


In [30]:
def get_length(filename):
    result = subprocess.run(["ffprobe", "-v", "error", "-show_entries",
                             "format=duration", "-of",
                             "default=noprint_wrappers=1:nokey=1", filename],
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT)
    return float(result.stdout)

In [31]:
def getFrames(FilePath,n_frame=10):
    images=[]
    VideoCap = cv.VideoCapture(FilePath)
    
    i=0
    while(True):
        hasFrames,image = VideoCap.read()
        
        if i%n_frame==0 and hasFrames:
            images.append(image)
            i+=1
            continue
        if not hasFrames:
            break
        i+=1
    return images
def calculate_first_mask(frames,skip=10):
    frame1=frames[0]
    gray1 = cv.cvtColor(frame1, cv.COLOR_BGR2GRAY)
    gray1 = cv.GaussianBlur(gray1, (3, 3), 0)
    length,width,color=frames[0].shape
    mask=np.zeros((length,width),dtype=np.uint8)
    dimension=frame1.shape[0]*frame1.shape[1]
    for i in range(1,skip):
        frame2=frames[i]
        gray2 = cv.cvtColor(frame2, cv.COLOR_BGR2GRAY)
        gray2 = cv.GaussianBlur(gray2, (3, 3), 0)
        deltaframe=cv.absdiff(gray1,gray2)
        threshold = cv.threshold(deltaframe, 20, i, cv.THRESH_BINARY)[1]
        threshold = cv.dilate(threshold,None)
        zero_frame=len(gray2[gray2==0])
        if zero_frame<dimension:
            mask=np.maximum(mask,threshold)
    return mask
def apply_mask(frames,skip=10):
    output=[]
    mask=calculate_first_mask(frames,skip)
    firstmask=np.zeros(frames[0].shape,dtype=np.uint8)
    firstmask[mask>0]=1
    tempo=frames[1]*firstmask
    dimension=frames[0].shape[0]*frames[0].shape[1]
    for i in range(len(frames)):
        temp=np.zeros((frames[0].shape),np.uint8)
        temp[mask>0]=1
        output.append(frames[i]*temp)
        gray2 = cv.cvtColor(frames[i], cv.COLOR_BGR2GRAY)
        gray2 = cv.GaussianBlur(gray2, (3, 3), 0)
        zero_frame=len(np.where(gray2==0))
        mask[mask>0]-=1
        if i>0:
            deltaframe=cv.absdiff(gray1,gray2)
            threshold = cv.threshold(deltaframe, 20, skip, cv.THRESH_BINARY)[1]
            threshold = cv.dilate(threshold,None,iterations=20)
            zero_frame=len(gray2[gray2==0])
            if zero_frame<dimension:
                mask=np.maximum(mask,threshold)
        gray1=gray2
    return output

In [32]:
class vggCNNEncoder(nn.Module):
    def __init__(self, fc_hidden1=512, fc_hidden2=512, drop_p=0.3, CNN_embed_dim=300,h_RNN_layers=3, h_RNN=256, h_FC_dim=128,  num_classes=3):
        """Load the pretrained ResNet-152 and replace top fc layer."""
        super(vggCNNEncoder, self).__init__()

        self.fc_hidden1, self.fc_hidden2 = fc_hidden1, fc_hidden2
        self.drop_p = drop_p
        self.RNN_input_size = CNN_embed_dim
        self.h_RNN_layers = h_RNN_layers   # RNN hidden layers
        self.h_RNN = h_RNN                 # RNN hidden nodes
        self.h_FC_dim = h_FC_dim
        self.drop_p = drop_p
        self.num_classes = num_classes
        
        vgg16 = torchvision.models.vgg16(pretrained=False)
        modules = list(vgg16.children())[:-1]      # delete the last fc layer.
        self.vgg16 = nn.Sequential(*modules)
        self.fc1 = nn.Linear(vgg16.classifier[0].in_features, fc_hidden1)
        self.bn1 = nn.BatchNorm1d(fc_hidden1, momentum=0.01)
        self.fc2 = nn.Linear(fc_hidden1, fc_hidden2)
        self.bn2 = nn.BatchNorm1d(fc_hidden2, momentum=0.01)
        self.fc3 = nn.Linear(fc_hidden2, CNN_embed_dim)

        self.LSTM = nn.LSTM(
                        input_size=self.RNN_input_size,
                        hidden_size=self.h_RNN,        
                        num_layers=h_RNN_layers,       
                        batch_first=True)       # input & output will has batch size as 1s dimension. e.g. (batch, time_step, input_size))
        self.fc4 = nn.Linear(self.h_RNN, self.h_FC_dim)
        self.fc5 = nn.Linear(self.h_FC_dim, self.num_classes)

    def forward(self, x_3d):
        cnn_embed_seq = []
        for t in range(x_3d.size(1)):
            with torch.no_grad():
                x = self.vgg16(x_3d[:, t, :, :, :])  # ResNet
                x = x.view(x.size(0), -1)             # flatten output of conv

            # FC layers
            x = self.bn1(self.fc1(x))
            x = nn.functional.relu(x)
            x = nn.functional.dropout(x, p=self.drop_p, training=self.training)
            x = self.bn2(self.fc2(x))
            x = nn.functional.relu(x)
            x = nn.functional.dropout(x, p=self.drop_p, training=self.training)
            x = self.fc3(x)

            cnn_embed_seq.append(x)
        cnn_embed_seq = torch.stack(cnn_embed_seq, dim=0).transpose_(0, 1)

        self.LSTM.flatten_parameters()
        RNN_out, (h_n, h_c) = self.LSTM(cnn_embed_seq, None)  
        x = self.fc4(RNN_out[:, -1, :])   # choose RNN_out at the last time step
        x = nn.functional.relu(x)
        x = nn.functional.dropout(x, p=self.drop_p, training=self.training)
        x = self.fc5(x)
        return x

In [33]:
#weights loading
identification_Dict = torch.load(thread_identification_MODEL_PATH)
classification_nonweighted_Dict = torch.load(thread_classification_nonweighted_MODEL_PATH)
classification_weighted_Dict = torch.load(thread_classification_weighted_MODEL_PATH)
four_class_nonweighted_dict=torch.load(four_class_nonweighted_model)
four_class_weighted_dict=torch.load(four_class_weighted_model)
#model making
identification_model = vggCNNEncoder(num_classes=2)
classification_nonweighted_model = vggCNNEncoder(num_classes=3)
classification_weighted_model = vggCNNEncoder(num_classes=3)
four_class_nonweighted_model=vggCNNEncoder(num_classes=4)
four_class_weighted_model=vggCNNEncoder(num_classes=4)
#Assigning weights to its model
identification_model.load_state_dict(identification_Dict['Model'])
classification_nonweighted_model.load_state_dict(classification_nonweighted_Dict['Model'])
classification_weighted_model.load_state_dict(classification_weighted_Dict['Model'])
four_class_nonweighted_model.load_state_dict(four_class_nonweighted_dict['Model'])
four_class_weighted_model.load_state_dict(four_class_weighted_dict['Model'])
#shifting to gpu
identification_model.cuda()
classification_nonweighted_model.cuda()
classification_weighted_model.cuda()
four_class_nonweighted_model.cuda()
four_class_weighted_model.cuda()


vggCNNEncoder(
  (vgg16): Sequential(
    (0): Sequential(
      (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (1): ReLU(inplace=True)
      (2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (3): ReLU(inplace=True)
      (4): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
      (5): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (6): ReLU(inplace=True)
      (7): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (8): ReLU(inplace=True)
      (9): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
      (10): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (11): ReLU(inplace=True)
      (12): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (13): ReLU(inplace=True)
      (14): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (15): ReLU(inplace=True)
      

In [34]:
#FILEPATH='../input/ucf-crime-dataset/Fighting/Fighting028_x264.mp4'
files_for_demo=['../input/ucf-crime-dataset/Fighting/Fighting025_x264.mp4','../input/ucf-crime-dataset/Arson/Arson024_x264.mp4','../input/ucf-crime-dataset/Normal_1/Normal_Videos008_x264.mp4']

In [35]:
class VideoLoader(torch.utils.data.Dataset):
    def __init__(self,FilePath, n_frame =10, sequence_length = 16,transform = None):
        self.FilePath=FilePath
        self.transform=transform
        if transform is None:
            self.transform = torchvision.transforms.ToTensor()
        self.n_frame = n_frame
        self.sequence_length = sequence_length
        self.LoadVideo()
        
    def LoadVideo(self):
        self.Frames = getFrames(self.FilePath,n_frame=self.n_frame)
        
    def __len__(self):
        return len(self.Frames)-self.sequence_length

    def __getitem__(self,idx):
        Frames =self.Frames[idx:idx+self.sequence_length]
        Frames=apply_mask(Frames,10)
        Frames = [self.transform(PIL.Image.fromarray(Frame)) for Frame in Frames]
        Frames = torch.stack(Frames)
        Frames = Frames.reshape(self.sequence_length,3,264,264)
        return Frames

In [36]:
Transform = torchvision.transforms.Compose([torchvision.transforms.Resize((264,264)),torchvision.transforms.ToTensor()])
SEQUENCE_LENGTH = 18
#Test = VideoLoader(files_for_demo,n_frame=5,sequence_length=SEQUENCE_LENGTH,transform = Transform)
#TestLoader = torch.utils.data.DataLoader(Test,batch_size=1,num_workers=4,shuffle=False)


In [37]:
SOFTMAX = nn.Softmax(dim=1)

In [38]:
##Binary Classification
# OFTMAX = nn.Softmax(dim=1)
# identification_model.eval()
# List = []
# timing=[]
# Prev=float('inf')
# VideoCap = cv.VideoCapture(FILEPATH)
# start=perf_counter()
# for idx,data in enumerate(tqdm(TestLoader,desc='Testing')):
#     Images = data.cuda()
#     Results = identification_model(Images)
#     T = SOFTMAX(Results.to("cpu"))
#     List.append(T[0][0])
#     if T[0][0]>0.48:
#         timing.append([((idx*5)),(((idx*5+SEQUENCE_LENGTH*5)))])
# frames_with_threat=[]
# for i in timing:
#     temp=np.arange(i[0],i[1])
#     frames_with_threat.append(temp)
# frames_with_threat=np.unique(np.array(frames_with_threat))
# images=[]
# hasFrames,image = VideoCap.read()
# video=cv.VideoWriter(FILEPATH[:-3].split('/')[-1]+'avi',cv.VideoWriter_fourcc(*'DIVX'), 30,(image.shape[1],image.shape[0]))
# #images.append(image)
# video.write(image)
# total=1
# font = cv.FONT_HERSHEY_PLAIN  
# while hasFrames:
#     hasFrames,image = VideoCap.read()
#     if hasFrames:
#         total+=1
# #        images.append(image)
#         if total in frames_with_threat:
#             cv.putText(image,'THREAT',(5, 15),font,1,(0,0,255),1,cv.LINE_4) 
#             video.write(image)
#         else:
#             cv.putText(image,'NON-THREAT',(5, 15),font,1,(255, 255, 255),1,cv.LINE_4) 
#             video.write(image)

# end=perf_counter()
# print('Time Taken:',end-start,'Video Duration:',total//30)
# video.release()

In [39]:
# 3 class classification
def hierarchical_model(FILEPATH,identification_model,classification_model,model_name):
    
    identification_model.eval()
    classification_model.eval()

    List = []
    timing=[]
    classes={0:'Fighting',1:'Gun Event',2:'Arson_Explosion'}
    Prev=float('inf')
    VideoCap = cv.VideoCapture(FILEPATH)
    start=perf_counter()
    length=int(VideoCap.get(cv.CAP_PROP_FRAME_COUNT))
    array=[-1 for i in range(length)]
    print(len(array))
    for idx,data in enumerate(TestLoader):#tqdm(TestLoader,desc='Testing 3-way-'+model_name)):
        Images = data.cuda()
        Results = identification_model(Images)
        T = SOFTMAX(Results.to("cpu"))
        List.append(T[0][1])
        
        if T[0][1]>0.6:
            timing.append([((idx*5)),(((idx*5+SEQUENCE_LENGTH*5)))])
            classify=classification_model(Images)
            T=SOFTMAX(classify.to('cpu')).detach().numpy()
            index=np.argmax(T[0])
            temp=np.arange(timing[-1][0],timing[-1][1])
    #        print(temp)
    #        if array[temp[0]]==-1:
            for i in range(idx*5,idx*5+SEQUENCE_LENGTH*5):  
                if i==len(array):
                    break
                array[i]=index            

    images=[]
    hasFrames,image = VideoCap.read()
    video=cv.VideoWriter('3-way-'+model_name+'-'+FILEPATH[:-3].split('/')[-1]+'avi',cv.VideoWriter_fourcc(*'DIVX'), 30,(image.shape[1],image.shape[0]))
    #images.append(image)
    video.write(image)
    total=1
    font = cv.FONT_HERSHEY_PLAIN  
    count=0
    while hasFrames:
        hasFrames,image = VideoCap.read()
        if hasFrames:
            total+=1
    #        images.append(image)
            if array[count]!=-1:
                cv.putText(image,'THREAT:'+classes[array[count]],(5, 15),font,1,(0,0,255),1,cv.LINE_4) 
                video.write(image)
            else:
                cv.putText(image,'NON-THREAT',(5, 15),font,1,(255, 255, 255),1,cv.LINE_4) 
                video.write(image)
        count+=1
    end=perf_counter()
    print('Time Taken:',end-start,'Video Duration:',total//30)
    video.release()

In [40]:
# 4 class classification
def single_model(FILEPATH,four_class_model,model_name):
    four_class_model.eval()
    List = []
    timing=[]
    classes={0:'Fighting',1:'Gun Event',2:'Arson_Explosion',3:'Normal'}
    Prev=float('inf')
    VideoCap = cv.VideoCapture(FILEPATH)
    start=perf_counter()
    length=int(VideoCap.get(cv.CAP_PROP_FRAME_COUNT))
    array=[-1 for i in range(length)]
    print(len(array))
    for idx,data in enumerate(TestLoader):#tqdm(TestLoader,desc='Testing 4-way-'+model_name)):
        Images = data.cuda()
        timing.append([((idx*5)),(((idx*5+SEQUENCE_LENGTH*5)))])
        classify=four_class_model(Images)
        T=SOFTMAX(classify.to('cpu')).detach().numpy()
        index=np.argmax(T[0])
    #    print(T,index)
        temp=np.arange(timing[-1][0],timing[-1][1])
    #        print(temp)
    #        if array[temp[0]]==-1:
        for i in range(idx*5,idx*5+SEQUENCE_LENGTH*5):  
            if i>=len(array)-1:
                break
            array[i]=index            
    images=[]
    hasFrames,image = VideoCap.read()
    video=cv.VideoWriter('4-way-'+model_name+'-'+FILEPATH[:-3].split('/')[-1]+'avi',cv.VideoWriter_fourcc(*'DIVX'), 30,(image.shape[1],image.shape[0]))
    #images.append(image)
    video.write(image)
    total=1
    font = cv.FONT_HERSHEY_PLAIN  
    count=0
    while hasFrames:
        hasFrames,image = VideoCap.read()
        if hasFrames:
            total+=1
    #        images.append(image)
            if array[count]!=3:
                if array[count]==-1:
                    break
#                print(array[count])
                cv.putText(image,'THREAT:'+classes[array[count]],(5, 15),font,1,(0,0,255),1,cv.LINE_4) 
                video.write(image)
            else:
                cv.putText(image,'NON-THREAT',(5, 15),font,1,(255, 255, 255),1,cv.LINE_4) 
                video.write(image)
        count+=1
    end=perf_counter()
    print('Time Taken:',end-start,'Video Duration:',total//30)
    video.release()

In [41]:
for i in range(len(files_for_demo)):
    Test = VideoLoader(files_for_demo[i],n_frame=5,sequence_length=SEQUENCE_LENGTH,transform = Transform)
    TestLoader = torch.utils.data.DataLoader(Test,batch_size=1,num_workers=4,shuffle=False)
    hierarchical_model(files_for_demo[i],identification_model,classification_nonweighted_model,'non-weighted')
    hierarchical_model(files_for_demo[i],identification_model,classification_weighted_model,'weighted')
    single_model(files_for_demo[i],four_class_nonweighted_model,'non-weighted')
    single_model(files_for_demo[i],four_class_weighted_model,'weighted')
    

4781


KeyboardInterrupt: 

In [24]:
files=os.listdir('./')
#for file in files:
    