In [2]:
from collections import defaultdict
import cv2
import numpy as np
from ultralytics import YOLO
from scipy.ndimage import center_of_mass
import csv
import os

# Load the YOLOv8 model
model = YOLO("best_osmu-e.pt")  # Ensure 'best.pt' is the correct path to your model

input_folder = "Dataset/Video"
output_folder = "Dataset/trajectories"

# YOLO prediction resolution (assuming 640x320)
yolo_width = 640
yolo_height = 320

# Store the track history
track_history = defaultdict(list)

# Function to process each video
def process_video(video_path, output_csv, output_video):
    cap = cv2.VideoCapture(video_path)

    # Check if video opened successfully
    if not cap.isOpened():
        print(f"Error: Cannot open video file {video_path}")
        return

    # Get the original video dimensions
    original_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    original_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    # Initialize VideoWriter to save the annotated video
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')  # Codec for mp4 format
    out = cv2.VideoWriter(output_video, fourcc, cap.get(cv2.CAP_PROP_FPS), (original_width, original_height))

    # Define data structures for CSV
    combined_data = defaultdict(lambda: defaultdict(dict))  # combined_data[track_id][frame] = {...}

    frame_number = 0

    # Loop through the video frames
    while cap.isOpened():
        # Read a frame from the video
        success, frame = cap.read()
        frame_number += 1

        if not success:
            break

        # Run YOLOv8 tracking on the frame
        results = model.track(frame, persist=True, tracker="bytetrack.yaml")

        # Initialize masks and track_ids
        masks = []
        track_ids = []
        bboxes = []  # YOLO bounding boxes

        # Check if there are any detections
        if results and len(results) > 0 and results[0].boxes is not None and results[0].boxes.id is not None:
            masks = results[0].masks.data.cpu().numpy()
            track_ids = results[0].boxes.id.int().cpu().tolist()
            bboxes = results[0].boxes.xyxy.cpu().numpy()  # YOLO bounding boxes

        # Visualize the results on the frame
        annotated_frame = results[0].plot() if results and len(results) > 0 else frame.copy()

        for mask, track_id, bbox in zip(masks, track_ids, bboxes):
            if mask.sum() > 0:
                # Convert mask to binary image
                binary_mask = mask.astype(np.uint8)

                # Bounding box coordinates from YOLO
                top_left = (bbox[0], bbox[1])
                bottom_right = (bbox[2], bbox[3])
                top_right = (bbox[2], bbox[1])
                bottom_left = (bbox[0], bbox[3])

                # Scale bounding box coordinates based on the difference in resolution between YOLO input and original video
                x_scale_factor = original_width / yolo_width
                y_scale_factor = original_height / yolo_height

                # Scale bounding box coordinates
                top_left_scaled = (int(top_left[0] * x_scale_factor), int(top_left[1] * y_scale_factor))
                bottom_right_scaled = (int(bottom_right[0] * x_scale_factor), int(bottom_right[1] * y_scale_factor))

                # Calculate the center of mass of the mask
                mask_center_of_mass = center_of_mass(mask)
                center_y, center_x = mask_center_of_mass

                # Calculate the mask area (number of non-zero pixels)
                mask_area = np.sum(binary_mask)

                # Scale center of mass coordinates
                center_x_scaled = int(center_x * x_scale_factor)
                center_y_scaled = int(center_y * y_scale_factor)

                # Draw the center of mass (optional)
                cv2.circle(annotated_frame, (center_x_scaled, center_y_scaled), radius=5, color=(0, 255, 0), thickness=-1)

                # Log the tracking information: bounding box coordinates and area
                print(f"Frame {frame_number} - Track ID: {track_id}")
                print(f"  YOLO Bounding Box: Top Left: {top_left_scaled}, Bottom Right: {bottom_right_scaled}")
                print(f"  Mask Area: {mask_area}")
                print(f"  Center of Mass: X={center_x_scaled}, Y={center_y_scaled}")

                # Update track history with the center of mass
                track_history[track_id].append((float(center_x_scaled), float(center_y_scaled)))

                # Store data for CSV (including bounding box and mask area)
                combined_data[track_id][frame_number]['Center_X'] = center_x_scaled
                combined_data[track_id][frame_number]['Center_Y'] = center_y_scaled
                combined_data[track_id][frame_number]['Mask_Area'] = mask_area

        # Write the annotated frame to the output video
        out.write(annotated_frame)

        # Display the annotated frame
        cv2.imshow("YOLO Bounding Box Tracking", annotated_frame)

        if cv2.waitKey(1) & 0xFF == ord("q"):
            break

    # Release the video capture and writer objects, and close the display window
    cap.release()
    out.release()  # Release the VideoWriter
    cv2.destroyAllWindows()

    # Save combined data to CSV
    with open(output_csv, mode='w', newline='') as file:
        writer = csv.writer(file)
        # Header
        header = ["Track ID", "Frame", "Center X", "Center Y", "Mask Area"]
        writer.writerow(header)

        for track_id, frames in combined_data.items():
            for frame_num, data in frames.items():
                row = [
                    track_id,
                    frame_num,
                    data.get("Center_X", ""),
                    data.get("Center_Y", ""),
                    data.get("Mask_Area", "")
                ]
                writer.writerow(row)

    print(f"Combined tracking data saved to {output_csv}")


# Example of how you would process a folder of videos
video_files = [f for f in os.listdir(input_folder) if f.endswith(".mp4")]

for video_file in video_files:
    video_path = os.path.join(input_folder, video_file)
    output_csv = os.path.join(output_folder, f"{os.path.splitext(video_file)[0]}.csv")
    output_video = os.path.join(output_folder, f"{os.path.splitext(video_file)[0]}_annotated.mp4")
    process_video(video_path, output_csv, output_video)


0: 320x640 6 Clouds, 1458.7ms
Speed: 21.4ms preprocess, 1458.7ms inference, 57.3ms postprocess per image at shape (1, 3, 320, 640)
Frame 1 - Track ID: 1
  YOLO Bounding Box: Top Left: (4163, 1387), Bottom Right: (5751, 2796)
  Mask Area: 12315
  Center of Mass: X=1643, Y=703
Frame 1 - Track ID: 2
  YOLO Bounding Box: Top Left: (3669, 876), Bottom Right: (3963, 1038)
  Mask Area: 418
  Center of Mass: X=1264, Y=325
Frame 1 - Track ID: 3
  YOLO Bounding Box: Top Left: (3743, 1124), Bottom Right: (4130, 1233)
  Mask Area: 406
  Center of Mass: X=1320, Y=397
Frame 1 - Track ID: 4
  YOLO Bounding Box: Top Left: (3607, 1478), Bottom Right: (3823, 1548)
  Mask Area: 145
  Center of Mass: X=1239, Y=510
Frame 1 - Track ID: 5
  YOLO Bounding Box: Top Left: (2931, 2181), Bottom Right: (3682, 2485)
  Mask Area: 780
  Center of Mass: X=1120, Y=778
Frame 1 - Track ID: 6
  YOLO Bounding Box: Top Left: (4208, 2514), Bottom Right: (4529, 2794)
  Mask Area: 407
  Center of Mass: X=1451, Y=887

0: 320x6