In [1]:
import numpy as np
import random
import traci  # SUMO Traffic Control Interface
import matplotlib.pyplot as plt

In [3]:
# Hyperparameters
alpha = 0.1   # Learning rate
gamma = 0.9   # Discount factor
epsilon = 0.1  # Epsilon-greedy strategy for exploration

In [7]:
# Number of intersections and actions (phases)
NUM_INTERSECTIONS = 6  # Modify as per the number of intersections in my project
NUM_ACTIONS = 3  # For example, 3 possible timings: 30s, 60s, 90s of green time

In [9]:
# Q-table initialized with zeros
Q_table = np.zeros((NUM_INTERSECTIONS, NUM_ACTIONS))

In [11]:
# Track total rewards per episode for plotting
total_rewards = []

In [13]:
# Function to select an action using epsilon-greedy policy
def choose_action(state):
    if random.uniform(0, 1) < epsilon:
        return random.randint(0, NUM_ACTIONS - 1)  # Explore: random action
    else:
        return np.argmax(Q_table[state])  # Exploit: best known action

In [15]:
# Function to calculate the reward based on traffic conditions
def get_reward(intersection_id):
    # Use the SUMO API to get the current traffic state (e.g., queue length or delay)
    queue_length = traci.edge.getLastStepVehicleNumber(f'intersection_{intersection_id}')
    return -queue_length  # Negative because we want to minimize the queue

In [17]:
# Function to update the Q-table based on the Q-learning formula
def update_q_table(state, action, reward, next_state):
    best_next_action = np.argmax(Q_table[next_state])
    Q_table[state, action] = Q_table[state, action] + alpha * (reward + gamma * Q_table[next_state, best_next_action] - Q_table[state, action])

In [19]:
# Main Q-learning loop
def q_learning(num_episodes):
    for episode in range(num_episodes):
        episode_reward = 0  # Track reward for the current episode

        # Loop over each intersection for every episode
        for intersection_id in range(NUM_INTERSECTIONS):
            # Get the initial state (current traffic condition)
            state = intersection_id  # Can be traffic conditions like queue length or congestion level
            
            # Choose an action (traffic signal phase setting)
            action = choose_action(state)
            
            # Apply the action in SUMO (set the traffic light phase)
            traci.trafficlight.setPhase(f'tl_{intersection_id}', action)
            
            # Simulate one step in SUMO to observe the effects of the action
            traci.simulationStep()  # Proceed by one simulation step
            
            # Get the reward (negative queue length to minimize traffic)
            reward = get_reward(intersection_id)
            episode_reward += reward  # Accumulate reward for the episode
            
            # Update the Q-table based on the observed reward and next state
            next_state = intersection_id  # The next state would be the updated traffic condition
            update_q_table(state, action, reward, next_state)
        
        # Store the total reward for this episode
        total_rewards.append(episode_reward)
        
        # Optionally, print progress
        if episode % 100 == 0:
            print(f'Episode {episode}, Total Reward: {episode_reward}')

    print('Training complete!')


In [21]:
# Plotting function to show total rewards per episode
def plot_rewards():
    plt.plot(total_rewards)
    plt.xlabel('Episode')
    plt.ylabel('Total Reward (Negative Queue Length)')
    plt.title('Q-learning: Total Reward per Episode')
    plt.show()