In [None]:
import pygame
import time

def play_sound(file_path, duration):
    pygame.init()
    pygame.mixer.init()

    try:
        pygame.mixer.music.load(file_path)
        pygame.mixer.music.play()
        time.sleep(duration)
    finally:
        pygame.mixer.quit()
        pygame.quit()
sound_file_path = 'emergency_sound.mp3'
duration = 5  # in seconds

### Prediction Model

In [1]:
import cv2
import numpy as np
from tensorflow.keras.models import load_model

IMAGE_HEIGHT, IMAGE_WIDTH = 64, 64
SEQUENCE_LENGTH = 20
# CLASSES_LIST = ["Shooting", "Fighting", "Explosion", "Accident"]
CLASSES_LIST = ["Robbery", "Fighting","Shooting"]
convlstm_model_h5 = load_model("./convlstm_model.h5", compile=True)
convlstm_model_h5.compile(
    optimizer="rmsprop",
    loss=None,
    metrics=None,
    loss_weights=None,
    weighted_metrics=None,
    run_eagerly=None,
    steps_per_execution=None,
    jit_compile=None,
)
# 6630497459
def predict_single_action(video_file_path, SEQUENCE_LENGTH):
    video_file_path_full = f"D:/ProjectX/{video_file_path}.mp4"
    video_reader = cv2.VideoCapture(video_file_path_full)
    frames_list = []
    predicted_class_name = ""
    video_frames_count = int(video_reader.get(cv2.CAP_PROP_FRAME_COUNT))
    skip_frames_window = max(int(video_frames_count / SEQUENCE_LENGTH), 1)
    for frame_counter in range(SEQUENCE_LENGTH):
        video_reader.set(cv2.CAP_PROP_POS_FRAMES, frame_counter * skip_frames_window)
        success, frame = video_reader.read()
        if not success:
            break
        resized_frame = cv2.resize(frame, (IMAGE_HEIGHT, IMAGE_WIDTH))
        normalized_frame = resized_frame / 255
        frames_list.append(normalized_frame)
    print("-------------")
    predicted_labels_probabilities = convlstm_model_h5.predict(
        np.expand_dims(frames_list, axis=0)
    )[0]
    predicted_label = np.argmax(predicted_labels_probabilities)
    a = lambda results : results
    # int(a(video_file_path))-1
    predicted_class_name = CLASSES_LIST[predicted_label]
    print(
        f"Action Predicted: {predicted_class_name}\nConfidence: {predicted_labels_probabilities[predicted_label]}"
    )
    video_reader.release()
    return f"['{predicted_class_name}','{predicted_labels_probabilities[predicted_label]}']"






In [None]:
# predict_single_action("2",20)

### CONTROLLING ENGINE 

In [4]:
import cv2
import os
import csv
import shutil
import threading
from datetime import datetime
import time
import uuid
import ast
# from face.get_face_data import fetch_face_data
# from number_plate.main import fetch_vehicle_number_plate

today_date = datetime.today().date()
formatted_date = today_date.strftime("%d-%m-%y")


def fetch_face_data(path):
    return "['21763781.jpg', '1362647.jpg' ,'76721532.jpg']"


def fetch_vehicle_number_plate(path):
    return "['JH20324', 'JH3247', 'TH2374', 'WB2324']"


CAM_ID = "526376"



def prediction_engine():
    while True:
        dir = os.listdir("D:/ProjectX/live_footages")
        for i in dir:
            if i and i.startswith("f"):
                unique_id = str(uuid.uuid4().hex)[:8]
                current_timestamp = int(time.time())
                path = os.path.join("D:/ProjectX/live_footages/", i)
                avg_action = predict_single_action(path, 20)
                facial_data = fetch_face_data(path)
                vehicle_data = fetch_vehicle_number_plate(path)
                v = ""
                if not os.path.exists(
                    f"D:/ProjectX/data_storage/{formatted_date}"
                ) and not os.path.isdir(f"D:/ProjectX/data_storage{formatted_date}"):
                    os.makedirs(f"D:/ProjectX/data_storage/{formatted_date}")

                if os.path.exists(
                    f"D:/ProjectX/data_storage/{formatted_date}/data.csv"
                ) and os.path.isfile(
                    f"D:/ProjectX/data_storage/{formatted_date}/data.csv"
                ):
                    with open(
                        f"D:/ProjectX/data_storage/{formatted_date}/data.csv",
                        mode="a",
                        newline="",
                    ) as file:
                        data = [
                            [
                                f"D:/ProjectX/data_storage/{formatted_date}/{CAM_ID}_{current_timestamp}_{unique_id}.mp4",
                                avg_action,
                                facial_data,
                                vehicle_data,
                            ],
                        ]
                        writer = csv.writer(file)
                        writer.writerows(data)
                else:
                    with open(
                        f"D:/ProjectX/data_storage/{formatted_date}/data.csv",
                        mode="w",
                        newline="",
                    ) as file:
                        data = [
                            [
                                "footage_location",
                                "action",
                                "facial_data_files",
                                "vehicle_tracked",
                            ],
                            [
                                f"D:/ProjectX/data_storage/{formatted_date}/{CAM_ID}_{current_timestamp}_{unique_id}.mp4",
                                avg_action,
                                facial_data,
                                vehicle_data,
                            ],
                        ]
                        writer = csv.writer(file)
                        writer.writerows(data)
                print((ast.literal_eval(avg_action))[0])
                if (ast.literal_eval(avg_action))[0] == "Shooting" or (ast.literal_eval(avg_action))[0] == "Fighting":
                    with open(
                        f"D:/ProjectX/interogation/interrogation_data.csv",
                        mode="r+",
                        newline="",
                    ) as i_file:
                        # Use the csv.reader to read the CSV file
                        csv_reader = csv.reader(i_file)
                        is_empty = True
                        # Iterate through the rows and update the is_empty variable
                        for row in csv_reader:
                            is_empty = False
                            break
                        if is_empty:
                            print("Crime Detected")
                            play_sound('emergency_sound.mp3',5)
                            data = [
                                [
                                    "footage_location",
                                    "searching_time",
                                    "action_confidence",
                                    "facial_data_files",
                                    "vehicle_tracked",
                                ],
                                [
                                    f"{formatted_date}/{CAM_ID}_{current_timestamp}_{unique_id}.mp4",
                                    "[]",
                                    avg_action,
                                    facial_data,
                                    ["JH10N9350"],
                                ],
                            ]
                            i_csv_writer = csv.writer(i_file)
                            i_csv_writer.writerows(data)
                os.rename(
                    path,
                    f"D:/ProjectX/live_footages/{CAM_ID}_{current_timestamp}_{unique_id}.mp4",
                )
                shutil.move(
                    f"D:/ProjectX/live_footages/{CAM_ID}_{current_timestamp}_{unique_id}.mp4",
                    f"D:/ProjectX/data_storage/{formatted_date}",
                )


def main():
    thread_1 = threading.Thread(target=prediction_engine)

    thread_1.start()

    cap = cv2.VideoCapture("gun_shooting.mp4")

    if not cap.isOpened():
        print("Error: Could not open camera.")
        return

    frame_width = int(cap.get(3))

    frame_height = int(cap.get(4))

    fps = int(cap.get(5))

    fourcc = cv2.VideoWriter_fourcc(*"H264")  # Codec for MP4 format

    stream_1 = None

    stream_2 = None

    s1_fileName = None

    s2_fileName = None

    recording_timer = 0

    file_no = 0

    while True:
        ret, frame = cap.read()

        if not ret:
            print("Error: Could not read frame.")
            if stream_1 is not None:
                stream_1.release()

            if stream_2 is not None:
                stream_2.release()

            if len(os.listdir("D:/ProjectX/live_footages")) > 0:
                for i in os.listdir("D:/ProjectX/live_footages"):
                    if not i.startswith("f"):
                        os.rename(
                            f"D:/ProjectX/live_footages/{i}",
                            f"D:/ProjectX/live_footages/f_{i}",
                        )
            thread_1.join()
            if len(os.listdir("D:/ProjectX/live_footages")) == 0:
                break
            else:
                continue

        if recording_timer == fps * 0:
            stream_1 = cv2.VideoWriter(
                "D:/ProjectX/live_footages/output_{}_s1.mp4".format(file_no),
                fourcc,
                fps,
                (frame_width, frame_height),
            )

            s1_fileName = "output_{}_s1.mp4".format(file_no)

            file_no += 1

        if recording_timer == fps * 7:
            stream_2 = cv2.VideoWriter(
                "D:/ProjectX/live_footages/output_{}_s2.mp4".format(file_no),
                fourcc,
                fps,
                (frame_width, frame_height),
            )

            s2_fileName = "output_{}_s2.mp4".format(file_no)

            file_no += 1

        if recording_timer >= fps * 0 and recording_timer <= fps * 10:
            stream_1.write(frame)

            if recording_timer == fps * 10:
                stream_1.release()
                os.rename(
                    f"D:/ProjectX/live_footages/{s1_fileName}",
                    f"D:/ProjectX/live_footages/f_{s1_fileName}",
                )

        if recording_timer >= fps * 7 and recording_timer <= fps * 17:
            stream_2.write(frame)

            if recording_timer == fps * 17:
                stream_2.release()
                os.rename(
                    f"D:/ProjectX/live_footages/{s2_fileName}",
                    f"D:/ProjectX/live_footages/f_{s2_fileName}",
                )

        if recording_timer == fps * 17:
            recording_timer = 0
        else:
            recording_timer += 1

        cv2.imshow("Recording", frame)

        if cv2.waitKey(1) & 0xFF == ord("q"):
            break

    if stream_1 is not None:
        stream_1.release()

    if stream_2 is not None:
        stream_2.release()

    cap.release()

    cv2.destroyAllWindows()

    thread_1.join()


if __name__ == "__main__":
    main()

Error: Could not read frame.
-------------
Action Predicted: Shooting
Confidence: 0.9909492135047913
Shooting
Crime Detected
-------------
Action Predicted: Shooting
Confidence: 0.9698354005813599
Shooting
