# CS272 Autonomous Driving - Emergency Vehicle Yielding Training

This notebook trains a PPO agent on a custom emergency vehicle yielding environment.

**Setup Steps:**
1. Upload `emergency_env.py` to your Google Drive
2. Set Runtime → Change runtime type → GPU
3. Run cells in order

In [None]:
# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

# Install required packages
!pip install gymnasium highway-env stable-baselines3[extra] pandas matplotlib tqdm -q

# Verify GPU is available
import torch
print(f"GPU Available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"GPU Name: {torch.cuda.get_device_name(0)}")
else:
    print("WARNING: No GPU detected. Training will be slow!")

In [None]:
import sys
import os

# IMPORTANT: Update this path to match your Google Drive folder structure
PROJECT_FOLDER = "/content/drive/MyDrive/CS272_Project"

# Create custom_env module structure
os.makedirs('/content/custom_env', exist_ok=True)

# Copy emergency_env.py from Drive
!cp {PROJECT_FOLDER}/emergency_env.py /content/custom_env/

# Create __init__.py to make it a package
with open('/content/custom_env/__init__.py', 'w') as f:
    f.write('')

# Add to Python path
sys.path.insert(0, '/content')

# Verify import works
import custom_env.emergency_env
print("✓ Custom environment imported successfully!")

In [None]:
import gymnasium as gym
import highway_env
from stable_baselines3 import PPO
from stable_baselines3.common.monitor import Monitor
from stable_baselines3.common.vec_env import DummyVecEnv
from stable_baselines3.common.callbacks import EvalCallback, CheckpointCallback
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm

# Setup directories - saves to Google Drive for persistence
SAVE_DIR = f"{PROJECT_FOLDER}/models"
LOG_DIR = f"{PROJECT_FOLDER}/logs"

os.makedirs(SAVE_DIR, exist_ok=True)
os.makedirs(LOG_DIR, exist_ok=True)

print(f"Models will be saved to: {SAVE_DIR}")
print(f"Logs will be saved to: {LOG_DIR}")

In [None]:
# Configure environment with LiDAR observation
config = {
    "observation": {
        "type": "LidarObservation",
        "cells": 64,
    },
    "action": {
        "type": "DiscreteMetaAction",
    },
}

def make_env():
    env = gym.make("EmergencyHighwayEnv-v0", config=config, render_mode=None)
    env = Monitor(env, filename=f"{LOG_DIR}/monitor_emergency_lidar.csv")
    return env

# Test environment creation
test_env = make_env()
obs, info = test_env.reset()
print(f"✓ Environment created successfully!")
print(f"Observation shape: {obs.shape}")
print(f"Action space: {test_env.action_space}")
test_env.close()

In [None]:
# Create vectorized environment
venv = DummyVecEnv([make_env])

# Checkpoint callback - saves model every 20k steps
checkpoint_callback = CheckpointCallback(
    save_freq=20_000,
    save_path=SAVE_DIR,
    name_prefix="ppo_emergency_lidar_checkpoint"
)

# Evaluation callback - evaluates and saves best model every 25k steps
eval_env = DummyVecEnv([make_env])
eval_callback = EvalCallback(
    eval_env,
    best_model_save_path=SAVE_DIR,
    log_path=LOG_DIR,
    eval_freq=25_000,
    deterministic=True,
    render=False
)

# Create PPO model - will use GPU if available
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Training device: {device}")

model = PPO(
    "MlpPolicy",
    venv,
    learning_rate=2e-4,
    n_steps=2048,
    batch_size=256,
    n_epochs=5,
    gamma=0.99,
    gae_lambda=0.95,
    clip_range=0.1,
    ent_coef=0.001,
    vf_coef=0.5,
    max_grad_norm=0.5,
    verbose=1,
    device=device,
    tensorboard_log=f"{LOG_DIR}/tb/"
)

print("✓ Model created successfully!")

In [None]:
# Train the model
print("Starting training for Emergency Vehicle Yielding Environment (LiDAR)...")
print("Training for ~4000 episodes (500,000 timesteps)...")
print("This will take approximately 1-2 hours on GPU, 10-20 hours on CPU.\n")

model.learn(
    total_timesteps=500_000,
    tb_log_name="run_emergency_lidar",
    callback=[checkpoint_callback, eval_callback],
    progress_bar=True
)

# Save final model
final_path = f"{SAVE_DIR}/ppo_emergency_lidar_final"
model.save(final_path)
print(f"\n✓ Training complete! Model saved to: {final_path}")

In [None]:
# Plot learning curve
def plot_learning_curve(log_path, output_path):
    df = pd.read_csv(log_path, skiprows=1)
    rewards = df["r"].values
    window = 20
    smoothed = pd.Series(rewards).rolling(window).mean()

    plt.figure(figsize=(10, 5))
    plt.plot(rewards, alpha=0.3, label="Raw episodic reward", color='blue')
    plt.plot(smoothed, linewidth=2, label=f"Smoothed (window={window})", color='orange')
    plt.xlabel("Episode")
    plt.ylabel("Reward")
    plt.title("Learning Curve - Emergency Yielding (LiDAR Observation)")
    plt.legend()
    plt.grid()
    plt.tight_layout()
    plt.savefig(output_path, dpi=300)
    print(f"Learning curve saved to: {output_path}")
    plt.show()

learning_curve_path = f"{LOG_DIR}/emergency_lidar_learning_curve.png"
plot_learning_curve(f"{LOG_DIR}/monitor_emergency_lidar.csv", learning_curve_path)

In [None]:
# Load best model for evaluation
print("Loading best model for evaluation...")
model = PPO.load(f"{SAVE_DIR}/best_model")

def evaluate_agent(model, make_env_fn, episodes=500):
    returns = []
    env = make_env_fn()

    for ep in tqdm(range(episodes), desc="Evaluating"):
        obs, info = env.reset()
        done = truncated = False
        total_reward = 0

        while not (done or truncated):
            action, _ = model.predict(obs, deterministic=True)
            obs, reward, done, truncated, info = env.step(action)
            total_reward += reward

        returns.append(total_reward)

    env.close()
    return returns

print("Running 500-episode deterministic evaluation...")
returns = evaluate_agent(model, make_env)

print(f"\n=== Evaluation Results ===")
print(f"Mean return: {np.mean(returns):.2f}")
print(f"Std return: {np.std(returns):.2f}")
print(f"Min return: {np.min(returns):.2f}")
print(f"Max return: {np.max(returns):.2f}")

In [None]:
# Violin plot for performance test
plt.figure(figsize=(7, 6))
parts = plt.violinplot([returns], showmeans=True, showextrema=True)
plt.xticks([1], ["PPO (LiDAR)"])
plt.ylabel("Episodic Return")
plt.title("Performance Test - Emergency Yielding (LiDAR, 500 episodes)")
plt.grid(axis="y")
plt.tight_layout()

performance_path = f"{LOG_DIR}/emergency_lidar_performance_test.png"
plt.savefig(performance_path, dpi=300)
print(f"Performance test plot saved to: {performance_path}")
plt.show()

print(f"\n✓ All results saved to Google Drive in: {PROJECT_FOLDER}")

## Optional: Monitor Training with TensorBoard

Run this cell to visualize training progress in real-time:

In [None]:
%load_ext tensorboard
%tensorboard --logdir {LOG_DIR}/tb/

## Optional: Resume Training from Checkpoint

If your session times out, you can resume training:

In [None]:
# List available checkpoints
import glob
checkpoints = sorted(glob.glob(f"{SAVE_DIR}/ppo_emergency_lidar_checkpoint_*.zip"))
print("Available checkpoints:")
for cp in checkpoints:
    print(f"  {os.path.basename(cp)}")

# Load the latest checkpoint (or specify a specific one)
if checkpoints:
    latest_checkpoint = checkpoints[-1]
    print(f"\nLoading: {os.path.basename(latest_checkpoint)}")
    model = PPO.load(latest_checkpoint, env=venv)
    
    # Continue training
    model.learn(
        total_timesteps=500_000,
        reset_num_timesteps=False,  # Keep existing timestep count
        callback=[checkpoint_callback, eval_callback],
        progress_bar=True
    )
else:
    print("No checkpoints found!")