In [2]:
import cv2
import torch
from ultralytics import YOLO
import time
import math

# Check if CUDA (GPU) is available
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f"Using device: {device}")

# Load YOLOv8 model (nano version for faster processing)
model = YOLO('yolov8n.pt').to(device)

# Initialize global variables
tracked_baggage = {}
next_baggage_id = 0
unattended_threshold = 20  # Seconds before showing "unattended" when left
stationary_threshold = 25  # Seconds to mark as "unattended" if stationary
person_class_id = 0  # Class ID for person
baggage_classes = [24, 26, 28]  # Class IDs for bags/suitcases
distance_threshold = 50  # Pixel distance threshold to match objects between frames

# Function to calculate Euclidean distance between two bounding boxes
def calculate_distance(box1, box2):
    x1, y1, x2, y2 = box1
    x1_, y1_, x2_, y2_ = box2
    center1 = ((x1 + x2) / 2, (y1 + y2) / 2)
    center2 = ((x1_ + x2_) / 2, (y1_ + y2_) / 2)
    distance = math.sqrt((center1[0] - center2[0]) ** 2 + (center1[1] - center2[1]) ** 2)
    return distance

# Function to detect baggage and persons
def detect_objects(frame):
    results = model(frame)  # Run YOLOv8 inference on the frame
    detections = results[0].boxes.data  # Extract detection data

    persons = []
    baggage = []

    for detection in detections:
        x1, y1, x2, y2, conf, class_id = detection

        if int(class_id) == person_class_id:
            persons.append((int(x1), int(y1), int(x2), int(y2)))

        if int(class_id) in baggage_classes:
            baggage.append((int(x1), int(y1), int(x2), int(y2)))

    return persons, baggage

# Function to assign a new baggage ID or match it with an existing one
def assign_baggage_id(bbox, frame):
    global next_baggage_id

    # Check if a close enough baggage already exists
    for baggage_id, (tracker, last_seen, last_bbox, stationary_since) in tracked_baggage.items():
        distance = calculate_distance(bbox, last_bbox)
        if distance < distance_threshold:
            tracked_baggage[baggage_id] = (tracker, time.time(), bbox, stationary_since)
            return baggage_id, tracker

    # If no match found, assign a new ID and tracker
    tracker = cv2.TrackerCSRT_create()
    tracker.init(frame, tuple(bbox))
    baggage_id = f'bag'
    next_baggage_id += 1
    tracked_baggage[baggage_id] = (tracker, time.time(), bbox, time.time())  # Set the initial stationary time
    return baggage_id, tracker

# Function to track baggage
def update_baggage_tracking(tracked_baggage, frame):
    for baggage_id, (tracker, last_seen, last_bbox, stationary_since) in list(tracked_baggage.items()):
        success, bbox = tracker.update(frame)
        if success:
            tracked_baggage[baggage_id] = (tracker, time.time(), bbox, stationary_since)
            x1, y1, w, h = bbox
            cv2.rectangle(frame, (int(x1), int(y1)), (int(x1 + w), int(y1 + h)), (255, 0, 0), 2)
            cv2.putText(frame, baggage_id, (int(x1), int(y1) - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
        else:
            print(f"Lost tracking for {baggage_id}")
            del tracked_baggage[baggage_id]

# Function to check for unattended baggage
def check_unattended_baggage(tracked_baggage, persons, frame):
    for baggage_id, (tracker, last_seen, bbox, stationary_since) in tracked_baggage.items():
        unattended = False
        color = (255, 0, 0)  # Default: Blue bounding box

        # Unpack the bounding box (x1, y1, width, height)
        x1, y1, w, h = bbox
        x2, y2 = x1 + w, y1 + h  # Calculate bottom-right corner

        # Check if the bag is stationary or left unattended
        if time.time() - stationary_since > stationary_threshold:
            unattended = True
        if time.time() - last_seen > unattended_threshold:
            unattended = True

        if unattended:
            color = (0, 0, 255)  # Red for unattended
            # Draw red bounding box and place "Unattended" text just below the box
            cv2.rectangle(frame, (int(x1), int(y1)), (int(x2), int(y2)), color, 2)
            cv2.putText(frame, "Unattended", (int(x1), int(y2) + 15), 
                        cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)

# Function to process the video
def process_video(input_video_path, output_video_path):
    global tracked_baggage

    # Capture video from the input file
    cap = cv2.VideoCapture(input_video_path)

    # Get video properties
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv2.CAP_PROP_FPS)

    # Define the codec and create VideoWriter object to save output
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')  # Codec for mp4
    out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        # Detect persons and baggage
        persons, baggage = detect_objects(frame)

        # Update tracking for existing baggage
        update_baggage_tracking(tracked_baggage, frame)

        # Check for unattended baggage
        check_unattended_baggage(tracked_baggage, persons, frame)

        # Track new baggage
        for (x1, y1, x2, y2) in baggage:
            bbox = (x1, y1, x2 - x1, y2 - y1)
            baggage_id, tracker = assign_baggage_id(bbox, frame)

        # Write the annotated frame to the output video
        out.write(frame)

        # Optionally display the frame
        cv2.imshow('Bag', frame)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    cap.release()
    out.release()
    cv2.destroyAllWindows()
    print(f"Output video saved at {output_video_path}")

# Path to your video file
input_video_path = 'Videos/sample2.mp4'  # Replace with your video file path
output_video_path = 'Output/result2.mp4'  # Output path

# Process the video to detect and track unattended baggage
process_video(input_video_path, output_video_path)


Using device: cuda

0: 480x640 1 person, 1 bird, 78.1ms
Speed: 7.0ms preprocess, 78.1ms inference, 2.0ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 1 person, 1 bird, 1 snowboard, 42.0ms
Speed: 8.2ms preprocess, 42.0ms inference, 0.0ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 1 person, 1 bird, 1 snowboard, 30.2ms
Speed: 4.0ms preprocess, 30.2ms inference, 0.0ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 1 person, 1 bird, 40.8ms
Speed: 2.6ms preprocess, 40.8ms inference, 6.9ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 1 person, 1 bird, 1 snowboard, 45.2ms
Speed: 0.0ms preprocess, 45.2ms inference, 4.6ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 1 person, 1 bird, 1 snowboard, 27.5ms
Speed: 6.9ms preprocess, 27.5ms inference, 0.0ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 1 person, 1 bird, 20.7ms
Speed: 0.0ms preprocess, 20.7ms inference, 0.0ms postprocess per image at shape (1, 3, 4