In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import random
from collections import deque
import traci
import sumolib
import time

environment = "environments/cross.sumocfg"
sumobin = sumolib.checkBinary('sumo-gui')

traci.start([sumobin, '-c', environment])  


print("Connected to TraCI")

Connected to TraCI


In [2]:
# Function to sum the delay of all vehicles affected by the traffic light
def get_delay(tls_id):
    delays = [get_lane_delay(lane_id) for lane_id in traci.trafficlight.getControlledLanes(tls_id)]
    return sum(delays)

# returns the sum of every vehicle's delay (1 - speed / max_speed) in a given lane
def get_lane_delay(lane_id):
    max_s = traci.lane.getMaxSpeed(lane_id)
    avg_s = traci.lane.getLastStepMeanSpeed(lane_id)
    num_veh = traci.lane.getLastStepVehicleNumber(lane_id)
    return max(num_veh * (1 - avg_s / max_s), 0) # for some reason this can return small negative values :\

# Function to get the number of vehicles currently waiting
def get_waiting_time(tls_id = '0'):
    waiting_times = [traci.lane.getLastStepHaltingNumber(lane_id) for lane_id in traci.trafficlight.getControlledLanes(tls_id)]
    return sum(waiting_times)

# Function that returns the number of emergency stops (acceleration < -4.5m/s^2) caused by the traffic light
def num_emergency_stops(tls_id = '0'):
    emergency_stops = [get_lane_emergency_stops(lane_id) for lane_id in traci.trafficlight.getControlledLanes(tls_id)]
    return sum(emergency_stops)

# returns the number of vehicles that had to emergency stop in the last time step (decelerated > 4.5 m/s/s)
def get_lane_emergency_stops(lane_id):
    emergency_stops = [veh_id for veh_id in traci.lane.getLastStepVehicleIDs(lane_id) if traci.vehicle.getAcceleration(veh_id) < -4.5]
    return len(emergency_stops)


In [3]:
class Environment:
    def __init__(self):
        self.previous_total_delay = 0.0

    # Function to reset the SUMO environment
    def reset_sumo_environment(self, environment):
        traci.load(['-c', environment, '--start', '--step-length', 0.2])
        traci.trafficlight.setProgram('0', '0')
        
        # Get initial state information (modify this based on your state representation)
        state = self.get_state()
        return state

    # Function to step through the SUMO simulation
    def step_in_sumo(self, action):
        # Apply the action
        self.apply_action(action)
        
        # Step the SUMO simulation forward
        traci.simulationStep()
        
        # Get the new state after taking the action
        next_state = self.get_state()
        
        # Calculate the reward with the specified tls_id
        reward = self.calculate_reward()
        
        # Check if the episode is done
        done = self.check_done_condition()
        
        return next_state, reward, done

    # Function to get the current state (modify this based on what information you need)
    def get_state(self):
        # example state, the total delay of each lane, takes into account cars being slowed, or too many cars stopped at a red
        state = []
        tls_ids = traci.trafficlight.getIDList()
        for tls_id in tls_ids:
            lanes = traci.trafficlight.getControlledLanes(tls_id)
            traffic_light_state = list(map(get_lane_delay, lanes))
            state.extend(traffic_light_state)

        return np.array(state)

    # Function to apply the action (modify based on your action space)
    def apply_action(self, action):
        # actions changes the phase of the traffic light program (0: do nothing, 1: next phase)
        try:
            tls_ids = traci.trafficlight.getIDList()
            for tls_id in tls_ids:
                traci.trafficlight.setPhase(tls_id, action)
        except traci.exceptions.FatalTraCIError as e:
            print("TraCI error:", e)
            traci.close()
            return

    # Function to calculate the reward (implement your logic)
    def calculate_reward(self):
        # Step the simulation with the specified phase applied across all traffic lights
        tls_ids = traci.trafficlight.getIDList()

        current_total_delay = 0
        for tls_id in tls_ids:
            # Get lanes controlled by this traffic light system
            controlled_lanes = traci.trafficlight.getControlledLanes(tls_id)
            
            # Sum the waiting times of all vehicles on the approaching lanes
            for lane_id in controlled_lanes:
                current_total_delay += traci.lane.getWaitingTime(lane_id)
        
        reward = self.previous_total_delay - current_total_delay

        self.previous_total_delay = current_total_delay

        print("reward:", reward)
        return reward

    # Function to check if the simulation should terminate
    def check_done_condition(self):
        # Example condition: terminate if simulation time exceeds a limit
        current_time = traci.simulation.getTime()
        return current_time > 250  # Change this threshold as necessary

In [4]:
# Define the neural network for the Q-function
class DQN(nn.Module):
    def __init__(self, n_state_params, n_actions):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(n_state_params, 24)
        self.fc2 = nn.Linear(24, 24)
        self.fc3 = nn.Linear(24, n_actions)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        return self.fc3(x)

In [5]:
# Define the RL agent
class RLAgent:
    def __init__(self, n_state_params, n_actions):
        self.n_state_params = n_state_params
        self.n_actions = n_actions
        self.memory = deque(maxlen=2000)
        self.gamma = 0.95  # discount rate
        self.epsilon = 0.2  # exploration rate
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.995
        self.model = DQN(n_state_params, n_actions)
        self.optimizer = optim.Adam(self.model.parameters(), lr=0.001)
        self.criterion = nn.MSELoss()

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def act(self, state):
        if np.random.rand() <= self.epsilon:
            return random.randrange(self.n_actions)
        state = torch.FloatTensor(state)
        q_values = self.model(state)
        return np.argmax(q_values.detach().numpy())

    def replay(self, batch_size):
        if len(self.memory) < batch_size:
            return
        minibatch = random.sample(self.memory, batch_size)
        for state, action, reward, next_state, done in minibatch:
            target = reward
            if not done:
                target += self.gamma * np.amax(self.model(torch.FloatTensor(next_state)).detach().numpy())
            target_f = self.model(torch.FloatTensor(state)).detach().numpy()
            # Check if action index is valid
            if 0 <= action < self.n_actions:
                target_f[action] = target
            else:
                print(f"Invalid action: {action}")

            # Convert back to tensor for loss calculation
            target_f_tensor = torch.FloatTensor(target_f)
            self.model.zero_grad()
            loss = self.criterion(target_f_tensor, self.model(torch.FloatTensor(state)))
            loss.backward()
            self.optimizer.step()
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

In [6]:
# Simulation interaction loop
def run_simulation(agent, env, num_episodes, batch_size):
    for e in range(num_episodes):
        state = env.reset_sumo_environment(environment)  # Reset the SUMO environment and get the initial state
        done = False
        total_reward = 0

        while not done:
            action = agent.act(state)
            next_state, reward, done = env.step_in_sumo(action)  # Step through the SUMO simulation
            agent.remember(state, action, reward, next_state, done)
            state = next_state
            total_reward += reward

        print(f"Episode: {e+1}/{num_episodes}, Total Reward: {total_reward}")
        agent.replay(batch_size)

In [7]:
# number of state parameters: parameter for each lane controlled by the traffic light, giving the total delay
env = Environment()
n_state_params = len(env.get_state())
print("Number of inputs:", n_state_params)
# Get the full phase program for the traffic light
program = traci.trafficlight.getCompleteRedYellowGreenDefinition('0')[0]

# Get the number of phases
n_actions = len(program.phases)
print("actions:", n_actions)

agent = RLAgent(n_state_params, n_actions)
run_simulation(agent, env, num_episodes=100, batch_size=32)

Number of inputs: 12
actions: 8


  program = traci.trafficlight.getCompleteRedYellowGreenDefinition('0')[0]


reward: 0.0
reward: 0.0
reward: 0.0
reward: 0.0
reward: 0.0
reward: 0.0
reward: 0.0
reward: 0.0
reward: 0.0
reward: 0.0
reward: 0.0
reward: 0.0
reward: 0.0
reward: 0.0
reward: 0.0
reward: 0.0
reward: 0.0
reward: 0.0
reward: 0.0
reward: 0.0
reward: 0.0
reward: 0.0
reward: 0.0
reward: 0.0
reward: 0.0
reward: 0.0
reward: 0.0
reward: 0.0
reward: 0.0
reward: 0.0
reward: 0.0
reward: 0.0
reward: 0.0
reward: 0.0
reward: 0.0
reward: 0.0
reward: 0.0
reward: 0.0
reward: 0.0
reward: 0.0
reward: 0.0
reward: 0.0
reward: 0.0
reward: 0.0
reward: 0.0
reward: 0.0
reward: 0.0
reward: 0.0
reward: 0.0
reward: 0.0
reward: 0.0
reward: 0.0
reward: 0.0
reward: 0.0
reward: 0.0
reward: 0.0
reward: 0.0
reward: 0.0
reward: 0.0
reward: 0.0
reward: 0.0
reward: 0.0
reward: 0.0
reward: 0.0
reward: 0.0
reward: -0.2
reward: -0.2
reward: -0.19999999999999996
reward: -0.20000000000000007
reward: -0.19999999999999996
reward: -0.19999999999999996
reward: -0.19999999999999996
reward: -0.20000000000000018
reward: -0.399999999