# YOLO Detection

In [7]:
import cv2
import json
from ultralytics import YOLO
import numpy as np

def run_yolo_detection(video_path, output_json_path, model_path='yolov8n.pt'):
    """
    Run YOLO detection on video and save results in JSON format
    
    Args:
        video_path: Path to input video
        output_json_path: Path to save JSON detections
        model_path: Path to YOLO model (or model name)
    """
    
    # Load YOLO model
    model = YOLO(model_path)
    
    # Open video
    cap = cv2.VideoCapture(video_path)
    
    all_detections = {}
    frame_idx = 0
    
    print("Processing video with YOLO...")
    
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        
        # Run YOLO detection
        results = model(frame, verbose=False)
        
        # Extract person detections (class 0 is person in COCO)
        frame_detections = []
        
        for result in results:
            boxes = result.boxes
            if boxes is not None:
                for box in boxes:
                    # Filter for person class (class 0) and confidence > 0.5
                    if box.cls.item() == 0 and box.conf.item() > 0.5:
                        # Get bounding box coordinates
                        x1, y1, x2, y2 = box.xyxy[0].tolist()
                        frame_detections.append([x1, y1, x2, y2])
        
        # Store detections for this frame
        all_detections[str(frame_idx)] = frame_detections
        
        if frame_idx % 30 == 0:
            print(f"Processed {frame_idx} frames")
        
        frame_idx += 1
    
    cap.release()
    
    # Save detections to JSON
    with open(output_json_path, 'w') as f:
        json.dump(all_detections, f, indent=2)
    
    print(f"YOLO detection complete! Saved to {output_json_path}")
    print(f"Total frames processed: {frame_idx}")
    
    return all_detections

# Usage example
if __name__ == "__main__":
    video_path = "15sec_input_720p.mp4"
    detections_output = "yolo_detections.json"
    
    # Run YOLO detection
    detections = run_yolo_detection(video_path, detections_output)
    
    print("Now you can run the ReID tracking system!")

Downloading https://github.com/ultralytics/assets/releases/download/v8.3.0/yolov8n.pt to 'yolov8n.pt'...


100%|██████████| 6.25M/6.25M [00:05<00:00, 1.29MB/s]


Processing video with YOLO...
Processed 0 frames
Processed 30 frames
Processed 60 frames
Processed 90 frames
Processed 120 frames
Processed 150 frames
Processed 180 frames
Processed 210 frames
Processed 240 frames
Processed 270 frames
Processed 300 frames
Processed 330 frames
Processed 360 frames
YOLO detection complete! Saved to yolo_detections.json
Total frames processed: 375
Now you can run the ReID tracking system!


# Player Re-ID Class using ViT


In [8]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import cv2
import numpy as np
from transformers import ViTModel, ViTFeatureExtractor
from collections import defaultdict, deque
import json
import os

class PlayerReID:
    """
    Simple but effective player re-ID using pre-trained ViT.
    Optimized for GPU execution.
    """
    def __init__(self, device='cuda' if torch.cuda.is_available() else 'cpu'):
        self.device = device

        # Load pre-trained ViT model and move to device
        self.feature_extractor = ViTFeatureExtractor.from_pretrained('google/vit-base-patch16-224')
        self.model = ViTModel.from_pretrained('google/vit-base-patch16-224')
        self.model.to(self.device)
        self.model.eval()

        self.player_features = defaultdict(deque)
        self.player_metadata = {}
        self.next_id = 0

        # Configuration parameters by optuna
        self.max_history = 36
        self.similarity_threshold = 0.414 
        self.max_disappeared = 54

        self.last_positions = {}
        self.disappeared_count = defaultdict(int)

    def extract_features(self, image_crop):
        """Extract features from player crop using pre-trained ViT"""
        if image_crop.size == 0:
            return None

        image_crop = cv2.resize(image_crop, (224, 224))
        image_crop = cv2.cvtColor(image_crop, cv2.COLOR_BGR2RGB)

        inputs = self.feature_extractor(images=image_crop, return_tensors="pt")
        inputs = {k: v.to(self.device) for k, v in inputs.items()}

        with torch.no_grad():
            outputs = self.model(**inputs)
            features = outputs.last_hidden_state[:, 0, :]  # CLS token
            features = F.normalize(features, p=2, dim=1)

        return features.cpu().numpy().flatten()

    def compute_similarity(self, feat1, feat2):
        """Compute cosine similarity using GPU-enabled PyTorch"""
        feat1 = torch.tensor(feat1, device=self.device)
        feat2 = torch.tensor(feat2, device=self.device)
        similarity = F.cosine_similarity(feat1.unsqueeze(0), feat2.unsqueeze(0))
        return similarity.item()

    def get_temporal_features(self, player_id):
        """Aggregate historical features using weighted average on GPU"""
        if player_id not in self.player_features:
            return None

        features_list = list(self.player_features[player_id])
        if not features_list:
            return None

        weights = np.exp(np.linspace(-1, 0, len(features_list)))
        weights = weights / weights.sum()

        features_tensor = torch.tensor(features_list, dtype=torch.float32, device=self.device)
        weights_tensor = torch.tensor(weights, dtype=torch.float32, device=self.device).unsqueeze(1)

        temporal_features = torch.sum(features_tensor * weights_tensor, dim=0)
        temporal_features = F.normalize(temporal_features, p=2, dim=0)

        return temporal_features.cpu().numpy()

    def motion_consistency_check(self, player_id, current_bbox):
        """Check if detection matches expected player motion"""
        if player_id not in self.last_positions:
            return True

        last_bbox = self.last_positions[player_id]
        last_center = ((last_bbox[0] + last_bbox[2]) / 2, (last_bbox[1] + last_bbox[3]) / 2)
        current_center = ((current_bbox[0] + current_bbox[2]) / 2, (current_bbox[1] + current_bbox[3]) / 2)

        distance = np.sqrt((current_center[0] - last_center[0]) ** 2 + (current_center[1] - last_center[1]) ** 2)
        max_movement = 100  # pixels

        return distance <= max_movement

    def find_best_match(self, query_features, current_bbox):
        """Find the best matching player based on similarity and motion"""
        if query_features is None:
            return None, 0.0

        best_id = None
        best_similarity = 0.0

        for player_id in self.player_features:
            if self.disappeared_count[player_id] > self.max_disappeared:
                continue

            temporal_features = self.get_temporal_features(player_id)
            if temporal_features is None:
                continue

            similarity = self.compute_similarity(query_features, temporal_features)

            if not self.motion_consistency_check(player_id, current_bbox):
                similarity *= 0.5

            if similarity > best_similarity:
                best_similarity = similarity
                best_id = player_id

        return best_id, best_similarity

    def update_player(self, player_id, features, bbox):
        """Update player database with new features and position"""
        self.player_features[player_id].append(features)
        if len(self.player_features[player_id]) > self.max_history:
            self.player_features[player_id].popleft()

        self.last_positions[player_id] = bbox
        self.disappeared_count[player_id] = 0

        if player_id not in self.player_metadata:
            self.player_metadata[player_id] = {'first_seen': 0, 'total_detections': 0}
        self.player_metadata[player_id]['total_detections'] += 1

    def process_frame(self, frame, detections, frame_idx):
        """Process single frame and assign player IDs"""
        player_ids = []

        for player_id in self.player_features:
            self.disappeared_count[player_id] += 1

        for bbox in detections:
            x1, y1, x2, y2 = bbox
            player_crop = frame[int(y1):int(y2), int(x1):int(x2)]

            features = self.extract_features(player_crop)
            if features is None:
                player_ids.append(-1)
                continue

            best_id, similarity = self.find_best_match(features, bbox)

            if best_id is not None and similarity > self.similarity_threshold:
                self.update_player(best_id, features, bbox)
                player_ids.append(best_id)
            else:
                new_id = self.next_id
                self.next_id += 1
                self.update_player(new_id, features, bbox)
                self.player_metadata[new_id]['first_seen'] = frame_idx
                player_ids.append(new_id)

        return player_ids

    def cleanup_old_players(self):
        """Remove stale player tracks"""
        to_remove = [pid for pid, count in self.disappeared_count.items() if count > self.max_disappeared * 2]
        for pid in to_remove:
            del self.player_features[pid]
            del self.disappeared_count[pid]
            self.last_positions.pop(pid, None)

    def save_results(self, output_path):
        """Save player tracking results"""
        results = {
            'player_metadata': self.player_metadata,
            'total_players': self.next_id,
            'configuration': {
                'similarity_threshold': self.similarity_threshold,
                'max_history': self.max_history,
                'max_disappeared': self.max_disappeared
            }
        }
        with open(output_path, 'w') as f:
            json.dump(results, f, indent=2)


def process_video_with_yolo_detections(video_path, detections_file, output_video_path):
    """
    Process video with YOLO detections and track players using re-ID
    """
    with open(detections_file, 'r') as f:
        all_detections = json.load(f)

    reid_system = PlayerReID()

    cap = cv2.VideoCapture(video_path)
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))

    frame_idx = 0

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        frame_detections = all_detections.get(str(frame_idx), [])
        player_ids = reid_system.process_frame(frame, frame_detections, frame_idx)

        for detection, player_id in zip(frame_detections, player_ids):
            x1, y1, x2, y2 = detection
            color = (0, 255, 0) if player_id != -1 else (0, 0, 255)
            cv2.rectangle(frame, (int(x1), int(y1)), (int(x2), int(y2)), color, 2)
            if player_id != -1:
                cv2.putText(frame, f'Player {player_id}', (int(x1), int(y1) - 10),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2)

        out.write(frame)

        if frame_idx % 30 == 0:
            reid_system.cleanup_old_players()
            print(f"Processed {frame_idx} frames")

        frame_idx += 1

    cap.release()
    out.release()
    reid_system.save_results('tracking_results.json')
    print(f"Total unique players detected: {reid_system.next_id}")
    return reid_system


# Main Pipeline

In [9]:
# Example usage
if __name__ == "__main__":
        # GPU optimization checks
    print("GPU Optimization Checks:")
    print(f"PyTorch version: {torch.__version__}")
    print(f"CUDA available: {torch.cuda.is_available()}")
    if torch.cuda.is_available():
        print(f"CUDA version: {torch.version.cuda}")
        print(f"GPU device: {torch.cuda.get_device_name(0)}")
        print(f"GPU memory: {torch.cuda.get_device_properties(0).total_memory / 1024**3:.1f} GB")
    
    video_path = "15sec_input_720p.mp4"
    # detections_file = "tracking_results.json"  # You need to create this
    detections_file = "yolo_detections.json"  # You need to create this
    output_video_path = "tracked_output_new.mp4"
    
    # Process video
    reid_system = process_video_with_yolo_detections(
        video_path, detections_file, output_video_path
    )
    
    print("Processing complete!")
    print(f"Results saved to: tracking_results.json")
    print(f"Output video saved to: {output_video_path}")

GPU Optimization Checks:
PyTorch version: 2.5.1+cu121
CUDA available: True
CUDA version: 12.1
GPU device: NVIDIA GeForce RTX 3050 6GB Laptop GPU
GPU memory: 6.0 GB


Some weights of ViTModel were not initialized from the model checkpoint at google/vit-base-patch16-224 and are newly initialized: ['pooler.dense.bias', 'pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Processed 0 frames
Processed 30 frames
Processed 60 frames
Processed 90 frames
Processed 120 frames
Processed 150 frames
Processed 180 frames
Processed 210 frames
Processed 240 frames
Processed 270 frames
Processed 300 frames
Processed 330 frames
Processed 360 frames
Total unique players detected: 38
Processing complete!
Results saved to: tracking_results.json
Output video saved to: tracked_output_new.mp4
