<a href="https://colab.research.google.com/github/Jasmy-Elzha-Mathew-1715/Crime-Detection-and-Recognition-on-CCTV/blob/main/Deep_Crime_Detection.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Step 1 : Connecting Google Drive

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Step 2 : Importing Packages

In [None]:
import cv2
import os
import numpy as np
import matplotlib.pyplot as plt

2.1 : Setting Dimensions of Frame

In [None]:
frames = 15
Width = 256
Height = 256

2.2 : Loading Video Names

In [None]:
def load_video_names(path):
  videos = [] 
  labels = [] 
  for category in os.listdir(path): 
    if os.path.isdir(category): 
      for video in os.listdir(path+"/"+category): 
        videos.append(path+"/"+category+"/"+video) 
        labels.append(category)
  return np.array(videos), np.array(labels)

2.3 : Conversion of Frame Pixel

In [None]:
def preprocess(frame):
  frame = cv2.resize(frame, (Width, Height)) 
  frame = frame-127.5
  frame = frame/127.5
  return frame

2.4 : Loading Videos from Path

In [None]:
def load_video(video_path):
  video_frames = [] 
  cap = cv2.VideoCapture(video_path) 
  while True:
    ret, frame = cap.read()
    if ret == True:
      video_frames.append(preprocess(frame)) 
    else:
      break
  cap.release()
  video_frames = select_frames(video_frames) 
  if len(video_frames) != frames: 
    print('short_video ', video_path, len(video_frames))
    return 0, False

  return np.array(video_frames), True

2.5 : Choosing Desired No. of Frames

In [None]:
def select_frames(video_frames):
  selected_frames = []
  if len(video_frames) > frames:
    fn = len(video_frames)//frames 
    f_num = 0
    for f in video_frames:
      if len(selected_frames) < frames:
        if f_num % fn == 0:
          selected_frames.append(f)
      f_num += 1
  else:
    selected_frames = video_frames
  return selected_frames

2.6 : Load Batches of Videos

In [None]:
def create_dataset(videos, labels, indx):
  x = []
  y = []
  for video, label in zip(videos[indx], labels[indx]):
    xi, is_video = load_video(video)

    if is_video:
      x.append(xi)
      y.append(label)

  return np.array(x),np.array(y)

### Training Phase

In [None]:
videos, labels = load_video_names('/content/drive/MyDrive/Crime Detection and Recognition on CCTV/Anomaly_Short')
samples = len(videos)

In [None]:
samples

0

In [None]:
classes = len(np.unique(labels))

In [None]:
classes

0

In [None]:
#print(np.unique(labels, return_counts = True))
labels_counts = np.unique(labels, return_counts = True)
for l,n in zip(labels_counts[0], labels_counts[1]):
  print(l,' > ', n)

In [None]:
from sklearn.preprocessing import OneHotEncoder, LabelEncoder

le = LabelEncoder()
le.fit(np.unique(labels))
encoded_labels = le.transform(labels)
encoded_labels = np.reshape(encoded_labels, (-1,1))
np.save("classes.npy", le.classes_)
encoder = OneHotEncoder()
encoder.fit(encoded_labels)
encoded_labels = encoder.transform(encoded_labels)

In [None]:
print(encoded_labels)

In [None]:
encoded_labels = encoded_labels.toarray()

In [None]:
encoded_labels

**Model**

Keras Import for Model

In [None]:
from keras.models import Input, Model
from keras.layers import TimeDistributed, LSTM
from keras.layers import ConvLSTM2D
from keras.layers import Conv2D, MaxPooling2D
from keras.layers import Activation, LeakyReLU, BatchNormalization
from keras.layers import Dense, Flatten, GlobalMaxPooling2D
from keras.layers import MaxPooling3D
from keras.layers import concatenate

from keras.optimizers import Adam

Residual Block for Feature Extraction

In [None]:
def res_block(model, filters):
  start_block = model
  model = Conv2D(filters=filters, kernel_size = 3, padding='same')(model)
  model = BatchNormalization(momentum=0.9)(model)
  model = LeakyReLU(0.2)(model)
  return concatenate([start_block, model])

Deep Crime Detection Model

In [None]:
def create_model():
    input_layer = Input(shape=(frames, Width, Height,3))
    model =  ConvLSTM2D(32,3,padding='same',return_sequences=False)(input_layer)
    model = BatchNormalization(momentum=0.9)(model)
    model = LeakyReLU(0.2)(model)
    filters = 64
    for _ in range(6):
      model = res_block(model, filters)
      try:
        model = MaxPooling3D((2,2,2))(model)
      except:
        model = MaxPooling2D((2,2))(model)
      if filters < 512:
        filters *= 2
    model = Flatten()(model)
    model = Dense(classes, activation='softmax')(model)
    model = Model(input_layer, model)
    model.compile(optimizer=Adam(learning_rate=1e-4),
              loss='categorical_crossentropy',
              metrics=['accuracy'])
    return model

In [None]:
classifier = create_model()

In [None]:
classifier.summary()

In [None]:
from keras.utils import plot_model
plot_model(classifier)

Path to save the model

In [None]:
classifier.save("my_model.h5")

In [None]:
try:
  classifier.load_weights(classifier)
except:
  pass

Training

In [None]:
batch_size = 4
if samples//batch_size < samples/batch_size:
  batches = (samples//batch_size)+1
else:
  batches = samples//batch_size
for e in range(100):
  index = list(range(samples))
  np.random.shuffle(index)
  accuracy = 0
  for batch in range(batches):
    bs = batch*batch_size
    be = bs+batch_size
    selected_indexes = index[bs:be]
    x,y = create_dataset(videos, encoded_labels, selected_indexes)
    # print(x.shape)
    # print(y.shape)
    results = classifier.train_on_batch(x,y)
    print('\r',batch,'/',batches,' : ',results[0],results[1],end='')
    accuracy += results[1]
    if batch%100 == 0:
      classifier.save_weights("classifier.h5")
  print('\r> ',e,', Accuracy = ',accuracy/batches)
  classifier.save_weights("classifier.h5")

### Testing Phase

Load Model

In [None]:
classifier.load_weights('/content/drive/MyDrive/Crime Detection and Recognition on CCTV/Anomaly_Short/classifier.h5')

In [None]:
from google.colab import files
uploaded = files.upload()

for fn in uploaded.keys():
  print('User uploaded file "(name)" with length (length) bytes'.format(
      name=fn, length=len(uploaded[fn])))
  videos, r = load_video(fn)

In [None]:
print(videos.shape)

In [None]:
result = classifier.predict( np.array([videos]))

In [None]:
import numpy as np
np.argmax(result)

In [None]:
le.inverse_transform([np.argmax(result)])[0]

### Evaluating Results

In [None]:
import torchvision
import torch
from torch import nn
import torch.nn.functional as F
import torchvision.models as models
import torch.optim as optim
import copy
import os
from tqdm.autonotebook import tqdm
import matplotlib.pyplot as plt
from torch.utils.data import Dataset
from torchvision import transforms
from torch.utils.data import DataLoader
import numpy as np
from torch.utils.data.sampler import SubsetRandomSampler
import cv2
import sys

In [None]:
# Loading the Model
classifier.load_weights('/content/drive/MyDrive/Crime Detection and Recognition on CCTV/Anomaly_Short/classifier.h5')

In [None]:
os.makedirs('/content/drive/MyDrive/Crime Detection and Recognition on CCTV/Results',exist_ok = True)
from torch.autograd import Variable
iteration = 0
acc_all = list()
loss_all = list()
    
for epoch in range(num_epochs):
    print('')
    print(f"--- Epoch {epoch} ---")
    phase1 = dataloaders.keys()
    for phase in phase1:
        print('')
        print(f"--- Phase {phase} ---")
        epoch_metrics = {"loss": [], "acc": []}
        for batch_i, (X, y) in enumerate(dataloaders[phase]):
            #iteration = iteration+1
            image_sequences = Variable(X.to(device), requires_grad=True)
            labels = Variable(y.to(device), requires_grad=False)
            optimizer.zero_grad()
            #model.lstm.reset_hidden_state()
            predictions = model(image_sequences)
            loss = cls_criterion(predictions, labels)
            acc = 100 * (predictions.detach().argmax(1) == labels).cpu().numpy().mean()
            loss.backward()
            optimizer.step()
            epoch_metrics["loss"].append(loss.item())
            epoch_metrics["acc"].append(acc)
            if(phase=='train'):
                lr,mom = onecyc.calc()
                update_lr(optimizer, lr)
                update_mom(optimizer, mom)
            batches_done = epoch * len(dataloaders[phase]) + batch_i
            batches_left = num_epochs * len(dataloaders[phase]) - batches_done
            sys.stdout.write(
                    "\r[Epoch %d/%d] [Batch %d/%d] [Loss: %f (%f), Acc: %.2f%% (%.2f%%)]"
                    % (
                        epoch,
                        num_epochs,
                        batch_i,
                        len(dataloaders[phase]),
                        loss.item(),
                        np.mean(epoch_metrics["loss"]),
                        acc,
                        np.mean(epoch_metrics["acc"]),
                    )
                )

                # Empty cache
            if torch.cuda.is_available():
                torch.cuda.empty_cache()
            
        print('')
        print('{} , acc: {}'.format(phase,np.mean(epoch_metrics["acc"])))
        torch.save(model.state_dict(),'/content/drive/MyDrive/Crime Detection and Recognition on CCTV/Anomaly_Short/classifier.h5'.format(epoch))
        if(phase=='train'):
          acc_all.append(np.mean(epoch_metrics["acc"]))
          loss_all.append(np.mean(epoch_metrics["loss"]))

In [None]:
def error_plot(loss):
    plt.figure(figsize=(10,5))
    plt.plot(loss)
    plt.title("Training loss plot")
    plt.xlabel("epochs")
    plt.ylabel("Loss")
    plt.show()
def acc_plot(acc):
    plt.figure(figsize=(10,5))
    plt.plot(acc)
    plt.title("Training accuracy plot")
    plt.xlabel("epochs")
    plt.ylabel("accuracy")
    plt.show()

In [None]:
loss_all

In [None]:
error_plot(loss_all)

In [None]:
acc_plot(acc_all)