In [None]:
import os
import cv2
import pandas as pd
import torch
import json
from ultralytics import YOLO
from datetime import datetime


class Tracker:
    def __init__(self, config_path: str):
        """
        Initialize the Tracker with a configuration file.
        Args:
            config_path (str): Path to the JSON configuration file.
        """
        # Load configuration
        with open(config_path, "r") as config_file:
            self.config = json.load(config_file)

        # Extract settings from the configuration
        self.model_path = self.config["model"]["weights"]
        self.conf_threshold = self.config["model"]["conf_threshold"]
        self.iou_threshold = self.config["model"]["iou_threshold"]

        self.input_dir = self.config["input"]["images_dir"]
        self.output_dir = self.config["output"]["output_dir"]
        self.save_video = self.config["output"]["save_video"]


        # Ensure output directory exists
        os.makedirs(self.output_dir, exist_ok=True)

        # Initialize YOLO model
        if not os.path.exists(self.model_path):
            raise FileNotFoundError(f"Model file {self.model_path} does not exist.")
        self.model = YOLO(self.model_path)
        self.model.to("cuda" if torch.cuda.is_available() else "cpu")

        # Prepare for tracking
        self.global_track_id = 0
        self.track_memory = {}  # Tracks global IDs for cross-image tracking
        self.tracking_data = []  # Store results for all files

    def _process_video(self, video_path: str):
        """
        Process a video file for object tracking.
        Args:
            video_path (str): Path to the video file.
        """
        cap = cv2.VideoCapture(video_path)
        if not cap.isOpened():
            print(f"Error: Unable to open video file {video_path}")
            return

        video_name = os.path.basename(video_path)
        video_output_dir = os.path.join(self.output_dir, "annotated_videos")
        os.makedirs(video_output_dir, exist_ok=True)
        output_video_path = os.path.join(video_output_dir, f"annotated_{video_name}")

        try:
            while cap.isOpened():
                success, frame = cap.read()
                if not success:
                    break

                # Get current frame number
                frame_number = int(cap.get(cv2.CAP_PROP_POS_FRAMES))

                # Run YOLO tracking
                try:
                    results = self.model.track(
                        frame,
                        conf=self.conf_threshold,
                        iou=self.iou_threshold,
                        persist=True,
                        save=self.save_video,
                    )

                    # Save annotated frame to video if `save_video` is True
                    if self.save_video:
                        annotated_frame = results[0].plot()
                        if "video_writer" not in locals():
                            # Initialize video writer if not already done
                            fourcc = cv2.VideoWriter_fourcc(*"mp4v")
                            frame_height, frame_width = annotated_frame.shape[:2]
                            video_writer = cv2.VideoWriter(
                                output_video_path, fourcc, cap.get(cv2.CAP_PROP_FPS), (frame_width, frame_height)
                            )
                        video_writer.write(annotated_frame)

                    # Check if there are detections
                    if results and results[0].boxes is not None:
                        boxes = results[0].boxes.xywh.cpu().numpy()  # Bounding boxes
                        track_ids = results[0].boxes.id.cpu().numpy().tolist()  # Track IDs

                        # Append tracking data
                        for box, track_id in zip(boxes, track_ids):
                            x, y, w, h = box
                            self.tracking_data.append([track_id, x, y, w, h, os.path.basename(video_path), frame_number])
                    else:
                        print(f"No detections in frame {frame_number}.")

                except Exception as e:
                    print(f"Error processing frame {frame_number}: {e}")

        except Exception as e:
            print(f"Error processing video {video_path}: {e}")
        finally:
            cap.release()
            # Release video writer
            if "video_writer" in locals():
                video_writer.release()

    def _process_image(self, image_path: str):
        """
        Process a single image for object tracking.
        Args:
            image_path (str): Path to the image file.
        """
        frame = cv2.imread(image_path)
        if frame is None:
            print(f"Error: Unable to open image file {image_path}")
            return

        results = self.model.track(frame, conf=self.conf_threshold, iou=self.iou_threshold, persist=True)

        if results and results[0].boxes is not None:
            boxes = results[0].boxes.xywh.cpu().numpy()
            track_ids = results[0].boxes.id.cpu().numpy().tolist()

            for box, track_id in zip(boxes, track_ids):
                if track_id not in self.track_memory:
                    self.global_track_id += 1
                    self.track_memory[track_id] = self.global_track_id

                global_id = self.track_memory[track_id]
                x, y, w, h = box
                self.tracking_data.append([global_id, x, y, w, h, os.path.basename(image_path), 0])

    def __call__(self):
        """
        Process all images and videos in the input directory or a single file.
        """
        if os.path.isfile(self.input_dir):
            # Process a single file
            input_files = [self.input_dir]
        elif os.path.isdir(self.input_dir):
            # Get all files from the input directory
            input_files = [
                os.path.join(self.input_dir, f)
                for f in os.listdir(self.input_dir)
                if f.lower().endswith((".mp4", ".avi", ".jpg", ".jpeg", ".png"))
            ]
        else:
            raise NotADirectoryError(f"The input path {self.input_dir} is neither a file nor a directory.")

        for input_file in sorted(input_files):
            if input_file.lower().endswith((".mp4", ".avi")):
                print(f"Processing video: {input_file}")
                self._process_video(input_file)
            elif input_file.lower().endswith((".jpg", ".jpeg", ".png")):
                print(f"Processing image: {input_file}")
                self._process_image(input_file)

        # Save results to a CSV file
        output_csv = os.path.join(self.output_dir, "tracking_results.csv")
        df = pd.DataFrame(self.tracking_data, columns=["track_id", "x", "y", "w", "h", "file_name", "frame_number"])
        df.to_csv(output_csv, index=False)
        print(f"Tracking results saved to {output_csv}")

# Usage
if __name__ == "__main__":
    tracker = Tracker(config_path="config.json")
    tracker()  # Call the instance to process files


Processing video: src/tests/13413819-hd_1920_1080_30fps.mp4

0: 384x640 (no detections), 139.8ms
Speed: 16.9ms preprocess, 139.8ms inference, 0.5ms postprocess per image at shape (1, 3, 384, 640)
Results saved to [1mruns\detect\track[0m
Error processing frame 1: 'NoneType' object has no attribute 'cpu'

0: 384x640 (no detections), 61.6ms
Speed: 3.0ms preprocess, 61.6ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)
Results saved to [1mruns\detect\track[0m
Error processing frame 2: 'NoneType' object has no attribute 'cpu'

0: 384x640 (no detections), 65.6ms
Speed: 2.0ms preprocess, 65.6ms inference, 0.0ms postprocess per image at shape (1, 3, 384, 640)
Results saved to [1mruns\detect\track[0m
Error processing frame 3: 'NoneType' object has no attribute 'cpu'

0: 384x640 (no detections), 79.1ms
Speed: 11.0ms preprocess, 79.1ms inference, 0.5ms postprocess per image at shape (1, 3, 384, 640)
Results saved to [1mruns\detect\track[0m
Error processing frame 4: 'None

In [19]:
import os
os.chdir("c:\\Users\\bohbot-lab\\git\\Mosquito_Supermodel")