# 🚗 AV Simulation Runner - Google Colab

This notebook demonstrates how to run the Autonomous Vehicle Simulation on Google Colab with data collection and visualization.

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/aanshshah/av-simulation/blob/main/examples/notebooks/02_simulation_runner.ipynb)

## Setup Environment

### Runtime Toggle

Set `RUN_SIMULATION` to `False` if you only want to load the bundled sample data and skip the pygame loop.


In [ ]:
RUN_SIMULATION = True  # Set to False to skip the live simulation run


In [None]:
# Enhanced setup with error handling and dynamic paths
import os
import sys

# Ensure repository paths are available
base_path = '/content' if ('/content' in os.getcwd() or 'COLAB_GPU' in os.environ) else os.getcwd()
repo_path = os.path.join(base_path, 'av-simulation')

if repo_path not in sys.path:
    sys.path.insert(0, repo_path)
    print(f"📁 Added to Python path: {repo_path}")

try:
    from examples.utils.colab_setup_utils import ColabSetupCoordinator
    from examples.utils.colab_helpers import quick_colab_setup

    print('🚀 Setting up enhanced Colab environment...')

    # Run comprehensive setup
    coordinator = ColabSetupCoordinator()
    results = coordinator.run_complete_setup(
        install_dependencies=False,  # Assume already done in setup notebook
        setup_display=True,
        download_code=False  # Assume files already available
    )

    if results.get('display_setup', False):
        print('✅ Enhanced setup completed successfully')
    else:
        print('⚠️ Setup issues detected, falling back to basic setup')

        # Fallback setup
        from pyvirtualdisplay import Display
        display = Display(visible=0, size=(1200, 800))
        display.start()
        os.environ['DISPLAY'] = ':' + str(display.display)
        print('✅ Basic virtual display ready')

except ImportError as e:
    print(f'⚠️ Enhanced utilities not available: {e}')
    print('🔄 Using basic setup...')

    # Basic fallback setup
    try:
        from pyvirtualdisplay import Display
        display = Display(visible=0, size=(1200, 800))
        display.start()
        os.environ['DISPLAY'] = ':' + str(display.display)
        print('✅ Basic virtual display ready')
    except Exception as setup_error:
        print(f'❌ Display setup failed: {setup_error}')
        print('💡 Please run the 01_colab_setup.ipynb notebook first')


## Run Simulation

In [None]:
# Run headless simulation and prepare analysis-friendly data
import os
import sys
import numpy as np
import pandas as pd

analysis_data = None
latest_run_id = None

base_path = '/content' if ('/content' in os.getcwd() or 'COLAB_GPU' in os.environ) else os.getcwd()
repo_path = os.path.join(base_path, 'av-simulation')
src_path = os.path.join(repo_path, 'src')
is_colab = '/content' in base_path

for path_entry in (repo_path, src_path):
    if path_entry not in sys.path and os.path.exists(path_entry):
        sys.path.insert(0, path_entry)
        print(f'📁 Added to Python path: {path_entry}')

run_requested = bool(globals().get('RUN_SIMULATION', True)) and is_colab

if run_requested:
    try:
        from examples.utils.colab_helpers import ColabSimulationRunner
        from av_simulation.data.repository import SimulationData
        print('✅ Simulation utilities imported successfully')
    except ImportError as e:
        print(f'❌ Cannot import simulation utilities: {e}')
        print('💡 Solutions:')
        print('  1. Run 01_colab_setup.ipynb first')
        print('  2. Upload or clone the repository into /content/av-simulation')
        print('  3. Verify that src/ is on the Python path')
        run_requested = False
    else:
        data_path = os.path.join(base_path, 'simulation_data')
        os.makedirs(data_path, exist_ok=True)
        print(f'📂 Data directory ready at: {data_path}')

        runner = ColabSimulationRunner()
        config = {
            'environment': 'highway',
            'duration': 30,
            'screenshot_interval': 5.0,
            'collect_data': True
        }

        simulation_result = runner.run_headless_simulation(config)

        if simulation_result and simulation_result.get('repository') and simulation_result.get('run_id'):
            repo = simulation_result['repository']
            latest_run_id = simulation_result['run_id']
            data_helper = SimulationData(repo)
            ego_snapshots = data_helper.get_ego_vehicle_data(latest_run_id)

            if ego_snapshots:
                ego_snapshots = sorted(ego_snapshots, key=lambda snap: snap.timestamp)
                timestamps = [snap.timestamp for snap in ego_snapshots]
                speeds = [snap.speed for snap in ego_snapshots]
                accelerations = [snap.acceleration for snap in ego_snapshots]
                positions_x = [snap.position_x for snap in ego_snapshots]
                positions_y = [snap.position_y for snap in ego_snapshots]

                if len(positions_x) > 1:
                    displacement = np.sqrt(np.diff(positions_x) ** 2 + np.diff(positions_y) ** 2)
                    distances = displacement.tolist()
                else:
                    distances = []

                analysis_data = {
                    'timestamps': timestamps,
                    'speeds': speeds,
                    'accelerations': accelerations,
                    'positions_x': positions_x,
                    'positions_y': positions_y,
                    'distances': distances
                }

                print(f'✅ Simulation completed. Run ID: {latest_run_id[:8]}')
                print(f'📊 Collected {len(ego_snapshots)} ego vehicle snapshots')
            else:
                print('⚠️ Simulation finished but no ego vehicle data was recorded')
        else:
            print('❌ Simulation run did not produce analyzable data')

if not run_requested:
    print('ℹ️ Live simulation skipped; loading bundled sample data instead.')

if analysis_data is None:
    sample_candidates = [
        os.path.join(repo_path, 'examples', 'notebooks', 'data', 'sample_ego_run.csv'),
        os.path.join(base_path, 'examples', 'notebooks', 'data', 'sample_ego_run.csv'),
        'examples/notebooks/data/sample_ego_run.csv'
    ]
    sample_path = next((p for p in sample_candidates if os.path.exists(p)), None)
    if sample_path:
        df = pd.read_csv(sample_path)
        if {'timestamp', 'speed', 'acceleration', 'pos_x', 'pos_y'} <= set(df.columns):
            analysis_data = {
                'timestamps': df['timestamp'].tolist(),
                'speeds': df['speed'].tolist(),
                'accelerations': df['acceleration'].tolist(),
                'positions_x': df['pos_x'].tolist(),
                'positions_y': df['pos_y'].tolist(),
                'distances': (
                    np.sqrt(np.diff(df['pos_x']) ** 2 + np.diff(df['pos_y']) ** 2).tolist()
                    if len(df) > 1 else []
                )
            }
            print(f'✅ Loaded sample data from {sample_path}')
        else:
            print(f'⚠️ Sample file missing required columns: {sample_path}')
    else:
        print('❌ No sample data available — please run the live simulation.')

globals()['analysis_data'] = analysis_data
globals()['latest_run_id'] = latest_run_id


## View Results

In [None]:
# Display simulation results with enhanced error handling
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

print('📈 Processing simulation results...')

df = None
if 'analysis_df' in globals() and globals()['analysis_df'] is not None:
    df = globals()['analysis_df'].copy()
elif 'analysis_data' in globals() and globals()['analysis_data']:
    data = globals()['analysis_data']
    df = pd.DataFrame({
        'timestamp': data.get('timestamps', []),
        'speed': data.get('speeds', []),
        'acceleration': data.get('accelerations', []),
        'pos_x': data.get('positions_x', []),
        'pos_y': data.get('positions_y', []),
    })
    if len(df) > 1:
        df.loc[1:, 'step_distance'] = np.sqrt(np.diff(df['pos_x']) ** 2 + np.diff(df['pos_y']) ** 2)

if df is not None and not df.empty:
    try:
        fig, axes = plt.subplots(2, 2, figsize=(15, 10))
        fig.suptitle('🚗 AV Simulation Results', fontsize=16, fontweight='bold')

        if {'timestamp', 'speed'} <= set(df.columns) and df['speed'].notna().any():
            axes[0, 0].plot(df['timestamp'], df['speed'], 'b-', linewidth=2)
            axes[0, 0].set_title('Vehicle Speed Over Time')
            axes[0, 0].set_xlabel('Time (s)')
            axes[0, 0].set_ylabel('Speed (m/s)')
            axes[0, 0].grid(True, alpha=0.3)
        else:
            axes[0, 0].text(0.5, 0.5, 'Speed data not available',
                            ha='center', va='center', transform=axes[0, 0].transAxes)
            axes[0, 0].set_title('Vehicle Speed Over Time')

        if {'pos_x', 'pos_y'} <= set(df.columns) and df['pos_x'].notna().any():
            axes[0, 1].plot(df['pos_x'], df['pos_y'], 'r-', linewidth=2)
            axes[0, 1].set_title('Vehicle Trajectory')
            axes[0, 1].set_xlabel('X Position (m)')
            axes[0, 1].set_ylabel('Y Position (m)')
            axes[0, 1].grid(True, alpha=0.3)
            axes[0, 1].axis('equal')
        else:
            axes[0, 1].text(0.5, 0.5, 'Position data not available',
                            ha='center', va='center', transform=axes[0, 1].transAxes)
            axes[0, 1].set_title('Vehicle Trajectory')

        if 'acceleration' in df and df['acceleration'].notna().any():
            axes[1, 0].plot(df['timestamp'], df['acceleration'], 'g-', linewidth=2)
            axes[1, 0].set_title('Vehicle Acceleration')
            axes[1, 0].set_xlabel('Time (s)')
            axes[1, 0].set_ylabel('Acceleration (m/s²)')
            axes[1, 0].grid(True, alpha=0.3)
        elif {'timestamp', 'speed'} <= set(df.columns) and len(df) > 1:
            derived = np.diff(df['speed']) / np.diff(df['timestamp'])
            axes[1, 0].plot(df['timestamp'].iloc[1:], derived, 'g-', linewidth=2)
            axes[1, 0].set_title('Derived Acceleration')
            axes[1, 0].set_xlabel('Time (s)')
            axes[1, 0].set_ylabel('Acceleration (m/s²)')
            axes[1, 0].grid(True, alpha=0.3)
        else:
            axes[1, 0].text(0.5, 0.5, 'Acceleration data not available',
                            ha='center', va='center', transform=axes[1, 0].transAxes)
            axes[1, 0].set_title('Vehicle Acceleration')

        stats_lines = ['Simulation Summary:']
        if 'timestamp' in df and len(df['timestamp']) > 1:
            duration = df['timestamp'].iloc[-1] - df['timestamp'].iloc[0]
            stats_lines.append(f"Duration: {duration:.1f}s")
        if 'speed' in df and df['speed'].notna().any():
            stats_lines.append(f"Max Speed: {df['speed'].max():.1f} m/s")
            stats_lines.append(f"Avg Speed: {df['speed'].mean():.1f} m/s")
        if 'step_distance' in df and df['step_distance'].notna().any():
            stats_lines.append(f"Total Distance: {df['step_distance'].fillna(0).sum():.1f} m")

        axes[1, 1].text(0.1, 0.9, '\n'.join(stats_lines), transform=axes[1, 1].transAxes,
                         fontsize=12, verticalalignment='top',
                         bbox=dict(boxstyle='round,pad=0.5', facecolor='lightblue', alpha=0.7))
        axes[1, 1].set_xlim(0, 1)
        axes[1, 1].set_ylim(0, 1)
        axes[1, 1].axis('off')
        axes[1, 1].set_title('Statistics')

        plt.tight_layout()
        plt.show()

        print('✅ Results plotted successfully')

    except Exception as plot_error:
        print(f'⚠️ Plotting error: {plot_error}')
        print('📊 Falling back to basic plot...')
        try:
            plt.figure(figsize=(10, 6))
            if {'timestamp', 'speed'} <= set(df.columns):
                plt.plot(df['timestamp'], df['speed'])
                plt.title('Vehicle Speed Over Time')
                plt.xlabel('Time (s)')
                plt.ylabel('Speed (m/s)')
            else:
                plt.text(0.5, 0.5, 'Data format not recognized',
                         ha='center', va='center', transform=plt.gca().transAxes)
                plt.title('Simulation Data')
            plt.show()
        except Exception:
            print('❌ Unable to create any plots')
else:
    print('❌ No data collected - simulation may have failed')
    print('💡 Check the simulation cell above for errors or load sample data from 01_colab_setup.ipynb')
