environment setup

In [3]:
%matplotlib inline
import cv2
import numpy as np
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
from ultralytics import YOLO
import os
import csv
from tqdm import tqdm
from IPython.display import HTML, Video
import base64


In [4]:
print(f"✓ OpenCV: {cv2.__version__}")
print(f"✓ NumPy: {np.__version__}")
print(f"✓ OS: {os.name}")
print(f"✓ CSV: {csv.__version__}")

✓ OpenCV: 4.12.0
✓ NumPy: 2.2.6
✓ OS: nt
✓ CSV: 1.0


frame preprocessing

In [5]:
def preprocess_frame(frame):
    """
    Preprocess frame using CLAHE for better detection
    
    Args:
        frame: Input BGR image
    
    Returns:
        enhanced: Enhanced BGR image
    """
    # Convert to LAB color space (correct approach)
    lab = cv2.cvtColor(frame, cv2.COLOR_BGR2LAB)
    
    # Split channels
    l, a, b = cv2.split(lab)
    
    # Apply CLAHE to L channel
    clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8,8))
    l_enhanced = clahe.apply(l)
    
    # Merge channels
    enhanced_lab = cv2.merge((l_enhanced, a, b))
    
    # Convert back to BGR
    enhanced = cv2.cvtColor(enhanced_lab, cv2.COLOR_LAB2BGR)
    
    # Apply slight blur to reduce noise
    enhanced = cv2.GaussianBlur(enhanced, (3, 3), 0)
    
    return enhanced

model configuration

In [6]:
model=YOLO('yolov8n.pt')

TARGET_CLASS = 0  # Person class
CONFIDENCE_THRESHOLD = 0.5
IOU_THRESHOLD = 0.45


In [7]:
print(f"Model: YOLOv8 Nano")
print(f"Target Class: Person (class_id={TARGET_CLASS})")
print(f"Confidence Threshold: {CONFIDENCE_THRESHOLD}")
print(f"IoU Threshold: {IOU_THRESHOLD}")
print(f"Device: CPU")

Model: YOLOv8 Nano
Target Class: Person (class_id=0)
Confidence Threshold: 0.5
IoU Threshold: 0.45
Device: CPU


video configuration

In [8]:
INPUT_VIDEO = r"C:\yolovideo\parkour.mp4"  
OUTPUT_DIR = "output"
OUTPUT_VIDEO = os.path.join(OUTPUT_DIR, "parkour_tracked.mp4")
OUTPUT_CSV = os.path.join(OUTPUT_DIR, "tracking_data.csv")

In [9]:
os.makedirs(OUTPUT_DIR, exist_ok=True)
print(f"Output directory created: {OUTPUT_DIR}")


Output directory created: output


In [10]:
if not os.path.exists(INPUT_VIDEO):
    print(f"Warning: Video not found at {INPUT_VIDEO}")
    print("Please update the INPUT_VIDEO path to your actual video location")
    # 如果找不到C盘的视频，尝试当前目录
    alternate_path = "0f4398279b513133a7bdfc0f82fe4633.MP4"
    if os.path.exists(alternate_path):
        INPUT_VIDEO = alternate_path
        print(f"Using alternate video: {INPUT_VIDEO}")

In [11]:
cap = cv2.VideoCapture(INPUT_VIDEO)
if not cap.isOpened():
    raise ValueError(f"Cannot open video: {INPUT_VIDEO}")\

fps = int(cap.get(cv2.CAP_PROP_FPS))
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
duration = total_frames / fps if fps > 0 else 0

In [12]:
print(f"Input Video: {INPUT_VIDEO}")
print(f"Resolution: {width}x{height}")
print(f"FPS: {fps}")
print(f"Total Frames: {total_frames}")
print(f"Duration: {duration:.2f} seconds")
print(f"Output Video: {OUTPUT_VIDEO}")
print(f"Output CSV: {OUTPUT_CSV}")

cap.release()   

Input Video: C:\yolovideo\parkour.mp4
Resolution: 640x360
FPS: 14
Total Frames: 282
Duration: 20.14 seconds
Output Video: output\parkour_tracked.mp4
Output CSV: output\tracking_data.csv


tracking  system implementation

In [13]:
class SimpleTracker:
    """简化的质心追踪器"""
    def __init__(self, max_lost=30, max_dist=100):
        self.next_id = 0
        self.objects = {}  # {id: {'center': [x,y], 'box': [x1,y1,x2,y2], 'lost': 0}}
        self.max_lost = max_lost
        self.max_dist = max_dist
    
    def update(self, boxes):
        """更新追踪"""
        # 计算新检测的中心点
        new_centers = []
        for box in boxes:
            x1, y1, x2, y2 = box
            center = [(x1+x2)/2, (y1+y2)/2]
            new_centers.append(center)
        
        # 如果没有检测到任何对象
        if len(boxes) == 0:
            # 增加所有对象的丢失计数
            for obj_id in list(self.objects.keys()):
                self.objects[obj_id]['lost'] += 1
                # 删除丢失太久的对象
                if self.objects[obj_id]['lost'] > self.max_lost:
                    del self.objects[obj_id]
            return self.objects
        
        # 如果没有已追踪对象，全部注册为新对象
        if len(self.objects) == 0:
            for center, box in zip(new_centers, boxes):
                self.objects[self.next_id] = {
                    'center': center,
                    'box': box,
                    'lost': 0
                }
                self.next_id += 1
            return self.objects
        
        # 匹配现有对象和新检测
        matched = set()  # 已匹配的新检测索引
        
        for obj_id, obj in list(self.objects.items()):
            best_match = -1
            best_dist = self.max_dist
            
            # 找最近的新检测
            for i, center in enumerate(new_centers):
                if i in matched:
                    continue
                # 计算距离
                dist = ((obj['center'][0] - center[0])**2 + 
                       (obj['center'][1] - center[1])**2)**0.5
                if dist < best_dist:
                    best_dist = dist
                    best_match = i
            
            # 更新匹配的对象
            if best_match >= 0:
                self.objects[obj_id] = {
                    'center': new_centers[best_match],
                    'box': boxes[best_match],
                    'lost': 0
                }
                matched.add(best_match)
            else:
                # 没找到匹配，增加丢失计数
                self.objects[obj_id]['lost'] += 1
                if self.objects[obj_id]['lost'] > self.max_lost:
                    del self.objects[obj_id]
        
        # 注册未匹配的新检测
        for i, (center, box) in enumerate(zip(new_centers, boxes)):
            if i not in matched:
                self.objects[self.next_id] = {
                    'center': center,
                    'box': box,
                    'lost': 0
                }
                self.next_id += 1
        
        return self.objects

video processing pipeline

In [14]:
def process_video(input_path, output_path, csv_path):
    """
    Process video with detection and tracking
    """
    # Open video
    cap = cv2.VideoCapture(input_path)
    if not cap.isOpened():
        raise ValueError(f"Cannot open video: {input_path}")
    
    # Get video properties
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    
    # Initialize video writer
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
    
    # Initialize tracker
    tracker = SimpleTracker(max_lost=30, max_dist=100)
    
    # Data storage
    tracking_data = []
    sample_frames = []
    sample_interval = max(1, total_frames // 6)
    
    print(f"\nProcessing {total_frames} frames...")
    
    # Process each frame
    frame_count = 0
    pbar = tqdm(total=total_frames, desc="Processing frames")
    
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        
        # Preprocess frame
        preprocessed = preprocess_frame(frame)
        
        # Run YOLO detection
        results = model(preprocessed,
                       classes=[TARGET_CLASS],
                       conf=CONFIDENCE_THRESHOLD,
                       iou=IOU_THRESHOLD,
                       verbose=False)
        
        # Extract detections
        detections = []
        if len(results[0].boxes) > 0:
            for box in results[0].boxes:
                x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
                detections.append([x1, y1, x2, y2])
        
        # Update tracker
        tracked_objects = tracker.update(detections)
        
        # Draw annotations
        annotated_frame = frame.copy()
        timestamp = frame_count / fps if fps > 0 else 0
        
        # Draw timestamp
        cv2.putText(annotated_frame, f"Time: {timestamp:.2f}s",
                   (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
        cv2.putText(annotated_frame, f"Frame: {frame_count}",
                   (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
        
        # Draw tracked objects
        for object_id, obj_data in tracked_objects.items():
            bbox = obj_data['box']
            x1, y1, x2, y2 = [int(v) for v in bbox]
            
            # Draw bounding box
            cv2.rectangle(annotated_frame, (x1, y1), (x2, y2), (0, 255, 255), 2)
            
            # Draw ID label
            label = f"ID: {object_id}"
            label_size, _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.8, 2)
            cv2.rectangle(annotated_frame, (x1, y1 - 25),
                         (x1 + label_size[0] + 10, y1), (0, 255, 255), -1)
            cv2.putText(annotated_frame, label, (x1 + 5, y1 - 7),
                       cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 0), 2)
            
            # Save tracking data
            tracking_data.append({
                'frame': frame_count,
                'timestamp': timestamp,
                'person_id': object_id,
                'x1': x1,
                'y1': y1,
                'x2': x2,
                'y2': y2
            })
        
        # Save sample frames
        if frame_count % sample_interval == 0 and len(sample_frames) < 6:
            sample_frames.append({
                'frame': annotated_frame.copy(),
                'timestamp': timestamp,
                'frame_num': frame_count
            })
        
        # Write frame
        out.write(annotated_frame)
        
        frame_count += 1
        pbar.update(1)
    
    pbar.close()
    
    # Release resources
    cap.release()
    out.release()
    cv2.destroyAllWindows()
    
    print(f"Video processing completed!")
    print(f"Output saved to: {output_path}")
    
    return tracking_data, sample_frames

In [15]:
tracking_data, sample_frames = process_video(INPUT_VIDEO, OUTPUT_VIDEO, OUTPUT_CSV)



Processing 282 frames...


Processing frames: 100%|██████████| 282/282 [00:03<00:00, 77.07it/s]

Video processing completed!
Output saved to: output\parkour_tracked.mp4





data logging

In [16]:
def save_tracking_data(data, csv_path):
    """
    Save tracking data to CSV file
    """
    if not data:
        print("No tracking data to save")
        return
    
    # Write CSV
    with open(csv_path, 'w', newline='') as csvfile:
        fieldnames = ['frame', 'timestamp', 'person_id', 'x1', 'y1', 'x2', 'y2']
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        
        writer.writeheader()
        writer.writerows(data)
    
    print(f"\nTracking data saved to: {csv_path}")
    
    # Display statistics
    unique_ids = set(row['person_id'] for row in data)
    total_detections = len(data)
    
    print(f"\nTracking Statistics:")
    print(f"  - Total detections: {total_detections}")
    print(f"  - Number of people tracked: {len(unique_ids)}")
    print(f"  - Person IDs: {sorted(unique_ids)}")
    
    # Per-person statistics
    person_frames = {}
    for row in data:
        pid = row['person_id']
        if pid not in person_frames:
            person_frames[pid] = []
        person_frames[pid].append(row['frame'])
    
    print(f"\nPer-person tracking duration:")
    for pid in sorted(person_frames.keys()):
        frames = person_frames[pid]
        duration = (max(frames) - min(frames)) / fps if fps > 0 else 0
        print(f"  - Person {pid}: {len(frames)} frames ({duration:.2f}s)")


In [17]:
save_tracking_data(tracking_data, OUTPUT_CSV)



Tracking data saved to: output\tracking_data.csv

Tracking Statistics:
  - Total detections: 904
  - Number of people tracked: 17
  - Person IDs: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]

Per-person tracking duration:
  - Person 0: 282 frames (20.07s)
  - Person 1: 126 frames (8.93s)
  - Person 2: 69 frames (4.86s)
  - Person 3: 31 frames (2.14s)
  - Person 4: 37 frames (2.57s)
  - Person 5: 31 frames (2.14s)
  - Person 6: 33 frames (2.29s)
  - Person 7: 31 frames (2.14s)
  - Person 8: 31 frames (2.14s)
  - Person 9: 31 frames (2.14s)
  - Person 10: 31 frames (2.14s)
  - Person 11: 60 frames (4.21s)
  - Person 12: 31 frames (2.14s)
  - Person 13: 54 frames (3.79s)
  - Person 14: 15 frames (1.00s)
  - Person 15: 10 frames (0.64s)
  - Person 16: 1 frames (0.00s)


result visualization

In [None]:
def visualize_results(sample_frames):
    """Safer visualization that prevents kernel crashes"""
    if not sample_frames:
        print("No sample frames to display")
        return
    
    try:
        # Use Agg backend to avoid GUI issues
        import matplotlib
        matplotlib.use('Agg')
        import matplotlib.pyplot as plt
        
        # Create figure with error handling
        fig = plt.figure(figsize=(15, 10))
        
        # Display only available frames (max 6)
        num_frames = min(len(sample_frames), 6)
        
        for i in range(num_frames):
            plt.subplot(2, 3, i+1)
            frame_rgb = cv2.cvtColor(sample_frames[i]['frame'], cv2.COLOR_BGR2RGB)
            plt.imshow(frame_rgb)
            plt.title(f"Frame {sample_frames[i]['frame_num']}")
            plt.axis('off')
        
        plt.suptitle('Parkour Tracking Results')
        plt.tight_layout()
        
        # Save instead of show (prevents GUI issues)
        output_path = os.path.join(OUTPUT_DIR, 'tracking_visualization.png')
        plt.savefig(output_path, dpi=100, bbox_inches='tight')
        plt.close()  # Important: close the figure to free memory
        
        print(f"Visualization saved to: {output_path}")
        
        # Display the saved image in notebook
        from IPython.display import Image, display
        display(Image(output_path))
        
    except Exception as e:
        print(f"Visualization error: {e}")
        print("Skipping visualization but continuing with the rest of the code")

: 

In [None]:
visualize_results(sample_frames)