
# Premiere-Style Object Tracking Effects (Colab)
This notebook downloads CC0/public-domain demo clips, runs Ultralytics YOLO + ByteTrack/BoT-SORT, parses natural-language effect prompts into a tiny DSL, plans keyframes, and renders multiple object-aware video effects (zoom/follow, spotlight, blur, reframe, callouts, PiP magnifier, trajectory overlay). It is organized so teammates can later wrap the same functions behind a UXP-friendly service layer.


In [1]:

#@title Run Me (installs runtime dependencies)
!pip install -q ultralytics opencv-python moviepy numpy pandas tqdm matplotlib scipy ipywidgets fastapi uvicorn python-multipart lapx


[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/1.1 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m1.1/1.1 MB[0m [31m73.7 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.1/1.1 MB[0m [31m23.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.0/2.0 MB[0m [31m20.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.6/1.6 MB[0m [31m23.1 MB/s[0m eta [36m0:00:00[0m
[?25h

In [2]:

#@title Imports & global config
from __future__ import annotations
import json
import math
import os
import re
import shutil
import textwrap
from dataclasses import dataclass, field, asdict
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Any

import numpy as np
import pandas as pd
import cv2
from tqdm import tqdm
from scipy.signal import savgol_filter
import matplotlib.pyplot as plt
from moviepy.editor import VideoFileClip
from IPython.display import Video, display
import torch
import ipywidgets as widgets

from ultralytics import YOLO

plt.style.use("ggplot")

BASE_DIR = Path('/content')
EXPORT_DIR = BASE_DIR / 'exports'
EXPORT_DIR.mkdir(parents=True, exist_ok=True)
CLIP_PATHS = {
    'clipA': BASE_DIR / 'clipA.mp4'
}
DEFAULT_MODELS = {'det': 'yolo11n.pt', 'seg': 'yolo11n-seg.pt'}
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f"Using device: {DEVICE.upper()} | torch {torch.__version__}")
if DEVICE == 'cpu':
    print('⚠️ GPU not detected. Enable a T4/other GPU runtime for best performance.')

EASINGS = {
    'linear': lambda x: x,
    'ease': lambda x: 3 * x**2 - 2 * x**3,
    'ease-in': lambda x: x**2,
    'ease-out': lambda x: 1 - (1 - x)**2,
    'ease-in-out': lambda x: (math.cos(math.pi + math.pi * x) + 1) / 2
}

SAVGOL_DEFAULT = {'window_length': 9, 'polyorder': 2}

np.set_printoptions(suppress=True)

print('Exports directory:', EXPORT_DIR)


  IMAGEMAGICK_BINARY = r"C:\Program Files\ImageMagick-6.8.8-Q16\magick.exe"
  lines_video = [l for l in lines if ' Video: ' in l and re.search('\d+x\d+', l)]
  rotation_lines = [l for l in lines if 'rotate          :' in l and re.search('\d+$', l)]
  match = re.search('\d+$', rotation_line)
  if event.key is 'enter':



Creating new Ultralytics Settings v0.0.6 file ✅ 
View Ultralytics Settings with 'yolo settings' or at '/root/.config/Ultralytics/settings.json'
Update Settings with 'yolo settings key=value', i.e. 'yolo settings runs_dir=path/to/dir'. For help see https://docs.ultralytics.com/quickstart/#ultralytics-settings.
Using device: CUDA | torch 2.8.0+cu126
Exports directory: /content/exports



## Download CC0 / Public Domain demo clips
We pull two Wikimedia Commons clips that ship with explicit `{{cc-zero}}` licensing:

* **clipA** – `Jay_Prakash_Guiding_at_Wikimedia_Hackathon_Kochi_2024.webm` (speaker + participants)

Both are transcoded to MP4 (≤720p) and saved under `/content`. Licenses are logged for auditability.


In [3]:
#@title Download demo clips (idempotent)
import urllib.request
from urllib.error import HTTPError
import requests

CLIP_SOURCES = {
    'clipA.mp4': {
        'url': 'https://upload.wikimedia.org/wikipedia/commons/c/c5/Jay_Prakash_Guiding_at_Wikimedia_Hackathon_Kochi_2024.webm',
        'credit': 'Wikimedia Commons · CC0 1.0',
        'convert_from': 'webm'
    }
}


def _download_file(url: str, target: Path) -> Path:
    target.parent.mkdir(parents=True, exist_ok=True)
    tmp_path = target.with_suffix(target.suffix + '.dl')
    if tmp_path.exists():
        tmp_path.unlink()
    print(f'Downloading {url} → {tmp_path.name}')
    try:
        # Use requests with a custom User-Agent
        headers = {"User-Agent": "Mozilla/5.0 (Colab demo)"}
        response = requests.get(url, stream=True, headers=headers)
        response.raise_for_status()  # Raise HTTPError for bad responses (4xx or 5xx)
        with open(tmp_path, 'wb') as f:
            for chunk in response.iter_content(chunk_size=8192):
                f.write(chunk)
    except requests.exceptions.RequestException as e:
        print(f"Error downloading {url}: {e}")
        raise e
    return tmp_path


def _transcode_to_mp4(src: Path, dst: Path, max_width: int = 1280):
    clip = VideoFileClip(str(src))
    resized = clip.resize(width=max_width) if clip.w > max_width else clip
    print(f'Transcoding {src.name} ({clip.duration:.2f}s) → {dst.name}')
    resized.write_videofile(
        str(dst),
        codec='libx264',
        audio=clip.audio is not None,
        audio_codec='aac',
        bitrate='4M',
        fps=clip.fps or 24,
        logger=None
    )
    clip.close()
    if resized is not clip:
        resized.close()


def download_demo_clips(force: bool = False):
    manifest = []
    for filename, meta in CLIP_SOURCES.items():
        target = BASE_DIR / filename
        if target.exists() and not force:
            clip = VideoFileClip(str(target))
            manifest.append({'path': str(target), 'duration': clip.duration, 'size': [clip.w, clip.h], 'credit': meta['credit']})
            clip.close()
            continue
        tmp = _download_file(meta['url'], target)
        if meta.get('convert_from'):
            _transcode_to_mp4(tmp, target)
            tmp.unlink(missing_ok=True)
        else:
            shutil.move(tmp, target)
        clip = VideoFileClip(str(target))
        manifest.append({'path': str(target), 'duration': clip.duration, 'size': [clip.w, clip.h], 'credit': meta['credit']})
        clip.close()
    manifest_path = EXPORT_DIR / 'clip_manifest.json'
    with open(manifest_path, 'w', encoding='utf-8') as f:
        json.dump(manifest, f, indent=2)
    print('Saved manifest →', manifest_path)
    return manifest

clip_manifest = download_demo_clips(force=False)
clip_manifest

Downloading https://upload.wikimedia.org/wikipedia/commons/c/c5/Jay_Prakash_Guiding_at_Wikimedia_Hackathon_Kochi_2024.webm → clipA.mp4.dl
Transcoding clipA.mp4.dl (5.83s) → clipA.mp4
Saved manifest → /content/exports/clip_manifest.json


[{'path': '/content/clipA.mp4',
  'duration': 5.83,
  'size': [608, 1080],
  'credit': 'Wikimedia Commons · CC0 1.0'}]


## Detect & Track (YOLO + ByteTrack/BoT-SORT)
This section exposes a service-like API:
```
detect_and_track(video_path, classes=None, tracker='bytetrack', use_seg=False)
```
It returns structured frame/track metadata and persists to `/content/exports/tracks.json`.


In [4]:

#@title Tracking utilities
MODEL_CACHE: Dict[str, YOLO] = {}
CLASS_NAME_CACHE: Dict[int, Dict[int, str]] = {}


def load_model(model_name: Optional[str] = None, use_seg: bool = False) -> YOLO:
    name = model_name or (DEFAULT_MODELS['seg'] if use_seg else DEFAULT_MODELS['det'])
    if name not in MODEL_CACHE:
        print(f'Loading model {name} → {DEVICE}')
        MODEL_CACHE[name] = YOLO(name)
    return MODEL_CACHE[name]


def _build_name_map(model: YOLO) -> Dict[int, str]:
    key = id(model)
    if key in CLASS_NAME_CACHE:
        return CLASS_NAME_CACHE[key]
    names = getattr(model, 'names', None)
    if names is None and hasattr(model, 'model'):
        names = getattr(model.model, 'names', None)
    mapping = {}
    if isinstance(names, dict):
        mapping = {int(k): str(v) for k, v in names.items()}
    elif isinstance(names, list):
        mapping = {idx: str(val) for idx, val in enumerate(names)}
    CLASS_NAME_CACHE[key] = mapping
    return mapping


def _normalize_label(text: str) -> str:
    return text.strip().lower()


def _class_ids_from_names(model: YOLO, classes: Optional[List[str]]):
    if not classes:
        return None
    name_map = _build_name_map(model)
    inv = {v.lower(): k for k, v in name_map.items()}
    ids = []
    for item in classes:
        key = str(item).lower()
        if key.isdigit():
            ids.append(int(key))
        elif key in inv:
            ids.append(inv[key])
    return ids or None


def encode_mask(mask: np.ndarray) -> Dict[str, Any]:
    mask = (mask > 0.5).astype(np.uint8)
    pixels = mask.flatten(order='F')
    counts = []
    last_val = 0
    run = 0
    for val in pixels:
        if val == last_val:
            run += 1
        else:
            counts.append(run)
            run = 1
            last_val = val
    counts.append(run)
    if pixels.size and pixels[0] == 1:
        counts = [0] + counts
    return {'size': [int(mask.shape[0]), int(mask.shape[1])], 'counts': counts}


def decode_mask(rle: Dict[str, Any]) -> np.ndarray:
    h, w = rle['size']
    counts = rle['counts']
    vals = []
    cur = 0
    for c in counts:
        vals.extend([cur] * c)
        cur = 1 - cur
    arr = np.array(vals, dtype=np.uint8)
    return arr.reshape((h, w), order='F')


def detect_and_track(
    video_path: str,
    classes: Optional[List[str]] = None,
    tracker: str = 'bytetrack',
    conf: float = 0.25,
    iou: float = 0.45,
    use_seg: bool = False,
    frame_stride: int = 1,
    imgsz: int = 960,
    max_width: int = 960,
    save_json: bool = True
) -> Dict[str, Any]:
    video_path = str(video_path)
    cap = cv2.VideoCapture(video_path)
    fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT) or 0)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH) or 0)
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT) or 0)
    duration = total_frames / fps if fps else 0
    cap.release()

    model = load_model(use_seg=use_seg)
    class_ids = _class_ids_from_names(model, classes)
    tracker_cfg = tracker if tracker.endswith('.yaml') else f'{tracker}.yaml'
    effective_imgsz = min(imgsz, max_width)

    print(f'Tracking {video_path} @ {fps:.2f} fps | {width}x{height} | stride={frame_stride}')
    stream = model.track(
        source=video_path,
        imgsz=effective_imgsz,
        tracker=tracker_cfg,
        stream=True,
        conf=conf,
        iou=iou,
        vid_stride=max(frame_stride, 1),
        device=DEVICE,
        classes=class_ids,
        verbose=False,
        persist=True
    )

    frames: List[Dict[str, Any]] = []
    frame_cursor = 0
    name_map = _build_name_map(model)

    for result in tqdm(stream, desc='YOLO tracking', total=math.ceil(total_frames / max(frame_stride, 1)) or None):
        detections = []
        boxes = result.boxes
        if boxes is not None and boxes.id is not None:
            ids = boxes.id.int().cpu().tolist()
            xyxy = boxes.xyxy.cpu().tolist()
            confs = boxes.conf.cpu().tolist()
            clss = boxes.cls.int().cpu().tolist()
            mask_data = result.masks.data.cpu().numpy() if use_seg and result.masks is not None else None
            for i, track_id in enumerate(ids):
                bbox = [float(v) for v in xyxy[i]]
                cls_name = name_map.get(int(clss[i]), str(clss[i]))
                payload = encode_mask(mask_data[i]) if mask_data is not None else None
                detections.append({
                    'id': int(track_id),
                    'cls': cls_name,
                    'conf': float(confs[i]),
                    'bbox_xyxy': bbox,
                    'mask_rle': payload
                })
        frames.append({
            'frame_index': int(frame_cursor),
            't': float(frame_cursor / fps) if fps else 0.0,
            'detections': detections
        })
        frame_cursor += max(frame_stride, 1)

    payload = {
        'video_path': video_path,
        'fps': fps,
        'size': [width, height],
        'duration': duration,
        'tracker': tracker_cfg,
        'use_seg': use_seg,
        'frame_stride': frame_stride,
        'frames': frames
    }
    if save_json:
        tracks_path = EXPORT_DIR / 'tracks.json'
        with open(tracks_path, 'w', encoding='utf-8') as f:
            json.dump(payload, f)
        print('Saved tracks →', tracks_path)
    return payload


def load_tracks(json_path: Optional[Path] = None) -> Dict[str, Any]:
    path = json_path or (EXPORT_DIR / 'tracks.json')
    with open(path, 'r', encoding='utf-8') as f:
        return json.load(f)


In [5]:

#@title Run detection on clipA (set force_run=True to re-run)
force_run = False
tracks_path = EXPORT_DIR / 'tracks.json'
if tracks_path.exists() and not force_run:
    tracks_data = load_tracks(tracks_path)
    print('Loaded cached tracks.json')
else:
    tracks_data = detect_and_track(
        video_path=str(CLIP_PATHS['clipA']),
        classes=None,
        tracker='bytetrack',
        use_seg=True,
        frame_stride=1,
        imgsz=960,
        conf=0.25,
        iou=0.45
    )
summary = {
    'frames_with_detections': sum(1 for f in tracks_data['frames'] if f['detections']),
    'total_frames': len(tracks_data['frames'])
}
summary


Loading model yolo11n-seg.pt → cuda
[KDownloading https://github.com/ultralytics/assets/releases/download/v8.3.0/yolo11n-seg.pt to 'yolo11n-seg.pt': 100% ━━━━━━━━━━━━ 5.9MB 92.3MB/s 0.1s
Tracking /content/clipA.mp4 @ 30.00 fps | 608x1080 | stride=1


YOLO tracking: 100%|██████████| 175/175 [02:18<00:00,  1.26it/s]


Saved tracks → /content/exports/tracks.json


{'frames_with_detections': 175, 'total_frames': 175}


## NL → Effect DSL
Rule-based parser with optional LLM fallback. The DSL covers:
* `ZoomFollow`
* `Spotlight`
* `BlurBackground`
* `PixelateObject`
* `AutoReframe`
* `Callout`
* `PiPMagnifier`
* `PathOverlay`


In [6]:
#@title Effect DSL parser
EFFECT_DEFAULTS = {
    'ZoomFollow': {'margin': 0.10, 'easing': 'ease-in-out'},
    'Spotlight': {'strength': 0.7, 'feather': 45},
    'BlurBackground': {'ksize': 21},
    'PixelateObject': {'block': 20},
    'AutoReframe': {'aspect': '9:16', 'safe': 0.8},
    'Callout': {'label': 'object'},
    'PiPMagnifier': {'scale': 1.5, 'radius': 120},
    'PathOverlay': {}
}

EFFECT_KEYWORDS = {
    'ZoomFollow': ['zoom', 'punch in', 'follow'],
    'Spotlight': ['spotlight', 'highlight'],
    'BlurBackground': ['blur background', 'background blur'],
    'PixelateObject': ['pixelate', 'pixelation'],
    'AutoReframe': ['auto-reframe', 'reframe', 'vertical'],
    'Callout': ['callout', 'label'],
    'PiPMagnifier': ['pip', 'magnifier', 'picture in picture'],
    'PathOverlay': ['path', 'trajectory', 'trace']
}

@dataclass
class EffectCommand:
    effect: str
    object: str
    t_in: float
    t_out: float
    params: Dict[str, Any] = field(default_factory=dict)


def _parse_time_window(text: str, video_duration: float) -> Tuple[float, float]:
    match = re.search(r'from\s*(\d+(?:\.\d+)?)\s*s?\s*(?:to|-)\s*(\d+(?:\.\d+)?)', text)
    if match:
        return float(match.group(1)), float(match.group(2))
    match = re.search(r'for\s*(\d+(?:\.\d+)?)\s*s', text)
    if match:
        dur = float(match.group(1))
        return 0.0, min(video_duration, dur)
    return 0.0, video_duration


def _extract_object(text: str) -> str:
    lowered = text.lower()
    sanitized = re.sub(r'[^a-z0-9 _:%-]', ' ', lowered)
    lookahead = r'(?=\s+(?:from|to|for|with|at|in|centered|label|,|and|$))'
    verb_patterns = [
        r'(?:zoom|follow|track|spotlight|highlight|blur|pixelate|callout|label|keep|keeping|focus|reframe|auto-reframe|pip|magnifier|path|trace)[^a-z0-9]+the\s+([a-z0-9 _-]+?)' + lookahead,
        r'around\s+the\s+([a-z0-9 _-]+?)' + lookahead,
        r'keeping\s+the\s+([a-z0-9 _-]+)',
        r'keep\s+the\s+([a-z0-9 _-]+)',
        r'the\s+([a-z0-9 _-]+)\s+centered'
    ]
    for pattern in verb_patterns:
        match = re.search(pattern, sanitized)
        if match:
            return match.group(1).strip()
    fallback = re.search(r'the\s+([a-z0-9 _-]+?)' + lookahead, sanitized)
    if fallback:
        return fallback.group(1).strip()
    centered = re.search(r'([a-z0-9 _-]+)\s+centered', sanitized)
    if centered:
        return centered.group(1).strip()
    if ' from ' in sanitized and ' the ' in sanitized.split(' from ')[0]:
        chunk = sanitized.split(' from ')[0]
        return chunk.split(' the ')[-1].strip()
    tokens = [tok for tok in sanitized.split() if tok not in {'from','to','with','and','for','centered'}]
    return tokens[-1] if tokens else 'object'


def _extract_numeric(text: str, keyword: str, scale: float = 1.0, default: Optional[float] = None) -> Optional[float]:
    pattern = rf'{keyword}[^0-9]*(\d+(?:\.\d+)?)'
    match = re.search(pattern, text)
    return float(match.group(1)) * scale if match else default


def _detect_effect(text: str) -> Optional[str]:
    lowered = text.lower()
    for effect, keywords in EFFECT_KEYWORDS.items():
        if any(k in lowered for k in keywords):
            return effect
    return None


def parse_nl_to_dsl(command: str, video_duration: float, fallback_llm: bool = False, llm_fn=None) -> List[EffectCommand]:
    commands = []
    for chunk in re.split(r'[;]\s*\n*', command):
        chunk = chunk.strip()
        if not chunk:
            continue
        effect = _detect_effect(chunk)
        if effect is None:
            if fallback_llm and llm_fn:
                return llm_fn(chunk)
            raise ValueError(f'Unknown effect: {chunk}')
        t_in, t_out = _parse_time_window(chunk, video_duration)
        obj = _extract_object(chunk)
        params = dict(EFFECT_DEFAULTS.get(effect, {}))
        if effect == 'ZoomFollow':
            margin = _extract_numeric(chunk, 'margin', scale=0.01)
            if margin is not None:
                params['margin'] = margin
        if effect == 'Spotlight':
            strength = _extract_numeric(chunk, 'strength', default=None)
            if strength is not None:
                params['strength'] = min(max(strength, 0.1), 0.95)
            feather = _extract_numeric(chunk, 'feather', default=None)
            if feather is not None:
                params['feather'] = int(feather)
        if effect == 'BlurBackground':
            ksize = _extract_numeric(chunk, 'blur', default=None)
            if ksize is not None:
                ksize = int(ksize)
                params['ksize'] = ksize if ksize % 2 else ksize + 1
        if effect == 'PixelateObject':
            block = _extract_numeric(chunk, 'block', default=None)
            if block is not None:
                params['block'] = max(4, int(block))
        if effect == 'AutoReframe':
            aspect = re.search(r'(\d+:\d+)', chunk)
            if aspect:
                params['aspect'] = aspect.group(1)
            safe = _extract_numeric(chunk, 'safe', default=None)
            if safe is not None:
                params['safe'] = min(max(safe, 0.1), 0.95)
        if effect == 'Callout':
            label = re.search(r'label\s*([a-z0-9 _-]+)', chunk)
            if label:
                params['label'] = label.group(1).strip()
        if effect == 'PiPMagnifier':
            scale = _extract_numeric(chunk, 'scale', default=None)
            radius = _extract_numeric(chunk, 'radius', default=None)
            if scale is not None:
                params['scale'] = max(1.1, scale)
            if radius is not None:
                params['radius'] = int(radius)
        commands.append(EffectCommand(effect=effect, object=obj, t_in=t_in, t_out=t_out, params=params))
    return commands

LLM_PROMPT = """You are a function that converts a natural-language prompt into the following JSON DSL:
[{"effect":str, "object":str, "t_in":float, "t_out":float, "params":{...}}]
Only emit valid JSON.
Effects: ZoomFollow, Spotlight, BlurBackground, PixelateObject, AutoReframe, Callout, PiPMagnifier, PathOverlay.
"""

In [7]:

#@title Parser unit tests
import unittest

class DSLParserTests(unittest.TestCase):
    def test_zoom_follow(self):
        cmd = parse_nl_to_dsl('zoom on the person from 1.0s to 3.5s with 10% margin', video_duration=8.0)[0]
        self.assertEqual(cmd.effect, 'ZoomFollow')
        self.assertAlmostEqual(cmd.t_in, 1.0)
        self.assertAlmostEqual(cmd.t_out, 3.5)
        self.assertAlmostEqual(cmd.params['margin'], 0.10)

    def test_blur_background(self):
        cmd = parse_nl_to_dsl('blur background around the car from 0 to 5s', video_duration=8.0)[0]
        self.assertEqual(cmd.effect, 'BlurBackground')
        self.assertEqual(cmd.object, 'car')
        self.assertEqual(cmd.t_out, 5.0)

    def test_auto_reframe(self):
        cmd = parse_nl_to_dsl('auto-reframe 9:16 keeping the dog centered from 2s to 7s', video_duration=10.0)[0]
        self.assertEqual(cmd.effect, 'AutoReframe')
        self.assertEqual(cmd.params['aspect'], '9:16')
        self.assertEqual(cmd.object, 'dog')

suite = unittest.defaultTestLoader.loadTestsFromTestCase(DSLParserTests)
unittest.TextTestRunner(verbosity=2).run(suite)


test_auto_reframe (__main__.DSLParserTests.test_auto_reframe) ... ok
test_blur_background (__main__.DSLParserTests.test_blur_background) ... ok
test_zoom_follow (__main__.DSLParserTests.test_zoom_follow) ... ok

----------------------------------------------------------------------
Ran 3 tests in 0.010s

OK


<unittest.runner.TextTestResult run=3 errors=0 failures=0>


## Keyframe planner (dominant track + smoothing)
Pick the dominant track ID, smooth center/scale with SavGol (fallback EMA), interpolate timeline, and persist `/content/exports/keyframes.json`.


In [8]:

#@title Keyframe utilities
tracks_df_cached: Optional[pd.DataFrame] = None


def tracks_to_dataframe(tracks: Dict[str, Any]) -> pd.DataFrame:
    records = []
    for frame in tracks['frames']:
        for det in frame['detections']:
            x1, y1, x2, y2 = det['bbox_xyxy']
            records.append({
                'frame_index': frame['frame_index'],
                't': frame['t'],
                'id': det['id'],
                'cls': _normalize_label(det['cls']),
                'conf': det['conf'],
                'x1': x1,
                'y1': y1,
                'x2': x2,
                'y2': y2,
                'mask_rle': det.get('mask_rle')
            })
    return pd.DataFrame.from_records(records)


def get_tracks_df(tracks: Dict[str, Any]) -> pd.DataFrame:
    global tracks_df_cached
    if tracks_df_cached is None:
        tracks_df_cached = tracks_to_dataframe(tracks)
    return tracks_df_cached


def choose_track_id(df: pd.DataFrame, cls_name: str, t_in: float, t_out: float) -> Optional[int]:
    if df.empty:
        return None
    window = df[(df['cls'] == _normalize_label(cls_name)) & (df['t'] >= t_in) & (df['t'] <= t_out)]
    if window.empty:
        return None
    counts = window.groupby('id')['conf'].agg(['count', 'median']).reset_index()
    counts = counts.sort_values(['count', 'median'], ascending=False)
    return int(counts.iloc[0]['id'])


def smooth_series(values: np.ndarray, window_length: int = 9, polyorder: int = 2) -> np.ndarray:
    values = np.asarray(values)
    if len(values) < window_length:
        alpha = 0.25
        smoothed = [values[0]]
        for val in values[1:]:
            smoothed.append(alpha * val + (1 - alpha) * smoothed[-1])
        return np.array(smoothed)
    return savgol_filter(values, window_length=window_length, polyorder=polyorder)


def plan_effect(cmd: EffectCommand, tracks: Dict[str, Any]) -> Dict[str, Any]:
    df = get_tracks_df(tracks)
    track_id = choose_track_id(df, cmd.object, cmd.t_in, cmd.t_out)
    if track_id is None:
        raise ValueError(f'No track found for {cmd.object}')
    window = df[(df['id'] == track_id) & (df['t'] >= cmd.t_in) & (df['t'] <= cmd.t_out)].copy()
    if window.empty:
        raise ValueError('Track data missing for chosen ID')
    centers_x = (window['x1'].values + window['x2'].values) / 2
    centers_y = (window['y1'].values + window['y2'].values) / 2
    widths = (window['x2'].values - window['x1'].values)
    heights = (window['y2'].values - window['y1'].values)
    frame_w, frame_h = tracks['size']
    margin = cmd.params.get('margin', 0.10)
    scale = np.maximum(widths / frame_w, heights / frame_h) * (1 + margin)
    centers_x = smooth_series(centers_x, **SAVGOL_DEFAULT)
    centers_y = smooth_series(centers_y, **SAVGOL_DEFAULT)
    scale = smooth_series(scale, **SAVGOL_DEFAULT)
    timeline = []
    for idx, row in enumerate(window.itertuples(index=False)):
        timeline.append({
            't': float(row.t),
            'frame': int(row.frame_index),
            'center': [float(centers_x[idx]), float(centers_y[idx])],
            'scale': float(scale[idx]),
            'bbox': [float(row.x1), float(row.y1), float(row.x2), float(row.y2)],
            'mask_rle': row.mask_rle
        })
    plan = {
        'effect': cmd.effect,
        'object': cmd.object,
        'track_id': track_id,
        't_in': cmd.t_in,
        't_out': cmd.t_out,
        'timeline': timeline,
        'frame_size': tracks['size'],
        'fps': tracks['fps'],
        'video_path': tracks['video_path'],
        'params': cmd.params
    }
    return plan


def save_keyframes(plans: List[Dict[str, Any]]):
    path = EXPORT_DIR / 'keyframes.json'
    with open(path, 'w', encoding='utf-8') as f:
        json.dump({'effects': plans}, f, indent=2)
    print('Saved keyframes →', path)

video_duration = tracks_data['duration']
demo_commands = [
    'zoom on the person from 1.0s to 4.5s with 12% margin',
    'blur background around the person from 0s to 5s',
    'callout the person from 2s to 6s label speaker'
]
parsed_cmds = [parse_nl_to_dsl(cmd, video_duration)[0] for cmd in demo_commands]
effect_plans = [plan_effect(cmd, tracks_data) for cmd in parsed_cmds]
save_keyframes(effect_plans)
len(effect_plans)


Saved keyframes → /content/exports/keyframes.json


3


## Renderers (MoviePy + OpenCV)
Implementations for all eight verbs; each writes `/content/exports/out_<effect>.mp4` and optionally previews inline.


In [9]:

#@title Rendering helpers
import base64


def decode_mask_rle(rle: Optional[Dict[str, Any]], frame_shape: Tuple[int, int]):
    if not rle:
        return None
    mask = decode_mask(rle)
    if mask.shape != frame_shape:
        mask = cv2.resize(mask, (frame_shape[1], frame_shape[0]), interpolation=cv2.INTER_NEAREST)
    return mask


def timeline_sampler(timeline: List[Dict[str, Any]]):
    times = np.array([item['t'] for item in timeline], dtype=np.float32)
    centers = np.array([item['center'] for item in timeline], dtype=np.float32)
    scales = np.array([item['scale'] for item in timeline], dtype=np.float32)
    bboxes = np.array([item['bbox'] for item in timeline], dtype=np.float32)
    masks = [item.get('mask_rle') for item in timeline]

    def sample(t: float):
        if t <= times[0]:
            return {'center': centers[0], 'scale': scales[0], 'bbox': bboxes[0], 'mask': masks[0]}
        if t >= times[-1]:
            return {'center': centers[-1], 'scale': scales[-1], 'bbox': bboxes[-1], 'mask': masks[-1]}
        idx = np.searchsorted(times, t, side='right')
        i0 = max(idx - 1, 0)
        i1 = min(idx, len(times) - 1)
        span = (times[i1] - times[i0]) or 1e-6
        alpha = (t - times[i0]) / span
        center = centers[i0] * (1 - alpha) + centers[i1] * alpha
        scale = scales[i0] * (1 - alpha) + scales[i1] * alpha
        bbox = bboxes[i0] * (1 - alpha) + bboxes[i1] * alpha
        mask = masks[i0 if alpha <= 0.5 else i1]
        return {'center': center, 'scale': scale, 'bbox': bbox, 'mask': mask}

    return sample


def ensure_mask(state: Dict[str, Any], frame_shape: Tuple[int, int], feather: int = 25):
    mask = decode_mask_rle(state.get('mask'), frame_shape)
    if mask is None:
        x1, y1, x2, y2 = state['bbox']
        temp = np.zeros(frame_shape, dtype=np.uint8)
        cv2.ellipse(
            temp,
            center=(int((x1 + x2) / 2), int((y1 + y2) / 2)),
            axes=(int(max((x2 - x1) / 2, 1)), int(max((y2 - y1) / 2, 1))),
            angle=0,
            startAngle=0,
            endAngle=360,
            color=255,
            thickness=-1
        )
        mask = temp
    if feather > 0:
        mask = cv2.GaussianBlur(mask, (0, 0), sigmaX=feather)
    return np.clip(mask.astype(np.float32) / 255.0, 0, 1)[..., None]


def clamp_window(center, scale, frame_size):
    W, H = frame_size
    crop_w = max(W * scale, 64)
    crop_h = max(H * scale, 64)
    x1 = np.clip(center[0] - crop_w / 2, 0, W - crop_w)
    y1 = np.clip(center[1] - crop_h / 2, 0, H - crop_h)
    x2 = x1 + crop_w
    y2 = y1 + crop_h
    return int(x1), int(y1), int(x2), int(y2)


def run_moviepy(plan: Dict[str, Any], frame_fn, output_path: Path, codec: str = 'libx264'):
    clip = VideoFileClip(plan['video_path'])
    sampler = timeline_sampler(plan['timeline'])
    H = int(plan['frame_size'][1])
    W = int(plan['frame_size'][0])

    def processor(get_frame, t):
        frame = get_frame(t)
        if t < plan['t_in'] or t > plan['t_out']:
            return frame
        state = sampler(t)
        return frame_fn(frame, state, (H, W))

    processed = clip.fl(processor)
    processed.write_videofile(str(output_path), codec=codec, audio=True, audio_codec='aac', fps=clip.fps, logger=None)
    clip.close()
    processed.close()
    return output_path


def render_zoom_follow(plan: Dict[str, Any], output: Path):
    W, H = plan['frame_size']

    def fn(frame, state, shape):
        cx, cy = state['center']
        scale = np.clip(state['scale'], 0.2, 1.0)
        x1, y1, x2, y2 = clamp_window((cx, cy), scale, (W, H))
        cropped = frame[int(y1):int(y2), int(x1):int(x2)]
        return cv2.resize(cropped, (W, H), interpolation=cv2.INTER_CUBIC)

    return run_moviepy(plan, fn, output)


def render_spotlight(plan: Dict[str, Any], output: Path):
    feather = int(plan['params'].get('feather', 45))
    strength = plan['params'].get('strength', 0.7)

    def fn(frame, state, shape):
        mask = ensure_mask(state, shape, feather)
        dimmed = (frame * (1 - strength)).astype(np.uint8)
        return (frame * mask + dimmed * (1 - mask)).astype(np.uint8)

    return run_moviepy(plan, fn, output)


def render_blur_background(plan: Dict[str, Any], output: Path):
    ksize = int(plan['params'].get('ksize', 21))
    if ksize % 2 == 0:
        ksize += 1

    def fn(frame, state, shape):
        mask = ensure_mask(state, shape, 25)
        blurred = cv2.GaussianBlur(frame, (ksize, ksize), 0)
        return (frame * mask + blurred * (1 - mask)).astype(np.uint8)

    return run_moviepy(plan, fn, output)


def render_pixelate(plan: Dict[str, Any], output: Path):
    block = int(plan['params'].get('block', 20))

    def fn(frame, state, shape):
        mask = ensure_mask(state, shape, 5)
        h, w, _ = frame.shape
        small = cv2.resize(frame, (max(1, w // block), max(1, h // block)), interpolation=cv2.INTER_LINEAR)
        pixelated = cv2.resize(small, (w, h), interpolation=cv2.INTER_NEAREST)
        return (frame * (1 - mask) + pixelated * mask).astype(np.uint8)

    return run_moviepy(plan, fn, output)


def render_autoreframe(plan: Dict[str, Any], output: Path):
    aspect = plan['params'].get('aspect', '9:16')
    w_ratio, h_ratio = [int(x) for x in aspect.split(':')]
    target_aspect = w_ratio / h_ratio
    W, H = plan['frame_size']

    def fn(frame, state, shape):
        cx, cy = state['center']
        cur_aspect = W / H
        if cur_aspect > target_aspect:
            crop_w = H * target_aspect
            crop_h = H
        else:
            crop_w = W
            crop_h = W / target_aspect
        x1 = np.clip(cx - crop_w / 2, 0, W - crop_w)
        y1 = np.clip(cy - crop_h / 2, 0, H - crop_h)
        cropped = frame[int(y1):int(y1 + crop_h), int(x1):int(x1 + crop_w)]
        target_h = 720
        target_w = int(target_h * target_aspect)
        return cv2.resize(cropped, (target_w, target_h))

    return run_moviepy(plan, fn, output)


def render_callout(plan: Dict[str, Any], output: Path):
    label = plan['params'].get('label', plan['object'])

    def fn(frame, state, shape):
        frame_out = frame.copy()
        cx, cy = map(int, state['center'])
        anchor = (np.clip(cx + 100, 0, frame.shape[1] - 1), np.clip(cy - 100, 0, frame.shape[0] - 1))
        cv2.line(frame_out, (cx, cy), anchor, (255, 255, 255), 2)
        cv2.circle(frame_out, (cx, cy), 6, (0, 255, 0), -1)
        box_w, box_h = 160, 60
        x1 = np.clip(anchor[0], 0, frame.shape[1] - box_w - 1)
        y1 = np.clip(anchor[1], 0, frame.shape[0] - box_h - 1)
        x2, y2 = x1 + box_w, y1 + box_h
        overlay = frame_out.copy()
        cv2.rectangle(overlay, (x1, y1), (x2, y2), (0, 0, 0), -1)
        cv2.addWeighted(overlay, 0.65, frame_out, 0.35, 0, frame_out)
        cv2.rectangle(frame_out, (x1, y1), (x2, y2), (255, 255, 255), 2)
        cv2.putText(frame_out, label, (x1 + 12, y1 + 35), cv2.FONT_HERSHEY_SIMPLEX, 0.75, (255, 255, 255), 2)
        return frame_out

    return run_moviepy(plan, fn, output)


def render_pip(plan: Dict[str, Any], output: Path):
    scale = plan['params'].get('scale', 1.5)
    radius = int(plan['params'].get('radius', 120))

    def fn(frame, state, shape):
        frame_out = frame.copy()
        h, w, _ = frame_out.shape
        cx, cy = map(int, state['center'])
        x1 = max(cx - radius, 0)
        y1 = max(cy - radius, 0)
        x2 = min(cx + radius, w - 1)
        y2 = min(cy + radius, h - 1)
        patch = frame_out[y1:y2, x1:x2]
        if patch.size == 0:
            return frame_out
        pip = cv2.resize(patch, (max(1, int(patch.shape[1] * scale)), max(1, int(patch.shape[0] * scale))))
        pip_h, pip_w = pip.shape[:2]
        target_x = int(np.clip(min(w - pip_w - 20, max(20, cx + radius)), 0, max(0, w - pip_w)))
        target_y = int(np.clip(max(20, cy - radius - pip_h), 0, max(0, h - pip_h)))
        frame_out[target_y:target_y + pip_h, target_x:target_x + pip_w] = pip
        cv2.rectangle(frame_out, (target_x, target_y), (target_x + pip_w, target_y + pip_h), (255, 255, 255), 2)
        return frame_out

    return run_moviepy(plan, fn, output)


def render_path(plan: Dict[str, Any], output: Path):
    points = np.array([item['center'] for item in plan['timeline']], dtype=np.int32)

    def fn(frame, state, shape):
        frame_out = frame.copy()
        cv2.polylines(frame_out, [points], False, (0, 255, 255), 4)
        return frame_out

    return run_moviepy(plan, fn, output)

RENDERERS = {
    'ZoomFollow': render_zoom_follow,
    'Spotlight': render_spotlight,
    'BlurBackground': render_blur_background,
    'PixelateObject': render_pixelate,
    'AutoReframe': render_autoreframe,
    'Callout': render_callout,
    'PiPMagnifier': render_pip,
    'PathOverlay': render_path
}


def render_effect(plan: Dict[str, Any], name: Optional[str] = None, preview: bool = True) -> Path:
    effect = name or plan['effect']
    output = EXPORT_DIR / f'out_{effect.lower()}.mp4'
    renderer = RENDERERS[effect]
    renderer(plan, output)
    if preview:
        display(Video(str(output)))
    return output


In [10]:

#@title Render demo effects (ZoomFollow, BlurBackground, Callout)
zoom_plan, blur_plan, callout_plan = effect_plans
zoom_path = render_effect(zoom_plan, 'ZoomFollow')
blur_path = render_effect(blur_plan, 'BlurBackground')
callout_path = render_effect(callout_plan, 'Callout')
zoom_path, blur_path, callout_path


(PosixPath('/content/exports/out_zoomfollow.mp4'),
 PosixPath('/content/exports/out_blurbackground.mp4'),
 PosixPath('/content/exports/out_callout.mp4'))


## Visual sanity checks & analytics
Charts complement the video preview: class counts over time plus smoothed center/scale curves. Saved under `/content/exports/`.


In [11]:

#@title Generate plots
class Plotter:
    def __init__(self, tracks: Dict[str, Any], plans: List[Dict[str, Any]]):
        self.tracks = tracks
        self.df = get_tracks_df(tracks)
        self.plans = plans

    def class_counts(self) -> Path:
        if self.df.empty:
            raise ValueError('No detections to plot')
        df = self.df.copy()
        df['second'] = df['t'].round(1)
        counts = df.groupby(['second', 'cls']).size().reset_index(name='detections')
        fig, ax = plt.subplots(figsize=(8, 4))
        for cls, group in counts.groupby('cls'):
            ax.plot(group['second'], group['detections'], label=cls)
        ax.set_xlabel('Time (s)')
        ax.set_ylabel('Detections')
        ax.set_title('Class counts over time')
        ax.legend()
        path = EXPORT_DIR / 'plot_class_counts.png'
        fig.tight_layout()
        fig.savefig(path)
        plt.close(fig)
        return path

    def plan_curves(self, plan: Dict[str, Any]) -> Path:
        times = [pt['t'] for pt in plan['timeline']]
        centers_x = [pt['center'][0] for pt in plan['timeline']]
        centers_y = [pt['center'][1] for pt in plan['timeline']]
        scales = [pt['scale'] for pt in plan['timeline']]
        fig, ax = plt.subplots(3, 1, figsize=(8, 8), sharex=True)
        ax[0].plot(times, centers_x)
        ax[0].set_ylabel('Center X')
        ax[1].plot(times, centers_y, color='orange')
        ax[1].set_ylabel('Center Y')
        ax[2].plot(times, scales, color='green')
        ax[2].set_ylabel('Scale %')
        ax[2].set_xlabel('Time (s)')
        fig.suptitle(f"Plan curves · {plan['effect']} ({plan['object']})")
        path = EXPORT_DIR / f"plot_{plan['effect'].lower()}.png"
        fig.tight_layout()
        fig.savefig(path)
        plt.close(fig)
        return path

plotter = Plotter(tracks_data, effect_plans)
class_plot = plotter.class_counts()
plan_plots = [plotter.plan_curves(plan) for plan in effect_plans]
class_plot, plan_plots


(PosixPath('/content/exports/plot_class_counts.png'),
 [PosixPath('/content/exports/plot_zoomfollow.png'),
  PosixPath('/content/exports/plot_blurbackground.png'),
  PosixPath('/content/exports/plot_callout.png')])


## Simple chat-like UI
ipywidgets controls for Analyze → Plan → Render plus preset prompts.


In [17]:

#@title Chat-style widgets
state = {
    'tracks': tracks_data,
    'plans': effect_plans
}

video_dropdown = widgets.Dropdown(options=list(CLIP_PATHS.keys()), description='Clip', value='clipA')
effect_dropdown = widgets.Dropdown(options=list(RENDERERS.keys()), description='Effect', value='ZoomFollow')
prompt_box = widgets.Textarea(value=demo_commands[0], description='Command', layout=widgets.Layout(width='80%', height='80px'))
output_area = widgets.Output()

preset_buttons = [
    widgets.Button(description='Zoom follow person'),
    widgets.Button(description='Blur speaker'),
    widgets.Button(description='Callout speaker')
]

preset_texts = demo_commands


def on_preset_click(idx):
    prompt_box.value = preset_texts[idx]

for i, btn in enumerate(preset_buttons):
    btn.on_click(lambda b, idx=i: on_preset_click(idx))

analyze_btn = widgets.Button(description='Analyze', button_style='info')
plan_btn = widgets.Button(description='Plan', button_style='warning')
render_btn = widgets.Button(description='Render', button_style='success')


def handle_analyze(_):
    output_area.clear_output()
    with output_area:
        clip_key = video_dropdown.value
        state['tracks'] = detect_and_track(str(CLIP_PATHS[clip_key]), use_seg=True, save_json=True)
        print('Recomputed tracks for', clip_key)


def handle_plan(_):
    output_area.clear_output()
    with output_area:
        cmds = parse_nl_to_dsl(prompt_box.value, state['tracks']['duration'])
        plans = [plan_effect(cmd, state['tracks']) for cmd in cmds]
        state['plans'] = plans
        save_keyframes(plans)
        print(f'Planned {len(plans)} effect(s)')


def handle_render(_):
    output_area.clear_output()
    with output_area:
        plan = next((p for p in state['plans'] if p['effect'] == effect_dropdown.value), state['plans'][0])
        path = render_effect(plan, effect_dropdown.value)
        print('Rendered →', path)

analyze_btn.on_click(handle_analyze)
plan_btn.on_click(handle_plan)
render_btn.on_click(handle_render)

ui = widgets.VBox([
    widgets.HBox([video_dropdown, effect_dropdown]),
    prompt_box,
    widgets.HBox(preset_buttons),
    widgets.HBox([analyze_btn, plan_btn, render_btn]),
    output_area
])
display(ui)


VBox(children=(HBox(children=(Dropdown(description='Clip', options=('clipA',), value='clipA'), Dropdown(descri…


## Optional FastAPI shim
A thin service layer showing how to reuse the notebook helpers over HTTP (Analyze → Plan → Render).


In [13]:

#@title FastAPI stub (service contract)
from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI(title='Object Tracking Effects API')

class AnalyzeRequest(BaseModel):
    video_path: str
    classes: Optional[List[str]] = None
    tracker: str = 'bytetrack'
    use_seg: bool = False

class PlanRequest(BaseModel):
    command: str
    video_duration: float

class RenderRequest(BaseModel):
    plan: Dict[str, Any]
    effect: Optional[str] = None

@app.post('/analyze')
def analyze(req: AnalyzeRequest):
    data = detect_and_track(req.video_path, req.classes, req.tracker, use_seg=req.use_seg)
    return data

@app.post('/plan')
def plan(req: PlanRequest):
    cmds = parse_nl_to_dsl(req.command, req.video_duration)
    plans = [plan_effect(cmd, tracks_data) for cmd in cmds]
    save_keyframes(plans)
    return {'plans': plans}

@app.post('/render')
def render(req: RenderRequest):
    path = render_effect(req.plan, req.effect, preview=False)
    return {'output': str(path)}

print('FastAPI app ready → launch with: uvicorn.run(app, host="0.0.0.0", port=8000)')


FastAPI app ready → launch with: uvicorn.run(app, host="0.0.0.0", port=8000)
