# Live Navigation Demo

Interactive demonstration of trained acoustic navigation agent.

## Features:
- Generate a new test cave
- Load trained model
- Simulate agent navigating using only acoustic signals
- Live visualization of agent movement
- Compare with optimal path

In [None]:
import sys
sys.path.append('../')

import numpy as np
import matplotlib.pyplot as plt
from matplotlib.patches import Circle
from matplotlib.animation import FuncAnimation
import torch
from pathlib import Path
from IPython.display import HTML, clear_output
import time

from data.audio_cave import AudioCave
from src.models import CompactAcousticNet, SpatialTemporalAcousticNet

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Using device: {device}')

## 1. Generate Test Cave

In [None]:
# Generate a new test cave (different from training)
print("Generating test cave...")
cave = AudioCave(width=60, height=60, min_corridor_width=3, wall_thickness_range=(1, 8))

print(f"Cave size: {cave.height}x{cave.width}")
print(f"Start: {cave.start}")
print(f"Goal: {cave.end}")
print(f"Walls: {100*cave.grid.mean():.1f}%")
print(f"Air: {100*(1-cave.grid.mean()):.1f}%")

cave.visualize()

## 2. Load Trained Model

In [None]:
# Load best model checkpoint
CHECKPOINT_PATH = Path('../notebooks/checkpoints/best_model.pt')

if not CHECKPOINT_PATH.exists():
    raise FileNotFoundError(f"Checkpoint not found: {CHECKPOINT_PATH}\nTrain a model first using 03_Training.ipynb")

checkpoint = torch.load(CHECKPOINT_PATH, map_location=device)

print("Checkpoint Info:")
print(f"  Epoch: {checkpoint['epoch']}")
print(f"  Val Loss: {checkpoint['val_loss']:.4f}")
print(f"  Val Accuracy: {checkpoint.get('val_acc', 0):.4f}")

# Create model (match architecture used in training)
MODEL_TYPE = 'compact'  # Change to 'spatial_temporal' if you trained that model
ACTION_NAMES = ['STOP', 'UP', 'DOWN', 'LEFT', 'RIGHT']

if MODEL_TYPE == 'compact':
    model = CompactAcousticNet(num_classes=5, dropout=0.3).to(device)
    print("Using CompactAcousticNet")
else:
    model = SpatialTemporalAcousticNet(num_classes=5, dropout=0.3).to(device)
    print("Using SpatialTemporalAcousticNet")

model.load_state_dict(checkpoint['model_state'])
model.eval()

print("\n✓ Model loaded successfully")

## 3. Simulate Acoustic Navigation

**Note:** This is a simplified simulation without actual k-Wave acoustic propagation.
In reality, you would:
1. Run k-Wave simulation at each position
2. Capture 8-mic pressure data
3. Feed to model for action prediction

For this demo, we'll use a mock sensor that reads pre-simulated data or random noise.

In [None]:
# Mock acoustic sensor (replace with real k-Wave simulation)
def mock_acoustic_sensor(position, cave_grid, timesteps=11434):
    """
    Mock sensor that generates pseudo-acoustic data.
    In real demo, this would run k-Wave simulation.
    """
    # Generate random acoustic-like signal (8 mics)
    # In reality: run k-Wave simulation at this position
    mic_data = np.random.randn(8, timesteps).astype(np.float32) * 0.003
    
    # Add position-dependent features (mock spatial info)
    y, x = position
    spatial_bias = np.sin(y * 0.1) * np.cos(x * 0.1)
    mic_data += spatial_bias * 0.001
    
    return mic_data

print("Mock sensor ready (replace with real k-Wave for production)")

In [None]:
# Action mappings
action_to_delta = {
    0: (0, 0),   # STOP
    1: (-1, 0),  # UP
    2: (1, 0),   # DOWN
    3: (0, -1),  # LEFT
    4: (0, 1),   # RIGHT
}

def is_valid_move(position, cave_grid, agent_radius=1):
    """Check if position has valid 3x3 footprint."""
    y, x = position
    if y - agent_radius < 0 or y + agent_radius >= cave_grid.shape[0]:
        return False
    if x - agent_radius < 0 or x + agent_radius >= cave_grid.shape[1]:
        return False
    footprint = cave_grid[y - agent_radius:y + agent_radius + 1, x - agent_radius:x + agent_radius + 1]
    return np.all(footprint == 0)

def predict_action(model, mic_data, device):
    """Predict action from mic data."""
    # Normalize per-sample
    mic_mean = mic_data.mean()
    mic_std = mic_data.std()
    if mic_std > 1e-8:
        mic_data_norm = (mic_data - mic_mean) / mic_std
    else:
        mic_data_norm = mic_data - mic_mean
    
    # Convert to tensor
    mic_tensor = torch.from_numpy(mic_data_norm).unsqueeze(0).to(device)
    
    # Predict
    with torch.no_grad():
        logits = model(mic_tensor)
        probs = torch.softmax(logits, dim=1)
        action = torch.argmax(logits, dim=1).item()
    
    return action, probs[0].cpu().numpy()

print("Navigation functions ready")

## 4. Run Navigation Episode

In [None]:
# Navigation parameters
MAX_STEPS = 100
current_pos = cave.start
goal_pos = cave.end

# Track trajectory
trajectory = [current_pos]
actions_taken = []
action_probs_history = []

print(f"Starting navigation from {current_pos} to {goal_pos}")
print(f"Optimal path length: {len([p for row in cave.action_grid for p in row if p != ''])}")
print("=" * 70)

# Navigation loop
for step in range(MAX_STEPS):
    # Check if reached goal
    if current_pos == goal_pos:
        print(f"\n✓ GOAL REACHED in {step} steps!")
        break
    
    # Get acoustic data (mock)
    mic_data = mock_acoustic_sensor(current_pos, cave.grid)
    
    # Predict action
    action, probs = predict_action(model, mic_data, device)
    
    # Get optimal action for comparison
    optimal_action = cave.action_grid[current_pos[0]][current_pos[1]]
    action_map_inv = {'stop': 0, 'up': 1, 'down': 2, 'left': 3, 'right': 4}
    optimal_action_idx = action_map_inv.get(optimal_action, 0)
    
    # Execute action
    delta = action_to_delta[action]
    next_pos = (current_pos[0] + delta[0], current_pos[1] + delta[1])
    
    # Validate move
    if is_valid_move(next_pos, cave.grid):
        current_pos = next_pos
        move_status = "✓"
    else:
        move_status = "✗ (invalid)"
    
    # Record
    trajectory.append(current_pos)
    actions_taken.append(action)
    action_probs_history.append(probs)
    
    # Progress update every 10 steps
    if (step + 1) % 10 == 0:
        dist = np.linalg.norm(np.array(current_pos) - np.array(goal_pos))
        print(f"Step {step+1}: pos={current_pos}, action={ACTION_NAMES[action]} (optimal={optimal_action}), dist={dist:.1f} {move_status}")

else:
    print(f"\n✗ Max steps reached ({MAX_STEPS})")

print("=" * 70)
print(f"Final position: {current_pos}")
print(f"Total steps: {len(trajectory)}")
print(f"Actions taken: {len(actions_taken)}")

## 5. Visualize Navigation

In [None]:
# Create visualization
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(16, 7))

# Left: Cave with trajectory
ax1.imshow(cave.grid.T, origin='lower', cmap='binary')
ax1.scatter([cave.start[0]], [cave.start[1]], s=300, c='green', marker='o', 
           edgecolors='black', linewidths=2, label='Start', zorder=10)
ax1.scatter([cave.end[0]], [cave.end[1]], s=300, c='red', marker='*', 
           edgecolors='black', linewidths=2, label='Goal', zorder=10)

# Plot trajectory
if len(trajectory) > 1:
    traj_array = np.array(trajectory)
    ax1.plot(traj_array[:, 0], traj_array[:, 1], 'b-', linewidth=2, alpha=0.7, label='Agent Path')
    ax1.scatter(traj_array[:, 0], traj_array[:, 1], c=range(len(trajectory)), 
               cmap='coolwarm', s=50, zorder=5, edgecolors='black', linewidths=0.5)

ax1.set_title(f'Agent Navigation ({len(trajectory)} steps)', fontsize=14, fontweight='bold')
ax1.set_xlabel('X')
ax1.set_ylabel('Y')
ax1.legend()
ax1.axis('image')

# Right: Optimal path comparison
symbol_map = {"up": 1, "down": 2, "left": 3, "right": 4, "stop": 5, "": 0}
action_numeric = np.vectorize(symbol_map.get)(cave.action_grid)
ax2.imshow(action_numeric.T, origin='lower', cmap='tab10', vmin=0, vmax=5, alpha=0.6)
ax2.contour(cave.grid.T, levels=[0.5], colors='black', linewidths=1)
ax2.scatter([cave.start[0]], [cave.start[1]], s=300, c='green', marker='o', 
           edgecolors='black', linewidths=2, label='Start', zorder=10)
ax2.scatter([cave.end[0]], [cave.end[1]], s=300, c='red', marker='*', 
           edgecolors='black', linewidths=2, label='Goal', zorder=10)
ax2.set_title('Optimal Action Field (BFS)', fontsize=14, fontweight='bold')
ax2.set_xlabel('X')
ax2.set_ylabel('Y')
ax2.legend()
ax2.axis('image')

plt.tight_layout()
plt.show()

## 6. Action Analysis

In [None]:
# Compute action accuracy
action_map_inv = {'stop': 0, 'up': 1, 'down': 2, 'left': 3, 'right': 4, '': 0}
optimal_actions = []
for pos in trajectory[:-1]:
    opt_act = cave.action_grid[pos[0]][pos[1]]
    optimal_actions.append(action_map_inv.get(opt_act, 0))

actions_array = np.array(actions_taken)
optimal_array = np.array(optimal_actions)

if len(actions_array) > 0 and len(optimal_array) > 0:
    matches = (actions_array == optimal_array).sum()
    accuracy = 100 * matches / len(actions_array)
    print(f"Action Accuracy: {accuracy:.1f}% ({matches}/{len(actions_array)})")
    
    # Action distribution
    print("\nAction Distribution:")
    for i, name in enumerate(ACTION_NAMES):
        count = (actions_array == i).sum()
        print(f"  {name}: {count} ({100*count/len(actions_array):.1f}%)")
else:
    print("No actions taken")

In [None]:
# Plot action probabilities over time
if len(action_probs_history) > 0:
    probs_array = np.array(action_probs_history)
    
    fig, ax = plt.subplots(figsize=(14, 5))
    for i, name in enumerate(ACTION_NAMES):
        ax.plot(probs_array[:, i], label=name, linewidth=2, alpha=0.7)
    
    ax.set_xlabel('Step', fontsize=12)
    ax.set_ylabel('Action Probability', fontsize=12)
    ax.set_title('Model Confidence Over Time', fontsize=14, fontweight='bold')
    ax.legend()
    ax.grid(True, alpha=0.3)
    plt.tight_layout()
    plt.show()

## 7. Performance Summary

In [None]:
# Calculate metrics
reached_goal = (trajectory[-1] == goal_pos)
path_length = len(trajectory)
optimal_length = len([p for row in cave.action_grid for p in row if p != ''])

print("=" * 70)
print("NAVIGATION PERFORMANCE SUMMARY")
print("=" * 70)
print(f"\nEnvironment:")
print(f"  Cave size: {cave.height}x{cave.width}")
print(f"  Start: {cave.start}")
print(f"  Goal: {cave.end}")
print(f"  Euclidean distance: {np.linalg.norm(np.array(cave.start) - np.array(cave.end)):.1f}")

print(f"\nAgent Performance:")
print(f"  Goal reached: {'✓ YES' if reached_goal else '✗ NO'}")
print(f"  Steps taken: {path_length}")
print(f"  Max steps allowed: {MAX_STEPS}")

if len(actions_array) > 0 and len(optimal_array) > 0:
    print(f"  Action accuracy: {accuracy:.1f}%")

if reached_goal:
    print(f"  Path efficiency: {100 * optimal_length / path_length:.1f}%")
    print(f"  Extra steps: {path_length - optimal_length}")
else:
    final_dist = np.linalg.norm(np.array(trajectory[-1]) - np.array(goal_pos))
    print(f"  Final distance to goal: {final_dist:.1f}")

print(f"\nModel Info:")
print(f"  Architecture: {MODEL_TYPE}")
print(f"  Checkpoint epoch: {checkpoint['epoch']}")
print(f"  Validation loss: {checkpoint['val_loss']:.4f}")
print(f"  Validation accuracy: {checkpoint.get('val_acc', 0)*100:.1f}%")

print("\n" + "=" * 70)

## 8. Try Different Test Cases

Re-run cells from section 1 onwards to test on new caves!

In [None]:
print("✓ Demo Complete!")
print("\nNext steps:")
print("  1. Re-run from Section 1 to test on a new cave")
print("  2. Try different model architectures (change MODEL_TYPE)")
print("  3. Integrate real k-Wave simulation (replace mock_acoustic_sensor)")
print("  4. Experiment with larger/smaller caves")