# Physical AI-Driven Action Sequence Analysis and MDP Modeling Using NVIDIA Cosmos Reason

---
# Part A: Video Recording

1) Only use one hand and manipulate one object at a time.
2) Do not try to do the same order as the other people! Every person can pick a
different sequence of actions to complete the task.
3) Try to make your hand gestures obvious when grasping or releasing an object.

In [None]:
%pip install torch torchvision --index-url https://download.pytorch.org/whl/cu124 -q
%pip install transformers accelerate -q
%pip install git+https://github.com/facebookresearch/segment-anything-2.git -q
%pip install opencv-python supervision -q
%pip install matplotlib numpy Pillow tqdm -q

In [None]:
# ============================================================
# Import Libraries
# ============================================================
import os
import cv2
import torch
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
from pathlib import Path
from collections import defaultdict
from tqdm import tqdm
import json
import warnings
import shutil
warnings.filterwarnings('ignore')

from sam2.build_sam import build_sam2_video_predictor, build_sam2
from sam2.sam2_image_predictor import SAM2ImagePredictor

print(f"PyTorch version: {torch.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"GPU: {torch.cuda.get_device_name(0)}")
    print(f"VRAM: {torch.cuda.get_device_properties(0).total_memory / 1e9:.1f} GB")

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {DEVICE}")

In [None]:
# ============================================================
# Configuration
# ============================================================
VIDEO_DIR = "./demonstrations"
OUTPUT_DIR = "./demonstrations/objects_tracked"
FRAMES_DIR = os.path.join(OUTPUT_DIR, "frames")  # Temporary frames for SAM2 video
OUTPUT_VIDEOS_DIR = os.path.join(OUTPUT_DIR, "videos")
TRACKING_DATA_DIR = os.path.join(OUTPUT_DIR, "tracking_data")

os.makedirs(OUTPUT_DIR, exist_ok=True)
os.makedirs(FRAMES_DIR, exist_ok=True)
os.makedirs(OUTPUT_VIDEOS_DIR, exist_ok=True)
os.makedirs(TRACKING_DATA_DIR, exist_ok=True)

# Gather all video files
video_files = sorted([f for f in os.listdir(VIDEO_DIR) if f.endswith(('.mp4', '.avi', '.mov'))])
print(f"Found {len(video_files)} videos:")
for v in video_files:
    print(f"  - {v}")

In [None]:
# ============================================================
# Load Grounding DINO for Initial Detection
# ============================================================
from transformers import AutoProcessor, AutoModelForZeroShotObjectDetection

GDINO_MODEL_ID = "IDEA-Research/grounding-dino-base"
gdino_processor = AutoProcessor.from_pretrained(GDINO_MODEL_ID)
gdino_model = AutoModelForZeroShotObjectDetection.from_pretrained(GDINO_MODEL_ID).to(DEVICE)
gdino_model.eval()
print("Grounding DINO loaded")

In [None]:
# ============================================================
# Load SAM 2 for Video Segmentation & Tracking
# ============================================================

# SAM2 checkpoint (you already have this)
SAM2_CHECKPOINT = "sam2_hiera_large.pt"
SAM2_CONFIG = "sam2_hiera_l.yaml"

# Download if not present
if not os.path.exists(SAM2_CHECKPOINT):
    print("Downloading SAM2 checkpoint...")
    import urllib.request
    urllib.request.urlretrieve(
        "https://dl.fbaipublicfiles.com/segment_anything_2/072824/sam2_hiera_large.pt",
        SAM2_CHECKPOINT
    )

# Build video predictor for tracking
sam2_video_predictor = build_sam2_video_predictor(SAM2_CONFIG, SAM2_CHECKPOINT, device=DEVICE)

# Build image predictor for initial segmentation
sam2_model = build_sam2(SAM2_CONFIG, SAM2_CHECKPOINT, device=DEVICE)
sam2_image_predictor = SAM2ImagePredictor(sam2_model)

print("SAM 2 Video Predictor loaded")

In [None]:
# ============================================================
# Workspace/Tabletop Detection
# ============================================================

def detect_tabletop_region(image, method="color"):
    """
    Detect the tabletop/workspace region to filter out background objects.
    
    Methods:
    - "color": Detect table by dominant color (works for colored tables)
    - "lower_half": Simple heuristic - table is typically in lower portion
    - "full": No filtering, use entire frame
    
    Returns: mask where True = workspace area
    """
    h, w = image.shape[:2]
    
    if method == "lower_half":
        # Simple heuristic: table is in lower 70% of frame
        mask = np.zeros((h, w), dtype=bool)
        mask[int(h * 0.2):, :] = True
        return mask
    
    elif method == "color":
        # Detect table surface by color - typically a solid color
        # Convert to HSV for better color segmentation
        hsv = cv2.cvtColor(image, cv2.COLOR_RGB2HSV)
        
        # Detect dominant non-white/non-black regions in lower half
        lower_region = hsv[int(h * 0.5):, :]
        
        # Calculate histogram of hue values
        hist = cv2.calcHist([lower_region], [0], None, [180], [0, 180])
        dominant_hue = np.argmax(hist)
        
        # Create mask for table color (with tolerance)
        lower_bound = np.array([max(0, dominant_hue - 15), 30, 50])
        upper_bound = np.array([min(180, dominant_hue + 15), 255, 255])
        color_mask = cv2.inRange(hsv, lower_bound, upper_bound)
        
        # Combine with spatial prior (lower portion more likely to be table)
        spatial_weight = np.linspace(0.3, 1.0, h).reshape(-1, 1)
        spatial_mask = np.tile(spatial_weight, (1, w))
        
        combined = (color_mask > 0).astype(float) * spatial_mask
        mask = combined > 0.5
        
        # Clean up with morphology
        kernel = np.ones((20, 20), np.uint8)
        mask = cv2.morphologyEx(mask.astype(np.uint8), cv2.MORPH_CLOSE, kernel)
        mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel)
        
        return mask.astype(bool)
    
    else:  # "full"
        return np.ones((h, w), dtype=bool)


def is_in_workspace(box, workspace_mask, threshold=0.5):
    """
    Check if a detected object is within the workspace region.
    
    Args:
        box: [x1, y1, x2, y2] bounding box
        workspace_mask: Boolean mask of workspace region
        threshold: Minimum overlap ratio required
    
    Returns: True if object is in workspace
    """
    x1, y1, x2, y2 = map(int, box)
    x1, y1 = max(0, x1), max(0, y1)
    x2, y2 = min(workspace_mask.shape[1], x2), min(workspace_mask.shape[0], y2)
    
    if x2 <= x1 or y2 <= y1:
        return False
    
    box_region = workspace_mask[y1:y2, x1:x2]
    overlap_ratio = np.mean(box_region)
    
    return overlap_ratio >= threshold

In [None]:
# ============================================================
# Dice Detection with Grounding DINO
# ============================================================

def normalize_dice_label(raw_label):
    """
    Normalize Grounding DINO labels to canonical dice colors.
    """
    raw = raw_label.lower().strip()
    
    color_order = ["green", "red", "blue"]
    for color in color_order:
        if color in raw:
            return f"{color}_dice"
    
    if "dice" in raw or "cube" in raw:
        return "unknown_dice"
    
    return raw_label


def classify_dice_by_color(image, box):
    """
    Classify a dice by analyzing the dominant color in its bounding box.
    More reliable than relying solely on Grounding DINO labels.
    """
    x1, y1, x2, y2 = map(int, box)
    h, w = image.shape[:2]
    x1, y1 = max(0, x1), max(0, y1)
    x2, y2 = min(w, x2), min(h, y2)
    
    if x2 <= x1 or y2 <= y1:
        return "unknown_dice"
    
    roi = image[y1:y2, x1:x2]
    hsv = cv2.cvtColor(roi, cv2.COLOR_RGB2HSV)
    
    # Define color ranges in HSV
    color_ranges = {
        "red_dice": [
            (np.array([0, 100, 100]), np.array([10, 255, 255])),      # Red lower
            (np.array([160, 100, 100]), np.array([180, 255, 255]))    # Red upper
        ],
        "green_dice": [
            (np.array([35, 80, 80]), np.array([85, 255, 255]))        # Green
        ],
        "blue_dice": [
            (np.array([90, 80, 80]), np.array([130, 255, 255]))       # Blue
        ]
    }
    
    color_scores = {}
    for color_name, ranges in color_ranges.items():
        total_mask = np.zeros(hsv.shape[:2], dtype=np.uint8)
        for lower, upper in ranges:
            mask = cv2.inRange(hsv, lower, upper)
            total_mask = cv2.bitwise_or(total_mask, mask)
        color_scores[color_name] = np.sum(total_mask) / (total_mask.size * 255)
    
    # Return color with highest score if above threshold
    best_color = max(color_scores, key=color_scores.get)
    if color_scores[best_color] > 0.15:  # At least 15% of pixels match
        return best_color
    
    return "unknown_dice"


def detect_colored_dice(image, workspace_mask=None, box_threshold=0.25, text_threshold=0.2):
    """
    Detect all colored dice in an image using Grounding DINO.
    
    Args:
        image: PIL Image or numpy array
        workspace_mask: Optional mask to filter detections
        box_threshold: Confidence threshold for boxes
        text_threshold: Confidence threshold for text matching
    
    Returns: List of dicts with {box, label, score, color}
    """
    if isinstance(image, np.ndarray):
        pil_image = Image.fromarray(image)
        np_image = image
    else:
        pil_image = image
        np_image = np.array(image)
    
    # Text prompt for dice detection
    text_prompt = "green dice . red dice . blue dice . green cube . red cube . blue cube ."
    
    inputs = gdino_processor(
        images=pil_image,
        text=text_prompt,
        return_tensors="pt"
    ).to(DEVICE)
    
    with torch.no_grad():
        outputs = gdino_model(**inputs)
    
    results = gdino_processor.post_process_grounded_object_detection(
        outputs,
        inputs.input_ids,
        text_threshold=text_threshold,
        target_sizes=[pil_image.size[::-1]]
    )[0]
    
    boxes = results["boxes"].cpu().numpy()
    scores = results["scores"].cpu().numpy()
    labels = results["labels"]
    
    # Filter by confidence
    mask = scores >= box_threshold
    boxes = boxes[mask]
    scores = scores[mask]
    labels = [labels[i] for i in range(len(labels)) if mask[i]]
    
    detections = []
    for box, score, label in zip(boxes, scores, labels):
        # Filter by workspace if provided
        if workspace_mask is not None and not is_in_workspace(box, workspace_mask):
            continue
        
        # Classify color based on actual pixel values
        color = classify_dice_by_color(np_image, box)
        
        detections.append({
            "box": box,
            "score": float(score),
            "label": normalize_dice_label(label),
            "color": color
        })
    
    return detections


def apply_nms(detections, iou_threshold=0.5):
    """
    Apply non-maximum suppression to remove duplicate detections.
    """
    if len(detections) == 0:
        return detections
    
    boxes = np.array([d["box"] for d in detections])
    scores = np.array([d["score"] for d in detections])
    
    # Sort by score
    order = scores.argsort()[::-1]
    
    keep = []
    while len(order) > 0:
        i = order[0]
        keep.append(i)
        
        if len(order) == 1:
            break
        
        # Compute IoU with remaining boxes
        xx1 = np.maximum(boxes[i, 0], boxes[order[1:], 0])
        yy1 = np.maximum(boxes[i, 1], boxes[order[1:], 1])
        xx2 = np.minimum(boxes[i, 2], boxes[order[1:], 2])
        yy2 = np.minimum(boxes[i, 3], boxes[order[1:], 3])
        
        inter = np.maximum(0, xx2 - xx1) * np.maximum(0, yy2 - yy1)
        area_i = (boxes[i, 2] - boxes[i, 0]) * (boxes[i, 3] - boxes[i, 1])
        areas = (boxes[order[1:], 2] - boxes[order[1:], 0]) * (boxes[order[1:], 3] - boxes[order[1:], 1])
        iou = inter / (area_i + areas - inter + 1e-6)
        
        remaining = np.where(iou < iou_threshold)[0] + 1
        order = order[remaining]
    
    return [detections[i] for i in keep]

In [None]:
# ============================================================
# Video Frame Extraction for SAM2
# ============================================================

def extract_frames_for_sam2(video_path, output_dir, sample_rate=1):
    """
    Extract frames from video for SAM2 video predictor.
    SAM2 requires JPEG frames in a directory.
    
    Args:
        video_path: Path to input video
        output_dir: Directory to save frames
        sample_rate: Extract every Nth frame (1 = all frames)
    
    Returns: (frame_paths, fps, total_frames, frame_indices)
    """
    # Clear output directory
    if os.path.exists(output_dir):
        shutil.rmtree(output_dir)
    os.makedirs(output_dir, exist_ok=True)
    
    cap = cv2.VideoCapture(video_path)
    fps = cap.get(cv2.CAP_PROP_FPS)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    
    frame_paths = []
    frame_indices = []
    frame_idx = 0
    saved_idx = 0
    
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        
        if frame_idx % sample_rate == 0:
            # SAM2 expects sequential naming
            frame_path = os.path.join(output_dir, f"{saved_idx:06d}.jpg")
            cv2.imwrite(frame_path, frame)
            frame_paths.append(frame_path)
            frame_indices.append(frame_idx)
            saved_idx += 1
        
        frame_idx += 1
    
    cap.release()
    
    return frame_paths, fps, total_frames, frame_indices, (width, height)

In [None]:
# ============================================================
# SAM2 Video Tracking Core
# ============================================================

# Color mapping for visualization
DICE_COLORS = {
    "green_dice": (0, 255, 0),      # Green
    "red_dice": (255, 0, 0),        # Red  
    "blue_dice": (0, 0, 255),       # Blue
    "unknown_dice": (255, 255, 0),  # Yellow
}


def track_dice_in_video(video_path, sample_rate=2, workspace_method="lower_half"):
    """
    Track all colored dice cubes in a video using SAM2.
    
    Args:
        video_path: Path to input video
        sample_rate: Process every Nth frame (lower = more accurate, slower)
        workspace_method: Method to detect tabletop ("lower_half", "color", "full")
    
    Returns: Dictionary with tracking results
    """
    video_name = Path(video_path).stem
    print(f"\n{'='*60}")
    print(f"Processing: {video_name}")
    print(f"{'='*60}")
    
    # Create temporary frame directory for this video
    video_frames_dir = os.path.join(FRAMES_DIR, video_name)
    
    # Step 1: Extract frames
    print("Step 1: Extracting frames...")
    frame_paths, fps, total_frames, frame_indices, (width, height) = \
        extract_frames_for_sam2(video_path, video_frames_dir, sample_rate)
    print(f"  - Extracted {len(frame_paths)} frames (original: {total_frames} @ {fps:.1f} FPS)")
    
    # Step 2: Detect dice on first frame
    print("Step 2: Detecting dice on first frame...")
    first_frame = cv2.cvtColor(cv2.imread(frame_paths[0]), cv2.COLOR_BGR2RGB)
    
    # Detect workspace/tabletop
    workspace_mask = detect_tabletop_region(first_frame, method=workspace_method)
    
    # Detect dice
    detections = detect_colored_dice(first_frame, workspace_mask, box_threshold=0.20)
    detections = apply_nms(detections, iou_threshold=0.4)
    
    if len(detections) == 0:
        print(" No dice detected on first frame! Trying without workspace filter...")
        detections = detect_colored_dice(first_frame, None, box_threshold=0.15)
        detections = apply_nms(detections, iou_threshold=0.4)
    
    print(f"  - Found {len(detections)} dice:")
    for i, det in enumerate(detections):
        print(f"    [{i}] {det['color']} (conf: {det['score']:.2f})")
    
    if len(detections) == 0:
        print(" No dice found, skipping video")
        return None
    
    # Step 3: Initialize SAM2 video predictor
    print("Step 3: Initializing SAM2 video tracking...")
    
    with torch.inference_mode(), torch.autocast(DEVICE, dtype=torch.bfloat16):
        state = sam2_video_predictor.init_state(video_path=video_frames_dir)
        
        # Add each detected dice as a tracking object
        object_ids = []
        object_colors = {}
        
        for i, det in enumerate(detections):
            obj_id = i + 1  # SAM2 uses 1-indexed object IDs
            box = det["box"]
            
            # Add object with bounding box prompt
            _, out_obj_ids, out_mask_logits = sam2_video_predictor.add_new_points_or_box(
                inference_state=state,
                frame_idx=0,
                obj_id=obj_id,
                box=box
            )
            
            object_ids.append(obj_id)
            object_colors[obj_id] = det["color"]
        
        print(f"  - Initialized {len(object_ids)} objects for tracking")
        
        # Step 4: Propagate through video
        print("Step 4: Propagating masks through video...")
        
        # Collect all frame masks
        video_segments = {}  # {frame_idx: {obj_id: mask}}
        
        for frame_idx, obj_ids, mask_logits in sam2_video_predictor.propagate_in_video(state):
            masks = (mask_logits > 0.0).cpu().numpy()
            video_segments[frame_idx] = {}
            
            for i, obj_id in enumerate(obj_ids):
                video_segments[frame_idx][obj_id] = masks[i, 0]  # [H, W] boolean mask
        
        print(f"  - Tracked across {len(video_segments)} frames")
    
    # Compile results
    results = {
        "video_name": video_name,
        "video_path": video_path,
        "fps": fps,
        "total_frames": total_frames,
        "processed_frames": len(frame_paths),
        "sample_rate": sample_rate,
        "frame_size": (width, height),
        "frame_indices": frame_indices,
        "objects": {
            obj_id: {
                "color": object_colors[obj_id],
                "initial_box": detections[obj_id - 1]["box"].tolist()
            }
            for obj_id in object_ids
        },
        "segments": video_segments,
        "frame_paths": frame_paths
    }
    
    return results

In [None]:
# ============================================================
# Visualization & Video Output
# ============================================================

def create_tracking_video(results, output_path):
    """
    Create output video with segmentation masks overlaid.
    """
    if results is None:
        return
    
    frame_paths = results["frame_paths"]
    segments = results["segments"]
    objects = results["objects"]
    fps = results["fps"] / results["sample_rate"]  # Adjust for sampled frames
    
    # Get frame size
    first_frame = cv2.imread(frame_paths[0])
    height, width = first_frame.shape[:2]
    
    # Create video writer
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
    
    print(f"Creating output video: {output_path}")
    
    for frame_idx, frame_path in enumerate(tqdm(frame_paths, desc="  Rendering")):
        frame = cv2.imread(frame_path)
        
        if frame_idx in segments:
            for obj_id, mask in segments[frame_idx].items():
                if obj_id in objects:
                    color_name = objects[obj_id]["color"]
                    color_bgr = DICE_COLORS.get(color_name, (255, 255, 0))
                    # Convert RGB to BGR for OpenCV
                    color_bgr = (color_bgr[2], color_bgr[1], color_bgr[0])
                    
                    # Apply mask overlay
                    mask_3ch = np.stack([mask] * 3, axis=-1)
                    overlay = frame.copy()
                    overlay[mask] = color_bgr
                    frame = cv2.addWeighted(frame, 0.7, overlay, 0.3, 0)
                    
                    # Draw contour
                    contours, _ = cv2.findContours(
                        mask.astype(np.uint8), 
                        cv2.RETR_EXTERNAL, 
                        cv2.CHAIN_APPROX_SIMPLE
                    )
                    cv2.drawContours(frame, contours, -1, color_bgr, 2)
                    
                    # Add label
                    if contours:
                        M = cv2.moments(contours[0])
                        if M["m00"] > 0:
                            cx = int(M["m10"] / M["m00"])
                            cy = int(M["m01"] / M["m00"])
                            label = color_name.replace("_", " ").title()
                            cv2.putText(frame, label, (cx - 30, cy - 10),
                                       cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2)
        
        out.write(frame)
    
    out.release()
    print(f" Saved: {output_path}")


def visualize_first_frame_detections(results, save_path=None):
    """
    Visualize the initial detections on the first frame.
    """
    if results is None:
        return
    
    frame_path = results["frame_paths"][0]
    frame = cv2.cvtColor(cv2.imread(frame_path), cv2.COLOR_BGR2RGB)
    segments = results["segments"].get(0, {})
    objects = results["objects"]
    
    fig, axes = plt.subplots(1, 2, figsize=(16, 8))
    
    # Original with boxes
    ax1 = axes[0]
    ax1.imshow(frame)
    ax1.set_title(f"Initial Detections - {results['video_name']}")
    
    for obj_id, obj_info in objects.items():
        box = obj_info["initial_box"]
        color = DICE_COLORS.get(obj_info["color"], (255, 255, 0))
        # Normalize to 0-1 for matplotlib
        color_norm = tuple(c / 255 for c in color)
        
        rect = plt.Rectangle(
            (box[0], box[1]), box[2] - box[0], box[3] - box[1],
            fill=False, edgecolor=color_norm, linewidth=2
        )
        ax1.add_patch(rect)
        ax1.text(box[0], box[1] - 5, obj_info["color"].replace("_", " "),
                color=color_norm, fontsize=10, fontweight='bold')
    ax1.axis('off')
    
    # With segmentation masks
    ax2 = axes[1]
    mask_overlay = frame.copy().astype(float)
    
    for obj_id, mask in segments.items():
        if obj_id in objects:
            color = DICE_COLORS.get(objects[obj_id]["color"], (255, 255, 0))
            for c in range(3):
                mask_overlay[:, :, c] = np.where(
                    mask, 
                    mask_overlay[:, :, c] * 0.5 + color[c] * 0.5,
                    mask_overlay[:, :, c]
                )
    
    ax2.imshow(mask_overlay.astype(np.uint8))
    ax2.set_title(f"Segmentation Masks - {results['video_name']}")
    ax2.axis('off')
    
    plt.tight_layout()
    
    if save_path:
        plt.savefig(save_path, dpi=150, bbox_inches='tight')
    plt.show()

In [None]:
# ============================================================
# Save Tracking Data
# ============================================================

def save_tracking_data(results, output_dir):
    """
    Save tracking data (centroids, bounding boxes per frame) to JSON.
    """
    if results is None:
        return
    
    video_name = results["video_name"]
    
    # Extract centroid and bbox data per frame
    tracking_data = {
        "video_name": video_name,
        "fps": results["fps"],
        "total_frames": results["total_frames"],
        "sample_rate": results["sample_rate"],
        "objects": {},
        "frames": {}
    }
    
    # Object info
    for obj_id, obj_info in results["objects"].items():
        tracking_data["objects"][str(obj_id)] = {
            "color": obj_info["color"]
        }
    
    # Per-frame tracking
    for frame_idx, masks in results["segments"].items():
        original_frame_idx = results["frame_indices"][frame_idx]
        tracking_data["frames"][str(original_frame_idx)] = {}
        
        for obj_id, mask in masks.items():
            if not np.any(mask):
                continue
            
            # Get bounding box from mask
            ys, xs = np.where(mask)
            if len(xs) == 0:
                continue
            
            bbox = [int(xs.min()), int(ys.min()), int(xs.max()), int(ys.max())]
            centroid = [int(np.mean(xs)), int(np.mean(ys))]
            area = int(np.sum(mask))
            
            tracking_data["frames"][str(original_frame_idx)][str(obj_id)] = {
                "bbox": bbox,
                "centroid": centroid,
                "area": area,
                "color": results["objects"][obj_id]["color"]
            }
    
    # Save to JSON
    output_path = os.path.join(output_dir, f"{video_name}_tracking.json")
    with open(output_path, 'w') as f:
        json.dump(tracking_data, f, indent=2)
    
    print(f"Saved tracking data: {output_path}")
    return tracking_data

In [None]:
# ============================================================
# Process All Videos
# ============================================================

# Configuration
SAMPLE_RATE = 2  # Process every 2nd frame (balance speed/accuracy)
WORKSPACE_METHOD = "lower_half"  # Options: "lower_half", "color", "full"

all_results = {}

for video_file in video_files:
    video_path = os.path.join(VIDEO_DIR, video_file)
    video_name = Path(video_file).stem
    
    try:
        # Track dice in video
        results = track_dice_in_video(
            video_path, 
            sample_rate=SAMPLE_RATE,
            workspace_method=WORKSPACE_METHOD
        )
        
        if results is not None:
            all_results[video_name] = results
            
            # Visualize first frame detections
            vis_path = os.path.join(OUTPUT_DIR, f"{video_name}_detections.png")
            visualize_first_frame_detections(results, save_path=vis_path)
            
            # Create output video with tracking
            output_video_path = os.path.join(OUTPUT_VIDEOS_DIR, f"{video_name}_tracked.mp4")
            create_tracking_video(results, output_video_path)
            
            # Save tracking data
            save_tracking_data(results, TRACKING_DATA_DIR)
            
    except Exception as e:
        print(f"Error processing {video_name}: {e}")
        import traceback
        traceback.print_exc()

print(f"\n{'='*60}")
print(f"Processing complete!")
print(f"{'='*60}")
print(f"Videos processed: {len(all_results)}/{len(video_files)}")
print(f"Output videos: {OUTPUT_VIDEOS_DIR}")
print(f"Tracking data: {TRACKING_DATA_DIR}")

In [None]:
# ============================================================
# Summary Statistics
# ============================================================

print("\n" + "="*70)
print("TRACKING SUMMARY")
print("="*70)

summary_data = []
for video_name, results in all_results.items():
    objects = results["objects"]
    
    # Count by color
    color_counts = {"green_dice": 0, "red_dice": 0, "blue_dice": 0, "unknown_dice": 0}
    for obj_info in objects.values():
        color = obj_info["color"]
        if color in color_counts:
            color_counts[color] += 1
    
    summary_data.append({
        "video": video_name,
        "green": color_counts["green_dice"],
        "red": color_counts["red_dice"],
        "blue": color_counts["blue_dice"],
        "total": len(objects),
        "frames": results["processed_frames"]
    })
    
    print(f"\n{video_name}:")
    print(f"  ðŸŸ¢ Green dice: {color_counts['green_dice']}")
    print(f"  ðŸ”´ Red dice:   {color_counts['red_dice']}")
    print(f"  ðŸ”µ Blue dice:  {color_counts['blue_dice']}")
    print(f" Total tracked: {len(objects)} objects over {results['processed_frames']} frames")

# Create summary table
print("\n" + "="*70)
print("SUMMARY TABLE")
print("="*70)
print(f"{'Video':<20} {'Green':>8} {'Red':>8} {'Blue':>8} {'Total':>8} {'Frames':>8}")
print("-"*70)
for row in summary_data:
    print(f"{row['video']:<20} {row['green']:>8} {row['red']:>8} {row['blue']:>8} {row['total']:>8} {row['frames']:>8}")

In [None]:
# ============================================================
# Test on Single Video (run this first to verify setup)
# ============================================================

# Pick the first video for testing
TEST_VIDEO = video_files[0] if video_files else None

if TEST_VIDEO:
    test_video_path = os.path.join(VIDEO_DIR, TEST_VIDEO)
    
    # Track with higher sample rate for faster testing
    test_results = track_dice_in_video(
        test_video_path,
        sample_rate=5,  # Faster for testing
        workspace_method="lower_half"
    )
    
    if test_results:
        # Visualize
        visualize_first_frame_detections(test_results)
        
        # Create short test video
        test_output = os.path.join(OUTPUT_VIDEOS_DIR, f"{Path(TEST_VIDEO).stem}_test.mp4")
        create_tracking_video(test_results, test_output)
        
        print("\nTest complete! Check the output above to verify dice detection.")
        print("  If dice are detected correctly, run the 'Process All Videos' cell.")
else:
    print("No videos found in demonstrations folder!")

---
# Part B: Object and Human Action Recognition


## Import Libraries and Suppress Warnings

In [9]:
import warnings
warnings.filterwarnings("ignore", category=UserWarning, module="torchvision")
warnings.filterwarnings("ignore", message=".*video metadata.*")

from pathlib import Path
from transformers import Qwen3VLForConditionalGeneration, AutoProcessor
from qwen_vl_utils import process_vision_info
import torch
import glob
import json
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
import numpy as np
import json
from datetime import datetime
import cv2

In [12]:
def get_video_frame_count(video_path, max_frames=500):
    """
    Get the total number of frames in a given video using OpenCV.

    Args:
        Video_path: path to video

    Returns:
        int: total number of frames in given video
    """

    cap = cv2.VideoCapture(str(video_path))
    if not cap.isOpened():
        print(f"Warning: could not open video {video_path}")
        return 60
    
    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    fps = cap.get(cv2.CAP_PROP_FPS)
    duration = frame_count / fps if fps > 0 else 0

    cap.release()
    capped_count = min(frame_count -1, max_frames)

    print(f" Video: {Path(video_path).name} | Frames: {frame_count} | FPS: {fps:.2f} | Duration: {duration:.2f} secs")
    return capped_count

## Setup Model Function

Load the Cosmos Reason model and processor from Hugging Face.

In [2]:
def setup_model(model_name="nvidia/Cosmos-Reason2-8B"):
    """
    Load the Cosmos Reason model and processor
    
    Args:
        model_name: Name of the model on Hugging Face
        
    Returns:
        model, processor tuple
    """
    print(f"Loading model: {model_name}")
    print("This may take a few minutes on first run...")
    
    # Load the model
    model = Qwen3VLForConditionalGeneration.from_pretrained(
        model_name,
        torch_dtype=torch.bfloat16,
        device_map="auto"
    )
    
    # Load the processor
    processor = AutoProcessor.from_pretrained(model_name)
    
    print("Model loaded successfully!")
    return model, processor

## Analyze Video Function

Process a video file with a question, handle system prompts for reasoning, and parse the output.

In [3]:
def analyze_video(video_path, question, model, processor, nframes, enable_reasoning=True):
    """
    Analyze a video using Cosmos Reason
    
    Args:
        video_path: Path to the video file
        question: Question to ask about the video
        model: The loaded model
        processor: The loaded processor
        nframes: Number of frames to sample (default: 60, recommended)
        enable_reasoning: Whether to enable chain-of-thought reasoning
        
    Returns:
        dict with 'reasoning' and 'answer' keys
    """
    print(f"\nAnalyzing video: {video_path}")
    print(f"Question: {question}")
    print(f"Number of frames: {nframes}")
    print(f"Reasoning enabled: {enable_reasoning}")
    
    # Prepare the system prompt (with reasoning format if enabled)
    if enable_reasoning:
        system_prompt = """Answer the question in the following format:
<think>
your reasoning
</think>

<answer>
your answer
</answer>"""
    else:
        system_prompt = "You are a helpful assistant that analyzes videos."
    
    # Prepare the conversation messages
    messages = [
        {
            "role": "system",
            "content": system_prompt
        },
        {
            "role": "user",
            "content": [
                {
                    "type": "video",
                    "video": str(video_path),
                    "nframes": nframes  # Explicit frame count to avoid metadata warning
                },
                {
                    "type": "text",
                    "text": question
                }
            ]
        }
    ]
    
    # Apply chat template
    text = processor.apply_chat_template(
        messages,
        tokenize=False,
        add_generation_prompt=True
    )
    
    # Process the video and prepare inputs
    image_inputs, video_inputs = process_vision_info(messages)
    
    inputs = processor(
        text=[text],
        images=image_inputs,
        videos=video_inputs,
        padding=True,
        return_tensors="pt"
    )
    
    # Move inputs to the same device as model
    inputs = inputs.to(model.device)
    
    print("\nGenerating response...")
    
    # Generate response
    with torch.no_grad():
        generated_ids = model.generate(
            **inputs,
            max_new_tokens=4096  # Recommended to avoid truncation
        )
    
    # Trim the input tokens from the generated output
    generated_ids_trimmed = [
        out_ids[len(in_ids):] 
        for in_ids, out_ids in zip(inputs.input_ids, generated_ids)
    ]
    
    # Decode the response
    output_text = processor.batch_decode(
        generated_ids_trimmed,
        skip_special_tokens=True,
        clean_up_tokenization_spaces=False
    )[0]
    
    # Parse reasoning and answer if reasoning was enabled
    if enable_reasoning:
        reasoning = ""
        answer = ""
        
        if "<think>" in output_text and "</think>" in output_text:
            reasoning = output_text.split("<think>")[1].split("</think>")[0].strip()
        
        if "<answer>" in output_text and "</answer>" in output_text:
            answer = output_text.split("<answer>")[1].split("</answer>")[0].strip()
        elif "</think>" in output_text:
            # Sometimes the answer comes after </think> without tags
            answer = output_text.split("</think>")[1].strip()
        else:
            answer = output_text
            
        return {
            "reasoning": reasoning,
            "answer": answer,
            "full_output": output_text
        }
    else:
        return {
            "reasoning": "",
            "answer": output_text,
            "full_output": output_text
        }

## Load the Model

Instantiate the model and processor using the Cosmos Reason 2B model.

In [4]:
model, processor = setup_model("nvidia/Cosmos-Reason2-8B")

Loading model: nvidia/Cosmos-Reason2-8B
This may take a few minutes on first run...


Fetching 4 files: 100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 4/4 [00:00<00:00, 41221.66it/s]
Loading weights: 100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 750/750 [00:04<00:00, 169.45it/s, Materializing param=model.visual.pos_embed.weight]                                 


Model loaded successfully!


---

## B.1 Object Detection in All Videos

Using NVIDIA Cosmos Reason 2B to detect and identify objects in all demonstration videos. This model leverages vision-language understanding to provide detailed object detection through natural language descriptions.

In [7]:
# Get all video files from the demonstrations folder


demonstrations_path = Path("demonstrations/objects_tracked")
video_extensions = ["*.mp4", "*.mov", "*.avi", "*.mkv"]
video_files = []

for ext in video_extensions:
    video_files.extend(demonstrations_path.glob(ext))

video_files = sorted(video_files)
print(f"Found {len(video_files)} videos in demonstrations folder:")
for v in video_files:
    print(f"  - {v.name}")

Found 11 videos in demonstrations folder:
  - data10_tracked.mp4
  - data1_tracked.mp4
  - data2_tracked.mp4
  - data3_tracked.mp4
  - data4_tracked.mp4
  - data5_tracked.mp4
  - data6_tracked.mp4
  - data7_tracked.mp4
  - data8_tracked.mp4
  - data9_tracked.mp4
  - demonstration1_tracked.mp4


In [13]:
# Object Detection Prompt
OBJECT_DETECTION_PROMPT = """You are analyzing a video with colored dice on a table. Pay EXTREMELY CLOSE ATTENTION to counting.

CRITICAL INSTRUCTIONS:
1. Count EACH die individually by color (red, green, blue)
2. Watch the ENTIRE video to track ALL dice present
3. A die may be moved, stacked, or rearranged - count it only ONCE
4. State the EXACT NUMBER of each color

Analyze this video frame-by-frame and provide:

**DICE INVENTORY (Count carefully!):**
- Total number of GREEN dice: [count each green die you see]
- Total number of RED dice: [count each red die you see]  
- Total number of BLUE dice: [count each blue die you see]
- Total number of ALL dice: [sum]

**INITIAL ARRANGEMENT:**
Describe the starting positions of ALL dice from left to right or their spatial arrangement.

**OBJECTS IN SCENE:**
- Table description
- Human hands/body parts visible
- Any other objects

Be precise with your counts. If you see a die being moved, don't count it twice."""

# Run object detection on all videos
object_detection_results = {}

for i, video_path in enumerate(video_files):
    print(f"\n{'='*80}")
    print(f"Processing video {i+1}/{len(video_files)}: {video_path.name}")
    print("="*80)

    frame_count = get_video_frame_count(video_path)
    
    result = analyze_video(
        video_path=video_path,
        question=OBJECT_DETECTION_PROMPT,
        model=model,
        processor=processor,
        nframes=frame_count,
        enable_reasoning=True
    )
    
    object_detection_results[video_path.name] = {
        "reasoning": result["reasoning"],
        "answer": result["answer"],
        "full_output": result["full_output"]
    }
    
    print(f"\n Objects in {video_path.name}:")
    print("-" * 40)
    print(result["answer"][:500] + "..." if len(result["answer"]) > 500 else result["answer"])


Processing video 1/11: data10_tracked.mp4
 Video: data10_tracked.mp4 | Frames: 555 | FPS: 29.99 | Duration: 18.51 secs

Analyzing video: demonstrations/objects_tracked/data10_tracked.mp4
Question: You are analyzing a video with colored dice on a table. Pay EXTREMELY CLOSE ATTENTION to counting.

CRITICAL INSTRUCTIONS:
1. Count EACH die individually by color (red, green, blue)
2. Watch the ENTIRE video to track ALL dice present
3. A die may be moved, stacked, or rearranged - count it only ONCE
4. State the EXACT NUMBER of each color

Analyze this video frame-by-frame and provide:

**DICE INVENTORY (Count carefully!):**
- Total number of GREEN dice: [count each green die you see]
- Total number of RED dice: [count each red die you see]  
- Total number of BLUE dice: [count each blue die you see]
- Total number of ALL dice: [sum]

**INITIAL ARRANGEMENT:**
Describe the starting positions of ALL dice from left to right or their spatial arrangement.

**OBJECTS IN SCENE:**
- Table descriptio

In [14]:
# Display complete object detection results
print("\n" + "="*80)
print("Results")
print("="*80)

for video_name, data in object_detection_results.items():
    print(f"\n{'='*60}")
    print(f"Video: {video_name}")
    print("="*60)
    print("\nReasoning:")
    print(data["reasoning"][:800] + "..." if len(data["reasoning"]) > 800 else data["reasoning"])
    print("\nObjects Detected:")
    print(data["answer"])


Results

Video: data10_tracked.mp4

Reasoning:
Okay, let's break this down. The user provided  a video involving colored dice on a table and wants me to analyze it according to specific instructions. My task is to count the number of each colored die (green, red, blue) present in the video while paying close attention to details.

First, I need to parse through the video carefully. The key points shown are that there are three dice initially: Green Dice, Red Dice, and Blue Dice. Each has distinct colors and labels. The person interacts with them by moving them around but doesn't add or remove any dice during the process. 

The actions shown involve picking up the Red Dice, placing it back, then stacking it on the Blue Dice, followed by adding the Green Dice on top of both. Throughout these interactions, the video shows that no addition...

Objects Detected:
{
  "dice_inventory": {
    "total_number_of_green_dice": 1,
    "total_number_of_red_dice": 1,
    "total_number_of_blue_dice": 

## B.2 Human Action Recognition

Using NVIDIA Cosmos Reason 2B to recognize actions performed in the videos (grasping, moving, releasing, stacking, etc.).

In [None]:
# Action Recognition Prompt
ACTION_RECOGNITION_PROMPT = """Analyze this video and identify ALL human actions performed required to complete the pattern of dice in this video. The dice will be lined up in a specific pattern.

For each action, provide:
1. Action name (e.g., grasping, picking up, moving, placing, releasing, stacking, pushing, pulling)
2. The object involved in the action
3. The state of the cube's pattern in a line, after each action is preformed

Focus on fine-grained manipulation actions such as:
- Reaching/approaching
- Grasping/gripping
- Lifting/picking up
- Moving/transporting
- Placing/positioning
- Releasing/letting go
- Stacking/arranging
- Adjusting/fine-tuning position

Provide a chronological list of all actions observed."""

# Run action recognition on all videos
action_recognition_results = {}

for i, video_path in enumerate(video_files):
    print(f"\n{'='*80}")
    print(f"Processing video {i+1}/{len(video_files)}: {video_path.name}")
    print("="*80)
    
    result = analyze_video(
        video_path=video_path,
        question=ACTION_RECOGNITION_PROMPT,
        model=model,
        processor=processor,
        nframes=60,
        enable_reasoning=True
    )
    
    action_recognition_results[video_path.name] = {
        "reasoning": result["reasoning"],
        "answer": result["answer"],
        "full_output": result["full_output"]
    }
    
    print(f"\n Actions in {video_path.name}:")
    print("-" * 40)
    print(result["answer"][:500] + "..." if len(result["answer"]) > 500 else result["answer"])


Processing video 1/11: data1.mov

Analyzing video: demonstrations/data1.mov
Question: Analyze this video and identify ALL human actions performed.

For each action, provide:
1. Action name (e.g., grasping, picking up, moving, placing, releasing, stacking, pushing, pulling)
2. The object involved in the action
3. Approximate timing (beginning, middle, end of video)
4. Hand used (left, right, or both)

Focus on fine-grained manipulation actions such as:
- Reaching/approaching
- Grasping/gripping
- Lifting/picking up
- Moving/transporting
- Placing/positioning
- Releasing/letting go
- Stacking/arranging
- Adjusting/fine-tuning position

Provide a chronological list of all actions observed.
Number of frames: 60
Reasoning enabled: True

Generating response...

 ACTIONS RECOGNIZED in data1.mov:
----------------------------------------
The individual stacks four dice (red, blue, green, and another green) vertically on the table using their right hand.

Processing video 2/11: data10.mov

Anal

KeyboardInterrupt: 

In [None]:
# Display complete action recognition results
print("\n" + "="*80)
print("Results")
print("="*80)

for video_name, data in action_recognition_results.items():
    print(f"\n{'='*60}")
    print(f"Video: {video_name}")
    print("="*60)
    print("\nReasoning:")
    print(data["reasoning"][:800] + "..." if len(data["reasoning"]) > 800 else data["reasoning"])
    print("\nActions:")
    print(data["answer"])

---

# Part C: Automatic Generation of Sequence of Actions

## C.1 Action Sequence Generation

Using NVIDIA Cosmos Reason 2B to generate structured sequences of actions from all videos.

In [None]:
# Action Sequence Generation Prompt
ACTION_SEQUENCE_PROMPT = """Analyze this video and generate a STRUCTURED SEQUENCE of actions.

Output the sequence in the following JSON-like format for each action:
{
  "step": <step_number>,
  "action": "<action_verb>",
  "object": "<object_being_manipulated>",
  "start_state": "<state_before_action>",
  "end_state": "<state_after_action>",
  "preconditions": ["<required_conditions>"],
  "effects": ["<resulting_changes>"]
}

Use ONLY these standardized action verbs:
- REACH: Moving hand toward an object
- GRASP: Closing fingers around an object
- LIFT: Raising an object from a surface
- MOVE: Transporting an object through space
- PLACE: Positioning an object at a location
- RELEASE: Opening fingers to let go of object
- ADJUST: Fine-tuning object position
- STACK: Placing object on top of another

Generate the complete action sequence from start to finish."""

# Run action sequence generation on all videos
action_sequence_results = {}

for i, video_path in enumerate(video_files):
    print(f"\n{'='*80}")
    print(f"Processing video {i+1}/{len(video_files)}: {video_path.name}")
    print("="*80)
    
    result = analyze_video(
        video_path=video_path,
        question=ACTION_SEQUENCE_PROMPT,
        model=model,
        processor=processor,
        nframes=60,
        enable_reasoning=True
    )
    
    action_sequence_results[video_path.name] = {
        "reasoning": result["reasoning"],
        "answer": result["answer"],
        "full_output": result["full_output"]
    }
    
    print(f"\nðŸ“‹ ACTION SEQUENCE for {video_path.name}:")
    print("-" * 40)
    print(result["answer"][:600] + "..." if len(result["answer"]) > 600 else result["answer"])

In [None]:
# Display complete action sequence results
print("\n" + "="*80)
print("Results")
print("="*80)

for video_name, data in action_sequence_results.items():
    print(f"\n{'='*60}")
    print(f"Video: {video_name}")
    print("="*60)
    print("\nAction Sequence:")
    print(data["answer"])

## C.2 Markov Decision Process Design

Based on the observed action sequences, we design a Markov Decision Process (MDP) for the manipulation tasks.

In [None]:
# Define the MDP components based on observed actions

# States: Represent the configuration of objects and hand state
STATES = {
    "S0_IDLE": "Hand empty, objects on table (initial state)",
    "S1_REACHING": "Hand moving toward target object",
    "S2_GRASPING": "Hand closing around object",
    "S3_HOLDING": "Object grasped and held",
    "S4_MOVING": "Object being transported",
    "S5_POSITIONING": "Object at target location",
    "S6_RELEASING": "Hand opening to release object",
    "S7_STACKED": "Object placed on stack (goal state)",
    "S8_COMPLETE": "All objects stacked (terminal state)"
}

# Actions: Possible actions the agent can take
ACTIONS = {
    "A0_WAIT": "Do nothing, remain in current state",
    "A1_REACH": "Extend hand toward target object",
    "A2_GRASP": "Close fingers around object",
    "A3_LIFT": "Raise object from surface",
    "A4_MOVE": "Transport object to target location",
    "A5_LOWER": "Move object down toward surface",
    "A6_RELEASE": "Open fingers to let go",
    "A7_ADJUST": "Fine-tune object position"
}

# Transition Probabilities (estimated from video observations)
# Format: P(next_state | current_state, action)
TRANSITION_PROBS = {
    ("S0_IDLE", "A1_REACH"): {"S1_REACHING": 0.95, "S0_IDLE": 0.05},
    ("S1_REACHING", "A2_GRASP"): {"S2_GRASPING": 0.90, "S1_REACHING": 0.10},
    ("S2_GRASPING", "A3_LIFT"): {"S3_HOLDING": 0.95, "S2_GRASPING": 0.05},
    ("S3_HOLDING", "A4_MOVE"): {"S4_MOVING": 0.90, "S3_HOLDING": 0.10},
    ("S4_MOVING", "A5_LOWER"): {"S5_POSITIONING": 0.85, "S4_MOVING": 0.15},
    ("S5_POSITIONING", "A6_RELEASE"): {"S6_RELEASING": 0.90, "S5_POSITIONING": 0.10},
    ("S6_RELEASING", "A0_WAIT"): {"S7_STACKED": 0.95, "S0_IDLE": 0.05},
    ("S5_POSITIONING", "A7_ADJUST"): {"S5_POSITIONING": 0.70, "S7_STACKED": 0.30},
}

# Rewards
REWARDS = {
    "S7_STACKED": 10.0,      # Successfully stacked one object
    "S8_COMPLETE": 100.0,    # All objects stacked (task complete)
    "S0_IDLE": -0.1,         # Small penalty for idle
    "FAILED_GRASP": -5.0,    # Failed grasp attempt
    "DROPPED": -10.0,        # Dropped object
    "DEFAULT": -0.5          # Step cost to encourage efficiency
}

print("=" * 80)
print("MARKOV DECISION PROCESS DEFINITION")
print("=" * 80)
print("\nStates:")
for state, desc in STATES.items():
    print(f"  {state}: {desc}")

print("\nActions:")
for action, desc in ACTIONS.items():
    print(f"  {action}: {desc}")

print("\nTransition Probabilities (sample):")
for (state, action), probs in list(TRANSITION_PROBS.items())[:5]:
    print(f"  P(Â·|{state}, {action}):")
    for next_state, prob in probs.items():
        print(f"    â†’ {next_state}: {prob:.2f}")

print("\nRewards:")
for state, reward in REWARDS.items():
    print(f"  {state}: {reward:+.1f}")

In [None]:
# Generate MDP Diagram using graphviz-style text representation
# (Can be visualized with graphviz or mermaid)

# mdp_diagram = """
# MDP State Transition Diagram (Mermaid format - paste into mermaid.live):

# ```mermaid
# stateDiagram-v2
#     [*] --> S0_IDLE
    
#     S0_IDLE --> S1_REACHING : A1_REACH (0.95)
#     S0_IDLE --> S0_IDLE : A0_WAIT (1.0)
    
#     S1_REACHING --> S2_GRASPING : A2_GRASP (0.90)
#     S1_REACHING --> S1_REACHING : fail (0.10)
    
#     S2_GRASPING --> S3_HOLDING : A3_LIFT (0.95)
#     S2_GRASPING --> S0_IDLE : fail (0.05)
    
#     S3_HOLDING --> S4_MOVING : A4_MOVE (0.90)
#     S3_HOLDING --> S3_HOLDING : hold (0.10)
    
#     S4_MOVING --> S5_POSITIONING : A5_LOWER (0.85)
#     S4_MOVING --> S4_MOVING : adjust (0.15)
    
#     S5_POSITIONING --> S6_RELEASING : A6_RELEASE (0.90)
#     S5_POSITIONING --> S5_POSITIONING : A7_ADJUST (0.70)
#     S5_POSITIONING --> S7_STACKED : A7_ADJUST (0.30)
    
#     S6_RELEASING --> S7_STACKED : success (0.95)
#     S6_RELEASING --> S0_IDLE : dropped (0.05)
    
#     S7_STACKED --> S0_IDLE : next_object
#     S7_STACKED --> S8_COMPLETE : all_done
    
#     S8_COMPLETE --> [*]
    
#     note right of S7_STACKED : Reward: +10
#     note right of S8_COMPLETE : Reward: +100
# ```

# Rewards:
# - S7_STACKED (object placed): +10
# - S8_COMPLETE (all done): +100
# - Failed transitions: -5 to -10
# - Each step: -0.5 (encourages efficiency)
# """

# print(mdp_diagram)

In [None]:
# Create a visual representation of the MDP using matplotlib


fig, ax = plt.subplots(1, 1, figsize=(16, 10))

# State positions (arranged in a flow)
state_positions = {
    "S0_IDLE": (1, 5),
    "S1_REACHING": (3, 5),
    "S2_GRASPING": (5, 5),
    "S3_HOLDING": (7, 5),
    "S4_MOVING": (9, 5),
    "S5_POSITIONING": (11, 5),
    "S6_RELEASING": (13, 5),
    "S7_STACKED": (11, 2),
    "S8_COMPLETE": (13, 2),
}

# Draw states
for state, (x, y) in state_positions.items():
    if state == "S8_COMPLETE":
        color = 'lightgreen'
    elif state == "S7_STACKED":
        color = 'lightblue'
    elif state == "S0_IDLE":
        color = 'lightyellow'
    else:
        color = 'lightgray'
    
    circle = plt.Circle((x, y), 0.5, color=color, ec='black', linewidth=2)
    ax.add_patch(circle)
    
    # State label
    short_name = state.replace("S", "").replace("_", "\n")
    ax.text(x, y, short_name, ha='center', va='center', fontsize=8, fontweight='bold')

# Draw transitions (arrows)
transitions = [
    ("S0_IDLE", "S1_REACHING", "REACH"),
    ("S1_REACHING", "S2_GRASPING", "GRASP"),
    ("S2_GRASPING", "S3_HOLDING", "LIFT"),
    ("S3_HOLDING", "S4_MOVING", "MOVE"),
    ("S4_MOVING", "S5_POSITIONING", "LOWER"),
    ("S5_POSITIONING", "S6_RELEASING", "RELEASE"),
    ("S6_RELEASING", "S7_STACKED", "0.95"),
    ("S7_STACKED", "S0_IDLE", "next"),
    ("S7_STACKED", "S8_COMPLETE", "done"),
]

for start, end, label in transitions:
    x1, y1 = state_positions[start]
    x2, y2 = state_positions[end]
    
    # Calculate direction
    dx, dy = x2 - x1, y2 - y1
    dist = np.sqrt(dx**2 + dy**2)
    
    # Offset to start/end at circle edge
    offset = 0.55
    x1_adj = x1 + offset * dx / dist
    y1_adj = y1 + offset * dy / dist
    x2_adj = x2 - offset * dx / dist
    y2_adj = y2 - offset * dy / dist
    
    ax.annotate("", xy=(x2_adj, y2_adj), xytext=(x1_adj, y1_adj),
                arrowprops=dict(arrowstyle="->", color='darkblue', lw=1.5))
    
    # Label
    mid_x, mid_y = (x1 + x2) / 2, (y1 + y2) / 2 + 0.3
    ax.text(mid_x, mid_y, label, ha='center', va='bottom', fontsize=7, color='darkred')

# Add legend
legend_elements = [
    mpatches.Patch(facecolor='lightyellow', edgecolor='black', label='Initial State'),
    mpatches.Patch(facecolor='lightgray', edgecolor='black', label='Intermediate State'),
    mpatches.Patch(facecolor='lightblue', edgecolor='black', label='Reward State (+10)'),
    mpatches.Patch(facecolor='lightgreen', edgecolor='black', label='Terminal State (+100)'),
]
ax.legend(handles=legend_elements, loc='lower left', fontsize=9)

ax.set_xlim(0, 15)
ax.set_ylim(0, 7)
ax.set_aspect('equal')
ax.axis('off')
ax.set_title('Markov Decision Process for Object Manipulation Task', fontsize=14, fontweight='bold')

plt.tight_layout()
plt.savefig('mdp_diagram.png', dpi=150, bbox_inches='tight')
plt.show()

print("\nSaved as 'mdp_diagram.png'")

## Limitations Discussion

### Limitations of the Action Sequence Generation Approach:

1. **Temporal Resolution**: The model samples at a fixed FPS (4 frames/second), which may miss rapid or subtle actions.

2. **Vocabulary Constraints**: Actions are limited to predefined verbs; novel or complex manipulations may not be accurately captured.

3. **Context Dependency**: The model relies on visual cues only; it cannot infer intent, force, or tactile feedback.

4. **Generalization**: The approach is trained on specific manipulation scenarios; extending to other domains (e.g., cooking, assembly) may require prompt engineering or fine-tuning.

5. **Occlusion Handling**: When hands or objects are occluded, the model may make incorrect inferences.

6. **Multi-object Tracking**: With many similar objects, the model may confuse object identities across frames.

7. **Real-time Performance**: The current approach is not suitable for real-time applications due to inference latency.

In [None]:
# Save all results to JSON files for reference


timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

# Save object detection results
with open(f"object_detection_results_{timestamp}.json", "w") as f:
    json.dump(object_detection_results, f, indent=2)
print(f"Object detection results saved to object_detection_results_{timestamp}.json")

# Save action recognition results
with open(f"action_recognition_results_{timestamp}.json", "w") as f:
    json.dump(action_recognition_results, f, indent=2)
print(f"Action recognition results saved to action_recognition_results_{timestamp}.json")

# Save action sequence results
with open(f"action_sequence_results_{timestamp}.json", "w") as f:
    json.dump(action_sequence_results, f, indent=2)
print(f"Action sequence results saved to action_sequence_results_{timestamp}.json")

# Save MDP definition
mdp_definition = {
    "states": STATES,
    "actions": ACTIONS,
    "transition_probabilities": {f"{s}_{a}": p for (s, a), p in TRANSITION_PROBS.items()},
    "rewards": REWARDS
}
with open(f"mdp_definition_{timestamp}.json", "w") as f:
    json.dump(mdp_definition, f, indent=2)
print(f"MDP definition saved to mdp_definition_{timestamp}.json")

---

## Summary

This notebook demonstrates:

### Model Used
**NVIDIA Cosmos Reason 2B** - A vision-language model based on Qwen3-VL architecture, fine-tuned for physical world understanding and reasoning.

### Part B: Object and Human Action Recognition
- **B.1**: Object detection using natural language prompting to identify all objects in each video
- **B.2**: Action recognition to identify manipulation actions (grasp, move, place, stack, etc.)

### Part C: Automatic Generation of Sequence of Actions
- **C.1**: Structured action sequence generation in JSON format with preconditions and effects
- **C.2**: Markov Decision Process design with states, actions, transition probabilities, and rewards

### Output Files Generated
- `object_detection_results_*.json` - Object detection for all videos
- `action_recognition_results_*.json` - Action recognition for all videos
- `action_sequence_results_*.json` - Structured action sequences
- `mdp_definition_*.json` - MDP formal definition
- `mdp_diagram.png` - Visual diagram of the MDP