
# Track B – Temporal Reasoning (Notebook)

This notebook uses short, domain‑specific **synthetic clips** (and optional real videos) to evaluate **temporal reasoning**:
- **Step‑order** recognition & verification
- **Duration** measurement / hold‑time estimation
- **Repetition / frequency** counting
- **Next‑action prediction** (simple motion extrapolation)

It produces a summary table of **sequence accuracy**, **timings**, and **example reasoning outputs**.


In [1]:

# --- Setup
import os, math, time, json, itertools, numpy as np, pandas as pd, cv2
from dataclasses import dataclass
from typing import List, Tuple, Dict

# Display utilities
from IPython.display import Video, display, HTML

# Ensure deterministic behavior
np.random.seed(7)

print("OpenCV", cv2.__version__)


OpenCV 4.12.0


## 1) Generate a tiny synthetic test video (steps, durations, repetitions)

In [2]:

def make_synthetic_video(path="synthetic_trackB.mp4", fps=20, size=(320,240)):
    W,H = size
    fourcc = cv2.VideoWriter_fourcc(*"mp4v")
    out = cv2.VideoWriter(path, fourcc, fps, (W,H))

    # Scenario:
    # Step A (blue bg)  -> 1.5s
    # Step B (green bg) + moving box repeating 3 cycles (left<->right), total 2.5s
    # Step C (red bg) hold 1.0s
    frames = 0
    # Step A
    for i in range(int(1.5*fps)):
        frame = np.zeros((H,W,3), np.uint8); frame[:]= (255,0,0)[::-1]  # blue
        cv2.putText(frame,"STEP A",(10,30),cv2.FONT_HERSHEY_SIMPLEX,1,(255,255,255),2,cv2.LINE_AA)
        out.write(frame); frames+=1

    # Step B with oscillating box (3 cycles)
    period = int(0.8*fps)  # frames per half-cycle
    total_B = int(2.5*fps)
    for i in range(total_B):
        frame = np.zeros((H,W,3), np.uint8); frame[:]= (0,255,0)[::-1]  # green
        t = i
        # horizontal oscillation using triangle wave
        x = int((W-60) * abs(((t/period)%2)-1))
        cv2.rectangle(frame,(x,80),(x+60,140),(0,0,0),-1)
        cv2.putText(frame,"STEP B",(10,30),cv2.FONT_HERSHEY_SIMPLEX,1,(0,0,0),2,cv2.LINE_AA)
        out.write(frame); frames+=1

    # Step C
    for i in range(int(1.0*fps)):
        frame = np.zeros((H,W,3), np.uint8); frame[:]= (0,0,255)[::-1]  # red
        cv2.putText(frame,"STEP C",(10,30),cv2.FONT_HERSHEY_SIMPLEX,1,(255,255,255),2,cv2.LINE_AA)
        out.write(frame); frames+=1

    out.release()
    return path, fps, frames

vid_path, fps, nframes = make_synthetic_video()
display(Video(vid_path, embed=True, width=480))
print("Saved:", vid_path, "fps:", fps, "frames:", nframes)


Saved: synthetic_trackB.mp4 fps: 20 frames: 100


## 2) Frame sampling & basic temporal signals

In [3]:

@dataclass
class TemporalSignals:
    mean_rgb: np.ndarray        # (N,3)
    gray_mean: np.ndarray       # (N,)
    gray_hist: np.ndarray       # (N,32)
    motion_mag: np.ndarray      # (N,)

def sample_video_frames(path, stride=1):
    cap = cv2.VideoCapture(path)
    if not cap.isOpened():
        raise RuntimeError(f"Cannot open video: {path}")
    fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
    W  = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    H  = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    frames = []
    grays  = []
    means  = []
    hists  = []
    motion = []
    prev_gray = None

    idx = -1
    while True:
        ret, frame = cap.read()
        if not ret: break
        idx += 1
        if idx % stride != 0: continue

        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        grays.append(gray)
        means.append(np.mean(frame.reshape(-1,3), axis=0))
        hist = cv2.calcHist([gray],[0],None,[32],[0,256]); cv2.normalize(hist, hist)
        hists.append(hist.squeeze())

        if prev_gray is None:
            motion.append(0.0)
        else:
            flow = cv2.calcOpticalFlowFarneback(prev_gray, gray, None, 0.5, 3, 21, 3, 5, 1.2, 0)
            mag,_ = cv2.cartToPolar(flow[...,0], flow[...,1])
            motion.append(float(np.mean(mag)))
        prev_gray = gray

        frames.append(frame)

    cap.release()
    means = np.array(means)
    gray_mean = np.array([g.mean() for g in grays])
    gray_hist = np.array(hists)
    motion_mag = np.array(motion)
    return frames, fps/stride, TemporalSignals(means, gray_mean, gray_hist, motion_mag)
    
frames, eff_fps, sig = sample_video_frames(vid_path, stride=1)
print("Sampled", len(frames), "frames at effective fps:", eff_fps)


Sampled 100 frames at effective fps: 20.0


## 3) Step segmentation (scene/step changes)

In [4]:

def detect_step_boundaries(mean_rgb, smooth=5, thr=25.0):
    # Smooth channel-wise then compute L2 change
    import scipy.ndimage as ndi
    mr = ndi.uniform_filter1d(mean_rgb[:,0], size=smooth, mode="nearest")
    mg = ndi.uniform_filter1d(mean_rgb[:,1], size=smooth, mode="nearest")
    mb = ndi.uniform_filter1d(mean_rgb[:,2], size=smooth, mode="nearest")
    dif = np.sqrt(np.diff(mr)**2 + np.diff(mg)**2 + np.diff(mb)**2)
    # Threshold & non-maximum suppression
    peaks = np.where(dif > thr)[0] + 1
    # De-duplicate close boundaries
    pruned = []
    for p in peaks:
        if not pruned or (p - pruned[-1]) > 5:
            pruned.append(p)
    boundaries = [0] + pruned + [len(mean_rgb)]
    return boundaries, dif

# Install dependency for smoothing (scipy) if missing
try:
    import scipy
except Exception:
    import sys, subprocess
    subprocess.check_call([sys.executable, "-m", "pip", "install", "scipy", "-q"])

bounds, dif = detect_step_boundaries(sig.mean_rgb, smooth=5, thr=20.0)
print("Boundaries (frame indices):", bounds)
durations = np.diff(bounds) / eff_fps  # seconds
print("Durations (s):", durations, "sum:", durations.sum())


Boundaries (frame indices): [0, np.int64(28), np.int64(78), 100]
Durations (s): [1.4 2.5 1.1] sum: 5.0


## 4) Step order recognition & verification

In [5]:

# Heuristic step label via dominant color channel
def label_step(mean_rgb_slice):
    m = mean_rgb_slice.mean(axis=0)  # average color over slice
    idx = int(np.argmax(m))          # 0=R,1=G,2=B in BGR order? OpenCV uses BGR
    # OpenCV means were BGR; convert to RGB-like labels
    # Channels correspond to: [B,G,R]
    lab = ["BLUE","GREEN","RED"][idx]
    return lab

labels = []
for s,e in zip(bounds[:-1], bounds[1:]):
    labels.append(label_step(sig.mean_rgb[s:e]))

print("Detected labels:", labels)

# Compare with expected ["BLUE","GREEN","RED"]
expected = ["BLUE","GREEN","RED"]
order_correct = labels == expected
print("Order correct?", order_correct)


Detected labels: ['RED', 'GREEN', 'BLUE']
Order correct? False


## 5) Duration measurement / hold‑time estimation

In [6]:

durations_dict = {lab: dur for lab, dur in zip(labels, durations)}
df_durations = pd.DataFrame({
    "step": labels,
    "duration_sec": durations
})
display(df_durations)

# Expected durations: A=1.5s (BLUE), B=2.5s (GREEN), C=1.0s (RED)
expected_durations = {"BLUE":1.5, "GREEN":2.5, "RED":1.0}
abs_err = {k: abs(durations_dict.get(k, np.nan) - v) for k,v in expected_durations.items()}
mae = np.nanmean(list(abs_err.values()))
print("Absolute duration errors (s):", abs_err, "MAE:", mae)


Unnamed: 0,step,duration_sec
0,RED,1.4
1,GREEN,2.5
2,BLUE,1.1


Absolute duration errors (s): {'BLUE': np.float64(0.3999999999999999), 'GREEN': np.float64(0.0), 'RED': np.float64(0.3999999999999999)} MAE: 0.2666666666666666


## 6) Repetition / frequency counting (motion cycles)

In [7]:

from scipy.signal import find_peaks

# Focus on Step B region for repetition counting
idx_B = labels.index("GREEN")
sB, eB = bounds[idx_B], bounds[idx_B+1]
motion_B = sig.motion_mag[sB:eB]
tB = np.arange(len(motion_B)) / eff_fps

# Peak counting on smoothed motion magnitude
motion_smooth = cv2.GaussianBlur(motion_B.reshape(-1,1), (9,1), 0).ravel()
peaks, _ = find_peaks(motion_smooth, distance=int(0.2*eff_fps), prominence=np.percentile(motion_smooth,60))
reps = len(peaks)//2  # two peaks per left-right cycle (roughly)
print("Estimated cycles (repetitions) in Step B:", reps)
print("Expected ~3 cycles")


Estimated cycles (repetitions) in Step B: 0
Expected ~3 cycles


## 7) Predict upcoming action/outcome (simple extrapolation)

In [8]:

# Track box x-position by template matching a black rectangle
def estimate_box_x(frames_slice):
    xs = []
    for fr in frames_slice:
        gray = cv2.cvtColor(fr, cv2.COLOR_BGR2GRAY)
        # find darkest stripe row (rectangle area)
        row = gray[80:140].mean(axis=0)
        x = int(np.argmin(row))  # leftmost dark area
        xs.append(x)
    return np.array(xs)

xs = estimate_box_x(frames[sB:eB])
# Predict next direction by last velocity sign
if len(xs) >= 3:
    v = np.diff(xs)
    pred = "moving right" if v[-1] > 0 else "moving left"
else:
    pred = "unknown"

print("Predicted next motion:", pred)


Predicted next motion: moving left


## 8) (Optional) VLM reasoning stub over a sampled batch

In [9]:

# You can replace this stub with your multi-backend VLM from Track A.
def vlm_stub_reasoning(frames_batch):
    # Simple descriptor to mimic a caption
    mean_gray = float(np.mean([cv2.cvtColor(f, cv2.COLOR_BGR2GRAY).mean() for f in frames_batch]))
    return f"Stub: observed color step changes; mean brightness {mean_gray:.1f}. Likely middle step includes repetitive lateral motion."

batch = frames[::max(1, len(frames)//8)][:8]
vlm_caption = vlm_stub_reasoning(batch)
print("VLM (stub) output:", vlm_caption)


VLM (stub) output: Stub: observed color step changes; mean brightness 102.7. Likely middle step includes repetitive lateral motion.


## 9) Scoring & summary table

In [10]:

summary = {
    "order_correct": bool(order_correct),
    "BLUE_duration_est_s": float(durations_dict.get("BLUE", np.nan)),
    "GREEN_duration_est_s": float(durations_dict.get("GREEN", np.nan)),
    "RED_duration_est_s": float(durations_dict.get("RED", np.nan)),
    "duration_MAE_s": float(mae),
    "reps_est_GREEN": int(reps),
    "pred_next_motion": pred,
    "vlm_stub_caption": vlm_caption
}
df_summary = pd.DataFrame([summary])
display(df_summary)
df_summary.to_csv("trackB_temporal_summary.csv", index=False)
print("Saved: trackB_temporal_summary.csv")


Unnamed: 0,order_correct,BLUE_duration_est_s,GREEN_duration_est_s,RED_duration_est_s,duration_MAE_s,reps_est_GREEN,pred_next_motion,vlm_stub_caption
0,False,1.1,2.5,1.4,0.266667,0,moving left,Stub: observed color step changes; mean bright...


Saved: trackB_temporal_summary.csv
