In [None]:
from google.colab import drive

drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [None]:
ROOT_DIR = '/content/gdrive/My Drive/Yolov8'

In [None]:
!pip install ultralytics
!pip install deep-sort-realtime
!pip install filterpy
!pip install tensorflow

# Object Tracking with Adaptive Tracker Switching

This notebook implements an object tracking system that utilizes YOLOv8 with custom and sorted dataset for object detection and employs adaptive switching between different tracking algorithms (Simple, DeepSORT, Particle Filter) to enhance tracking performance.

## Functionality:

1. **Object Detection:**
   - The system uses a pre-trained YOLOv8 model (`yolov8n.pt`) to detect objects in video frames.
   - Detections are represented by bounding boxes and confidence scores.

2. **Tracking Algorithms:**
   - **Simple Tracker:** A basic tracker that assigns unique IDs to detected objects and updates their positions in subsequent frames.
   - **DeepSORT Tracker:** A more sophisticated tracker that utilizes deep learning features and a Kalman filter for robust tracking and re-identification of objects.
   - **Particle Filter Tracker:** Intended to provide an alternative tracking approach for potentially challenging scenarios using a Kalman filter.

3. **Adaptive Switching:**
   - The system continuously monitors the quality of tracking using a heuristic function (`tracking_quality_low`).
   - If the tracking quality falls below a certain threshold, the system automatically switches to a more robust tracker (from Simple to DeepSORT to Particle Filter).
   - Hysteresis is implemented to prevent rapid and unstable switching.

4. **Visualization:**
   - Tracked objects are visualized by drawing bounding boxes and displaying their IDs on the video frames.

## Code Structure:

- **`SimpleTracker` Class:** Implements the basic tracking logic.
- **`DeepSORTTracker` Class:** Integrates DeepSORT for robust tracking.
- **`ParticleFilterTracker` Class:** Provides an alternative tracking approach using a Kalman filter.
- **`tracking_quality_low` Function:** Evaluates the quality of tracking based on various metrics.
- **`track_video` Function:** Orchestrates the entire tracking process, including object detection, tracker updates, and visualization.

## Usage:

1. There is already a video, in order to use it, it needs access to local google drive with the location of the video.
2. Execute the notebook cells to run the object tracking system.

## Limitations:

- The particle filter implementation might need further refinement.
- The tracking quality metric could be fine-tuned for better performance.
- More comprehensive visualization and quantitative evaluation would be beneficial.

## Future Enhancements:

- Implement a true particle filter to account for non-linear systems and non-gaussian noise.
- Explore more advanced tracking quality metrics and switching strategies.
- Add visualizations and quantitative evaluation using metrics like MOTA, MOTP, and IDF1.
- Enhance documentation and add comments for better readability.


In [None]:
import os
import cv2
import numpy as np
from ultralytics import YOLO
from deep_sort_realtime.deepsort_tracker import DeepSort
from filterpy.kalman import KalmanFilter
import tensorflow as tf
from google.colab.patches import cv2_imshow

# Load a model
model = YOLO('yolov8n.pt')

# Train the model
results = model.train(data='coco8.yaml', epochs=50, imgsz=640)

# Evaluate the model
results = model.val()

# Feature Extraction Hook
features = []

def hook_fn(module, input, output):
    features.append(output)

# Register hook for feature extraction
layer_to_hook = model.model.model[2]
hook = layer_to_hook.register_forward_hook(hook_fn)

# Helper function to format YOLO detections
def format_detections(results):
    formatted_detections = []
    for det in results:
        x1, y1, x2, y2, conf = det[:5]
        formatted_detections.append([x1, y1, x2, y2, conf])
    return np.array(formatted_detections)

# SimpleTracker implementation
class SimpleTracker:
    def __init__(self):
        self.tracks = {}
        self.next_id = 0

    def update(self, detections):
        updated_tracks = []
        for det in detections:
            bbox = det[:4]
            track_id = self.next_id
            self.tracks[track_id] = bbox
            self.next_id += 1
            updated_tracks.append({"id": track_id, "bbox": bbox})
        return updated_tracks

# DeepSORT Tracker
class DeepSORTTracker:
    def __init__(self):
        self.tracker = DeepSort(max_age=30, n_init=3)

    def update(self, detections, features):
        return self.tracker.update_tracks(detections, features)

# Particle Filter Tracker
class ParticleFilterTracker:
    def __init__(self):
        self.filters = {}
        self.next_id = 0

    def create_particle_filter(self):
        pf = KalmanFilter(dim_x=8, dim_z=4)  # [x, y, w, h, vx, vy, vw, vh]
        pf.x = np.zeros(8)  # Initial state
        pf.P *= 10  # Covariance matrix
        pf.R *= 10  # Measurement noise
        return pf

    def update(self, detections):
        updated_tracks = []
        for det in detections:
            track_id = self.next_id
            if track_id not in self.filters:
                self.filters[track_id] = self.create_particle_filter()
            pf = self.filters[track_id]
            pf.predict()
            pf.update(np.array(det[:4]))  # Measurement update
            self.next_id += 1
            updated_tracks.append({"id": track_id, "bbox": det[:4]})
        return updated_tracks

# Helper function to calculate tracking quality metrics
def calculate_quality_score(tracked_objects, previous_objects):
    # Example metrics: average tracking duration, bounding box overlap, etc.
    avg_duration = np.mean([obj['id'] for obj in tracked_objects]) if tracked_objects else 0
    overlap = 0  # Placeholder for overlap calculation logic
    return avg_duration + overlap

# Helper function to determine tracking quality with hysteresis
def tracking_quality_low(tracked_objects, previous_objects, tracker_type_history):
    quality_score = calculate_quality_score(tracked_objects, previous_objects)
    quality_threshold = 5  # Adjust as needed
    return quality_score < quality_threshold

# Main function for video processing and tracking
def track_video(video_path):
    if not os.path.exists(video_path):
        raise FileNotFoundError(f"Video file not found: {video_path}")

    cap = cv2.VideoCapture(video_path)

    simple_tracker = SimpleTracker()
    deepsort_tracker = DeepSORTTracker()
    particle_filter_tracker = ParticleFilterTracker()

    # Start with the SimpleTracker
    current_tracker = simple_tracker
    tracker_type = "simple"

    previous_objects = []
    tracker_type_history = []
    low_quality_frames = 0
    quality_threshold_frames = 5

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        # YOLOv8 detection
        results = model(frame)[0]
        detections = results.boxes.data.cpu().numpy()

        # Clear and extract features
        features.clear()
        _ = model(frame)

        # Update tracker based on type
        if tracker_type == "simple":
            tracked_objects = current_tracker.update(detections)
            if tracking_quality_low(tracked_objects, previous_objects, tracker_type_history):
                low_quality_frames += 1
                if low_quality_frames >= quality_threshold_frames:
                    current_tracker = deepsort_tracker
                    tracker_type = "deepsort"
                    low_quality_frames = 0
        elif tracker_type == "deepsort":
            formatted_detections = format_detections(detections)
            tracked_objects = current_tracker.update(formatted_detections, features)
            if tracking_quality_low(tracked_objects, previous_objects, tracker_type_history):
                low_quality_frames += 1
                if low_quality_frames >= quality_threshold_frames:
                    current_tracker = particle_filter_tracker
                    tracker_type = "particle"
                    low_quality_frames = 0
        else:  # Particle Filter
            tracked_objects = current_tracker.update(detections)

        # Update previous objects
        previous_objects = tracked_objects

        # Visualize results
        for obj in tracked_objects:
            x1, y1, x2, y2 = map(int, obj['bbox'])
            cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
            cv2.putText(frame, f"ID: {obj['id']}", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 0, 255), 2)

        cv2_imshow(frame)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    cap.release()
    cv2.destroyAllWindows()

# Example usage
video_path = '/content/gdrive/My Drive/Yolov8/test.mp4'
track_video(video_path)