In [None]:
import argparse
from collections import defaultdict
from pathlib import Path

import cv2
import numpy as np
from shapely.geometry import Polygon
from shapely.geometry.point import Point

from ultralytics import YOLO
from ultralytics.utils.files import increment_path
from ultralytics.utils.plotting import Annotator, colors

In [None]:
# Define global variables
track_history = defaultdict(list)
current_region = None

# Dictionary to keep track of previous positions of cars
previous_positions = {}

# Function to check if a car is moving
# def is_moving(car_id, current_position, threshold=2):
#     if car_id in previous_positions:
#         prev_position = previous_positions[car_id]
#         dist = np.linalg.norm(np.array(current_position) - np.array(prev_position))
#         return dist > threshold
#     return False

# Function to create a dynamic polygon mask for the middle road
def create_dynamic_polygon_mask(frame, width_percentage=0.3, height_percentage=0.9):
    height, width = frame.shape[:2]
    poly_width = int(width * width_percentage)
    poly_height = int(height * height_percentage)
    start_x = (width - poly_width) // 2
    end_x = start_x + poly_width
    start_y = (height - poly_height) // 2
    end_y = start_y + poly_height
    
    polygon = Polygon([(start_x, start_y), (end_x, start_y), (end_x, end_y), (start_x, end_y)])
    mask = np.zeros(frame.shape[:2], dtype=np.uint8)
    cv2.fillPoly(mask, [np.array(polygon.exterior.coords, np.int32)], 255)
    return mask

# Function to run the main logic
def run(
    weights="yolov8n.pt",
    source=None,
    device="cpu",
    save_img=False,
    exist_ok=False,
    classes=None,
    line_thickness=2,
    track_thickness=2,
    region_thickness=2,
):
    vid_frame_count = 0
    unique_class_0_ids = set()
    unique_class_1_ids = set()
    unique_class_2_ids = set()
    # moving_cars = set()

    if not Path(source).exists():
        raise FileNotFoundError(f"Source path '{source}' does not exist.")

    model = YOLO(f"{weights}")
    model.to(device)
    names = model.model.names

    reader = imageio.get_reader(source, 'ffmpeg')
    meta_data = reader.get_meta_data()
    fps = meta_data['fps']
    frame_width, frame_height = meta_data['size']
    num_frames = meta_data['nframes']  # Total number of frames in the video
    save_dir = increment_path(Path("ultralytics_rc_output") / "exp", exist_ok)
    save_dir.mkdir(parents=True, exist_ok=True)
    video_writer = cv2.VideoWriter(str(save_dir / f"{Path(source).stem}.mp4"), cv2.VideoWriter_fourcc(*"mp4v"), fps, (frame_width, frame_height))

    frame_skip_count = 5  # Skip every 5th frame

    for i, frame in enumerate(reader):
       # if i % frame_skip_count != 0:
           ## continue

        vid_frame_count += 1
        frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)

        # Create a dynamic polygon mask for the middle road region
        middle_road_mask = create_dynamic_polygon_mask(frame)

        # Apply the middle road mask to the frame
        masked_frame = cv2.bitwise_and(frame, frame, mask=middle_road_mask)

        results = model.track(masked_frame, persist=True, classes=classes)

        if results[0].boxes.id is not None:
            boxes = results[0].boxes.xyxy.cpu()
            track_ids = results[0].boxes.id.int().cpu().tolist()
            clss = results[0].boxes.cls.cpu().tolist()

            annotator = Annotator(masked_frame, line_width=line_thickness, example=str(names))

            for box, track_id, cls in zip(boxes, track_ids, clss):
                annotator.box_label(box, str(names[cls]), color=colors(cls, True))
                bbox_center = (box[0] + box[2]) / 2, (box[1] + box[3]) / 2

                track = track_history[track_id]
                track.append((float(bbox_center[0]), float(bbox_center[1])))
                if len(track) > 30:
                    track.pop(0)
                points = np.hstack(track).astype(np.int32).reshape((-1, 1, 2))
                cv2.polylines(masked_frame, [points], isClosed=False, color=colors(cls, True), thickness=track_thickness)

                if cls == 0:
                    unique_class_0_ids.add(track_id)
                elif cls == 1:
                    unique_class_1_ids.add(track_id)
                elif cls == 2:
                    unique_class_2_ids.add(track_id)

                # if cls == 2:  # Assuming class 2 is car
                #     if is_moving(track_id, bbox_center):
                #         moving_cars.add(track_id)
                #     previous_positions[track_id] = bbox_center

        if save_img:
            video_writer.write(masked_frame)

        # Print progress every 10 processed frames
        if vid_frame_count % 10 == 0:
            progress = (vid_frame_count * frame_skip_count) / num_frames * 100
            print(f"Progress: {progress:.2f}%")

    print(f"Total unique objects of class 0 in the whole video: {len(unique_class_0_ids)}")
    print(f"Total unique objects of class 1 in the whole video: {len(unique_class_1_ids)}")
    print(f"Total unique objects of class 2 in the whole video: {len(unique_class_2_ids)}")
    # print(f"Total moving cars in the whole video: {len(moving_cars)}")

    video_writer.release()

# Function to parse command line arguments
def parse_opt():
    parser = argparse.ArgumentParser()
    parser.add_argument("--weights", type=str, default="yolov8n.pt", help="initial weights path")
    parser.add_argument("--device", default="cpu", help="cuda device, i.e. 0 or 0,1,2,3 or cpu")
    parser.add_argument("--source", type=str, required=True, help="video file path")
    parser.add_argument("--save-img", action="store_true", help="save results")
    parser.add_argument("--exist-ok", action="store_true", help="existing project/name ok, do not increment")
    parser.add_argument("--classes", nargs="+", type=int, help="filter by class: --classes 0 1 2")
    parser.add_argument("--line-thickness", type=int, default=2, help="bounding box thickness")
    parser.add_argument("--track-thickness", type=int, default=2, help="Tracking line thickness")
    parser.add_argument("--region-thickness", type=int, default=4, help="Region thickness")
    return parser.parse_args()

# Main function
def main(opt):
    run(**vars(opt))

if __name__ == "__main__":
    opt = parse_opt()
    main(opt)
