In [None]:
# Get weights and needed files
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!pip install pytube

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pytube
  Downloading pytube-15.0.0-py3-none-any.whl (57 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m57.6/57.6 kB[0m [31m2.8 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: pytube
Successfully installed pytube-15.0.0


In [None]:
import os
import io
import cv2
import time
import copy
import glob
import torch
import gdown
import argparse
import statistics
import threading
import torchvision
import numpy as np
import pandas as pd
import torch.nn as nn
import albumentations as A
from pytube import YouTube
from moviepy.editor import *
from base64 import b64encode
from collections import deque
from IPython.display import HTML

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
predicted_class_name = ""
DATASET_DIR = ''
CLASSES_LIST = ['fight','noFight']
SEQUENCE_LENGTH = 16


# url = 'https://drive.google.com/uc?id=1MWDeLnpEaZDrKK-OjmzvYLxfjwp-GDcp'
# output = 'model_16_m3_0.8888.pth'
# gdown.download(url, output, quiet=False)

output = '/content/drive/MyDrive/TheProject/Fight_Detection_From_Surveillance_Cameras-PyTorch_Project/Models/model_16_m3_0.8888.pth'
__location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(output)))
modelPath= os.path.join(__location__, 'model_16_m3_0.8888.pth')
###############################################################################

# Define the transforms
def transform_():
    transform = A.Compose(
    [A.Resize(128, 171, always_apply=True),A.CenterCrop(112, 112, always_apply=True),
     A.Normalize(mean = [0.43216, 0.394666, 0.37645],std = [0.22803, 0.22145, 0.216989], always_apply=True)]
     )
    return transform


def frames_extraction(video_path,SEQUENCE_LENGTH):
    '''
    This function will extract the required frames from a video after resizing and normalizing them.
    Args:
        video_path: The path of the video in the disk, whose frames are to be extracted.
        SEQUENCE_LENGTH: TThe number of Frames we want.
    Returns:
        frames_list: A list containing the resized and normalized frames of the video.
    '''

    # Declare a list to store video frames.
    frames_list = []

    # Read the Video File using the VideoCapture object.
    video_reader = cv2.VideoCapture(video_path)

    # Get the total number of frames in the video.
    video_frames_count = int(video_reader.get(cv2.CAP_PROP_FRAME_COUNT))

    # Calculate the the interval after which frames will be added to the list.
    skip_frames_window = max(int(video_frames_count/SEQUENCE_LENGTH), 1)

    transform= transform_()

    # Iterate through the Video Frames.
    for frame_counter in range(SEQUENCE_LENGTH):

        # Set the current frame position of the video.
        video_reader.set(cv2.CAP_PROP_POS_FRAMES, frame_counter * skip_frames_window)

        # Reading the frame from the video.
        success, frame = video_reader.read()

        # Check if Video frame is not successfully read then break the loop
        if not success:
            break

        image = frame.copy()
        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        frame = transform(image=frame)['image']

        # Append the normalized frame into the frames list
        frames_list.append(frame)

    # Release the VideoCapture object.
    video_reader.release()

    # Return the frames list.
    return frames_list


def create_dataset(DATASET_DIR,CLASSES_LIST,SEQUENCE_LENGTH):
    '''
    This function will extract the data of the selected classes and create the required dataset.
    Returns:
        features:          A list containing the extracted frames of the videos.
        labels:            A list containing the indexes of the classes associated with the videos.
    '''

    # Declared Empty Lists to store the features and labels.
    features = []
    labels = []

    # Iterating through all the classes mentioned in the classes list
    for class_index, class_name in enumerate(CLASSES_LIST):

        # Display the name of the class whose data is being extracted.
        print(f'Extracting Data of Class: {class_name}')

        # Get the list of video files present in the specific class name directory.
        files_list = os.listdir(os.path.join(DATASET_DIR, class_name))

        # Iterate through all the files present in the files list.
        for file_name in files_list:

            # Get the complete video path.
            video_file_path = os.path.join(DATASET_DIR, class_name, file_name)

            # Extract the frames of the video file.
            frames = frames_extraction(video_file_path,SEQUENCE_LENGTH)

            # Check if the extracted frames are equal to the SEQUENCE_LENGTH specified above.
            # So ignore the vides having frames less than the SEQUENCE_LENGTH.
            if len(frames) == SEQUENCE_LENGTH:
                # Append the data to their repective lists.
                input_frames = np.array(frames)

                # transpose to get [3, num_clips, height, width]
                input_frames = np.transpose(input_frames, (3,0, 1, 2))

                # convert the Frames & Labels to tensor
                input_frames = torch.tensor(input_frames, dtype=torch.float32)
                label = torch.tensor(int(class_index))

                # Append the data to their repective lists and Stack them as Tensor.
                features.append(input_frames) # append to list
                labels.append(label) # append to list



    # Return the frames, class index, and video file path.
    return  torch.stack(features), torch.stack(labels)

# Function To Train the Model From Pytorch Documentation
def train_model(device,model, dataloaders, criterion, optimizer, num_epochs=25, is_inception=False):
    since = time.time()

    val_acc_history = []

    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0

    for epoch in range(num_epochs):
        print('Epoch {}/{}'.format(epoch, num_epochs - 1))
        print('-' * 10)

        # Each epoch has a training and validation phase
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()  # Set model to training mode
            else:
                model.eval()   # Set model to evaluate mode

            running_loss = 0.0
            running_corrects = 0

            # Iterate over data.
            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(device)
                labels = labels.to(device)

                # zero the parameter gradients
                optimizer.zero_grad()

                # forward
                # track history if only in train
                with torch.set_grad_enabled(phase == 'train'):
                    # Get model outputs and calculate loss
                    # Special case for inception because in training it has an auxiliary output. In train
                    #   mode we calculate the loss by summing the final output and the auxiliary output
                    #   but in testing we only consider the final output.
                    if is_inception and phase == 'train':
                        # From https://discuss.pytorch.org/t/how-to-optimize-inception-model-with-auxiliary-classifiers/7958
                        outputs, aux_outputs = model(inputs)
                        loss1 = criterion(outputs, labels)
                        loss2 = criterion(aux_outputs, labels)
                        loss = loss1 + 0.4*loss2
                    else:
                        outputs = model(inputs)
                        loss = criterion(outputs, labels)

                    _, preds = torch.max(outputs, 1)

                    # backward + optimize only if in training phase
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                # statistics
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

            epoch_loss = running_loss / len(dataloaders[phase].dataset)
            epoch_acc = running_corrects.double() / len(dataloaders[phase].dataset)

            print('{} Loss: {:.4f} Acc: {:.4f}'.format(phase, epoch_loss, epoch_acc))

            # deep copy the model
            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())
            if phase == 'val':
                val_acc_history.append(epoch_acc)

        print()

    time_elapsed = time.time() - since
    print('Training complete in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
    print('Best val Acc: {:4f}'.format(best_acc))

    # load best model weights
    model.load_state_dict(best_model_wts)
    return model, val_acc_history

def loadModel():
  PATH=modelPath
  model_ft = torchvision.models.video.mc3_18(pretrained=True, progress=False)
  num_ftrs = model_ft.fc.in_features         #in_features
  model_ft.fc = torch.nn.Linear(num_ftrs, 2) #nn.Linear(in_features, out_features)
  model_ft.load_state_dict(torch.load(PATH,map_location=torch.device(device)))
  model_ft.to(device)
  model_ft.eval()
  return model_ft

model = loadModel()

def PredTopKClass(k, clips):
  with torch.no_grad(): # we do not want to backprop any gradients

      input_frames = np.array(clips)

      # add an extra dimension
      input_frames = np.expand_dims(input_frames, axis=0)

      # transpose to get [1, 3, num_clips, height, width]
      input_frames = np.transpose(input_frames, (0, 4, 1, 2, 3))

      # convert the frames to tensor
      input_frames = torch.tensor(input_frames, dtype=torch.float32)
      input_frames = input_frames.to(device)

      # forward pass to get the predictions
      outputs = model(input_frames)

      # get the prediction index
      soft_max = torch.nn.Softmax(dim=1)
      probs = soft_max(outputs.data)
      prob, indices = torch.topk(probs, k)

  Top_k = indices[0]
  Classes_nameTop_k=[CLASSES_LIST[item].strip() for item in Top_k]
  ProbTop_k=prob[0].tolist()
  ProbTop_k = [round(elem, 5) for elem in ProbTop_k]
  return Classes_nameTop_k[0]     #list(zip(Classes_nameTop_k,ProbTop_k))


def PredTopKProb(k,clips):
  with torch.no_grad(): # we do not want to backprop any gradients

      input_frames = np.array(clips)

      # add an extra dimension
      input_frames = np.expand_dims(input_frames, axis=0)

      # transpose to get [1, 3, num_clips, height, width]
      input_frames = np.transpose(input_frames, (0, 4, 1, 2, 3))

      # convert the frames to tensor
      input_frames = torch.tensor(input_frames, dtype=torch.float32)
      input_frames = input_frames.to(device)

      # forward pass to get the predictions
      outputs = model(input_frames)

      # get the prediction index
      soft_max = torch.nn.Softmax(dim=1)
      probs = soft_max(outputs.data)
      prob, indices = torch.topk(probs, k)

  Top_k = indices[0]
  Classes_nameTop_k=[CLASSES_LIST[item].strip() for item in Top_k]
  ProbTop_k=prob[0].tolist()
  ProbTop_k = [round(elem, 5) for elem in ProbTop_k]
  return list(zip(Classes_nameTop_k,ProbTop_k))

def downloadYouTube(videourl, path):

    yt = YouTube(videourl)
    yt = yt.streams.filter(progressive=True, file_extension='mp4').order_by('resolution').desc().first()
    if not os.path.exists(path):
        os.makedirs(path)
    yt.download(path)

def show_video(file_name, width=640):
  # show resulting deepsort video
  mp4 = open(file_name,'rb').read()
  data_url = "data:video/mp4;base64," + b64encode(mp4).decode()
  return HTML("""
  <video width="{0}" controls>
        <source src="{1}" type="video/mp4">
  </video>
  """.format(width, data_url))

def FightInference(video_path,SEQUENCE_LENGTH=64):
  clips = frames_extraction(video_path,SEQUENCE_LENGTH)
  print(PredTopKClass(1,clips))
  print(PredTopKProb(2,clips))
  return "***********"


def FightInference_Time(video_path,SEQUENCE_LENGTH=64):
  start_time = time.time()
  clips = frames_extraction(video_path,SEQUENCE_LENGTH)
  class_=PredTopKClass(1,clips)
  elapsed = time.time() - start_time
  print("time is:",elapsed)
  return class_




def predict_on_video(video_file_path, output_file_path, image_path,SEQUENCE_LENGTH,skip=2,showInfo=False):
    '''
    This function will perform action recognition on a video using the LRCN model.
    Args:
    video_file_path:  The path of the video stored in the disk on which the action recognition is to be performed.
    output_file_path: The path where the ouput video with the predicted action being performed overlayed will be stored.
    SEQUENCE_LENGTH:  The fixed number of frames of a video that can be passed to the model as one sequence.
    '''
    i=0
    j=0
    flag=0
    # Initialize the VideoCapture object to read from the video file.
    video_reader = cv2.VideoCapture(video_file_path)

    # Get the width and height of the video.
    original_video_width = int(video_reader.get(cv2.CAP_PROP_FRAME_WIDTH))
    original_video_height = int(video_reader.get(cv2.CAP_PROP_FRAME_HEIGHT))

    # Initialize the VideoWriter Object to store the output video in the disk.
    video_writer = cv2.VideoWriter(output_file_path, cv2.VideoWriter_fourcc('M', 'P', '4', 'V'),
                                   video_reader.get(cv2.CAP_PROP_FPS), (original_video_width, original_video_height))

    # Declare a queue to store video frames.
    frames_queue = deque(maxlen = SEQUENCE_LENGTH)
    transform= transform_()
    # Initialize a variable to store the predicted action being performed in the video.
    predicted_class_name = ''

    # Iterate until the video is accessed successfully.
    counter=0
    while video_reader.isOpened():

        # Read the frame.
        ok, frame = video_reader.read()

        # Check if frame is not read properly then break the loop.
        if not ok:
            break

        image = frame.copy()
        framee = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        framee = transform(image=framee)['image']
        if counter % skip==0:
          # Appending the pre-processed frame into the frames list.
          frames_queue.append(framee)


        # Check if the number of frames in the queue are equal to the fixed sequence length.
        if len(frames_queue) == SEQUENCE_LENGTH:
          predicted_class_name= PredTopKClass(1,frames_queue)
          if showInfo:
            print(predicted_class_name)
            frames_queue = deque(maxlen = SEQUENCE_LENGTH)
          else:
            frames_queue = deque(maxlen = SEQUENCE_LENGTH)

        # Write predicted class name on top of the frame.
        if predicted_class_name=="fight":
          if(j%30==0):
            flag=1
            i=i+1
            folder_path = image_path
            file_name = os.path.join(folder_path, f"fight_{i}.jpg")
            cv2.imwrite(file_name, frame)
          # img = recognize2(frame)
          # cv2.imwrite(f'cina_{i}.jpg', img)
          j=j+1
          cv2.putText(frame, predicted_class_name, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 3)
        else:
          cv2.putText(frame, predicted_class_name, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
        counter+=1

        # Write The frame into the disk using the VideoWriter Object.

        video_writer.write(frame)
        # time.sleep(2)
    if showInfo:
      print(counter)
    # Release the VideoCapture and VideoWriter objects.
    video_reader.release()
    video_writer.release()
    # fight detected
    return flag

def fightDetection(inputPath,seq,skip,outputPath,image_path,showInfo=False):

    # Perform Accident Detection on the Test Video.
    return predict_on_video(inputPath, outputPath,image_path,seq,skip,showInfo)




Downloading: "https://download.pytorch.org/models/mc3_18-a90a0ba3.pth" to /root/.cache/torch/hub/checkpoints/mc3_18-a90a0ba3.pth


In [None]:
start=time.time()
frame_count = 0
def fight_detection(input_path, output_path, image_folder, input_criminal_face, output_face_folder):
    global frame_count
    flag =  fightDetection(input_path,16,2,output_path,image_folder,False)
    flag=0
    print("flag = {}".format(flag))
    if flag==1:
      image_files = os.listdir(image_folder)
      for image_file in image_files:
          frame_count += 1
          path = os.path.join(image_folder,image_file)
          print(path)
          image = face_recognition.load_image_file(path)
          # image = cv2.imread(path)

          # # Convert the image to RGB (face_recognition uses RGB images)
          # image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
          recognize2(image, input_criminal_face, output_face_folder)

      #   # Open the video file
      #   video = cv2.VideoCapture(output_path)
      #  # Loop through the video frames
      #   while True:
      #       ret, frame = video.read()
      #       if not ret:
      #           break
      #       frame_count += 1
      #       img=recognize2(frame, input_criminal_face, output_face_folder)
      #   # Release the video object
      #   video.release()

elapsed=time.time()-start
print("Take:",f'{elapsed:.20f}'," Second")
print("Take:",f'{elapsed/60:.20f}'," min")


Take: 0.00029945373535156250  Second
Take: 0.00000499089558919271  min


In [None]:
!pip install face_recognition

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting face_recognition
  Downloading face_recognition-1.3.0-py2.py3-none-any.whl (15 kB)
Collecting face-recognition-models>=0.3.0 (from face_recognition)
  Downloading face_recognition_models-0.3.0.tar.gz (100.1 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m100.1/100.1 MB[0m [31m10.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: face-recognition-models
  Building wheel for face-recognition-models (setup.py) ... [?25l[?25hdone
  Created wheel for face-recognition-models: filename=face_recognition_models-0.3.0-py2.py3-none-any.whl size=100566173 sha256=6acfc2d22fe17ef0a105d0f7841c61df4c53456dd32dea6eebd07075047cb32a
  Stored in directory: /root/.cache/pip/wheels/7a/eb/cf/e9eced74122b679557f597bb7c8e4c739cfcac526db1fd523d
Successfully built face-recognition-models
Installing colle

In [None]:
import face_recognition
import glob
import cv2
import os
import matplotlib.pyplot as plt
import numpy as np
import time

###Graduation Image

In [None]:
def recognize4(img, encoded_faces, names):
    faces_locations = face_recognition.face_locations(img)
    faces_encodings = face_recognition.face_encodings(img)
    detected = False

    for img_encoding, face_location in zip(faces_encodings, faces_locations):
        results = face_recognition.compare_faces(encoded_faces, img_encoding, tolerance=0.5)
        detected = any(results)

        if detected:
            matched_name = "Unknown"

            # Find the name associated with the matched face
            for idx, result in enumerate(results):
                if result:
                    matched_name = names[idx]
                    break

            # Increase the box size
            change = 7
            face_top = max(0, face_location[0] - change)
            face_right = min(img.shape[1], face_location[1] + change)
            face_bottom = min(img.shape[0], face_location[2] + change)
            face_left = max(0, face_location[3] - change)

            img = cv2.putText(img, matched_name, (face_left, face_top - change), cv2.FONT_HERSHEY_DUPLEX, 0.7, (255, 255, 255), 2)
            img = cv2.rectangle(img, (face_left, face_top), (face_right, face_bottom), (255, 255, 255), 2)

            # img = cv2.putText(img, matched_name, (face_location[3], face_location[0]-20), cv2.FONT_HERSHEY_DUPLEX, 1.0, (255, 255, 255), 2)
            # img = cv2.rectangle(img, (face_location[3], face_location[0]), (face_location[1], face_location[2]), (255, 255, 255), 2)

    return img, detected
our_img_folder = '/content/drive/MyDrive/TheProject/our_img_folder'
list_our_images = os.listdir(our_img_folder)
image_names = [os.path.splitext(file)[0] for file in list_our_images]
criminal_images = [face_recognition.load_image_file(os.path.join(our_img_folder, image_name)) for image_name in list_our_images]
encoded_faces = [face_recognition.face_encodings(image)[0] for image in criminal_images]
print(len(encoded_faces))

img_num =1
path = f'/content/drive/MyDrive/TheProject/grad/grad{img_num}.jpg'
img = cv2.imread(path)
# Call the recognize3 function with the updated arguments
img_result, is_detected = recognize4(img, encoded_faces, image_names)
result_path = '/content/drive/MyDrive/TheProject/grad_result'
cv2.imwrite(os.path.join(result_path, f'result{img_num}.jpg'), img_result)


9


True

In [None]:
def recognize2(img, input_criminal_face, output_face_folder):
    cina_image = face_recognition.load_image_file(input_criminal_face) #john cina photo
    cina_encoding = face_recognition.face_encodings(cina_image)[0]
    print('here')
    faces_locations = face_recognition.face_locations(img)
    print('here2')
    faces_encodings = face_recognition.face_encodings(img)
    print('here3')
    for img_encoding , face_location  in zip(faces_encodings , faces_locations):
        results = face_recognition.compare_faces([cina_encoding], img_encoding , tolerance = 0.6)
        if sum(results)>0:
                folder_path = output_face_folder
                file_name = os.path.join(folder_path, f"face_{frame_count}.jpg")
                print('yes')
                img = cv2.putText(img, 'cina' , (face_location[3],face_location[0]-20), cv2.FONT_HERSHEY_DUPLEX, 2.0, (255,255,255), 2)
                img = cv2.rectangle(img, (face_location[3],face_location[0]), (face_location[1],face_location[2]) , (255,255,255), 2)
                img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
                cv2.imwrite(file_name, img)
    return img


#The fight and face together

###Fight and face models together

In [None]:
def recognize3(img, encoded_face):
    faces_locations = face_recognition.face_locations(img)
    faces_encodings = face_recognition.face_encodings(img)
    detected = False
    for img_encoding , face_location  in zip(faces_encodings , faces_locations):
        results = face_recognition.compare_faces(encoded_face, img_encoding , tolerance = 0.5)
        detected = sum(results)>0
        if detected:
                # folder_path = output_face_folder
                # file_name = os.path.join(folder_path, f"face_{frame_count}.jpg")
                img = cv2.putText(img, 'Criminal' , (face_location[3],face_location[0]-20), cv2.FONT_HERSHEY_DUPLEX, 1.0, (255,255,255), 2)
                img = cv2.rectangle(img, (face_location[3],face_location[0]), (face_location[1],face_location[2]) , (255,255,255), 2)
                # img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
                # cv2.imwrite(file_name, img)
    return img, detected
# def predict_on_video(video_file_path, output_file_path, image_path,SEQUENCE_LENGTH,skip=2,showInfo=False):
def fightDetection2(video_file_path,output_file_path,image_path,input_criminal_folder,list_criminal_images,ouput_face_folder,face_model=False,SEQUENCE_LENGTH=16,skip=2,showInfo=False):
    '''
    This function will perform action recognition on a video using the LRCN model.
    Args:
    video_file_path:  The path of the video stored in the disk on which the action recognition is to be performed.
    output_file_path: The path where the ouput video with the predicted action being performed overlayed will be stored.
    SEQUENCE_LENGTH:  The fixed number of frames of a video that can be passed to the model as one sequence.
    '''
    i=0
    j=0
    k=0
    flag=0

    global frame_count
    global frame_number

    # Initialize the VideoCapture object to read from the video file.
    video_reader = cv2.VideoCapture(video_file_path)

    frame_number = 0
    frame_count = int(video_reader.get(cv2.CAP_PROP_FRAME_COUNT))

    # Get the width and height of the video.
    original_video_width = int(video_reader.get(cv2.CAP_PROP_FRAME_WIDTH))
    original_video_height = int(video_reader.get(cv2.CAP_PROP_FRAME_HEIGHT))

    # Initialize the VideoWriter Object to store the output video in the disk.
    video_writer = cv2.VideoWriter(output_file_path, cv2.VideoWriter_fourcc('M', 'P', '4', 'V'),
                                   video_reader.get(cv2.CAP_PROP_FPS), (original_video_width, original_video_height))

    # Declare a queue to store video frames.
    frames_queue = deque(maxlen = SEQUENCE_LENGTH)
    transform= transform_()
    # Initialize a variable to store the predicted action being performed in the video.
    predicted_class_name = ''

    # Iterate until the video is accessed successfully.
    counter=0

    # criminal_image = face_recognition.load_image_file(input_criminal_face) #john cina photo
    # criminal_encoding = face_recognition.face_encodings(criminal_image)[0]
    if face_model:
        criminal_images = [face_recognition.load_image_file(os.path.join(input_criminal_folder, image_name)) for image_name in list_criminal_images]
        # criminal_encodings = [face_recognition.face_encodings(image)[0] for image in criminal_images]
        criminal_encodings = []
        for image in criminal_images:
            face_encodings = face_recognition.face_encodings(image)
            if len(face_encodings) > 0:
                criminal_encodings.append(face_encodings[0])

        if not (len(criminal_encodings) > 0):
            face_model = False
        print("************")
        print("Number of face encodings: ", len(criminal_encodings))
        print("************")
    print(face_model)
    while video_reader.isOpened():

        # Read the frame.
        ok, frame = video_reader.read()

        # Check if frame is not read properly then break the loop.
        if not ok:
            break

        image = frame.copy()
        framee = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        framee = transform(image=framee)['image']
        if counter % skip==0:
          # Appending the pre-processed frame into the frames list.
          frames_queue.append(framee)


        # Check if the number of frames in the queue are equal to the fixed sequence length.
        if len(frames_queue) == SEQUENCE_LENGTH:
          predicted_class_name= PredTopKClass(1,frames_queue)
          if showInfo:
            print(predicted_class_name)
            frames_queue = deque(maxlen = SEQUENCE_LENGTH)
          else:
            frames_queue = deque(maxlen = SEQUENCE_LENGTH)

        # Write predicted class name on top of the frame.
        if predicted_class_name=="fight":
          if(j%30==0):
            flag=1
            i=i+1
            folder_path = image_path
            file_name = os.path.join(folder_path, f"fight_{i}.jpg")
            cv2.imwrite(file_name, frame)
          # img = recognize2(frame)
          # cv2.imwrite(f'cina_{i}.jpg', img)

          j=j+1
          cv2.putText(frame, predicted_class_name, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 3)
        else:
          cv2.putText(frame, predicted_class_name, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
        counter+=1

        if face_model:
            frame,face_detected = recognize3(frame,criminal_encodings)
            if (face_detected) and (frame_number%15==0):
                k=k+1
                folder_path = ouput_face_folder
                file_name = os.path.join(folder_path, f"face_{k}.jpg")
                cv2.imwrite(file_name, frame)

        # Write The frame into the disk using the VideoWriter Object.

        video_writer.write(frame)
        frame_number += 1

        # time.sleep(2)
    if showInfo:
      print(counter)
    # Release the VideoCapture and VideoWriter objects.
    video_reader.release()
    video_writer.release()
    # fight detected
    return flag

###Car crash

In [None]:
from itertools import combinations
weights_path =os.path.join("/content/drive/MyDrive/TheProject/yolov3.weights")
cfg_path =os.path.join("/content/drive/MyDrive/TheProject/yolov3.cfg.txt")
net=cv2.dnn.readNetFromDarknet(cfg_path ,weights_path)

In [None]:
def car_accident(img) :
  (H,W)=img.shape[:2]
  blob= cv2.dnn.blobFromImage(img,1/255.0,(416,416),crop=False,swapRB=False)
  net.setInput(blob)
  detected = ''
  layers_output=net.forward(['yolo_82', 'yolo_94', 'yolo_106'])
  car_boxes=[]
  car_confidences=[]
  classIDs=[]
  for output in layers_output:
    for detection in output:
        scores=detection[5:]
        classID=np.argmax(scores)
        confidence=scores[classID]
        if classID==2:
          if (confidence >0.5):
            box=detection[:4] * np.array([W,H,W,H])
            bx,by,bw,bh=box.astype("int")
            x=int(bx-(bw/2))
            y=int(by-(bh/2))
            car_boxes.append([x,y,int(bw),int(bh)])
            car_confidences.append(float(confidence))
            classIDs.append(classID)
  car_idx=cv2.dnn.NMSBoxes(car_boxes,car_confidences,0.5,0.4)

  if len(car_idx) > 0:  						# At least 1 detection in the image and check detection presence in a frame
        centroid_dict = dict() 						# Function creates a dictionary and calls it centroid_dict
        objectId = 0								# We inialize a variable called ObjectId and set it to 0
        for i in car_idx.flatten():
          (x,y)=[car_boxes[i][0],car_boxes[i][1]]
          (w,h)=[car_boxes[i][2],car_boxes[i][3]]

     	    # Store the center points of the detections
          # Convert from center coordinates to rectangular coordinates, We use floats to ensure the precision of the BBox
          xmin = int(round(x))
          xmax = int(xmin+w)
          ymin = int(round(y))
          ymax = int(ymin+h)

          # Append center point of bbox for cars detected.
          centroid_dict[objectId] = (int(x), int(y), xmin, ymin, xmax, ymax) # Create dictionary of tuple with 'objectId' as the index center points and bbox
          objectId += 1 #Increment the index for each detection
    #=================================================================#
    # Purpose : Determine which car bbox are close to each other
    #=================================================================
        vehicle_red_zone_list = [] # List containing which Object id is in under threshold distance condition.
        vehicle_red_line_list = []
        for (id1, p1), (id2, p2) in combinations(centroid_dict.items(), 2): # Get all the combinations of close detections, #List of multiple items - id1 1, points 2, 1,3
            #dx, dy = p1[0] - p2[0], p1[1] - p2[1]  	# Check the difference between centroid x: 0, y :1
            #distance = is_close(dx, dy) 			# Calculates the Euclidean distance

            #if distance < 50.0:						# Set our distance threshold - If they meet this condition then..

            if not ((p1[2]+30>=p2[4]) or (p1[4]<=p2[2]+30) or (p1[5]<=p2[3]+30) or (p1[3]+30>=p2[5])):
                if id1 not in vehicle_red_zone_list:
                    vehicle_red_zone_list.append(id1)       #  Add Id to a list
                    vehicle_red_line_list.append(p1[0:2])   #  Add points to the list
                if id2 not in vehicle_red_zone_list:
                    vehicle_red_zone_list.append(id2)		# Same for the second id
                    vehicle_red_line_list.append(p2[0:2])

        for idx1, box in centroid_dict.items():  # dict (1(key):red(value), 2 blue)  idx - key  box - value
            if idx1 in vehicle_red_zone_list:   # if id is in red zone list
                cv2.rectangle(img, (box[2], box[3]), (box[4], box[5]), (0, 0, 255), 2) # Create Red bounding boxes  #starting point, ending point size of 2
            else:
                cv2.rectangle(img, (box[2], box[3]), (box[4], box[5]), (0, 255, 0), 2) # Create Green bounding boxes
		#=================================================================#

        if len(vehicle_red_zone_list)!=0:
            text = "Crash Detected"
        else:
            text = "Crash Not Detected"

        location = (W-550,80)										# Set the location of the displayed text
        if len(vehicle_red_zone_list)!=0:
            cv2.putText(img, text, location, cv2.FONT_HERSHEY_SIMPLEX, 1, (0,0,255), 2, cv2.LINE_AA)  # Display Text
            detected = 'crash'
        else:
            cv2.putText(img, text, location, cv2.FONT_HERSHEY_SIMPLEX, 1, (0,255,0), 2, cv2.LINE_AA)  # Display Text
  return img,detected

###Human_Fall detection

In [None]:
weights_path =os.path.join("/content/drive/MyDrive/TheProject/yolov3.weights")
cfg_path =os.path.join("/content/drive/MyDrive/TheProject/yolov3.cfg.txt")
# weights_path =os.path.join("/content/drive/MyDrive/TheProject/yolov3-tiny.weights")
# cfg_path =os.path.join("/content/drive/MyDrive/TheProject/yolov3-tiny.cfg")
net=cv2.dnn.readNetFromDarknet(cfg_path ,weights_path)

def human_fall(img) :
  (H,W)=img.shape[:2]
  blob= cv2.dnn.blobFromImage(img,1/255.0,(416,416),crop=False,swapRB=False)
  net.setInput(blob)
  layers_output=net.forward(['yolo_82', 'yolo_94', 'yolo_106'])
  boxes=[]
  confidences=[]
  classIDs=[]
  for output in layers_output:
    for detection in output:
        scores=detection[5:]
        classID=np.argmax(scores)
        confidence=scores[classID]
        if classID==0 :
          if (confidence >0.5):
            box=detection[:4] * np.array([W,H,W,H])
            bx,by,bw,bh=box.astype("int")

            x=int(bx-(bw/2))
            y=int(by-(bh/2))

            boxes.append([x,y,int(bw),int(bh)])
            confidences.append(float(confidence))
            classIDs.append(classID)
  idx=cv2.dnn.NMSBoxes(boxes,confidences,0.5,0.4)
  detected = ''
  if len(idx)==0 :
    return img,detected
  else :
      for i in idx.flatten():
        (x,y)=[boxes[i][0],boxes[i][1]]
        (w,h)=[boxes[i][2],boxes[i][3]]
        if not (h-w > -100) :
        #   cv2.rectangle (img,(x,y),(x+w ,y+h),(0,255,0),2)
        # else :
            detected = 'fall'
            cv2.rectangle (img,(x,y),(x+w ,y+h),(0,0,255),2)
            cv2.putText(img,("fall detected"),(x,y-5),cv2.FONT_HERSHEY_SIMPLEX,1,(0,0,255),2)
  return img,detected

###Mask Detection

In [None]:
%cd /content/drive/MyDrive/TheProject/darknet/
!chmod +x darknet

/content/drive/MyDrive/TheProject/darknet


In [None]:
# run your custom detector with this command (upload an image to your google drive to test, the thresh flag sets the minimum accuracy required for object detection)
!./darknet detector test data/obj.data cfg/yolov4-tiny-custom.cfg /content/drive/MyDrive/TheProject/yolov4-tiny/training/yolov4-tiny-custom_best.weights {'/content/drive/MyDrive/TheProject/test_mask.jpg'} -thresh 0.2

# Read the resulting image after detection
result_image = cv2.imread('predictions.jpg')
cv2.imwrite('/content/mask_tested.jpg', result_image)

 CUDA-version: 11080 (12000), cuDNN: 8.9.0, CUDNN_HALF=1, GPU count: 1  
 CUDNN_HALF=1 
 OpenCV version: 4.2.0
 0 : compute_capability = 750, cudnn_half = 1, GPU: Tesla T4 
net.optimized_memory = 0 
mini_batch = 1, batch = 1, time_steps = 1, train = 0 
   layer   filters  size/strd(dil)      input                output
   0 Create CUDA-stream - 0 
 Create cudnn-handle 0 
conv     32       3 x 3/ 2    416 x 416 x   3 ->  208 x 208 x  32 0.075 BF
   1 conv     64       3 x 3/ 2    208 x 208 x  32 ->  104 x 104 x  64 0.399 BF
   2 conv     64       3 x 3/ 1    104 x 104 x  64 ->  104 x 104 x  64 0.797 BF
   3 route  2 		                       1/2 ->  104 x 104 x  32 
   4 conv     32       3 x 3/ 1    104 x 104 x  32 ->  104 x 104 x  32 0.199 BF
   5 conv     32       3 x 3/ 1    104 x 104 x  32 ->  104 x 104 x  32 0.199 BF
   6 route  5 4 	                           ->  104 x 104 x  64 
   7 conv     64       1 x 1/ 1    104 x 104 x  64 ->  104 x 104 x  64 0.089 BF
   8 route  2 7 	     

True

In [None]:
def mask_detection(frame):
    # Convert the frame to the required format (e.g., save it as an image file)
    cv2.imwrite('frame.jpg', frame)

    # run your custom detector with this command (upload an image to your google drive to test, the thresh flag sets the minimum accuracy required for object detection)
    !./darknet detector test data/obj.data cfg/yolov4-tiny-custom.cfg /content/drive/MyDrive/TheProject/yolov4-tiny/training/yolov4-tiny-custom_best.weights {'frame.jpg'} -thresh 0.2 > /dev/null 2>&1

    # Read the resulting image after detection
    result_image = cv2.imread('predictions.jpg')

    # Return the processed frame
    return result_image,''

##Crash Fall

In [None]:
def fall_crash(img) :
  (H,W)=img.shape[:2]
  blob= cv2.dnn.blobFromImage(img,1/255.0,(416,416),crop=False,swapRB=False)
  net.setInput(blob)
  crash_detected = False
  fall_detected = False
  layers_output=net.forward(['yolo_82', 'yolo_94', 'yolo_106'])
  car_boxes=[]
  car_confidences=[]
  classIDs=[]
  for output in layers_output:
    for detection in output:
        scores=detection[5:]
        classID=np.argmax(scores)
        confidence=scores[classID]
        if classID==0 or classID==2:
          if (confidence >0.5):
            box=detection[:4] * np.array([W,H,W,H])
            bx,by,bw,bh=box.astype("int")
            x=int(bx-(bw/2))
            y=int(by-(bh/2))
            car_boxes.append([x,y,int(bw),int(bh)])
            car_confidences.append(float(confidence))
            classIDs.append(classID)
  car_idx=cv2.dnn.NMSBoxes(car_boxes,car_confidences,0.5,0.4)

  if len(car_idx) > 0:  						# At least 1 detection in the image and check detection presence in a frame
    centroid_dict = dict() 						# Function creates a dictionary and calls it centroid_dict
    objectId = 0								# We inialize a variable called ObjectId and set it to 0
    for i in car_idx.flatten():
        (x,y)=[car_boxes[i][0],car_boxes[i][1]]
        (w,h)=[car_boxes[i][2],car_boxes[i][3]]

        # fall
        if classIDs[i] == 0:
            if not(h-w > -100) :
                # cv2.rectangle (img,(x,y),(x+w ,y+h),(0,255,0),2)
            # else:
                fall_detected = True
                cv2.rectangle (img,(x,y),(x+w ,y+h),(0,0,255),2)
                cv2.putText(img,("fall detected"),(x,y-5),cv2.FONT_HERSHEY_SIMPLEX,1,(0,0,255),2)
        # car
        else:
            # Store the center points of the detections
            # Convert from center coordinates to rectangular coordinates, We use floats to ensure the precision of the BBox
            xmin = int(round(x))
            xmax = int(xmin+w)
            ymin = int(round(y))
            ymax = int(ymin+h)

            # Append center point of bbox for cars detected.
            centroid_dict[objectId] = (int(x), int(y), xmin, ymin, xmax, ymax) # Create dictionary of tuple with 'objectId' as the index center points and bbox
            objectId += 1 #Increment the index for each detection
#=================================================================#
# Purpose : Determine which car bbox are close to each other
#=================================================================
    vehicle_red_zone_list = [] # List containing which Object id is in under threshold distance condition.
    vehicle_red_line_list = []
    for (id1, p1), (id2, p2) in combinations(centroid_dict.items(), 2): # Get all the combinations of close detections, #List of multiple items - id1 1, points 2, 1,3
        #dx, dy = p1[0] - p2[0], p1[1] - p2[1]  	# Check the difference between centroid x: 0, y :1
        #distance = is_close(dx, dy) 			# Calculates the Euclidean distance

        #if distance < 50.0:						# Set our distance threshold - If they meet this condition then..

        if not ((p1[2]+30>=p2[4]) or (p1[4]<=p2[2]+30) or (p1[5]<=p2[3]+30) or (p1[3]+30>=p2[5])):
            if id1 not in vehicle_red_zone_list:
                vehicle_red_zone_list.append(id1)       #  Add Id to a list
                vehicle_red_line_list.append(p1[0:2])   #  Add points to the list
            if id2 not in vehicle_red_zone_list:
                vehicle_red_zone_list.append(id2)		# Same for the second id
                vehicle_red_line_list.append(p2[0:2])

    for idx1, box in centroid_dict.items():  # dict (1(key):red(value), 2 blue)  idx - key  box - value
        if idx1 in vehicle_red_zone_list:   # if id is in red zone list
            cv2.rectangle(img, (box[2], box[3]), (box[4], box[5]), (0, 0, 255), 2) # Create Red bounding boxes  #starting point, ending point size of 2
        else:
            cv2.rectangle(img, (box[2], box[3]), (box[4], box[5]), (0, 255, 0), 2) # Create Green bounding boxes
    #=================================================================#

    if len(vehicle_red_zone_list)!=0:
        text = "Crash Detected"
    else:
        text = "Crash Not Detected"

    location = (W-350,30)										# Set the location of the displayed text
    if len(vehicle_red_zone_list)!=0:
        cv2.putText(img, text, location, cv2.FONT_HERSHEY_SIMPLEX, 1, (0,0,255), 2, cv2.LINE_AA)  # Display Text
        crash_detected = True
    else:
        cv2.putText(img, text, location, cv2.FONT_HERSHEY_SIMPLEX, 1, (0,255,0), 2, cv2.LINE_AA)  # Display Text

  result = "false" if not(fall_detected or crash_detected) else "crash" if crash_detected else "fall"
  return img,result

In [None]:
def model_call(func,input_path,output_path,fall_image_folder,crash_folder,mask_model=False):
    i = 1
    j = 1

    global frame_count
    global frame_number

    video = cv2.VideoCapture(input_path)

    frame_number = 0
    frame_count = int(video.get(cv2.CAP_PROP_FRAME_COUNT))

    # Get video properties
    fps = video.get(cv2.CAP_PROP_FPS)
    frame_width = int(video.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(video.get(cv2.CAP_PROP_FRAME_HEIGHT))

    # Create VideoWriter object to save the processed frames as video
    output_video = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc('M', 'P', '4', 'V'), fps, (frame_width, frame_height)) #cv2.VideoWriter_fourcc(*'mp4v'), isColor=False

    # Iterate over the video frames
    while True:
        ret, frame = video.read()
        if not ret:
            break
        # Process the frame using the function
        frame,detected = func(frame)
        if detected=='fall' and frame_number%30 == 0:
            file_name = os.path.join(fall_image_folder, f"fall_{i}.jpg")
            cv2.imwrite(file_name, frame)
            i +=1
        elif detected == 'crash' and frame_number%3 == 0:
            file_name = os.path.join(crash_folder, f"crash_{j}.jpg")
            cv2.imwrite(file_name, frame)
            j +=1
        if mask_model:
            frame,_ = mask_detection(frame)

        output_video.write(frame)
        frame_number += 1

    # Release the video objects
    video.release()
    output_video.release()