# CV Pipeline Prototype
**YOLOv8 → ByteTrack → MediaPipe Pose → Anomaly Detection**

This notebook prototypes the full computer-vision pipeline before porting it to `cv_backend/core/`.

## 1. Setup & Installs

In [None]:
# Run once per Colab session
!pip install ultralytics mediapipe lap --quiet

# ByteTrack — install from source
!git clone https://github.com/ifzhang/ByteTrack /content/ByteTrack --quiet
%cd /content/ByteTrack
!pip install -e . --quiet
%cd /content

import cv2
import numpy as np
import torch
from ultralytics import YOLO
import mediapipe as mp
from IPython.display import display, Image as IPImage
print('Setup complete. GPU:', torch.cuda.get_device_name(0) if torch.cuda.is_available() else 'CPU')

## 2. Upload Test Video

In [None]:
from google.colab import files
import os

# Option A: upload from local machine
# uploaded = files.upload()
# VIDEO_PATH = list(uploaded.keys())[0]

# Option B: mount Drive and point to a file
# from google.colab import drive
# drive.mount('/content/drive')
# VIDEO_PATH = '/content/drive/MyDrive/test_video.mp4'

# Option C: download a sample (Creative Commons fall detection video)
# TODO: replace with your own test video URL
VIDEO_PATH = '/content/test_video.mp4'
print(f'Using video: {VIDEO_PATH}, exists: {os.path.exists(VIDEO_PATH)}')

## 3. YOLOv8 Person Detection

In [None]:
model = YOLO('yolov8n.pt')  # auto-downloads on first run

cap = cv2.VideoCapture(VIDEO_PATH)
ret, frame = cap.read()
cap.release()

if not ret:
    print('Could not read video — check VIDEO_PATH')
else:
    results = model(frame, verbose=False)[0]
    annotated = results.plot()
    _, buf = cv2.imencode('.jpg', annotated)
    display(IPImage(data=buf.tobytes()))
    person_boxes = [b for b in results.boxes if int(b.cls[0]) == 0 and float(b.conf[0]) > 0.45]
    print(f'Detected {len(person_boxes)} person(s)')

## 4. ByteTrack Integration

In [None]:
from yolox.tracker.byte_tracker import BYTETracker

class Args:
    track_thresh = 0.45
    track_buffer = 30
    match_thresh = 0.8
    mot20 = False

tracker = BYTETracker(Args(), frame_rate=30)

cap = cv2.VideoCapture(VIDEO_PATH)
frames_processed = 0

while frames_processed < 60:  # process first 60 frames as demo
    ret, frame = cap.read()
    if not ret:
        break

    h, w = frame.shape[:2]
    results = model(frame, verbose=False)[0]
    dets = np.array(
        [[*b.xyxy[0].tolist(), float(b.conf[0])] for b in results.boxes if int(b.cls[0]) == 0],
        dtype=np.float32
    ) if len(results.boxes) else np.empty((0, 5), dtype=np.float32)

    online_targets = tracker.update(dets, [h, w], [h, w])

    for t in online_targets:
        x1, y1, bw, bh = t.tlwh
        cv2.rectangle(frame, (int(x1), int(y1)), (int(x1+bw), int(y1+bh)), (0, 255, 0), 2)
        cv2.putText(frame, f'ID:{t.track_id}', (int(x1), int(y1)-5),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)

    frames_processed += 1

cap.release()
_, buf = cv2.imencode('.jpg', frame)
display(IPImage(data=buf.tobytes()))
print(f'Processed {frames_processed} frames, {len(online_targets)} tracked persons in last frame')

## 5. MediaPipe Pose on Tracked Persons

In [None]:
mp_pose = mp.solutions.pose
mp_draw = mp.solutions.drawing_utils

pose = mp_pose.Pose(static_image_mode=True, model_complexity=1, min_detection_confidence=0.5)

cap = cv2.VideoCapture(VIDEO_PATH)
ret, frame = cap.read()
cap.release()

results_yolo = model(frame, verbose=False)[0]
person_boxes = [b.xyxy[0].tolist() for b in results_yolo.boxes if int(b.cls[0]) == 0 and float(b.conf[0]) > 0.45]

for bbox in person_boxes:
    x1, y1, x2, y2 = [int(v) for v in bbox]
    crop = frame[max(0,y1):y2, max(0,x1):x2]
    crop_rgb = cv2.cvtColor(crop, cv2.COLOR_BGR2RGB)
    pose_result = pose.process(crop_rgb)

    if pose_result.pose_landmarks:
        mp_draw.draw_landmarks(crop, pose_result.pose_landmarks, mp_pose.POSE_CONNECTIONS)
        frame[max(0,y1):y2, max(0,x1):x2] = crop

_, buf = cv2.imencode('.jpg', frame)
display(IPImage(data=buf.tobytes()))
print(f'Pose detected for {len(person_boxes)} person(s)')

## 6. Anomaly Logic — Fall & Collapse Detection

In [None]:
# TODO: fine-tune these thresholds once you have real fall videos
FALL_ASPECT_RATIO = 1.4   # width/height > this → possible fall
ERRATIC_VAR_THRESHOLD = 800.0

from collections import defaultdict, deque
import time

track_history = defaultdict(lambda: deque(maxlen=30))

def check_fall(bbox):
    x1, y1, x2, y2 = bbox
    w, h = x2 - x1, y2 - y1
    if h <= 0:
        return False, 0.0
    ratio = w / h
    return ratio > FALL_ASPECT_RATIO, round(min(ratio / 3.0, 1.0), 3)

def check_erratic(history):
    if len(history) < 10:
        return False, 0.0
    recent = list(history)[-10:]
    variance = float(np.var([p[0] for p in recent]) + np.var([p[1] for p in recent]))
    return variance > ERRATIC_VAR_THRESHOLD, round(min(variance / (ERRATIC_VAR_THRESHOLD * 3), 1.0), 3)

# Quick test
test_bbox_fall = [10, 100, 200, 130]  # very wide — should flag as fall
test_bbox_normal = [10, 50, 80, 200]  # tall — normal standing
print('Fall test (wide bbox):', check_fall(test_bbox_fall))
print('Normal test (tall bbox):', check_fall(test_bbox_normal))

## 7. End-to-End Pipeline — Annotated Output

In [None]:
# TODO: fine-tune INFERENCE_STRIDE for your target FPS
INFERENCE_STRIDE = 2
OUTPUT_PATH = '/content/annotated_output.mp4'

tracker2 = BYTETracker(Args(), frame_rate=30)
cap = cv2.VideoCapture(VIDEO_PATH)
fps = cap.get(cv2.CAP_PROP_FPS) or 30
fw = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
fh = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
writer = cv2.VideoWriter(OUTPUT_PATH, cv2.VideoWriter_fourcc(*'mp4v'), fps, (fw, fh))

frame_id = 0
anomaly_log = []

while True:
    ret, frame = cap.read()
    if not ret:
        break
    frame_id += 1

    if frame_id % INFERENCE_STRIDE != 0:
        writer.write(frame)
        continue

    results_y = model(frame, verbose=False)[0]
    dets = np.array(
        [[*b.xyxy[0].tolist(), float(b.conf[0])] for b in results_y.boxes if int(b.cls[0]) == 0],
        dtype=np.float32
    ) if len(results_y.boxes) else np.empty((0, 5), dtype=np.float32)

    tracks = tracker2.update(dets, [fh, fw], [fh, fw])

    for t in tracks:
        x1, y1, bw, bh = t.tlwh
        bbox = [x1, y1, x1+bw, y1+bh]
        cx, cy = x1 + bw/2, y1 + bh/2
        track_history[t.track_id].append((cx, cy))

        is_fall, fall_conf = check_fall(bbox)
        is_erratic, erratic_conf = check_erratic(track_history[t.track_id])

        color = (0, 255, 0)
        label = f'ID:{t.track_id}'
        if is_fall:
            color = (0, 0, 255)
            label += ' FALL'
            anomaly_log.append({'frame': frame_id, 'type': 'FALL', 'track_id': t.track_id, 'confidence': fall_conf})
        elif is_erratic:
            color = (0, 165, 255)
            label += ' ERRATIC'
            anomaly_log.append({'frame': frame_id, 'type': 'ERRATIC_MOTION', 'track_id': t.track_id, 'confidence': erratic_conf})

        cv2.rectangle(frame, (int(x1), int(y1)), (int(x1+bw), int(y1+bh)), color, 2)
        cv2.putText(frame, label, (int(x1), int(y1)-5), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)

    writer.write(frame)

cap.release()
writer.release()
print(f'Done. Output: {OUTPUT_PATH}')
print(f'Anomaly events detected: {len(anomaly_log)}')
for ev in anomaly_log[:10]:
    print(ev)

## 8. Export Colab → FastAPI

| Notebook cell | Maps to FastAPI file |
|---|---|
| `model = YOLO(...)` + detection loop | `cv_backend/core/detector.py` → `AnomalyDetector` |
| `BYTETracker` + update loop | `cv_backend/core/tracker.py` → `PersonTracker` |
| `mp_pose.Pose` + crop inference | `cv_backend/core/pose.py` → `PoseAnalyzer` |
| `check_fall`, `check_erratic` functions | `cv_backend/core/anomaly.py` → `AnomalyClassifier` |
| Full while-loop pipeline | `cv_backend/routers/stream.py` → `_run_pipeline()` |

When porting:
1. Replace `print()` with proper logging.
2. Wrap blocking calls in `asyncio.to_thread()`.
3. Add full type hints to all methods.