# Phase 2 — Optical Flow Basics

Compute dense optical flow and derive obstacle risk and openness signals.

## Imports and setup

In [None]:
from pathlib import Path
import sys
import time

import cv2
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm

from src.utils import load_config, ensure_demo_video, open_capture, iter_frames, release_capture
from src.perception_flow import (
    compute_optical_flow,
    compute_flow_magnitude,
    compute_obstacle_risk,
    compute_openness,
    visualize_flow,
    visualize_flow_hsv,
)

config = load_config("configs/default.yaml")
video_path = ensure_demo_video(config)
print(f"Video: {video_path}")

## Test optical flow on a pair of frames

In [None]:
# Read two consecutive frames
cap = open_capture(config)
ok1, frame1 = cap.read()
ok2, frame2 = cap.read()
release_capture(cap)

if not (ok1 and ok2):
    raise RuntimeError("Failed to read frames")

# Convert to grayscale for flow computation
gray1 = cv2.cvtColor(frame1, cv2.COLOR_BGR2GRAY)
gray2 = cv2.cvtColor(frame2, cv2.COLOR_BGR2GRAY)

# Compute optical flow
flow = compute_optical_flow(gray1, gray2, config)
print(f"Flow shape: {flow.shape}")
print(f"Flow range: [{flow.min():.2f}, {flow.max():.2f}]")

## Visualize optical flow

In [None]:
# Convert frames to RGB for visualization
frame1_rgb = cv2.cvtColor(frame1, cv2.COLOR_BGR2RGB)
frame2_rgb = cv2.cvtColor(frame2, cv2.COLOR_BGR2RGB)

# Visualize flow with arrows
flow_arrows = visualize_flow(frame2_rgb, flow, step=20)

# Visualize flow with HSV color coding
flow_hsv = visualize_flow_hsv(flow)

# Visualize flow magnitude
magnitude = compute_flow_magnitude(flow)

fig, axes = plt.subplots(2, 2, figsize=(14, 10))
axes[0, 0].imshow(frame1_rgb)
axes[0, 0].set_title("Frame 1")
axes[0, 0].axis("off")

axes[0, 1].imshow(frame2_rgb)
axes[0, 1].set_title("Frame 2")
axes[0, 1].axis("off")

axes[1, 0].imshow(flow_arrows)
axes[1, 0].set_title("Optical Flow (Arrows)")
axes[1, 0].axis("off")

im = axes[1, 1].imshow(magnitude, cmap="hot")
axes[1, 1].set_title("Flow Magnitude")
axes[1, 1].axis("off")
plt.colorbar(im, ax=axes[1, 1])

plt.tight_layout()
plt.show()

## Compute obstacle risk and openness signals

In [None]:
# Compute risk and openness
risk_score = compute_obstacle_risk(flow, config)
openness_score, preferred_direction = compute_openness(flow, config)

print(f"Obstacle Risk: {risk_score:.3f}")
print(f"Openness Score: {openness_score:.3f}")
print(f"Preferred Direction: {preferred_direction}")

# Visualize central region used for risk computation
h, w = magnitude.shape
central_region = config["risk"]["central_region"]
x_start = int(w * central_region[0])
x_end = int(w * central_region[1])

vis_frame = frame2_rgb.copy()
cv2.rectangle(vis_frame, (x_start, 0), (x_end, h), (255, 0, 0), 2)
cv2.line(vis_frame, (w // 2, 0), (w // 2, h), (0, 255, 0), 2)

plt.figure(figsize=(10, 6))
plt.imshow(vis_frame)
plt.title(f"Risk Region (blue) | Risk={risk_score:.3f} | Openness={openness_score:.3f} | Direction={preferred_direction}")
plt.axis("off")
plt.show()

## Test on full video sequence

In [None]:
# Process video and collect signals
cap = open_capture(config)
max_frames = config["video"].get("max_frames") or 150  # Limit for testing

risk_scores = []
openness_scores = []
preferred_directions = []

prev_gray = None
frame_idx = 0

for idx, frame in tqdm(iter_frames(cap, max_frames=max_frames, convert_rgb=False), total=max_frames):
    curr_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    
    if prev_gray is not None:
        flow = compute_optical_flow(prev_gray, curr_gray, config)
        risk = compute_obstacle_risk(flow, config)
        openness, direction = compute_openness(flow, config)
        
        risk_scores.append(risk)
        openness_scores.append(openness)
        preferred_directions.append(1 if direction == "right" else -1)
    
    prev_gray = curr_gray
    frame_idx += 1

release_capture(cap)

print(f"Processed {frame_idx} frames")
print(f"Risk range: [{min(risk_scores):.3f}, {max(risk_scores):.3f}]")
print(f"Openness range: [{min(openness_scores):.3f}, {max(openness_scores):.3f}]")

## Plot signals over time

In [None]:
fig, axes = plt.subplots(3, 1, figsize=(12, 8), sharex=True)

frames = np.arange(len(risk_scores))

# Risk scores
axes[0].plot(frames, risk_scores, label="Risk Score", color="red")
axes[0].axhline(config["risk"]["thresholds"]["avoid"], color="orange", linestyle="--", label="Avoid Threshold")
axes[0].axhline(config["risk"]["thresholds"]["stop"], color="darkred", linestyle="--", label="Stop Threshold")
axes[0].set_ylabel("Risk Score")
axes[0].set_ylim([0, 1])
axes[0].legend()
axes[0].grid(True, alpha=0.3)

# Openness scores
axes[1].plot(frames, openness_scores, label="Openness Score", color="blue")
axes[1].axhline(0, color="black", linestyle="-", linewidth=0.5)
axes[1].set_ylabel("Openness (L-R)")
axes[1].legend()
axes[1].grid(True, alpha=0.3)

# Preferred direction
axes[2].plot(frames, preferred_directions, label="Preferred Direction", color="green", drawstyle="steps-post")
axes[2].set_ylabel("Direction (±1)")
axes[2].set_xlabel("Frame")
axes[2].set_ylim([-1.5, 1.5])
axes[2].set_yticks([-1, 0, 1])
axes[2].set_yticklabels(["Left", "Center", "Right"])
axes[2].legend()
axes[2].grid(True, alpha=0.3)

plt.suptitle("Optical Flow-Based Perception Signals")
plt.tight_layout()
plt.show()

## Verification

Phase 2 verification:
- Optical flow computation works on frame pairs
- Flow magnitude is visualized correctly
- Risk and openness signals are computed and show non-trivial variation
- Signals are plotted over time and respond to video content