# Traffic Tune - Optimizing Traffic Signals with Reinforcement Learning

## Introduction

Welcome to the Traffic Tune POC notebook. Our project focused on optimizing traffic signal control using reinforcement learning. Traffic congestion is a major problem in urban areas, leading to increased travel times, fuel consumption, and pollution. Traditional traffic signal control systems often struggle to adapt to dynamic traffic conditions, resulting in suboptimal traffic flow.

Traffic Tune is a recommendation system that leverages reinforcement learning to dynamically adjust traffic signals at intersections. By learning from traffic patterns in real-time, Traffic Tune aims to improve traffic flow, reduce congestion, and enhance overall transportation efficiency.

In this POC, we will demonstrate how to train a reinforcement learning agent to optimize traffic signal control in a simulated environment. We will use the SUMO (Simulation of Urban MObility) traffic simulation tool and the Stable Baselines3 library to train a Deep Q-Network (DQN) agent to learn an optimal traffic signal control policy.


# Setup and Installations

In [None]:
import env_manager as env_manager
from Old_Files import ppo_trainer as ppo_trainer

In [None]:
num_intersection_to_train = 4 # Choose which intersection you want to train

experiment_type = "SingleAgent" # Choose the experiment_type: SingleAgent | MultiAgent

env setup

In [None]:
manager = env_manager.EnvManager(f"{experiment_type}Environment", "env_config.json", json_id=f"intersection_{num_intersection_to_train}")
generator = manager.env_generator(f"Nets/intersection_{num_intersection_to_train}/route_xml_path_intersection_{num_intersection_to_train}.txt")
rou , csv = next(generator)
env_kwargs = manager.initialize_env(rou, csv)

print(f"\nEnv creat for intersection_{num_intersection_to_train}",
      "\nNet path:", manager.kwargs["net_file"],
      "\nRoute path:", rou,
      "\nCsv path:", csv)

agent setup

In [None]:
ppo_agent = ppo_trainer.PPOTrainer("ppo_config.json", manager, experiment_type=experiment_type)
ppo_agent.build_config()

agent training

In [None]:
results = ppo_agent.train()

In [None]:
print(results.get_best_result())

agent prediction

In [None]:
ppo_agent.evaluate(results=results, kwargs=env_kwargs)
best = results.get_best_result("env_runners/episode_reward_max", "max")
print(best)

In [None]:
rou , csv = next(generator)
print(rou)
print(csv)
env_kwargs = manager.initialize_env(rou, csv)
print(env_kwargs)

In [None]:
ppo_agent.config = best.config
ppo_agent.build_config()

In [None]:
result_2 = ppo_agent.train()

In [None]:
import numpy as np
from ray.rllib.algorithms.algorithm import Algorithm
best_result = result.get_best_result("env_runners/episode_reward_max", "max")
checkpoint_path = best_result.checkpoint.path
algo = Algorithm.from_checkpoint(checkpoint_path)
eval_env = algo.env_creator({})


# Set up evaluation parameters
num_episodes = 4

# Evaluation loop
episode_rewards = []

for _ in range(num_episodes):
    episode_reward = 0
    done = False
    obse, _ = eval_env.reset()

    
    while not done:
        action = algo.compute_single_action(obse)
        obs, reward, terminated, truncated, info = eval_env.step(action)
        done = terminated or truncated
        episode_reward += reward

    
    episode_rewards.append(episode_reward)

# Calculate and print evaluation metrics
mean_reward = np.mean(episode_rewards)
std_reward = np.std(episode_rewards)

print(f"Evaluation over {num_episodes} episodes:")
print(f"Mean episode reward: {mean_reward:.2f} +/- {std_reward:.2f}")


# Plot the results 
import matplotlib.pyplot as plt

plt.figure(figsize=(10, 5))
plt.plot(episode_rewards)
plt.title("Episode Rewards")
plt.xlabel("Episode")
plt.ylabel("Reward")

plt.tight_layout()
plt.show()

# 7. Clean up
eval_env.close()
