In [None]:
import cv2
import numpy as np
import torch
import os
from ultralytics import YOLO
from typing import List, Union, Tuple
from concurrent.futures import ThreadPoolExecutor, as_completed
import math
class VehicleTracker:
    def __init__(self, 
                 yolo_model_path: str, 
                 input_videos: List[str], 
                 output_dir: str, 
                 conf,
                 iou,
                 velocity_threshold,
                 roi_height_percentage: float = 0.2,  # Percentage of video height for ROI
                 classes_to_track: Union[List[str], List[int]] = [2,3]):
        """
        Initialize CarTracker with YOLO model, input videos, and tracking parameters
        
        Args:
            yolo_model_path (str): Path to YOLO model weights
            input_videos (List[str]): List of input video paths
            output_dir (str): Directory to save output videos and log files
            roi_height_percentage (float): Percentage of video height for ROI
            classes_to_track (List): Classes to track (default: [car and motorcycle])
        """
        # Create output directory if it doesn't exist
        self.model_path=yolo_model_path
        self.conf=conf
        self.iou=iou
        os.makedirs(output_dir, exist_ok=True)
        self.velocity_threshold=velocity_threshold
        # Load YOLO model with tracking
        
        self.input_videos = input_videos
        self.output_dir = output_dir
        self.roi_height_percentage = roi_height_percentage
        
        # Convert class names to class indices
        self.classes_to_track = self._get_class_indices(classes_to_track)
        
   
    def _get_class_indices(self, classes_to_track: Union[List[str], List[int]]) -> List[int]:
        """
        Convert class names or indices to class indices
        
        Args:
            classes_to_track (List): List of class names or indices
        
        Returns:
            List[int]: List of class indices
        """
        # If input is class names
        if isinstance(classes_to_track[0], str):
            return [self.model.names.index(cls) for cls in classes_to_track if cls in self.model.names]
        
        # If input is already class indices
        return classes_to_track
    
    def _create_roi(self, frame_width: int, frame_height: int) -> np.ndarray:
        """
        Create a Region of Interest (ROI) in the middle of the frame
        
        Args:
            frame_width (int): Width of the video frame
            frame_height (int): Height of the video frame
        
        Returns:
            np.ndarray: ROI polygon points
        """
        # Calculate ROI height based on percentage
        roi_height = int(frame_height * self.roi_height_percentage)
        
        # Calculate ROI vertical position (middle of the frame)
        roi_y_center = int(frame_height / 2)
        roi_y_top = roi_y_center - int(roi_height / 2)
        roi_y_bottom = roi_y_center + int(roi_height / 2)
        
        # Create ROI polygon with full width
        roi = np.array([
            [0, roi_y_top],           # Top left
            [frame_width, roi_y_top],  # Top right
            [frame_width, roi_y_bottom],  # Bottom right
            [0, roi_y_bottom]         # Bottom left
        ], dtype=np.int32)
        
        return roi
    
    def _is_inside_roi(self, bbox: np.ndarray, roi: np.ndarray) -> bool:
        """
        Check if a bounding box is inside the Region of Interest
        
        Args:
            bbox (np.ndarray): Bounding box coordinates [x1, y1, x2, y2]
            roi (np.ndarray): Region of Interest polygon points
        
        Returns:
            bool: True if bbox is inside ROI, False otherwise
        """
        center_x = (bbox[0] + bbox[2]) / 2
        center_y = (bbox[1] + bbox[3]) / 2
        return cv2.pointPolygonTest(roi, (center_x, center_y), False) >= 0
    
    def _calculate_velocity(self, prev_position: Tuple[int, int], current_position: Tuple[int, int], fps: float) -> float:
        """
        Calculate the velocity of an object based on its position in two consecutive frames.
        
        Args:
            prev_position (Tuple[int, int]): Previous position of the object (x, y)
            current_position (Tuple[int, int]): Current position of the object (x, y)
            fps (float): Frames per second of the video
        
        Returns:
            float: Velocity in km/h
        """
        # Calculate Euclidean distance between the previous and current positions
        dx = current_position[0] - prev_position[0]
        dy = current_position[1] - prev_position[1]
        distance_pixels = math.sqrt(dx**2 + dy**2)

        # Convert distance in pixels to meters (this will depend on your camera setup)
        distance_meters = distance_pixels * 0.02  # Adjust the scale based on your setup
        
        # Calculate velocity in meters per second
        velocity_mps = distance_meters * fps
        
        # Convert velocity from meters per second to kilometers per hour (1 m/s = 3.6 km/h)
        velocity_kmh = velocity_mps * 3.6
        
        return velocity_kmh
    
    def process_video(self, model_name, video_path):
            model=YOLO(model_name)
            video_name = os.path.splitext(os.path.basename(video_path))[0]
            for cls in self.classes_to_track:
                output_video_path = os.path.join(self.output_dir, f'{video_name}_labels.mp4')
                log_file_path = os.path.join(self.output_dir, f'{video_name}_{cls}_count.log')
            # Open input video
            cap = cv2.VideoCapture(video_path)
            width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
            height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
            fps = cap.get(cv2.CAP_PROP_FPS)
            
            # Create ROI dynamically based on frame dimensions
            roi = self._create_roi(width, height)
            
            # Video writer
            fourcc = cv2.VideoWriter_fourcc(*'mp4v')
            out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))
            
            # Reset tracking state for this video
            car_counts=0
            bike_counts= 0
            tracked_ids = set()
            back_sub = cv2.createBackgroundSubtractorMOG2(history=500, varThreshold=100, detectShadows=True)
            
            vehicle_positions={}
            while cap.isOpened():
                
                ret, frame = cap.read()
                if not ret:
                    break
                
                try:
                    fg_mask = back_sub.apply(frame)
                    # Detect and track objects using YOLO with BoT-SORT
                    results = model.track(
                        frame, 
                        persist=True,  # Enables tracking across frames
                        classes=self.classes_to_track,  # Specific classes to track
                        conf=self.conf,  # Confidence threshold
                        iou=self.iou,   # IoU threshold for tracking,
                        imgsz=(1440,2560),
                        tracker="botsort.yaml",
                        device="cuda"
                    )
                
                    # Draw ROI
                    cv2.polylines(frame, [roi], True, (0, 255, 0), 2)
                    
                    # Process tracked objects
                    if results[0].boxes.id is not None:
                        for box in results[0].boxes:
                            # Extract tracking information
                            bbox = box.xyxy[0].cpu().numpy()
                            cls = int(box.cls[0])
                            track_id = int(box.id[0])
                            
                            # Check if object is a tracked class
                            if cls in self.classes_to_track:
                                # Draw bounding box for ALL tracked cars
                                x1, y1, x2, y2 = map(int, bbox)
                                if fg_mask[y1:y2, x1:x2].mean() < 50:  # Adjust the threshold value as needed
                                    continue
                                if track_id in tracked_ids:
                                    color= (0,0,255)
                                else:
                                    if cls == 2:
                                        color = (50, 158, 168)  # Green for all cars
                                    if cls == 3: 
                                        color=(255,255,255)
                                
                                # Check if object is inside ROI and count only if not previously counted
                                if self._is_inside_roi(bbox, roi) and track_id not in tracked_ids:
                                    if track_id in vehicle_positions:
                                        prev_position = vehicle_positions[track_id]
                                        velocity = self._calculate_velocity(prev_position, (x1, y1), fps)
                                        if velocity < self.velocity_threshold:
                                            continue
                                        if velocity > self.velocity_threshold:
                                            if cls == 2:
                                                car_counts += 1
                                            else:
                                                bike_counts += 1
                                            tracked_ids.add(track_id)
                                    vehicle_positions[track_id] = (x1, y1)  # Update the vehicle's last position
                                    color = (0, 0, 255)  # Red for counted cars in ROI
                                
                                # Draw bounding box
                                cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
                                
                                # Add label with class name and track ID
                                label = f'{model.names[cls]} ID:{track_id}'
                                cv2.putText(frame, 
                                            label, 
                                            (x1, y1-10), 
                                            cv2.FONT_HERSHEY_SIMPLEX, 0.9, color, 2)
                        counter_text = f"Cars: {car_counts} | Bikes: {bike_counts}"
                        cv2.putText(frame, counter_text, (10, 40), cv2.FONT_HERSHEY_SIMPLEX, 2, (0, 255, 0), 4)
                    # Write frame to output video
                    out.write(frame)
                
                except Exception as e:
                    print(f"Error processing frame: {e}")
                    continue
            
            # Release resources
            cap.release()
            out.release()
            
            # Write log file
            with open(log_file_path, 'w') as log:
                log.write(f"Config:\nModel:{self.model_path}\nConf:{self.conf}\nIOU:{self.iou}\nROI_height:{self.roi_height_percentage}\nVelocity threshold:{self.velocity_threshold}\n")
                log.write(f"Video: {video_name}\n")
                log.write(f"Total Cars Counted: {car_counts}\n")
                log.write(f"Total Bikes Counted: {bike_counts}")
       
        
        # Print summary
            print("Tracking completed. Check output directory for results.")
            
            return {"Car": car_counts, "Bike":bike_counts}
            
    def process_video_concurrently(self):
        final_res=[]
        with ThreadPoolExecutor(max_workers=20) as executor:
                submission={executor.submit(self.process_video, self.model_path, vid_path) for vid_path in self.input_videos}
                for res in as_completed(submission):
                    try:
                       final_res.append(res.result())
                    except Exception as exc:
                        print('%r generated an exception: %s' % (exc))
        return final_res
                    

# Example Usage
def main():
    # List of input videos
    def list_files_recursively(directory):
        """
        Recursively list all file paths in the given directory.
        
        Args:
            directory (str): The directory to search in.

        Returns:
            list: A list of full paths to all files in the directory and its subdirectories.
        """
        file_paths = []
        for root, _, files in os.walk(directory):
            for file in files:
                file_paths.append(os.path.join(root, file))
        return file_paths

    input_videos=list_files_recursively("videos")
    input_videos=[vid for vid in input_videos if "D2" in vid]
    print("Processing", input_videos)
    
    # Initialize tracker
    car_tracker = VehicleTracker(
        yolo_model_path='yolo11n.pt',  # Use appropriate YOLO model
        input_videos=input_videos,
        output_dir='output_tracking_v11',
        roi_height_percentage=0.3,  # 20% of video height in the middle
        classes_to_track=[2, 3],
        conf=0.5,
        iou=0.5,
        velocity_threshold=2.0# Can use class names or indices
    )
    
    # Process videos
    res=car_tracker.process_video_concurrently()
 
if __name__ == "__main__":
    main()