# Raahi-v2 Transit RL Training on Google Colab

This notebook trains reinforcement learning agents for transit optimization using synthetic city data.
Run on Google Colab with GPU for faster training.

In [None]:
# Check GPU availability
import torch
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"GPU: {torch.cuda.get_device_name(0)}")
    device = torch.device("cuda")
else:
    device = torch.device("cpu")
print(f"Using device: {device}")

In [None]:
# Install required packages
!pip install stable-baselines3[extra]
!pip install tensorboard
!pip install gymnasium
!pip install matplotlib
!pip install numpy
!pip install torch

In [None]:
# Import libraries
import json
import random
import numpy as np
import torch
import torch.nn as nn
import matplotlib.pyplot as plt
from collections import defaultdict, deque
from dataclasses import dataclass
from typing import List, Dict, Tuple
import gymnasium as gym
from stable_baselines3 import PPO
from stable_baselines3.common.env_util import make_vec_env
from stable_baselines3.common.callbacks import EvalCallback
import os

In [None]:
# City Builder class for generating training data
@dataclass
class Station:
    id: str
    x: int
    y: int
    station_type: str
    is_transfer: bool = False

class CityBuilder:
    def __init__(self, grid_size: int = 8):
        self.grid_size = grid_size
        self.zone_types = ["residential", "commercial", "industrial"]
        self.station_types = ["bus", "tram", "metro"]
    
    def generate_random_city(self) -> Dict:
        zones = self._generate_zones()
        stations = self._generate_stations()
        connections = self._generate_connections(stations)
        routes = self._generate_routes(stations)
        
        return {
            "grid_size": self.grid_size,
            "zones": zones,
            "stations": stations,
            "connections": connections,
            "routes": routes
        }
    
    def _generate_zones(self) -> List[Dict]:
        zones = []
        for x in range(self.grid_size):
            for y in range(self.grid_size):
                zone_type = random.choice(self.zone_types)
                zones.append({"x": x, "y": y, "type": zone_type})
        return zones
    
    def _generate_stations(self) -> List[Dict]:
        stations = []
        num_stations = random.randint(5, 15)
        
        for i in range(num_stations):
            x = random.randint(0, self.grid_size - 1)
            y = random.randint(0, self.grid_size - 1)
            station_type = random.choice(self.station_types)
            
            stations.append({
                "id": f"station_{i}",
                "x": x, "y": y,
                "type": station_type,
                "is_transfer": False
            })
        return stations
    
    def _generate_connections(self, stations: List[Dict]) -> List[Dict]:
        connections = []
        for i, s1 in enumerate(stations):
            for j, s2 in enumerate(stations[i+1:], i+1):
                dist = abs(s1["x"] - s2["x"]) + abs(s1["y"] - s2["y"])
                if dist <= 3:
                    connections.append({
                        "from": s1["id"],
                        "to": s2["id"],
                        "walk_time": dist * 60
                    })
        return connections
    
    def _generate_routes(self, stations: List[Dict]) -> List[Dict]:
        routes = []
        station_groups = defaultdict(list)
        
        for station in stations:
            station_groups[station["type"]].append(station)
        
        route_id = 0
        for mode, mode_stations in station_groups.items():
            if len(mode_stations) >= 2:
                routes.append({
                    "id": f"route_{route_id}",
                    "mode": mode,
                    "stations": [s["id"] for s in mode_stations[:4]],
                    "color": f"#{''.join([random.choice('0123456789ABCDEF') for _ in range(6)])}"
                })
                route_id += 1
        
        return routes

print("City Builder ready")

In [None]:
# Transit Environment for RL training
class TransitEnv(gym.Env):
    def __init__(self, grid_size=8, training_cities=None):
        super().__init__()
        self.grid_size = grid_size
        self.city_builder = CityBuilder(grid_size)
        self.training_cities = training_cities or []
        self.city_index = 0
        
        # Action space: choose route frequency for each route (0-10)
        self.action_space = gym.spaces.Box(
            low=0, high=10, shape=(10,), dtype=np.float32
        )
        
        # Observation space: city state
        obs_size = grid_size * grid_size * 4  # zones + stations + routes
        self.observation_space = gym.spaces.Box(
            low=0, high=1, shape=(obs_size,), dtype=np.float32
        )
        
        self.reset()
    
    def reset(self, seed=None, options=None):
        super().reset(seed=seed)
        
        # Use pre-generated cities if available, otherwise generate new
        if self.training_cities:
            self.city_data = self.training_cities[self.city_index % len(self.training_cities)]
            self.city_index += 1
        else:
            self.city_data = self.city_builder.generate_random_city()
            
        self.current_step = 0
        self.max_steps = 100
        
        obs = self._get_observation()
        return obs, {}
    
    def step(self, action):
        # Apply actions (route frequencies)
        route_frequencies = action
        
        # Calculate reward based on efficiency
        reward = self._calculate_reward(route_frequencies)
        
        self.current_step += 1
        done = self.current_step >= self.max_steps
        truncated = False
        
        obs = self._get_observation()
        return obs, reward, done, truncated, {}
    
    def _get_observation(self):
        # Convert city data to observation vector
        obs = np.zeros(self.grid_size * self.grid_size * 4, dtype=np.float32)
        
        # Encode zones
        for zone in self.city_data["zones"]:
            idx = zone["x"] * self.grid_size + zone["y"]
            if zone["type"] == "residential":
                obs[idx] = 1.0
            elif zone["type"] == "commercial":
                obs[idx + self.grid_size * self.grid_size] = 1.0
            elif zone["type"] == "industrial":
                obs[idx + 2 * self.grid_size * self.grid_size] = 1.0
        
        # Encode stations
        for station in self.city_data["stations"]:
            idx = station["x"] * self.grid_size + station["y"]
            obs[idx + 3 * self.grid_size * self.grid_size] = 1.0
        
        return obs
    
    def _calculate_reward(self, route_frequencies):
        # Simple reward: efficiency vs cost
        efficiency = np.sum(route_frequencies) * 0.1
        cost = np.sum(route_frequencies ** 2) * 0.05
        coverage = len(self.city_data["stations"]) * 0.02
        
        reward = efficiency + coverage - cost
        return float(reward)

print("Transit Environment ready")

In [None]:
# Generate training cities
print("Generating training cities...")
builder = CityBuilder(8)
training_cities = []

for i in range(50):
    city = builder.generate_random_city()
    training_cities.append(city)
    if i % 10 == 0:
        print(f"Generated {i+1} cities")

print(f"Generated {len(training_cities)} training cities")
print(f"Sample city has {len(training_cities[0]['stations'])} stations")

In [None]:
# Create environment with training cities
env = TransitEnv(grid_size=8, training_cities=training_cities)
print(f"Environment created with {len(training_cities)} training cities")
print(f"Action space: {env.action_space}")
print(f"Observation space: {env.observation_space}")

# Test environment
obs, info = env.reset()
print(f"Initial observation shape: {obs.shape}")
print(f"Current city has {len(env.city_data['stations'])} stations")

action = env.action_space.sample()
obs, reward, done, truncated, info = env.step(action)
print(f"Test step reward: {reward}")

In [None]:
# Setup PPO model
model = PPO(
    "MlpPolicy",
    env,
    learning_rate=3e-4,
    n_steps=2048,
    batch_size=64,
    n_epochs=10,
    gamma=0.99,
    gae_lambda=0.95,
    clip_range=0.2,
    ent_coef=0.01,
    device=device,
    verbose=1,
    tensorboard_log="./logs/"
)

print(f"PPO model created on {device}")
print(f"Policy network: {model.policy}")

In [None]:
# Training loop
print("Starting training...")
total_timesteps = 100000
save_freq = 10000

# Create callback for evaluation
eval_env = TransitEnv(grid_size=8)
eval_callback = EvalCallback(
    eval_env, 
    best_model_save_path="./models/",
    log_path="./logs/",
    eval_freq=5000,
    deterministic=True,
    render=False
)

# Train the model
model.learn(
    total_timesteps=total_timesteps,
    callback=eval_callback,
    progress_bar=True
)

print("Training completed!")

In [None]:
# Save final model
model.save("transit_ppo_final")
print("Model saved as 'transit_ppo_final'")

# Test trained model
obs, info = env.reset()
total_reward = 0

for step in range(100):
    action, _states = model.predict(obs, deterministic=True)
    obs, reward, done, truncated, info = env.step(action)
    total_reward += reward
    
    if done or truncated:
        break

print(f"Test episode reward: {total_reward}")
print(f"Average reward per step: {total_reward / (step + 1)}")

In [None]:
# Performance visualization
import matplotlib.pyplot as plt
import pandas as pd

# Evaluate on multiple episodes
num_eval_episodes = 20
rewards = []

for episode in range(num_eval_episodes):
    obs, info = env.reset()
    episode_reward = 0
    
    for step in range(100):
        action, _states = model.predict(obs, deterministic=True)
        obs, reward, done, truncated, info = env.step(action)
        episode_reward += reward
        
        if done or truncated:
            break
    
    rewards.append(episode_reward)
    if episode % 5 == 0:
        print(f"Episode {episode}: {episode_reward:.2f}")

# Plot results
plt.figure(figsize=(10, 6))
plt.subplot(1, 2, 1)
plt.plot(rewards)
plt.title("Episode Rewards")
plt.xlabel("Episode")
plt.ylabel("Total Reward")

plt.subplot(1, 2, 2)
plt.hist(rewards, bins=10)
plt.title("Reward Distribution")
plt.xlabel("Total Reward")
plt.ylabel("Frequency")

plt.tight_layout()
plt.show()

print(f"Average reward: {np.mean(rewards):.2f} ± {np.std(rewards):.2f}")
print(f"Min reward: {np.min(rewards):.2f}")
print(f"Max reward: {np.max(rewards):.2f}")

In [None]:
# Model analysis
print("Model Analysis:")
print(f"Policy network: {model.policy}")
print(f"Total parameters: {sum(p.numel() for p in model.policy.parameters())}")

# Sample action analysis
obs, info = env.reset()
action, _states = model.predict(obs, deterministic=True)

print(f"\nSample action (route frequencies): {action}")
print(f"Action statistics:")
print(f"  Mean: {np.mean(action):.3f}")
print(f"  Std: {np.std(action):.3f}")
print(f"  Min: {np.min(action):.3f}")
print(f"  Max: {np.max(action):.3f}")

In [None]:
# Download model files (for Colab)
from google.colab import files

try:
    # Zip model files
    !zip -r trained_models.zip transit_ppo_final.zip models/ logs/
    
    # Download
    files.download('trained_models.zip')
    print("Model files downloaded!")
except:
    print("Not running on Colab, skipping download")

## Training Complete!

The RL agent has been trained on synthetic transit data. Key results:

- **Environment**: 8x8 grid cities with multiple transit modes
- **Algorithm**: PPO (Proximal Policy Optimization)
- **Training**: 100K timesteps with GPU acceleration
- **Objective**: Optimize route frequencies for efficiency

The trained model can now make decisions about transit route scheduling to maximize efficiency while minimizing costs.