### Prediction Model

In [1]:
import cv2
import numpy as np
from tensorflow.keras.models import load_model

IMAGE_HEIGHT, IMAGE_WIDTH = 64, 64
SEQUENCE_LENGTH = 20
CLASSES_LIST = ["smoke", "shoot_gun", "run", "hit"]
convlstm_model_h5 = load_model("./convlstm_model.h5", compile=False)
convlstm_model_h5.compile(
    optimizer="rmsprop",
    loss=None,
    metrics=None,
    loss_weights=None,
    weighted_metrics=None,
    run_eagerly=None,
    steps_per_execution=None,
    jit_compile=None,
)


def predict_single_action(video_file_path, SEQUENCE_LENGTH):
    video_reader = cv2.VideoCapture(video_file_path)
    frames_list = []
    predicted_class_name = ""
    video_frames_count = int(video_reader.get(cv2.CAP_PROP_FRAME_COUNT))
    skip_frames_window = max(int(video_frames_count / SEQUENCE_LENGTH), 1)
    for frame_counter in range(SEQUENCE_LENGTH):
        video_reader.set(cv2.CAP_PROP_POS_FRAMES, frame_counter * skip_frames_window)
        success, frame = video_reader.read()
        if not success:
            break
        resized_frame = cv2.resize(frame, (IMAGE_HEIGHT, IMAGE_WIDTH))
        normalized_frame = resized_frame / 255
        frames_list.append(normalized_frame)
    predicted_labels_probabilities = convlstm_model_h5.predict(
        np.expand_dims(frames_list, axis=0)
    )[0]
    predicted_label = np.argmax(predicted_labels_probabilities)
    predicted_class_name = CLASSES_LIST[predicted_label]
    print(
        f"Action Predicted: {predicted_class_name}\nConfidence: {predicted_labels_probabilities[predicted_label]}"
    )
    video_reader.release()


### Run Engine

In [2]:
# Libraries

from ultralytics import YOLO
import cv2

import util
from sort.sort import *
from util import get_car, read_license_plate, write_csv

import os
import shutil
import threading


Using CPU. Note: This module is much faster with a GPU.


In [3]:
# Prediction handler

def prediction_engine():
    while True:
        dir = os.listdir("D:/Project_v0.0.0/live_footages")
        for i in dir:
            if i and i.startswith("f"):
                path = os.path.join(
                    "D:/Project_v0.0.0/live_footages", i
                )
                print(path)
                predict_single_action(path, 20)
                shutil.move(path, "D:/Project_v0.0.0/upload_engine")


In [4]:
def main():

    # for vechile tracking {
    results = {}
    
    mot_tracker = Sort()
    
    # load models
    coco_model = YOLO("yolov8n.pt")
    license_plate_detector = YOLO("license_plate_detector.pt")
    
    # load video
    cap = cv2.VideoCapture("C:/Users/toufiqhussain/Downloads/test_2.mp4")
    
    vehicles = [2, 3, 5, 7]
    # }

    # for action recognition {
    
    thread_1 = threading.Thread(target=prediction_engine)
    thread_1.start()

    if not cap.isOpened():
        print("Error: Could not open camera.")
        return

    frame_width = int(cap.get(3))
    frame_height = int(cap.get(4))
    fps = int(cap.get(5))

    fourcc = cv2.VideoWriter_fourcc(*"H264")  # Codec for MP4 format
    stream_1 = None
    stream_2 = None
    s1_fileName = None
    s2_fileName = None
    recording_timer = 0
    file_no = 0

    # }
    
    # read frames
    frame_nmr = -1
    ret = True
    while ret:
        frame_nmr += 1
        ret, frame = cap.read()
        if not ret:
            print("Error: Could not read frame.")
            if len(os.listdir("D:/Project_v0.0.0/live_footages"))>0:
                for i in os.listdir("D:/Project_v0.0.0/live_footages"):
                    os.rename(
                    f"D:/Project_v0.0.0/live_footages/{i}",
                    f"D:/Project_v0.0.0/live_footages/f_{i}",
                )
            thread_1.join()
            break
        if ret:
            if recording_timer == fps * 0:
                stream_1 = cv2.VideoWriter(
                    "D:/Project_v0.0.0/live_footages/output_{}_s1.mp4".format(
                        file_no
                    ),
                    fourcc,
                    fps,
                    (frame_width, frame_height),
                )

                s1_fileName = "output_{}_s1.mp4".format(file_no)
                file_no += 1
            if recording_timer == fps * 7:
                stream_2 = cv2.VideoWriter(
                    "D:/Project_v0.0.0/live_footages/output_{}_s2.mp4".format(
                        file_no
                    ),
                    fourcc,
                    fps,
                    (frame_width, frame_height),
                )
                s2_fileName = "output_{}_s2.mp4".format(file_no)
                file_no += 1

            if recording_timer >= fps * 0 and recording_timer <= fps * 10:
                stream_1.write(frame)
                if recording_timer == fps * 10:
                    stream_1.release()
                    os.rename(
                        f"D:/Project_v0.0.0/live_footages/{s1_fileName}",
                        f"D:/Project_v0.0.0/live_footages/f_{s1_fileName}",
                    )
            if recording_timer >= fps * 7 and recording_timer <= fps * 17:
                stream_2.write(frame)
                if recording_timer == fps * 17:
                    stream_2.release()
                    os.rename(
                        f"D:/Project_v0.0.0/live_footages/{s2_fileName}",
                        f"D:/Project_v0.0.0/live_footages/f_{s2_fileName}",
                    )

            if recording_timer == fps * 17:
                recording_timer = 0
            else:
                recording_timer += 1


            results[frame_nmr] = {}
            # detect vehicles
            detections = coco_model(frame)[0]
            detections_ = []
            for detection in detections.boxes.data.tolist():
                x1, y1, x2, y2, score, class_id = detection
                if int(class_id) in vehicles:
                    detections_.append([x1, y1, x2, y2, score])
    
            # track vehicles
            track_ids = mot_tracker.update(np.asarray(detections_))
    
            # detect license plates
            license_plates = license_plate_detector(frame)[0]
            for license_plate in license_plates.boxes.data.tolist():
                x1, y1, x2, y2, score, class_id = license_plate
    
                # assign license plate to car
                xcar1, ycar1, xcar2, ycar2, car_id = get_car(license_plate, track_ids)
    
                if car_id != -1:
                    # crop license plate
                    license_plate_crop = frame[int(y1) : int(y2), int(x1) : int(x2), :]
    
                    # process license plate
                    license_plate_crop_gray = cv2.cvtColor(
                        license_plate_crop, cv2.COLOR_BGR2GRAY
                    )
                    _, license_plate_crop_thresh = cv2.threshold(
                        license_plate_crop_gray, 64, 255, cv2.THRESH_BINARY_INV
                    )

                    # read license plate number
                    license_plate_text, license_plate_text_score = read_license_plate(
                        license_plate_crop_thresh
                    )
    
                    if license_plate_text is not None:
                        results[frame_nmr][car_id] = {
                            "car": {"bbox": [xcar1, ycar1, xcar2, ycar2]},
                            "license_plate": {
                                "bbox": [x1, y1, x2, y2],
                                "text": license_plate_text,
                                "bbox_score": score,
                                "text_score": license_plate_text_score,
                            },
                        }
            cv2.imshow("Recording", frame)

            if cv2.waitKey(1) & 0xFF == ord("q"):
                break
    if stream_1 is not None:
        stream_1.release()
    if stream_2 is not None:
        stream_2.release()
    
    cap.release()
    cv2.destroyAllWindows()
    thread_1.join()
    # write results
    write_csv(results, "test.csv")



if __name__ == "__main__":
    main()
    


0: 384x640 21 cars, 1 bus, 3 trucks, 100.3ms
Speed: 5.0ms preprocess, 100.3ms inference, 8.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 2 license_plates, 49.1ms
Speed: 2.0ms preprocess, 49.1ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 23 cars, 1 bus, 2 trucks, 52.1ms
Speed: 2.0ms preprocess, 52.1ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 2 license_plates, 46.1ms
Speed: 2.0ms preprocess, 46.1ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 22 cars, 1 bus, 2 trucks, 52.1ms
Speed: 2.0ms preprocess, 52.1ms inference, 2.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 2 license_plates, 47.1ms
Speed: 1.0ms preprocess, 47.1ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 22 cars, 1 bus, 1 truck, 51.1ms
Speed: 1.0ms preprocess, 51.1ms inference, 3.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 2 license_plates,

Error: Could not read frame.
D:/Project_v0.0.0/live_footages\f_output_0_s1.mp4


Exception in thread Thread-5 (prediction_engine):
Traceback (most recent call last):
  File "c:\Users\toufiqhussain\AppData\Local\Programs\Python\Python311\Lib\threading.py", line 1038, in _bootstrap_inner
    self.run()
  File "c:\Users\toufiqhussain\AppData\Local\Programs\Python\Python311\Lib\threading.py", line 975, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Users\toufiqhussain\AppData\Local\Temp\ipykernel_17508\1665289333.py", line 12, in prediction_engine
  File "C:\Users\toufiqhussain\AppData\Local\Temp\ipykernel_17508\1681270134.py", line 35, in predict_single_action
  File "c:\Users\toufiqhussain\AppData\Local\Programs\Python\Python311\Lib\site-packages\keras\src\utils\traceback_utils.py", line 70, in error_handler
    raise e.with_traceback(filtered_tb) from None
  File "C:\Users\TOUFIQ~1\AppData\Local\Temp\__autograph_generated_filemjopmzv5.py", line 15, in tf__predict_function
    retval_ = ag__.converted_call(ag__.ld(step_function), (ag__.ld(self), ag__