In [1]:
import cv2
import numpy as np
from ultralytics import YOLO
import cvzone
import base64
import os
import time
import threading
import pygame  # For playing alert sounds
from openai import OpenAI

# Initialize OpenAI client for OpenRouter.ai
client = OpenAI(
    base_url="https://openrouter.ai/api/v1",
    api_key="",  # Replace with your OpenRouter API key
)

class PackageDetectionProcessor:
    def __init__(self, video_file, yolo_model_path="best.pt"):
        """Initialize package detection processor for conveyor belt monitoring."""
        try:
            self.yolo_model = YOLO(yolo_model_path)
            self.names = self.yolo_model.names
        except Exception as e:
            raise RuntimeError(f"Error loading YOLO model: {e}")

        self.cap = cv2.VideoCapture(video_file)
        if not self.cap.isOpened():
            raise FileNotFoundError("Error: Could not open video file.")

        self.processed_track_ids = set()
        self.current_date = time.strftime("%Y-%m-%d")
        self.output_filename = f"package_data_{self.current_date}.txt"
        self.cropped_images_folder = "cropped_packages"
        os.makedirs(self.cropped_images_folder, exist_ok=True)

        # Detection line parameters
        self.cx1 = 416  # X-position of the vertical line
        self.offset = 6  # Offset for detection tolerance

        # Sound alert setup
        pygame.mixer.init()
        self.alert_sound = "alert.mp3"  # Ensure this file exists in the same directory
        self.sound_playing = False  # Track sound state
        
        # Initialize report file
        if not os.path.exists(self.output_filename):
            with open(self.output_filename, "w", encoding="utf-8") as file:
                file.write("Timestamp | Track ID | Package Type | Front Open | Damage Condition\n")
                file.write("-" * 80 + "\n")

    def play_alert(self):
        """Play an alert sound if it's not already playing."""
        if not self.sound_playing:
            pygame.mixer.music.load(self.alert_sound)
            pygame.mixer.music.play(-1)  # Loop indefinitely
            self.sound_playing = True

    def stop_alert(self):
        """Stop the alert sound if playing."""
        if self.sound_playing:
            pygame.mixer.music.stop()
            self.sound_playing = False

    def analyze_image_with_openai(self, image_path):
        """Analyze the package image using OpenAI via OpenRouter.ai."""
        try:
            with open(image_path, "rb") as img_file:
                base64_image = base64.b64encode(img_file.read()).decode("utf-8")

            # Prepare the prompt for OpenAI
            prompt = """
            Analyze the given image of a package and extract the following details:
            
            - **Box Front Open (Yes/No)**
            - **Damage Condition (Yes/No)**
            
            Return results in table format only:
            | Package Type (box) | Box Front Flap Open (Yes/No) | Damage Condition (Yes/No) |
            |--------------------|----------------------------|--------------------------|
            """

            # Send the request to OpenAI via OpenRouter.ai
            completion = client.chat.completions.create(
                extra_headers={
                    "HTTP-Referer": "<YOUR_SITE_URL>",  # Optional. Replace with your site URL.
                    "X-Title": "<YOUR_SITE_NAME>",     # Optional. Replace with your site name.
                },
                extra_body={},
                model="google/gemini-2.0-flash-lite-preview-02-05:free",  # Replace with your preferred model
                messages=[
                    {
                        "role": "user",
                        "content": [
                            {"type": "text", "text": prompt},
                            {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{base64_image}"}}
                        ]
                    }
                ]
            )

            response_text = completion.choices[0].message.content.strip()
            return response_text
        except Exception as e:
            print(f"❌ Error invoking OpenAI model: {e}")
            return "Error processing image."

    def process_crop_image(self, image, track_id):
        """Save and analyze cropped package images."""
        timestamp = time.strftime("%Y-%m-%d_%H-%M-%S")
        image_filename = os.path.join(self.cropped_images_folder, f"package_{track_id}_{timestamp}.jpg")
        cv2.imwrite(image_filename, image)

        response_content = self.analyze_image_with_openai(image_filename)
        extracted_data = response_content.split("\n")[2:]

        alert_triggered = False  # Flag to track if an alert is needed
        
        if extracted_data:
            with open(self.output_filename, "a", encoding="utf-8") as file:
                for row in extracted_data:
                    if "--------------" in row or not row.strip():
                        continue
                    values = [col.strip() for col in row.split("|")[1:-1]]
                    if len(values) == 3:
                        package_type, box_open, damage_status = values
                        file.write(f"{timestamp} | Track ID: {track_id} | {package_type} | {box_open} | {damage_status}\n")
                        
                        if box_open.lower() == "yes":
                            alert_triggered = True  # Trigger alert

        if alert_triggered:
            self.play_alert()
        else:
            self.stop_alert()

    def crop_and_process(self, frame, box, track_id):
        """Crop and process detected packages."""
        if track_id in self.processed_track_ids:
            return  

        x1, y1, x2, y2 = map(int, box)
        cropped_image = frame[y1:y2, x1:x2]
        self.processed_track_ids.add(track_id)
        threading.Thread(target=self.process_crop_image, args=(cropped_image, track_id), daemon=True).start()

    def process_video_frame(self, frame):
        """Process each video frame to detect and analyze packages."""
        frame = cv2.resize(frame, (1020, 600))
        results = self.yolo_model.track(frame, persist=True)

        if results and results[0].boxes is not None:
            boxes = results[0].boxes.xyxy.int().cpu().tolist()
            track_ids = results[0].boxes.id.int().cpu().tolist() if results[0].boxes.id is not None else [-1] * len(boxes)

            for box, track_id in zip(boxes, track_ids):
                x1, y1, x2, y2 = map(int, box)
                cx = (x1 + x2) // 2 
                if self.cx1 - self.offset < cx < self.cx1 + self.offset:
                    cv2.rectangle(frame, (x1, y1), (x2, y2), (255, 0, 255), 2)
                    cvzone.putTextRect(frame, f"ID: {track_id}", (x2, y2), 1, 1)
                    self.crop_and_process(frame, box, track_id)
        return frame


    def start_processing(self):

        """Start video processing."""
        while self.cap.isOpened():
            ret, frame = self.cap.read()
            if not ret:
                self.stop_alert()
                break
            frame = self.process_video_frame(frame)
            cv2.line(frame, (416, 2), (416, 599), (0, 255, 0), 2)
            cv2.imshow("Package Detection", frame)
            if cv2.waitKey(1) & 0xFF == ord("q"):
                break
        self.cap.release()
        cv2.destroyAllWindows()

if __name__ == "__main__":
    video_file = "co.mp4"
    processor = PackageDetectionProcessor(video_file)
    processor.start_processing()

pygame 2.6.1 (SDL 2.28.4, Python 3.11.7)
Hello from the pygame community. https://www.pygame.org/contribute.html

0: 384x640 1 object, 90.7ms
Speed: 8.1ms preprocess, 90.7ms inference, 363.1ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 object, 33.4ms
Speed: 2.0ms preprocess, 33.4ms inference, 3.9ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 object, 24.0ms
Speed: 3.9ms preprocess, 24.0ms inference, 2.1ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 object, 25.6ms
Speed: 3.9ms preprocess, 25.6ms inference, 3.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 object, 24.0ms
Speed: 3.4ms preprocess, 24.0ms inference, 2.2ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 object, 50.3ms
Speed: 2.4ms preprocess, 50.3ms inference, 2.2ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 object, 30.8ms
Speed: 3.0ms preprocess, 30.8ms inference, 3.2ms postprocess per image at shape (1, 3, 384, 640)

0:

In [2]:
import cv2
import numpy as np
from ultralytics import YOLO
import cvzone
import base64
import os
import time
import threading
import pygame  # For playing alert sounds
from openai import OpenAI

# Initialize OpenAI client for OpenRouter.ai
client = OpenAI(
    base_url="https://openrouter.ai/api/v1",
    api_key="",  # Replace with your OpenRouter API key
)

class PackageDetectionProcessor:
    def __init__(self, video_file, yolo_model_path="best.pt", output_video_file="output_video.mp4"):
        """Initialize package detection processor for conveyor belt monitoring."""
        try:
            self.yolo_model = YOLO(yolo_model_path)
            self.names = self.yolo_model.names
        except Exception as e:
            raise RuntimeError(f"Error loading YOLO model: {e}")

        self.cap = cv2.VideoCapture(video_file)
        if not self.cap.isOpened():
            raise FileNotFoundError("Error: Could not open video file.")

        # Get video properties for VideoWriter
        self.frame_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        self.frame_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        self.fps = int(self.cap.get(cv2.CAP_PROP_FPS))

        # Initialize VideoWriter to save the output video
        self.output_video_file = output_video_file
        self.video_writer = cv2.VideoWriter(
            self.output_video_file,
            cv2.VideoWriter_fourcc(*'mp4v'),  # Codec for .mp4 files
            self.fps,
            (self.frame_width, self.frame_height)
        )

        self.processed_track_ids = set()
        self.current_date = time.strftime("%Y-%m-%d")
        self.output_filename = f"package_data_{self.current_date}.txt"
        self.cropped_images_folder = "cropped_packages"
        os.maked(self.cropped_images_folder, exist_ok=True)

        # Detection line parameters
        self.cx1 = 416  # X-position of the vertical line
        self.offset = 6  # Offset for detection tolerance

        # Sound alert setup
        pygame.mixer.init()
        self.alert_sound = "alert.mp3"  # Ensure this file exists in the same directory
        self.sound_playing = False  # Track sound state
        
        # Initialize report file
        if not os.path.exists(self.output_filename):
            with open(self.output_filename, "w", encoding="utf-8") as file:
                file.write("Timestamp | Track ID | Package Type | Front Open | Damage Condition\n")
                file.write("-" * 80 + "\n")

    def play_alert(self):
        """Play an alert sound if it's not already playing."""
        if not self.sound_playing:
            pygame.mixer.music.load(self.alert_sound)
            pygame.mixer.music.play(-1)  # Loop indefinitely
            self.sound_playing = True

    def stop_alert(self):
        """Stop the alert sound if playing."""
        if self.sound_playing:
            pygame.mixer.music.stop()
            self.sound_playing = False

    def analyze_image_with_openai(self, image_path):
        """Analyze the package image using OpenAI via OpenRouter.ai."""
        try:
            with open(image_path, "rb") as img_file:
                base64_image = base64.b64encode(img_file.read()).decode("utf-8")

            # Prepare the prompt for OpenAI
            prompt = """
            Analyze the given image of a package and extract the following details:
            
            - **Box Front Open (Yes/No)**
            - **Damage Condition (Yes/No)**
            
            Return results in table format only:
            | Package Type (box) | Box Front Flap Open (Yes/No) | Damage Condition (Yes/No) |
            |--------------------|----------------------------|--------------------------|
            """

            # Send the request to OpenAI via OpenRouter.ai
            completion = client.chat.completions.create(
                extra_headers={
                    "HTTP-Referer": "<YOUR_SITE_URL>",  # Optional. Replace with your site URL.
                    "X-Title": "<YOUR_SITE_NAME>",     # Optional. Replace with your site name.
                },
                extra_body={},
                model="google/gemini-2.0-pro-exp-02-05:free",  # Replace with your preferred model
                messages=[
                    {
                        "role": "user",
                        "content": [
                            {"type": "text", "text": prompt},
                            {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{base64_image}"}}
                        ]
                    }
                ]
            )

            response_text = completion.choices[0].message.content.strip()
            return response_text
        except Exception as e:
            print(f"❌ Error invoking OpenAI model: {e}")
            return "Error processing image."

    def process_crop_image(self, image, track_id):
        """Save and analyze cropped package images."""
        timestamp = time.strftime("%Y-%m-%d_%H-%M-%S")
        image_filename = os.path.join(self.cropped_images_folder, f"package_{track_id}_{timestamp}.jpg")
        cv2.imwrite(image_filename, image)

        response_content = self.analyze_image_with_openai(image_filename)
        extracted_data = response_content.split("\n")[2:]

        alert_triggered = False  # Flag to track if an alert is needed
        
        if extracted_data:
            with open(self.output_filename, "a", encoding="utf-8") as file:
                for row in extracted_data:
                    if "--------------" in row or not row.strip():
                        continue
                    values = [col.strip() for col in row.split("|")[1:-1]]
                    if len(values) == 3:
                        package_type, box_open, damage_status = values
                        file.write(f"{timestamp} | Track ID: {track_id} | {package_type} | {box_open} | {damage_status}\n")
                        
                        if box_open.lower() == "yes":
                            alert_triggered = True  # Trigger alert

        if alert_triggered:
            self.play_alert()
        else:
            self.stop_alert()

    def crop_and_process(self, frame, box, track_id):
        """Crop and process detected packages."""
        if track_id in self.processed_track_ids:
            return  

        x1, y1, x2, y2 = map(int, box)
        cropped_image = frame[y1:y2, x1:x2]
        self.processed_track_ids.add(track_id)
        threading.Thread(target=self.process_crop_image, args=(cropped_image, track_id), daemon=True).start()

    def process_video_frame(self, frame):
        """Process each video frame to detect and analyze packages."""
        frame = cv2.resize(frame, (1020, 600))
        results = self.yolo_model.track(frame, persist=True)

        if results and results[0].boxes is not None:
            boxes = results[0].boxes.xyxy.int().cpu().tolist()
            track_ids = results[0].boxes.id.int().cpu().tolist() if results[0].boxes.id is not None else [-1] * len(boxes)

            for box, track_id in zip(boxes, track_ids):
                x1, y1, x2, y2 = map(int, box)
                cx = (x1 + x2) // 2 
                if self.cx1 - self.offset < cx < self.cx1 + self.offset:
                    cv2.rectangle(frame, (x1, y1), (x2, y2), (255, 0, 255), 2)
                    cvzone.putTextRect(frame, f"ID: {track_id}", (x2, y2), 1, 1)
                    self.crop_and_process(frame, box, track_id)
        return frame

    @staticmethod
    def mouse_callback(event, x, y, flags, param):
        if event == cv2.EVENT_MOUSEMOVE:
            print(f"Mouse Position: ({x}, {y})")

    def start_processing(self):
        cv2.namedWindow("Package Detection")
        cv2.setMouseCallback("Package Detection", self.mouse_callback)

        """Start video processing."""
        while self.cap.isOpened():
            ret, frame = self.cap.read()
            if not ret:
                self.stop_alert()
                break

            # Process the frame
            processed_frame = self.process_video_frame(frame)

            # Write the processed frame to the output video
            self.video_writer.write(processed_frame)

            # Display the processed frame
            cv2.imshow("Package Detection", processed_frame)
            if cv2.waitKey(1) & 0xFF == ord("q"):
                break

        # Release resources
        self.cap.release()
        self.video_writer.release()
        cv2.destroyAllWindows()
        print(f"✅ Processing completed. Output video saved to {self.output_video_file}")

if __name__ == "__main__":
    video_file = "co.mp4"
    output_video_file = "output_video.mp4"  # Output video file name
    processor = PackageDetectionProcessor(video_file, output_video_file=output_video_file)
    processor.start_processing()

AttributeError: module 'os' has no attribute 'maked'