# Turn Signal Detection - Testing & Visualization Notebook

In [None]:
import sys
from pathlib import Path
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from matplotlib.gridspec import GridSpec
import numpy as np
import pandas as pd
from IPython.display import display, HTML, Video
import json
from collections import Counter
import logging

# Add src to path
sys.path.insert(0, str(Path.cwd() / 'src'))

from utils.config import load_config
from data import (
    load_dataset_from_config,
    create_image_loader,
    SequencePreprocessor
)
from models import load_model
from postprocess import create_postprocessor

# Setup plotting
%matplotlib inline
plt.rcParams['figure.figsize'] = (16, 8)
plt.rcParams['figure.dpi'] = 100

# Suppress some warnings
logging.getLogger('transformers').setLevel(logging.ERROR)

print("✓ Imports successful")

## 1. Configuration & Setup

In [None]:
# === CONFIGURATION ===
CONFIG_FILE = "configs/cosmos_reason1_video.yaml"

# Test sequences - add specific sequence IDs you want to test
TEST_SEQUENCES = [
    # Add your test sequence IDs here, or leave empty to use first N sequences
    # Example:
    # "2024-03-25-15-40-16_mapping_tartu/camera_fl_2_track_5",
    # "2024-03-25-15-40-16_mapping_tartu/camera_fl_2_track_10",
]

NUM_SEQUENCES = 5  # Number of sequences to load if TEST_SEQUENCES is empty

# Load config
config = load_config(CONFIG_FILE)

# Override to use test sequences
if TEST_SEQUENCES:
    config.data.sequence_filter = TEST_SEQUENCES
    config.data.max_sequences = None
else:
    config.data.max_sequences = NUM_SEQUENCES

print(f"Configuration: {CONFIG_FILE}")
print(f"Model: {config.model.type.value} ({config.model.inference_mode.value} mode)")
print(f"Prompt: {config.model.prompt_template_path}")

## 2. Load Data

In [None]:
# Load dataset
dataset = load_dataset_from_config(config.data)

print(f"Loaded {dataset.num_sequences} sequences")
print(f"Total frames: {dataset.total_frames}")
print(f"\nSequences:")
for i, seq in enumerate(dataset.sequences):
    gt_label = seq.ground_truth_label if seq.has_ground_truth else "N/A"
    print(f"  {i+1}. {seq.sequence_id[:60]:60s} | {seq.num_frames:3d} frames | GT: {gt_label}")

In [None]:
# Setup image loader (lazy)
image_loader = create_image_loader(config.data, lazy=True)
preprocessor = SequencePreprocessor(config.preprocessing)

print("✓ Image loader and preprocessor ready")

## 3. Load Model & Test Prompt

In [None]:
# View current prompt
with open(config.model.prompt_template_path, 'r') as f:
    current_prompt = f.read()

print("Current Prompt:")
print("="*80)
print(current_prompt)
print("="*80)

In [None]:
# Optional: Test a new prompt without modifying files
# Uncomment and edit to experiment with prompt variations

# NEW_PROMPT = """
# You are analyzing a sequence of images showing a vehicle from behind.
# Your task is to determine if the vehicle's turn signals are active.
# 
# Analyze the entire sequence and respond with:
# {
#   "label": "left" | "right" | "both" | "none",
#   "confidence": 0.0 to 1.0,
#   "reasoning": "Brief explanation",
#   "start_frame": frame number where signal starts (or null),
#   "end_frame": frame number where signal ends (or null)
# }
# """
# 
# # This will override the prompt for testing
# config.model.prompt_template_path = "/tmp/test_prompt.txt"
# Path("/tmp/test_prompt.txt").write_text(NEW_PROMPT)

In [None]:
# Load model
print("Loading model... (this may take a minute)")
model = load_model(config.model, warmup=True)
print("✓ Model loaded and ready")

## 4. Run Inference on Test Sequences

In [None]:
# Process all test sequences
results = []

for i, sequence in enumerate(dataset.sequences):
    print(f"\nProcessing {i+1}/{dataset.num_sequences}: {sequence.sequence_id}")
    
    # Load images
    image_loader(sequence, load_full_frame=False)
    loaded = sum(1 for f in sequence.frames if f.crop_image is not None)
    print(f"  Loaded {loaded}/{sequence.num_frames} images")
    
    if loaded == 0:
        print("  ⚠️  No images loaded, skipping")
        continue
    
    # Preprocess
    if config.model.inference_mode.value == 'video':
        video = preprocessor.preprocess_for_video(sequence)
        print(f"  Video shape: {video.shape}")
        
        # Predict
        prediction = model.predict_video(video)
        predictions = [prediction]
    else:
        samples = preprocessor.preprocess_for_single_images(sequence)
        images = [s[0] for s in samples]
        print(f"  Processing {len(images)} frames")
        
        predictions = model.predict_batch(images)
    
    # Store results
    results.append({
        'sequence': sequence,
        'sequence_id': sequence.sequence_id,
        'predictions': predictions,
        'ground_truth': sequence.ground_truth_label if sequence.has_ground_truth else None
    })
    
    # Show prediction
    if config.model.inference_mode.value == 'video':
        pred = predictions[0]
        print(f"  ✓ Prediction: {pred['label']} (conf: {pred['confidence']:.2f})")
        print(f"    Latency: {pred['latency_ms']:.1f} ms")
        if pred.get('reasoning'):
            print(f"    Reasoning: {pred['reasoning'][:100]}...")
    
    # Clear memory
    if hasattr(image_loader, 'clear_sequence'):
        image_loader.clear_sequence(sequence)

print(f"\n✓ Processed {len(results)} sequences")

## 5. Visualize Results

In [None]:
# Summary table
summary_data = []
for result in results:
    if config.model.inference_mode.value == 'video':
        pred = result['predictions'][0]
        summary_data.append({
            'Sequence': result['sequence_id'][:50],
            'Frames': result['sequence'].num_frames,
            'Predicted': pred['label'],
            'Confidence': f"{pred['confidence']:.2f}",
            'Ground Truth': result['ground_truth'] or 'N/A',
            'Correct': '✓' if pred['label'] == result['ground_truth'] else '✗' if result['ground_truth'] else '-'
        })
    else:
        # For single-image mode, show most common prediction
        labels = [p['label'] for p in result['predictions']]
        most_common = Counter(labels).most_common(1)[0][0]
        avg_conf = np.mean([p['confidence'] for p in result['predictions']])
        summary_data.append({
            'Sequence': result['sequence_id'][:50],
            'Frames': result['sequence'].num_frames,
            'Predicted': most_common,
            'Confidence': f"{avg_conf:.2f}",
            'Ground Truth': result['ground_truth'] or 'N/A',
            'Correct': '✓' if most_common == result['ground_truth'] else '✗' if result['ground_truth'] else '-'
        })

summary_df = pd.DataFrame(summary_data)
display(summary_df)

In [None]:
# Function to visualize a sequence
def visualize_sequence(result_idx, show_frames=True, max_frames=20):
    """
    Visualize predictions for a sequence.
    
    Args:
        result_idx: Index in results list
        show_frames: Whether to show sample frames
        max_frames: Maximum number of frames to display
    """
    result = results[result_idx]
    sequence = result['sequence']
    predictions = result['predictions']
    
    print(f"Sequence: {result['sequence_id']}")
    print(f"Frames: {sequence.num_frames}")
    print(f"Ground Truth: {result['ground_truth'] or 'N/A'}")
    print()
    
    # Reload images for visualization
    image_loader(sequence, load_full_frame=False)
    
    if config.model.inference_mode.value == 'video':
        # Video mode - single prediction
        pred = predictions[0]
        print(f"Prediction: {pred['label']} (confidence: {pred['confidence']:.2f})")
        print(f"Latency: {pred['latency_ms']:.1f} ms")
        
        if pred.get('reasoning'):
            print(f"\nReasoning:\n{pred['reasoning']}")
        
        if pred.get('raw_output'):
            print(f"\nRaw Output:\n{pred['raw_output'][:500]}...")
        
        # Show sample frames
        if show_frames:
            frames_to_show = min(max_frames, len(sequence.frames))
            indices = np.linspace(0, len(sequence.frames)-1, frames_to_show, dtype=int)
            
            cols = 5
            rows = (frames_to_show + cols - 1) // cols
            
            fig, axes = plt.subplots(rows, cols, figsize=(16, rows*3))
            axes = axes.flatten() if frames_to_show > 1 else [axes]
            
            for i, idx in enumerate(indices):
                frame = sequence.frames[idx]
                if frame.crop_image is not None:
                    axes[i].imshow(frame.crop_image)
                    axes[i].set_title(f"Frame {frame.frame_id}\n{pred['label']}")
                    axes[i].axis('off')
            
            # Hide unused subplots
            for i in range(frames_to_show, len(axes)):
                axes[i].axis('off')
            
            plt.tight_layout()
            plt.show()
    
    else:
        # Single-image mode - timeline
        labels = [p['label'] for p in predictions]
        confidences = [p['confidence'] for p in predictions]
        frame_ids = [f.frame_id for f in sequence.frames[:len(predictions)]]
        
        # Plot timeline
        fig = plt.figure(figsize=(16, 6))
        gs = GridSpec(2, 1, height_ratios=[2, 1], hspace=0.3)
        
        # Labels
        ax1 = plt.subplot(gs[0])
        label_map = {'none': 0, 'left': 1, 'right': 2, 'both': 3}
        label_nums = [label_map.get(l, 0) for l in labels]
        
        ax1.plot(frame_ids, label_nums, 'o-', linewidth=2, markersize=6)
        ax1.set_yticks([0, 1, 2, 3])
        ax1.set_yticklabels(['None', 'Left', 'Right', 'Both'])
        ax1.set_xlabel('Frame ID')
        ax1.set_ylabel('Prediction')
        ax1.set_title(f'Predictions over Time - {result["sequence_id"]}')
        ax1.grid(True, alpha=0.3)
        
        # Confidence
        ax2 = plt.subplot(gs[1])
        ax2.fill_between(frame_ids, confidences, alpha=0.5)
        ax2.plot(frame_ids, confidences, 'b-', linewidth=2)
        ax2.set_xlabel('Frame ID')
        ax2.set_ylabel('Confidence')
        ax2.set_ylim([0, 1])
        ax2.grid(True, alpha=0.3)
        
        plt.show()
        
        # Statistics
        label_counts = Counter(labels)
        print(f"\nLabel Distribution:")
        for label, count in label_counts.most_common():
            pct = count / len(labels) * 100
            print(f"  {label:8s}: {count:3d} ({pct:5.1f}%)")
        
        print(f"\nAverage Confidence: {np.mean(confidences):.2f}")
        print(f"Min Confidence: {np.min(confidences):.2f}")
        print(f"Max Confidence: {np.max(confidences):.2f}")
    
    # Clear memory
    if hasattr(image_loader, 'clear_sequence'):
        image_loader.clear_sequence(sequence)

print("✓ Visualization function ready")
print("\nUsage: visualize_sequence(0)  # 0 = first sequence in results")

In [None]:
# Visualize first sequence
if results:
    visualize_sequence(0)

In [None]:
# Visualize all sequences (use with caution for many sequences)
# for i in range(len(results)):
#     print(f"\n{'='*80}\n")
#     visualize_sequence(i, show_frames=False)  # Set show_frames=True to see images

## 6. Detailed Analysis

In [None]:
# Examine raw model outputs
def show_raw_output(result_idx):
    """Show the raw model output for debugging"""
    result = results[result_idx]
    
    print(f"Sequence: {result['sequence_id']}")
    print(f"Ground Truth: {result['ground_truth'] or 'N/A'}")
    print("\n" + "="*80)
    
    if config.model.inference_mode.value == 'video':
        pred = result['predictions'][0]
        print("RAW MODEL OUTPUT:")
        print("="*80)
        print(pred.get('raw_output', 'No raw output available'))
        print("\n" + "="*80)
        print("\nPARSED:")
        print(f"  Label: {pred['label']}")
        print(f"  Confidence: {pred['confidence']:.2f}")
        print(f"  Reasoning: {pred.get('reasoning', 'N/A')}")
    else:
        print("Showing first 3 predictions:")
        for i, pred in enumerate(result['predictions'][:3]):
            print(f"\nFrame {i+1}:")
            print(pred.get('raw_output', 'No raw output available'))
            print(f"  -> Parsed: {pred['label']} ({pred['confidence']:.2f})")

# Example usage:
# show_raw_output(0)

In [None]:
# Model performance metrics
metrics = model.get_metrics()

print("Model Performance Metrics:")
print("="*80)
for key, value in metrics.items():
    if isinstance(value, float):
        if 'rate' in key or 'pct' in key:
            print(f"  {key:30s}: {value:.2%}")
        else:
            print(f"  {key:30s}: {value:.2f}")
    else:
        print(f"  {key:30s}: {value}")

In [None]:
# Calculate accuracy (if ground truth available)
results_with_gt = [r for r in results if r['ground_truth'] is not None]

if results_with_gt:
    if config.model.inference_mode.value == 'video':
        correct = sum(1 for r in results_with_gt 
                     if r['predictions'][0]['label'] == r['ground_truth'])
        accuracy = correct / len(results_with_gt)
        print(f"\nAccuracy: {correct}/{len(results_with_gt)} = {accuracy:.1%}")
    else:
        # For single-image, use majority vote
        correct = 0
        for r in results_with_gt:
            labels = [p['label'] for p in r['predictions']]
            most_common = Counter(labels).most_common(1)[0][0]
            if most_common == r['ground_truth']:
                correct += 1
        accuracy = correct / len(results_with_gt)
        print(f"\nAccuracy (majority vote): {correct}/{len(results_with_gt)} = {accuracy:.1%}")
else:
    print("\nNo ground truth labels available for accuracy calculation")

## 7. Export Results

In [None]:
# Save results to file for later analysis
output_dir = Path("notebook_outputs")
output_dir.mkdir(exist_ok=True)

# Save summary
summary_df.to_csv(output_dir / "test_summary.csv", index=False)

# Save detailed results
detailed_results = []
for result in results:
    detailed_results.append({
        'sequence_id': result['sequence_id'],
        'ground_truth': result['ground_truth'],
        'predictions': result['predictions']
    })

with open(output_dir / "test_results.json", 'w') as f:
    json.dump(detailed_results, f, indent=2, default=str)

print(f"✓ Results saved to {output_dir}/")