# CCTV Person Detection and Tracking System
# Optimized for Large-Scale Video Processing with Re-identification

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
ffmpeg is already the newest version (7:4.4.2-0ubuntu0.22.04.1).
0 upgraded, 0 newly installed, 0 to remove and 35 not upgraded.


In [1]:
## Installation and Setup

# Install required packages
!pip install ultralytics
!pip install deep-sort-realtime
!pip install opencv-python
!pip install torch torchvision torchaudio
!pip install numpy pandas
!pip install tqdm

Collecting ultralytics
  Downloading ultralytics-8.3.176-py3-none-any.whl.metadata (37 kB)
Collecting ultralytics-thop>=2.0.0 (from ultralytics)
  Downloading ultralytics_thop-2.0.15-py3-none-any.whl.metadata (14 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch>=1.8.0->ultralytics)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch>=1.8.0->ultralytics)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch>=1.8.0->ultralytics)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch>=1.8.0->ultralytics)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.4.5.8 (from torch>=1.8.0->ultralytics)
  Downloading nv

In [2]:
import sqlite3

In [3]:
import cv2
import numpy as np
import torch
import sqlite3
import json
import time
from datetime import datetime, timedelta
from pathlib import Path
from collections import defaultdict, deque
from typing import Dict, List, Tuple, Optional
import threading
import multiprocessing as mp
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import gc

In [4]:
# Import YOLO and tracking libraries
from ultralytics import YOLO
from deep_sort_realtime.deepsort_tracker import DeepSort

print("All packages installed successfully!")
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"GPU: {torch.cuda.get_device_name(0)}")
    print(f"CUDA version: {torch.version.cuda}")

Creating new Ultralytics Settings v0.0.6 file ✅ 
View Ultralytics Settings with 'yolo settings' or at '/root/.config/Ultralytics/settings.json'
Update Settings with 'yolo settings key=value', i.e. 'yolo settings runs_dir=path/to/dir'. For help see https://docs.ultralytics.com/quickstart/#ultralytics-settings.
All packages installed successfully!
CUDA available: True
GPU: Tesla T4
CUDA version: 12.4


In [13]:
VIDEO_PATH = "/content/store-aisle-detection.mp4"
OUTPUT_PATH= "/content/tracked_output.mp4"

 Core Classes and Functions

In [17]:

class PersonDetectionTracker:
    def __init__(self,
                 model_path: str = "yolov8n.pt",
                 tracking_algorithm: str = "deepsort",
                 device: str = "cuda" if torch.cuda.is_available() else "cpu",
                 confidence_threshold: float = 0.5,
                 batch_size: int = 8):

        self.device = device
        self.confidence_threshold = confidence_threshold
        self.batch_size = batch_size

        # Initialize YOLO model
        print(f"Loading YOLO model on {device}...")
        self.model = YOLO(model_path)
        self.model.to(device)

        # Initialize tracker
        if tracking_algorithm.lower() == "deepsort":
            self.tracker = DeepSort(
                max_age=60,
                n_init=3,
                max_cosine_distance=0.2,
                nn_budget=100,
                embedder="mobilenet"
            )

        # Track management
        self.active_tracks = {}
        self.track_history = defaultdict(list)
        self.person_timestamps = defaultdict(list)

        # Performance monitoring
        self.processing_times = []
        self.frame_count = 0

    def detect_persons(self, frames: List[np.ndarray]) -> List:
        """Batch detection of persons in frames"""
        if not frames:
            return []

        # Run batch inference
        results = self.model(frames,
                           classes=[0],  # Person class only
                           conf=self.confidence_threshold,
                           device=self.device,
                           verbose=False)

        return results

    def track_persons(self, frame: np.ndarray, detections) -> Tuple[List, np.ndarray]:
        """Track detected persons and return track info"""
        if detections is None or len(detections.boxes) == 0:
            return [], frame

        # Convert YOLO detections to DeepSORT format
        detection_list = []
        for box in detections.boxes:
            x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
            conf = box.conf[0].cpu().numpy()
            detection_list.append(([x1, y1, x2-x1, y2-y1], conf))

        # Update tracker
        tracks = self.tracker.update_tracks(detection_list, frame=frame)

        # Draw tracks and collect info
        track_info = []
        for track in tracks:
            if track.is_confirmed():
                track_id = track.track_id
                ltrb = track.to_ltrb()

                track_info.append({
                    'id': track_id,
                    'bbox': ltrb,
                    'timestamp': time.time()
                })

                # Draw bounding box and ID
                x1, y1, x2, y2 = map(int, ltrb)
                cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
                cv2.putText(frame, f'ID: {track_id}',
                           (x1, y1-10), cv2.FONT_HERSHEY_SIMPLEX,
                           0.9, (0, 255, 0), 2)

        return track_info, frame

In [18]:
class VideoProcessor:
    def __init__(self, detector_tracker: PersonDetectionTracker):
        self.detector_tracker = detector_tracker
        self.database = PersonDatabase()

    def process_video_optimized(self,
                              video_path: str,
                              output_path: Optional[str] = None,
                              frame_skip: int = 1,
                              max_workers: int = 4) -> Dict:
        """
        Optimized video processing with multi-threading and batch processing
        """
        print(f"Processing video: {video_path}")

        cap = cv2.VideoCapture(video_path)

        # Get video properties
        fps = int(cap.get(cv2.CAP_PROP_FPS))
        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

        print(f"Video properties: {width}x{height}, {fps} FPS, {total_frames} frames")

        # Setup video writer if output path is provided
        out = None
        if output_path:
            fourcc = cv2.VideoWriter_fourcc(*'mp4v')
            out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

        # Processing variables
        frame_buffer = []
        results = {
            'total_persons_detected': 0,
            'unique_persons': set(),
            'person_timestamps': defaultdict(list),
            'processing_time': 0,
            'fps_achieved': 0
        }

        start_time = time.time()
        frame_idx = 0

        # Process video in batches
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            while True:
                ret, frame = cap.read()
                if not ret:
                    break

                # Skip frames if needed
                if frame_idx % (frame_skip + 1) != 0:
                    frame_idx += 1
                    continue

                frame_buffer.append((frame.copy(), frame_idx))

                # Process batch when buffer is full
                if len(frame_buffer) >= self.detector_tracker.batch_size:
                    self._process_frame_batch(frame_buffer, results, out, cap, frame_idx)
                    frame_buffer = []

                frame_idx += 1

                # Progress update
                if frame_idx % (fps * 10) == 0:  # Every 10 seconds
                    elapsed = time.time() - start_time
                    progress = (frame_idx / total_frames) * 100
                    est_remaining = (elapsed / progress * 100) - elapsed if progress > 0 else 0
                    print(f"Progress: {progress:.1f}%, "
                          f"Elapsed: {elapsed:.1f}s, "
                          f"ETA: {est_remaining:.1f}s")

        # Process remaining frames
        if frame_buffer:
            self._process_frame_batch(frame_buffer, results, out, cap, frame_idx)

        # Cleanup
        cap.release()
        if out:
            out.release()

        # Calculate final statistics
        end_time = time.time()
        total_time = end_time - start_time
        results['processing_time'] = total_time
        results['fps_achieved'] = frame_idx / total_time
        results['unique_persons'] = len(results['unique_persons'])

        print(f"Processing completed!")
        print(f"Total time: {total_time:.2f}s")
        print(f"FPS achieved: {results['fps_achieved']:.2f}")
        print(f"Unique persons detected: {results['unique_persons']}")

        return results

    def _process_frame_batch(self, frame_buffer: List, results: Dict,
                           out, cap, current_frame_idx):
        """Process a batch of frames"""
        frames = [f[0] for f in frame_buffer]
        frame_indices = [f[1] for f in frame_buffer]

        # Batch detection
        detections_batch = self.detector_tracker.detect_persons(frames)

        # Process each frame with tracking
        for i, (frame, frame_idx) in enumerate(frame_buffer):
            if i < len(detections_batch):
                detections = detections_batch[i]

                # Get timestamp for this frame
                timestamp = self._frame_to_timestamp(frame_idx, cap)

                # Track persons
                tracks, processed_frame = self.detector_tracker.track_persons(
                    frame, detections)

                # Update results
                for track in tracks:
                    person_id = track['id']
                    results['unique_persons'].add(person_id)
                    results['person_timestamps'][person_id].append({
                        'timestamp': timestamp,
                        'frame_idx': frame_idx,
                        'bbox': track['bbox']
                    })
                    results['total_persons_detected'] += 1

                # Write frame if output is specified
                if out:
                    out.write(processed_frame)

                # Store in database
                self.database.store_detections(tracks, timestamp, frame_idx)

    def _frame_to_timestamp(self, frame_idx: int, cap) -> str:
        """Convert frame index to timestamp"""
        fps = cap.get(cv2.CAP_PROP_FPS)
        seconds = frame_idx / fps
        return str(timedelta(seconds=seconds))

In [19]:
class PersonDatabase:
    def __init__(self, db_path: str = "person_tracking.db"):
        self.db_path = db_path
        self.init_database()

    def init_database(self):
        """Initialize SQLite database"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        cursor.execute('''
            CREATE TABLE IF NOT EXISTS person_detections (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                person_id INTEGER NOT NULL,
                timestamp TEXT NOT NULL,
                frame_idx INTEGER NOT NULL,
                bbox_x1 REAL NOT NULL,
                bbox_y1 REAL NOT NULL,
                bbox_x2 REAL NOT NULL,
                bbox_y2 REAL NOT NULL,
                confidence REAL DEFAULT 1.0
            )
        ''')

        cursor.execute('''
            CREATE INDEX IF NOT EXISTS idx_person_id ON person_detections(person_id)
        ''')

        cursor.execute('''
            CREATE INDEX IF NOT EXISTS idx_timestamp ON person_detections(timestamp)
        ''')

        conn.commit()
        conn.close()

    def store_detections(self, tracks: List, timestamp: str, frame_idx: int):
        """Store detection results in database"""
        if not tracks:
            return

        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        for track in tracks:
            cursor.execute('''
                INSERT INTO person_detections
                (person_id, timestamp, frame_idx, bbox_x1, bbox_y1, bbox_x2, bbox_y2)
                VALUES (?, ?, ?, ?, ?, ?, ?)
            ''', (
                track['id'],
                timestamp,
                frame_idx,
                track['bbox'][0],
                track['bbox'][1],
                track['bbox'][2],
                track['bbox'][3]
            ))

        conn.commit()
        conn.close()

    def get_person_appearances(self, person_id: int) -> List:
        """Get all appearances of a specific person"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        cursor.execute('''
            SELECT timestamp, frame_idx, bbox_x1, bbox_y1, bbox_x2, bbox_y2
            FROM person_detections
            WHERE person_id = ?
            ORDER BY frame_idx
        ''', (person_id,))

        results = cursor.fetchall()
        conn.close()

        return results

    def get_unique_persons(self) -> List:
        """Get list of all unique person IDs"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        cursor.execute('SELECT DISTINCT person_id FROM person_detections')
        results = [row[0] for row in cursor.fetchall()]

        conn.close()
        return results

In [20]:
def optimize_for_gpu():
    """Apply GPU optimizations"""
    if torch.cuda.is_available():
        # Set memory allocation strategy
        torch.cuda.empty_cache()

        # Enable mixed precision if supported
        torch.backends.cudnn.benchmark = True
        torch.backends.cudnn.deterministic = False

        print("GPU optimizations applied")
    else:
        print("CUDA not available - running on CPU")

In [22]:
## Main Processing Function
#option1

def main_processing_pipeline(video_path: str,
                           output_path: Optional[str] = None,
                           model_size: str = "n",  # n, s, m, l, x
                           tracking_algorithm: str = "deepsort"):
    """
    Main processing pipeline for CCTV person detection and tracking
    """

    print("=" * 60)
    print("CCTV Person Detection and Tracking System")
    print("=" * 60)

    # Apply optimizations
    optimize_for_gpu()

    # Initialize detector and tracker
    model_path = f"yolov8{model_size}.pt"
    detector = PersonDetectionTracker(
        model_path=model_path,
        tracking_algorithm=tracking_algorithm,
        confidence_threshold=0.5,
        batch_size=8 if torch.cuda.is_available() else 4
    )

    # Initialize video processor
    processor = VideoProcessor(detector)

    # Process video
    results = processor.process_video_optimized(
        video_path=video_path,
        output_path=output_path,
        frame_skip=0,  # Process every frame
        max_workers=4
    )

    # Generate report
    generate_tracking_report(results, processor.database)

    return results

def generate_tracking_report(results: Dict, database: PersonDatabase):
    """Generate a comprehensive tracking report"""

    print("\n" + "=" * 40)
    print("TRACKING REPORT")
    print("=" * 40)

    print(f"Processing Time: {results['processing_time']:.2f} seconds")
    print(f"FPS Achieved: {results['fps_achieved']:.2f}")
    print(f"Total Person Detections: {results['total_persons_detected']}")
    print(f"Unique Persons: {results['unique_persons']}")

    # Get detailed person information
    unique_persons = database.get_unique_persons()

    print(f"\nDetailed Person Tracking:")
    print("-" * 40)

    for person_id in unique_persons[:10]:  # Show first 10 persons
        appearances = database.get_person_appearances(person_id)
        print(f"Person ID {person_id}: {len(appearances)} appearances")

        if appearances:
            first_seen = appearances[0][0]
            last_seen = appearances[-1][0]
            print(f"  First seen: {first_seen}")
            print(f"  Last seen: {last_seen}")
            print(f"  Duration: {len(appearances)} frames")
        print()

    # Save detailed results to JSON
    report_data = {
        'summary': results,
        'person_details': {}
    }

    for person_id in unique_persons:
        appearances = database.get_person_appearances(person_id)
        report_data['person_details'][person_id] = {
            'total_appearances': len(appearances),
            'timestamps': [app[0] for app in appearances],
            'frame_indices': [app[1] for app in appearances]
        }

    with open('tracking_report.json', 'w') as f:
        json.dump(report_data, f, indent=2, default=str)

    print("Detailed report saved to: tracking_report.json")

In [23]:
if __name__ == "__main__":

    VIDEO_PATH = "/content/store-aisle-detection.mp4"
    OUTPUT_PATH = "/content/tracked_output.mp4"

    # Run the main processing pipeline
    results = main_processing_pipeline(
        video_path=VIDEO_PATH,
        output_path=OUTPUT_PATH,
        model_size="s",  # Use YOLOv8s for good balance of speed/accuracy
        tracking_algorithm="deepsort"
    )

    print("Processing complete!")


CCTV Person Detection and Tracking System
GPU optimizations applied
Loading YOLO model on cuda...


Downloading https://github.com/ultralytics/assets/releases/download/v8.3.0/yolov8s.pt to 'yolov8s.pt': 100%|██████████| 21.5M/21.5M [00:00<00:00, 121MB/s] 


Processing video: /content/store-aisle-detection.mp4
Video properties: 720x404, 59 FPS, 3921 frames
Progress: 15.0%, Elapsed: 7.6s, ETA: 43.1s
Progress: 30.1%, Elapsed: 30.1s, ETA: 70.0s
Progress: 45.1%, Elapsed: 54.3s, ETA: 66.0s
Progress: 60.2%, Elapsed: 87.3s, ETA: 57.8s
Progress: 75.2%, Elapsed: 113.7s, ETA: 37.4s
Progress: 90.3%, Elapsed: 140.7s, ETA: 15.1s
Processing completed!
Total time: 160.65s
FPS achieved: 24.41
Unique persons detected: 13

TRACKING REPORT
Processing Time: 160.65 seconds
FPS Achieved: 24.41
Total Person Detections: 13804
Unique Persons: 13

Detailed Person Tracking:
----------------------------------------
Person ID 1: 1007 appearances
  First seen: 0:00:07.891217
  Last seen: 0:00:25.241883
  Duration: 1007 frames

Person ID 2: 3246 appearances
  First seen: 0:00:11.261250
  Last seen: 0:01:05.398667
  Duration: 3246 frames

Person ID 5: 2961 appearances
  First seen: 0:00:16.016000
  Last seen: 0:01:05.398667
  Duration: 2961 frames

Person ID 6: 79 appear