In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [1]:
# Install the required libraries.
!pip install tensorflow
!pip install opencv-contrib-python
!pip install moviepy
!pip install ultralytics



In [2]:
# Import the required libraries.
import cv2
import math
import numpy as np
import tensorflow as tf
from collections import deque
from moviepy.editor import *
import pandas as pd
from ultralytics import YOLO

In [3]:
# Specify the height and width to which each video frame will be resized in our dataset.
IMAGE_HEIGHT , IMAGE_WIDTH = 64, 64

# Specify the number of frames of a video that will be fed to the model as one sequence.
SEQUENCE_LENGTH = 20

# Specify the list containing the names of the classes used for training. Feel free to choose any set of classes.

# this is one of those 2 model which performed very well
# LRCN_model_TPU_2.17.0_Date_Time_2024_08_06__04_39_16___Loss_0.19148118793964386___Accuracy_0.93388432264328.h5
# CLASSES_LIST = ['TaiChi','JugglingBalls','Basketball','PommelHorse']

# CLASSES_LIST = ['walking', 'handwaving', 'running', 'jogging', 'boxing']


# CLASSES_LIST = ['Running', 'TaiChi', 'Punch', 'Walking', 'Holding Gun']
# CLASSES_LIST = ['WalkingWithDog', 'TaiChi', 'Punch', 'Basketball', 'Holding Gun']
CLASSES_LIST = ['TaiChi', 'Punch', 'Basketball', 'Holding Gun']

In [4]:
# Load the Model.
LRCN_model = tf.keras.models.load_model('/content/drive/MyDrive/Action Detection/Model/LRCN_TPU_2.15.0_sqLen_20_widthHeight_64_Loss_0.11_Acc_0.97_T_P_B_HG.h5')

In [5]:
def dis(pt1,pt2):
    cx1 = int((pt1[0]+pt1[2])/2)
    cy1 = int((pt1[1]+pt1[3])/2)
    cx2 = int((pt2[0]+pt2[2])/2)
    cy2 = int((pt2[1]+pt2[3])/2)
    distance = math.hypot(cx2-cx1,cy2-cy1)
    return distance

def getarea(x1,y1,x2,y2):
    w = x2-x1
    h = y2-y1
    area = w*h
    return area

val = 80
def biggerFrame(pt,w,h):
    x1,y1,x2,y2 = pt[0],pt[1],pt[2],pt[3]
    x1 -= val if x1-val >= 0 else 0
    y1 -= val if y1-val >= 0 else 0
    x2 += val if x2+val <= w else w
    y2 += val if y2+val <= h else h
    return (x1,y1,x2,y2)

In [6]:
model=YOLO('yolov8s.pt')
my_file = open("/content/drive/MyDrive/Action Detection/coco.txt", "r")
data = my_file.read()
object_list = data.split("\n")

Downloading https://github.com/ultralytics/assets/releases/download/v8.2.0/yolov8s.pt to 'yolov8s.pt'...


100%|██████████| 21.5M/21.5M [00:00<00:00, 152MB/s] 


In [7]:
from google.colab.patches import cv2_imshow

In [8]:
def predict_on_video(video_file_path, output_file_path, SEQUENCE_LENGTH):

    # SEQUENCE_LENGTH:  The fixed number of frames of a video that can be passed to the model as one sequence.

    prev_frames = []
    track_id = 0
    frame_list = {}
    tracking_object = {}
    predicted_class_name = {}
    count = 0
    # Initialize the VideoCapture object to read from the video file.
    video_reader = cv2.VideoCapture(video_file_path)

    # Get the width and height of the video.
    original_video_width = int(video_reader.get(cv2.CAP_PROP_FRAME_WIDTH))
    original_video_height = int(video_reader.get(cv2.CAP_PROP_FRAME_HEIGHT))

    # Initialize the VideoWriter Object to store the output video in the disk.
    video_writer = cv2.VideoWriter(output_file_path, cv2.VideoWriter_fourcc('M', 'P', '4', 'V'),
                                   video_reader.get(cv2.CAP_PROP_FPS), (original_video_width, original_video_height))

    # Declare a queue to store video frames.
    i=0
    prev_frames_id = []
    while True:
        res, frame = video_reader.read()
        if not res:
            break
        # Detect objects on frame
        # if i == 3:
        #     i = 0
        if i % 3 == 0:
            count += 1
            results=model.predict(frame)
            a=results[0].boxes.data
            px=pd.DataFrame(a).astype("float")
            cur_frames = []
            sq_len= SEQUENCE_LENGTH
            for ind,box in px.iterrows():
                (x1, y1, x2, y2, d) = (int(box[0]),int(box[1]),int(box[2]),int(box[3]),int(box[5]))
                c = object_list[d]
                if 'person' in c and getarea(x1,y1,x2,y2) >= 30000:
                    cur_frames.append((x1,y1,x2,y2))

            curr_frames_id = []
            if count <= 2:
                for pt1 in cur_frames:
                    for pt2 in prev_frames:
                        distance = dis(pt1,pt2)
                        if distance < 50:
                            tracking_object[track_id] = pt1
                            curr_frames_id.append(track_id)
                            # for storing the cropped frames in deque
                            frame_list[track_id] = (deque(maxlen = 20))

                            x1,y1,x2,y2 = biggerFrame(pt1,original_video_width,original_video_height)
                            newFrame = frame[y1:y2,x1:x2]
                            # cv2_imshow(newFrame)

                            resized_frame = cv2.resize(newFrame, (IMAGE_HEIGHT, IMAGE_WIDTH))
                            normalized_frame = resized_frame / 255
                            frame_list[track_id].append(normalized_frame)
                            track_id += 1
                            break
                prev_frames = cur_frames.copy()
            else:
                tracking_object_copy = tracking_object.copy()
                curr_frames_copy = cur_frames.copy()
                for id, pt2 in tracking_object_copy.items():
                    id_exists = False
                    for pt in curr_frames_copy:
                        distance = dis(pt,pt2)
                        if distance < 50:
                            tracking_object[id] = pt
                            curr_frames_id.append(id)

                            # ids already exists

                            x1,y1,x2,y2 = biggerFrame(pt,original_video_width,original_video_height)
                            newFrame = frame[y1:y2,x1:x2]

                            resized_frame = cv2.resize(newFrame, (IMAGE_HEIGHT, IMAGE_WIDTH))
                            normalized_frame = resized_frame / 255
                            frame_list[id].append(normalized_frame)

                            id_exists = True
                            if pt in cur_frames:
                                cur_frames.remove(pt)
                            break
                    if not id_exists:
                        tracking_object.pop(id)
                        frame_list.pop(id)

                # add new ids
                for pt in cur_frames:
                    tracking_object[track_id] = pt
                    curr_frames_id.append(track_id)
                    # adding new frames to deque list
                    frame_list[track_id] = (deque(maxlen = 20))

                    x1,y1,x2,y2 = biggerFrame(pt,original_video_width,original_video_height)
                    newFrame = frame[y1:y2,x1:x2]

                    resized_frame = cv2.resize(newFrame, (IMAGE_HEIGHT, IMAGE_WIDTH))
                    normalized_frame = resized_frame / 255
                    frame_list[track_id].append(normalized_frame)
                    track_id += 1

            for id in curr_frames_id:
                frame_queue = frame_list[id]
                if len(frame_queue) == SEQUENCE_LENGTH:
                    predicted_labels_probabilities = LRCN_model.predict(np.expand_dims(frame_queue, axis = 0))[0]
                    predicted_label = np.argmax(predicted_labels_probabilities)
                    print(predicted_labels_probabilities)
                    if predicted_labels_probabilities[predicted_label] > 0.8:
                        predicted_class_name[id] = CLASSES_LIST[predicted_label]
                        print(predicted_class_name[id])
                        frame_list[id] = (deque(maxlen = SEQUENCE_LENGTH))
            prev_frames_id = curr_frames_id
        else :
            curr_frames_id = prev_frames_id


        for id in curr_frames_id:
            font = cv2.FONT_HERSHEY_DUPLEX
            name = ''
            if predicted_class_name.get(id) is not None:
                name = predicted_class_name[id]
            x1,y1,x2,y2 = tracking_object[id]
            cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
            cv2.putText(frame,name , (x1 + 6, y1 - 6), font, 1.0, (255, 0, 0), 1)
        i += 1
        video_writer.write(frame)
    # Release the VideoCapture and VideoWriter objects.
    video_reader.release()
    video_writer.release()

In [11]:
input_video_file_path = '/content/drive/MyDrive/Action Detection/Input/a.mp4'
output_video_file_path = '/content/drive/MyDrive/Action Detection/Output/a.mp4'

# Perform Action Recognition on the Test Video.
predict_on_video(input_video_file_path, output_video_file_path, SEQUENCE_LENGTH)

# Display the output video.
VideoFileClip(output_video_file_path, audio=False, target_resolution=(300,None)).ipython_display()


0: 640x384 5 persons, 1 chair, 90.6ms
Speed: 2.3ms preprocess, 90.6ms inference, 1.1ms postprocess per image at shape (1, 3, 640, 384)

0: 640x384 5 persons, 1 chair, 70.5ms
Speed: 2.0ms preprocess, 70.5ms inference, 0.8ms postprocess per image at shape (1, 3, 640, 384)

0: 640x384 5 persons, 1 chair, 73.1ms
Speed: 2.0ms preprocess, 73.1ms inference, 1.0ms postprocess per image at shape (1, 3, 640, 384)

0: 640x384 5 persons, 1 chair, 72.5ms
Speed: 2.0ms preprocess, 72.5ms inference, 0.9ms postprocess per image at shape (1, 3, 640, 384)

0: 640x384 5 persons, 1 chair, 74.1ms
Speed: 2.0ms preprocess, 74.1ms inference, 0.9ms postprocess per image at shape (1, 3, 640, 384)

0: 640x384 5 persons, 1 chair, 73.0ms
Speed: 2.2ms preprocess, 73.0ms inference, 0.8ms postprocess per image at shape (1, 3, 640, 384)

0: 640x384 5 persons, 1 chair, 75.1ms
Speed: 2.0ms preprocess, 75.1ms inference, 0.9ms postprocess per image at shape (1, 3, 640, 384)

0: 640x384 5 persons, 1 chair, 73.7ms
Speed: 1.

                                                               

Moviepy - Done !
Moviepy - video ready __temp__.mp4


