# üö¶ Traffic Signal RL Training (PPO)

This notebook trains a **city-agnostic** reinforcement learning agent for traffic signal control.

**Key Points:**
- Uses synthetic traffic patterns (NOT real data)
- PPO algorithm (stable, sample-efficient)
- Can run on Colab with GPU acceleration

---

## 1Ô∏è‚É£ Install Dependencies

Run this cell first (especially on Colab)

In [None]:
# Install required packages
!pip install stable-baselines3[extra] gymnasium numpy pandas -q
print("‚úÖ Dependencies installed!")

## 2Ô∏è‚É£ Traffic Environment (City-Agnostic)

In [None]:
"""
Generic Traffic Environment for RL Training
============================================
City-agnostic, 4-way intersection simulator.
NO city names, NO geography - pure abstract traffic control.
"""

import numpy as np
from enum import IntEnum
from typing import Tuple, Dict
import random


class Phase(IntEnum):
    """4-phase signal structure."""
    NS_GREEN = 0
    NS_LEFT = 1
    EW_GREEN = 2
    EW_LEFT = 3


class TrafficPatternGenerator:
    """Generates synthetic arrival rates for training."""
    
    @staticmethod
    def uniform_random(min_rate=0.1, max_rate=1.5):
        return {d: np.random.uniform(min_rate, max_rate) 
                for d in ['north', 'south', 'east', 'west']}
    
    @staticmethod
    def peak_hour(peak_direction=None):
        if peak_direction is None:
            peak_direction = random.choice(['north', 'south', 'east', 'west'])
        rates = {d: np.random.uniform(0.2, 0.5) for d in ['north', 'south', 'east', 'west']}
        rates[peak_direction] = np.random.uniform(1.0, 2.0)
        return rates
    
    @staticmethod
    def symmetric_flow():
        ns_rate = np.random.uniform(0.3, 1.5)
        ew_rate = np.random.uniform(0.3, 1.5)
        return {'north': ns_rate, 'south': ns_rate, 'east': ew_rate, 'west': ew_rate}
    
    @staticmethod
    def low_traffic():
        return {d: np.random.uniform(0.05, 0.3) for d in ['north', 'south', 'east', 'west']}
    
    @staticmethod
    def heavy_traffic():
        return {d: np.random.uniform(1.0, 2.5) for d in ['north', 'south', 'east', 'west']}
    
    @staticmethod
    def asymmetric_random():
        rates = {}
        for d in ['north', 'south', 'east', 'west']:
            rates[d] = np.random.uniform(1.5, 3.0) if random.random() < 0.3 else np.random.uniform(0.1, 0.5)
        return rates
    
    @classmethod
    def get_random_pattern(cls):
        patterns = [cls.uniform_random, cls.peak_hour, cls.symmetric_flow,
                    cls.low_traffic, cls.heavy_traffic, cls.asymmetric_random]
        return random.choice(patterns)()


class TrafficEnv:
    """Generic 4-way intersection environment."""
    
    def __init__(self, max_steps=3600, min_green_time=10, max_green_time=60,
                 yellow_time=3, saturation_flow=0.5, num_lanes=2, max_queue=100,
                 arrival_rates=None):
        self.max_steps = max_steps
        self.min_green_time = min_green_time
        self.max_green_time = max_green_time
        self.yellow_time = yellow_time
        self.saturation_flow = saturation_flow
        self.num_lanes = num_lanes
        self.max_queue = max_queue
        self.directions = ['north', 'south', 'east', 'west']
        
        self.phase_to_directions = {
            Phase.NS_GREEN: ['north', 'south'],
            Phase.NS_LEFT: ['north', 'south'],
            Phase.EW_GREEN: ['east', 'west'],
            Phase.EW_LEFT: ['east', 'west'],
        }
        
        self.queues = None
        self.current_phase = None
        self.phase_timer = None
        self.step_count = None
        self.total_waiting = None
        self.arrival_rates = arrival_rates
        self.n_actions = 5
        self.state_dim = 9  # 4 queues + 4 phase one-hot + timer
    
    def reset(self, arrival_rates=None):
        self.queues = {d: np.random.randint(0, 10) for d in self.directions}
        self.current_phase = Phase(np.random.randint(0, 4))
        self.phase_timer = np.random.randint(self.min_green_time, self.max_green_time)
        self.step_count = 0
        self.total_waiting = 0
        self.total_throughput = 0
        
        if arrival_rates is not None:
            self.arrival_rates = arrival_rates
        elif self.arrival_rates is None:
            self.arrival_rates = TrafficPatternGenerator.get_random_pattern()
        
        return self._get_state()
    
    def _get_state(self):
        queue_state = np.array([self.queues[d] / self.max_queue for d in self.directions])
        phase_one_hot = np.zeros(4)
        phase_one_hot[self.current_phase] = 1.0
        timer_norm = np.array([self.phase_timer / self.max_green_time])
        return np.concatenate([queue_state, phase_one_hot, timer_norm])
    
    def step(self, action):
        self.step_count += 1
        switched_phase = False
        
        # Force switch if timer expired
        if self.phase_timer == 0:
            self.current_phase = Phase((self.current_phase + 1) % 4)
            self.phase_timer = self.min_green_time
            switched_phase = True
        
        # Handle action
        if not switched_phase:
            if action == 0:
                if self.phase_timer < self.max_green_time:
                    self.phase_timer += 1
            else:
                new_phase = Phase(action - 1)
                if new_phase != self.current_phase:
                    self.current_phase = new_phase
                    self.phase_timer = self.min_green_time
                    switched_phase = True
        
        # Vehicle arrivals
        for direction in self.directions:
            arrivals = np.random.poisson(self.arrival_rates[direction])
            self.queues[direction] = min(self.queues[direction] + arrivals, self.max_queue)
        
        # Vehicle departures
        green_directions = self.phase_to_directions[self.current_phase]
        for direction in green_directions:
            departures = int(self.saturation_flow * self.num_lanes)
            actual_departures = min(departures, self.queues[direction])
            self.queues[direction] -= actual_departures
            self.total_throughput += actual_departures
        
        self.phase_timer = max(0, self.phase_timer - 1)
        
        # Calculate reward
        total_queue = sum(self.queues.values())
        self.total_waiting += total_queue
        reward = -total_queue / (self.max_queue * 4)
        queue_std = np.std(list(self.queues.values()))
        reward -= 0.1 * (queue_std / self.max_queue)
        if switched_phase:
            reward -= 0.2
        
        done = self.step_count >= self.max_steps
        info = {
            'queues': self.queues.copy(),
            'phase': self.current_phase,
            'total_waiting': self.total_waiting,
            'throughput': self.total_throughput,
            'avg_queue': total_queue / 4,
            'reward': reward,
            'total_queue': total_queue,
            'switched': switched_phase,
        }
        
        return self._get_state(), reward, done, info

print("‚úÖ TrafficEnv defined!")

## 3Ô∏è‚É£ Gymnasium Wrapper (for Stable-Baselines3)

In [None]:
import gymnasium as gym
from gymnasium import spaces


class TrafficGymEnv(gym.Env):
    """Gymnasium-compatible wrapper for TrafficEnv."""
    
    metadata = {"render_modes": ["human"]}
    
    def __init__(self, max_steps=3600, randomize_pattern=True):
        super().__init__()
        self.env = TrafficEnv(max_steps=max_steps)
        self.randomize_pattern = randomize_pattern
        
        self.observation_space = spaces.Box(
            low=0.0, high=1.0, shape=(self.env.state_dim,), dtype=np.float32
        )
        self.action_space = spaces.Discrete(self.env.n_actions)
    
    def reset(self, seed=None, options=None):
        super().reset(seed=seed)
        pattern = TrafficPatternGenerator.get_random_pattern() if self.randomize_pattern else None
        obs = self.env.reset(arrival_rates=pattern)
        return obs.astype(np.float32), {"arrival_rates": self.env.arrival_rates}
    
    def step(self, action):
        obs, reward, done, info = self.env.step(action)
        return obs.astype(np.float32), reward, done, False, info
    
    def render(self):
        pass
    
    def close(self):
        pass


# Quick test
env = TrafficGymEnv(max_steps=100)
obs, info = env.reset()
print(f"‚úÖ Gym wrapper ready!")
print(f"   Observation shape: {obs.shape}")
print(f"   Action space: {env.action_space}")

## 4Ô∏è‚É£ Training Configuration

In [None]:
# ============================================
# TRAINING HYPERPARAMETERS
# ============================================

# Environment
MAX_STEPS_PER_EPISODE = 3600    # 1 hour simulation
N_ENVS = 4                       # Parallel environments

# PPO Hyperparameters
LEARNING_RATE = 3e-4
N_STEPS = 2048                   # Steps per rollout
BATCH_SIZE = 64
N_EPOCHS = 10                    # PPO epochs per update
GAMMA = 0.99                     # Discount factor
GAE_LAMBDA = 0.95               # GAE lambda
CLIP_RANGE = 0.2                # PPO clip range
ENT_COEF = 0.01                 # Entropy (exploration)

# Training duration
# üîß ADJUST THIS BASED ON YOUR TIME
TOTAL_TIMESTEPS = 500_000       # ~15-30 min on Colab GPU

print("‚úÖ Config set!")
print(f"   Total timesteps: {TOTAL_TIMESTEPS:,}")
print(f"   Parallel envs: {N_ENVS}")

## 5Ô∏è‚É£ Create PPO Agent

In [None]:
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import DummyVecEnv
from stable_baselines3.common.monitor import Monitor
from stable_baselines3.common.callbacks import EvalCallback, CheckpointCallback
import os

# Create directories
os.makedirs("models", exist_ok=True)
os.makedirs("logs", exist_ok=True)

# Create vectorized environment
def make_env():
    env = TrafficGymEnv(max_steps=MAX_STEPS_PER_EPISODE, randomize_pattern=True)
    return Monitor(env)

train_env = DummyVecEnv([make_env for _ in range(N_ENVS)])
eval_env = DummyVecEnv([make_env])

print(f"‚úÖ Environments created!")
print(f"   Training envs: {N_ENVS}")
print(f"   Observation space: {train_env.observation_space}")
print(f"   Action space: {train_env.action_space}")

In [None]:
# Create PPO agent
model = PPO(
    policy="MlpPolicy",
    env=train_env,
    learning_rate=LEARNING_RATE,
    n_steps=N_STEPS,
    batch_size=BATCH_SIZE,
    n_epochs=N_EPOCHS,
    gamma=GAMMA,
    gae_lambda=GAE_LAMBDA,
    clip_range=CLIP_RANGE,
    ent_coef=ENT_COEF,
    verbose=1,
    tensorboard_log="logs/",
)

print("\n‚úÖ PPO Agent created!")
print(f"   Policy: MlpPolicy")
print(f"   Parameters: {sum(p.numel() for p in model.policy.parameters()):,}")

## 6Ô∏è‚É£ Setup Callbacks

In [None]:
# Evaluation callback - saves best model
eval_callback = EvalCallback(
    eval_env,
    best_model_save_path="models/best_model",
    log_path="logs/eval",
    eval_freq=10000 // N_ENVS,
    n_eval_episodes=5,
    deterministic=True,
    render=False,
)

# Checkpoint callback - periodic saves
checkpoint_callback = CheckpointCallback(
    save_freq=50000 // N_ENVS,
    save_path="models/checkpoints",
    name_prefix="ppo_traffic",
)

callbacks = [eval_callback, checkpoint_callback]

print("‚úÖ Callbacks ready!")

## üöÄ 7Ô∏è‚É£ TRAIN THE MODEL

**This is the main training cell!**

‚è±Ô∏è Expected time:
- 500K steps: ~15-30 min (Colab GPU)
- 1M steps: ~30-60 min (Colab GPU)

You can interrupt with `Ctrl+C` and the model will still be saved.

In [None]:
print("=" * 60)
print("üöÄ STARTING PPO TRAINING")
print("=" * 60)
print(f"Total timesteps: {TOTAL_TIMESTEPS:,}")
print(f"This may take 15-30 minutes...")
print("-" * 60)

try:
    model.learn(
        total_timesteps=TOTAL_TIMESTEPS,
        callback=callbacks,
        progress_bar=True,
    )
except KeyboardInterrupt:
    print("\n‚ö†Ô∏è Training interrupted!")

# Save final model
model.save("models/ppo_traffic_final")
print("\n" + "=" * 60)
print("‚úÖ TRAINING COMPLETE!")
print("üíæ Model saved to: models/ppo_traffic_final.zip")
print("=" * 60)

## 8Ô∏è‚É£ Test the Trained Agent

In [None]:
# Load the best model
from stable_baselines3 import PPO

best_model = PPO.load("models/best_model/best_model")
print("‚úÖ Best model loaded!")

In [None]:
# Test on different traffic patterns
test_patterns = [
    ("Low Traffic", TrafficPatternGenerator.low_traffic()),
    ("Heavy Traffic", TrafficPatternGenerator.heavy_traffic()),
    ("Peak Hour", TrafficPatternGenerator.peak_hour()),
    ("Symmetric", TrafficPatternGenerator.symmetric_flow()),
]

print("\n" + "=" * 60)
print("üìä TESTING TRAINED AGENT")
print("=" * 60)

for name, rates in test_patterns:
    env = TrafficGymEnv(max_steps=1000)
    obs, _ = env.reset()
    env.env.arrival_rates = rates  # Override with test pattern
    
    total_reward = 0
    for _ in range(1000):
        action, _ = best_model.predict(obs, deterministic=True)
        obs, reward, done, _, info = env.step(action)
        total_reward += reward
        if done:
            break
    
    print(f"\n{name}:")
    print(f"  Total Reward: {total_reward:.2f}")
    print(f"  Avg Queue: {info['avg_queue']:.2f}")
    print(f"  Throughput: {info['throughput']} vehicles")

## 9Ô∏è‚É£ Download Trained Model

Run this to download your trained model (for Colab)

In [None]:
# For Google Colab - download the model
try:
    from google.colab import files
    files.download('models/ppo_traffic_final.zip')
    print("‚úÖ Download started!")
except:
    print("Not running on Colab. Model saved locally at: models/ppo_traffic_final.zip")

---

## ‚úÖ Done!

Your trained model is saved at:
- `models/ppo_traffic_final.zip` - Final model
- `models/best_model/best_model.zip` - Best during training

**Next steps:**
1. Download the model
2. Evaluate on Silk Board arrival rates
3. Compare with fixed-time baseline