In [1]:
import cv2
import torch
import numpy as np
from ultralytics import YOLO
import pandas as pd
from pathlib import Path
from collections import defaultdict

# ------------------------
# GPU check
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
if torch.cuda.is_available():
    print(f"GPU: {torch.cuda.get_device_name(0)}")

# ------------------------
# Load YOLO model (your trained model)
model_path = "f:/skills-copilot-codespaces-vscode/thesis/runs/detect/rsud20k_yolo114/weights/best.pt"
model = YOLO(model_path)
model.to(device)
print(f"‚úì Loaded YOLO model from: {model_path}")
print(f"Model classes: {model.names}")

# ------------------------
# CLASS NAME REMAPPING
CLASS_REMAP = {
    'person': 'person',
    'rickshaw': 'rickshaw',
    'rickshaw_van': 'rickshaw_van',
    'auto_rickshaw': 'auto_rickshaw',
    'truck': 'truck',
    'pickup_truck': 'pickup_truck',
    'private_car': 'car',
    'motorcycle': 'motorcycle',
    'bicycle': 'bicycle',
    'bus': 'bus',
    'micro_bus': 'micro_bus',
    'covered_van': 'covered_van',
    'human_hauler': 'human_hauler',
    'motorbike': 'motorcycle',
    'car': 'car',
}

# REAL-WORLD OBJECT SIZES (average dimensions in meters)
OBJECT_REAL_SIZES = {
    'person': {'width': 0.5, 'height': 1.7, 'length': 0.3},
    'rickshaw': {'width': 1.2, 'height': 1.8, 'length': 2.5},
    'rickshaw_van': {'width': 1.5, 'height': 2.0, 'length': 3.0},
    'auto_rickshaw': {'width': 1.3, 'height': 1.6, 'length': 2.8},
    'truck': {'width': 2.5, 'height': 3.5, 'length': 8.0},
    'pickup_truck': {'width': 2.0, 'height': 2.2, 'length': 5.5},
    'car': {'width': 1.8, 'height': 1.5, 'length': 4.5},
    'motorcycle': {'width': 0.8, 'height': 1.2, 'length': 2.2},
    'bicycle': {'width': 0.6, 'height': 1.1, 'length': 1.8},
    'bus': {'width': 2.5, 'height': 3.2, 'length': 12.0},
    'micro_bus': {'width': 2.0, 'height': 2.5, 'length': 6.0},
    'covered_van': {'width': 2.0, 'height': 2.3, 'length': 5.0},
    'human_hauler': {'width': 1.5, 'height': 2.0, 'length': 3.5},
}

# ------------------------
# CAMERA CALIBRATION - PINHOLE CAMERA MODEL
# ------------------------
FOCAL_LENGTH_PX = 1000  # Focal length in pixels
FRAME_RATE_SAMPLES = 5  # Frames to average for speed

print(f"\nCamera calibration:")
print(f"  Focal length: {FOCAL_LENGTH_PX} pixels")
print(f"  Speed averaging: {FRAME_RATE_SAMPLES} frames")

# ------------------------
# Video Setup
video_path = r"F:\skills-copilot-codespaces-vscode\thesis\Night Driving Seoul City _ Gangnam and Expressway with Chill Lofi Hiphop Beats POV 4K HDR.mp4"
cap = cv2.VideoCapture(video_path)

if not cap.isOpened():
    raise ValueError(f"Cannot open video: {video_path}")

fps = int(cap.get(cv2.CAP_PROP_FPS))
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

print(f"\nVideo Info:")
print(f"  Resolution: {width}x{height}")
print(f"  FPS: {fps}")
print(f"  Total Frames: {total_frames}")
print(f"  Duration: {total_frames/fps:.1f} seconds")

DELTA_TIME = 1.0 / fps

# Display settings
display_width = 1280
display_height = int(height * (display_width / width))

# ------------------------
# Output setup
output_dir = Path("runs/detect/distance_tracking")
output_dir.mkdir(parents=True, exist_ok=True)

output_video = output_dir / "annotated_video.mp4"
output_csv = output_dir / "detections_with_distance.csv"

fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(str(output_video), fourcc, fps, (width, height))

# ------------------------
# Detection settings
CONFIDENCE_THRESHOLD = 0.15
IOU_THRESHOLD = 0.3
IMAGE_SIZE = 1280
MAX_TRACKING_DISTANCE = 150

print(f"\nDetection settings:")
print(f"  Confidence: {CONFIDENCE_THRESHOLD}")
print(f"  IoU: {IOU_THRESHOLD}")
print(f"  Image size: {IMAGE_SIZE}px")

# ------------------------
# Tracking variables
detection_data = []
frame_id = 0
tracked_objects = {}
next_object_id = 0

def estimate_distance(bbox_width, bbox_height, object_class, focal_length):
    """
    Pinhole camera model: Distance = (Real_Size √ó Focal_Length) / Pixel_Size
    """
    if object_class not in OBJECT_REAL_SIZES:
        return None
    
    real_width = OBJECT_REAL_SIZES[object_class]['width']
    real_height = OBJECT_REAL_SIZES[object_class]['height']
    
    distance_from_width = (real_width * focal_length) / bbox_width if bbox_width > 0 else 999
    distance_from_height = (real_height * focal_length) / bbox_height if bbox_height > 0 else 999
    
    distance = (distance_from_width + distance_from_height) / 2.0
    return distance

def find_closest_tracked_object(center_x, center_y, object_class, frame_num):
    """Find closest previously tracked object of same class"""
    min_distance = float('inf')
    closest_id = None
    
    for obj_id, obj_data in tracked_objects.items():
        if obj_data['class'] != object_class:
            continue
        
        if len(obj_data['positions']) > 0:
            last_x, last_y, last_frame = obj_data['positions'][-1]
            
            if frame_num - last_frame > 10:
                continue
            
            dist = np.sqrt((center_x - last_x)**2 + (center_y - last_y)**2)
            
            if dist < min_distance and dist < MAX_TRACKING_DISTANCE:
                min_distance = dist
                closest_id = obj_id
    
    return closest_id

def calculate_speed(obj_data, current_distance, current_frame):
    """
    Calculate real-world speed using 3D displacement
    """
    if len(obj_data['positions']) < 2:
        return 0.0
    
    positions = obj_data['positions'][-FRAME_RATE_SAMPLES:]
    distances = obj_data['distances'][-FRAME_RATE_SAMPLES:]
    
    if len(positions) < 2:
        return 0.0
    
    first_x, first_y, first_frame = positions[0]
    last_x, last_y, last_frame = positions[-1]
    
    first_distance = distances[0]
    last_distance = distances[-1]
    
    # Convert pixel movement to meters
    avg_distance = (first_distance + last_distance) / 2.0
    meters_per_pixel = avg_distance / FOCAL_LENGTH_PX
    
    lateral_displacement_x = (last_x - first_x) * meters_per_pixel
    lateral_displacement_y = (last_y - first_y) * meters_per_pixel
    depth_displacement = last_distance - first_distance
    
    # Total 3D displacement
    total_displacement = np.sqrt(
        lateral_displacement_x**2 + 
        lateral_displacement_y**2 + 
        depth_displacement**2
    )
    
    frames_elapsed = last_frame - first_frame
    time_elapsed = frames_elapsed * DELTA_TIME
    
    if time_elapsed == 0:
        return 0.0
    
    speed_ms = total_displacement / time_elapsed
    speed_kmh = speed_ms * 3.6
    
    return abs(speed_kmh)

print("\nProcessing video...")
print("Press 'q' to stop early\n")

while True:
    ret, frame = cap.read()
    if not ret:
        break
    
    frame_id += 1
    
    # YOLO detection with improved settings
    results = model(
        frame, 
        conf=CONFIDENCE_THRESHOLD,
        iou=IOU_THRESHOLD,
        imgsz=IMAGE_SIZE,
        verbose=False,
        agnostic_nms=False,
        max_det=300
    )[0]
    
    current_detections = []
    
    # Process detections
    for box in results.boxes:
        x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
        conf = float(box.conf[0])
        cls = int(box.cls[0])
        original_label = results.names[cls]
        
        label = CLASS_REMAP.get(original_label, original_label)
        
        cx = (x1 + x2) / 2
        cy = (y1 + y2) / 2
        bbox_width = x2 - x1
        bbox_height = y2 - y1
        
        if bbox_width < 10 or bbox_height < 10:
            continue
        
        distance = estimate_distance(bbox_width, bbox_height, label, FOCAL_LENGTH_PX)
        
        if distance is None or distance > 200:
            continue
        
        current_detections.append({
            'cx': cx,
            'cy': cy,
            'class': label,
            'original_label': original_label,
            'conf': conf,
            'bbox': (x1, y1, x2, y2),
            'distance': distance,
            'bbox_width': bbox_width,
            'bbox_height': bbox_height
        })
    
    # Track objects and calculate speeds
    for det in current_detections:
        cx, cy = det['cx'], det['cy']
        label = det['class']
        distance = det['distance']
        
        obj_id = find_closest_tracked_object(cx, cy, label, frame_id)
        
        if obj_id is None:
            obj_id = next_object_id
            next_object_id += 1
            tracked_objects[obj_id] = {
                'class': label,
                'positions': [],
                'distances': [],
                'speeds': []
            }
        
        tracked_objects[obj_id]['positions'].append((cx, cy, frame_id))
        tracked_objects[obj_id]['distances'].append(distance)
        
        speed_kmh = calculate_speed(tracked_objects[obj_id], distance, frame_id)
        tracked_objects[obj_id]['speeds'].append(speed_kmh)
        
        det['speed_kmh'] = speed_kmh
        det['object_id'] = obj_id
    
    # Draw detections with distance and speed
    for det in current_detections:
        x1, y1, x2, y2 = det['bbox']
        label = det['class']
        conf = det['conf']
        distance = det['distance']
        speed_kmh = det['speed_kmh']
        
        # Color based on distance
        if distance < 10:
            color = (0, 0, 255)  # Red - close
        elif distance < 25:
            color = (0, 165, 255)  # Orange - medium
        else:
            color = (0, 255, 0)  # Green - far
        
        # Draw bounding box
        cv2.rectangle(frame, (int(x1), int(y1)), (int(x2), int(y2)), color, 3)
        
        # Class label
        label_text = f"{label}: {conf:.2f}"
        (text_width, text_height), _ = cv2.getTextSize(label_text, cv2.FONT_HERSHEY_SIMPLEX, 0.9, 2)
        cv2.rectangle(frame, (int(x1), int(y1)-text_height-10), (int(x1)+text_width+10, int(y1)), color, -1)
        cv2.putText(frame, label_text, (int(x1)+5, int(y1)-5), 
                   cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255, 255, 255), 2)
        
        # Distance
        distance_text = f"Dist: {distance:.1f}m"
        (dist_width, dist_height), _ = cv2.getTextSize(distance_text, cv2.FONT_HERSHEY_SIMPLEX, 0.8, 2)
        cv2.rectangle(frame, (int(x1), int(y2)+5), (int(x1)+dist_width+10, int(y2)+dist_height+15), (255, 165, 0), -1)
        cv2.putText(frame, distance_text, (int(x1)+5, int(y2)+dist_height+10), 
                   cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2)
        
        # Speed (only if moving)
        if speed_kmh > 1.0:
            speed_text = f"Speed: {speed_kmh:.1f} km/h"
            (speed_width, speed_height), _ = cv2.getTextSize(speed_text, cv2.FONT_HERSHEY_SIMPLEX, 0.8, 2)
            cv2.rectangle(frame, (int(x1), int(y2)+dist_height+20), 
                         (int(x1)+speed_width+10, int(y2)+dist_height+speed_height+30), (0, 0, 255), -1)
            cv2.putText(frame, speed_text, (int(x1)+5, int(y2)+dist_height+speed_height+25), 
                       cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2)
        
        # Store data
        detection_data.append({
            'frame': frame_id,
            'time_sec': frame_id / fps,
            'object_id': det['object_id'],
            'class': label,
            'original_class': det['original_label'],
            'confidence': conf,
            'x1': x1,
            'y1': y1,
            'x2': x2,
            'y2': y2,
            'center_x': det['cx'],
            'center_y': det['cy'],
            'distance_meters': distance,
            'speed_kmh': speed_kmh,
            'bbox_width': det['bbox_width'],
            'bbox_height': det['bbox_height']
        })
    
    # Frame info
    cv2.rectangle(frame, (5, 5), (450, 95), (0, 0, 0), -1)
    cv2.putText(frame, f"Frame: {frame_id}/{total_frames}", (10, 30), 
               cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2)
    cv2.putText(frame, f"Objects: {len(current_detections)}", (10, 60), 
               cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2)
    cv2.putText(frame, f"Tracked: {len(tracked_objects)}", (10, 90), 
               cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2)
    
    # Write output
    out.write(frame)
    
    # Display
    display_frame = cv2.resize(frame, (display_width, display_height))
    cv2.imshow("Object Detection - Distance & Speed (Press Q to quit)", display_frame)
    if cv2.waitKey(1) & 0xFF == ord('q'):
        print("\nStopped by user")
        break
    
    # Progress
    if frame_id % 30 == 0:
        progress = (frame_id / total_frames) * 100
        num_detections = len(current_detections)
        if num_detections > 0:
            avg_dist = np.mean([d['distance'] for d in current_detections])
            avg_conf = np.mean([d['conf'] for d in current_detections])
            print(f"Progress: {progress:.1f}% ({frame_id}/{total_frames}) - Objects: {num_detections} - Avg Conf: {avg_conf:.2f} - Avg Dist: {avg_dist:.1f}m")
        else:
            print(f"Progress: {progress:.1f}% ({frame_id}/{total_frames})")

# Cleanup
cap.release()
out.release()
cv2.destroyAllWindows()

# Save CSV
df = pd.DataFrame(detection_data)
df.to_csv(output_csv, index=False)

# Summary
print("\n" + "="*70)
print("‚úì Video processing complete!")
print("="*70)
print(f"Total frames processed: {frame_id}")
print(f"Total detections: {len(detection_data)}")
print(f"\nOutput files:")
print(f"  Video: {output_video.absolute()}")
print(f"  CSV:   {output_csv.absolute()}")

if len(detection_data) > 0:
    print(f"\nDetection Summary by Class:")
    class_summary = df.groupby('class').agg({
        'object_id': 'count',
        'confidence': 'mean',
        'distance_meters': 'mean',
        'speed_kmh': lambda x: x[x > 1.0].mean() if len(x[x > 1.0]) > 0 else 0
    }).round(2)
    class_summary.columns = ['Detections', 'Avg Confidence', 'Avg Distance (m)', 'Avg Speed (km/h)']
    print(class_summary.to_string())
    
    print(f"\nOverall Stats:")
    print(f"  Average confidence: {df['confidence'].mean():.3f}")
    print(f"  Average distance: {df['distance_meters'].mean():.1f} meters")
    print(f"  Closest object: {df['distance_meters'].min():.1f} meters")
    print(f"  Farthest object: {df['distance_meters'].max():.1f} meters")
    
    moving_objects = df[df['speed_kmh'] > 1.0]
    if len(moving_objects) > 0:
        print(f"  Average speed (moving): {moving_objects['speed_kmh'].mean():.1f} km/h")
        print(f"  Max speed: {moving_objects['speed_kmh'].max():.1f} km/h")
    
    print(f"\nTracked Objects: {len(tracked_objects)}")

print("="*70)
print("\nüìè Distance Estimation: Pinhole Camera Model")
print(f"Current FOCAL_LENGTH_PX = {FOCAL_LENGTH_PX}")
print("Calibrate for accurate results - see README for instructions")
print("="*70)


Using device: cuda
GPU: NVIDIA GeForce RTX 3060
‚úì Loaded YOLO model from: f:/skills-copilot-codespaces-vscode/thesis/runs/detect/rsud20k_yolo114/weights/best.pt
Model classes: {0: 'person', 1: 'rickshaw', 2: 'rickshaw_van', 3: 'auto_rickshaw', 4: 'truck', 5: 'pickup_truck', 6: 'private_car', 7: 'motorcycle', 8: 'bicycle', 9: 'bus', 10: 'micro_bus', 11: 'covered_van', 12: 'human_hauler'}

Camera calibration:
  Focal length: 1000 pixels
  Speed averaging: 5 frames

Video Info:
  Resolution: 640x360
  FPS: 30
  Total Frames: 102550
  Duration: 3418.3 seconds

Detection settings:
  Confidence: 0.15
  IoU: 0.3
  Image size: 1280px

Processing video...
Press 'q' to stop early

‚úì Loaded YOLO model from: f:/skills-copilot-codespaces-vscode/thesis/runs/detect/rsud20k_yolo114/weights/best.pt
Model classes: {0: 'person', 1: 'rickshaw', 2: 'rickshaw_van', 3: 'auto_rickshaw', 4: 'truck', 5: 'pickup_truck', 6: 'private_car', 7: 'motorcycle', 8: 'bicycle', 9: 'bus', 10: 'micro_bus', 11: 'covered_