In [1]:
import numpy as np
import os
import cv2
import keras
import math
import tensorflow as tf
from keras.layers import Conv2D, MaxPooling2D, Dense, Flatten, Dropout, LSTM
from keras.optimizers import Adam
from mtcnn.mtcnn import MTCNN

class CustomLSTM(LSTM):
    def __init__(self, *args, **kwargs):
        kwargs.pop('time_major', None)
        super().__init__(*args, **kwargs)

class ShopliftingPrediction:
    def __init__(self, model_path, frame_width, frame_height, sequence_length):
        self.frame_width = frame_width
        self.frame_height = frame_height
        self.sequence_length = sequence_length
        self.model_path = model_path
        self.message = ""

    def load_model(self):
        self.model = tf.keras.models.load_model(
            self.model_path,
            custom_objects={
                'Conv2D': Conv2D,
                'MaxPooling2D': MaxPooling2D,
                'Dense': Dense,
                'Flatten': Flatten,
                'Dropout': Dropout,
                'LSTM': CustomLSTM
            }
        )
        self.model.compile(optimizer=Adam(learning_rate=0.001), loss='categorical_crossentropy', metrics=['accuracy'])

    def generate_message_content(self, probability, label):
        if label == 0:
            if probability <= 75:
                self.message = "There is little chance of theft"
            elif probability <= 85:
                self.message = "High probability of theft"
            else:
                self.message = "Very high probability of theft"
        elif label == 1:
            if probability <= 75:
                self.message = "The movement is confusing, watch"
            elif probability <= 85:
                self.message = "I think it's normal, but it's better to watch"
            else:
                self.message = "Movement is normal"

    def Pre_Process_Video(self, current_frame, previous_frame):
        diff = cv2.absdiff(current_frame, previous_frame)
        diff = cv2.GaussianBlur(diff, (3, 3), 0)
        resized_frame = cv2.resize(diff, (self.frame_height, self.frame_width))
        gray_frame = cv2.cvtColor(resized_frame, cv2.COLOR_BGR2GRAY)
        normalized_frame = gray_frame / 255.0
        return normalized_frame

    def Read_Video(self, filePath):
        self.video_reader = cv2.VideoCapture(filePath)
        self.original_video_width = int(self.video_reader.get(cv2.CAP_PROP_FRAME_WIDTH))
        self.original_video_height = int(self.video_reader.get(cv2.CAP_PROP_FRAME_HEIGHT))
        self.fps = self.video_reader.get(cv2.CAP_PROP_FPS)

    def Single_Frame_Predict(self, frames_queue):
        probabilities = self.model.predict(np.expand_dims(frames_queue, axis=0))[0]
        predicted_label = np.argmax(probabilities)
        probability = math.floor(max(probabilities[0], probabilities[1]) * 100)
        return [probability, predicted_label]

    def save_short_video(self, video_file_path, start_frame, end_frame, output_folder):
        cap = cv2.VideoCapture(video_file_path)
        if not cap.isOpened():
            print("Error: Cannot open video file")
            return
        os.makedirs(output_folder, exist_ok=True)
        fps = cap.get(cv2.CAP_PROP_FPS)
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        out_path = os.path.join(output_folder, f"theft_{start_frame}_{end_frame}.mp4")
        out = cv2.VideoWriter(out_path, fourcc, fps, (int(cap.get(3)), int(cap.get(4))))
        current_frame = 0
        while cap.isOpened():
            ret, frame = cap.read()
            if not ret or current_frame > end_frame:
                break
            if start_frame <= current_frame <= end_frame:
                out.write(frame)
            current_frame += 1
        cap.release()
        out.release()

    def detect_face(self, image_path, output_folder):
        detector = MTCNN()
        image = cv2.imread(image_path)
        faces = detector.detect_faces(image)
        valid_faces = 0

        for face in faces:
            x, y, w, h = face['box']
            face_image = image[y:y+h, x:x+w]

            # Check aspect ratio for filtering valid faces
            face_aspect_ratio = float(w) / h
            if 0.75 < face_aspect_ratio < 1.33:
                unique_filename = f"face_{os.path.splitext(os.path.basename(image_path))[0]}_{valid_faces}.jpg"
                face_image_path = os.path.join(output_folder, unique_filename)
                cv2.imwrite(face_image_path, face_image)
                valid_faces += 1

    def Predict_Video(self, video_file_path, output_file_path):
        global message
        message = 'I will start analysis video now'
        self.Read_Video(video_file_path)
        video_writer = cv2.VideoWriter(output_file_path, cv2.VideoWriter_fourcc('M', 'P', '4', 'V'),
                                       self.fps, (self.original_video_width, self.original_video_height))
        success, frame = self.video_reader.read()
        previous = frame.copy()
        frames_queue = []
        start_frame, end_frame = None, None
        while self.video_reader.isOpened():
            ok, frame = self.video_reader.read()
            if not ok:
                break
            normalized_frame = self.Pre_Process_Video(frame, previous)
            previous = frame.copy()
            frames_queue.append(normalized_frame)
            if len(frames_queue) == self.sequence_length:
                [probability, predicted_label] = self.Single_Frame_Predict(frames_queue)
                self.generate_message_content(probability, predicted_label)
                message = "{}:{}%".format(self.message, probability)
                if probability > 85 and predicted_label == 0:
                    theft_image_path = os.path.join("run-images", f"theft_{self.video_reader.get(cv2.CAP_PROP_POS_FRAMES)}.jpg")
                    cv2.imwrite(theft_image_path, frame)
                    start_frame = int(self.video_reader.get(cv2.CAP_PROP_POS_FRAMES)) - self.sequence_length
                    end_frame = int(self.video_reader.get(cv2.CAP_PROP_POS_FRAMES))
                    self.save_short_video(video_file_path, start_frame, end_frame, "short-video")
                    self.detect_face(theft_image_path, "face-images")
                frames_queue = []
                print(message)
            cv2.rectangle(frame, (0, 0), (640, 40), (255, 255, 255), -1)
            cv2.putText(frame, message, (1, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1, cv2.LINE_AA)
            video_writer.write(frame)
        self.video_reader.release()
        video_writer.release()

# استفاده از کلاس
m = ShopliftingPrediction(model_path="lrcn_160S_90_90Q.h5",
                          frame_width=90, frame_height=90, sequence_length=160)
m.load_model()

# نام فایل‌های ویدیویی ورودی و خروجی
input_video_file_path = "video.mp4"
output_video_file_path = 'output.mp4'
m.Predict_Video(input_video_file_path, output_video_file_path)


  super().__init__(**kwargs)


[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m15s[0m 15s/step
Very high probability of theft:99%
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 83ms/step
Very high probability of theft:99%
