In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import random
from collections import deque
import copy
import os
import sys
from maps.SumoEnv import SumoEnv 

In [None]:
class SumoDQNAgent:
    def __init__(self, action_space_n, observation_space_n, config):
        self.action_space_n = action_space_n
        self.observation_space_n = observation_space_n
        self.config = config
        self.device = self.set_device()
        self.set_random_seed(config['random_seed'])

        # Define the neural networks
        self.model = self.create_model()
        self.target_model = copy.deepcopy(self.model)

        # Initialize environment and state matrices
        self.env = SumoEnv(gui=False, flow_on_HW=config['flow_on_HW'], flow_on_Ramp=config['flow_on_Ramp'])
        self.state_matrices = deque(maxlen=config['state_queue_len'])
        self.initialize_state_matrices()

        self.optimizer = optim.Adam(self.model.parameters(), lr=config['learning_rate'])
        self.loss_fn = nn.MSELoss()
        self.replay = deque(maxlen=config['mem_size'])
        
        # Traffic flow data (convert times to steps assuming 1 step = 1 second)
        self.data_points = [(t * 60, hw, ramp) for t, hw, ramp in config['data_points']]
        
    def set_device(self):
        device = "mps" if torch.backends.mps.is_available() else "cpu"
        print(f"Using device: {device}")
        return device

    def set_random_seed(self, seed):
        torch.manual_seed(seed)
        np.random.seed(seed)
        random.seed(seed)

    def create_model(self):
        """Defines the neural network architecture."""
        layers = [self.observation_space_n, 128, 64, 32, 8, self.action_space_n]
        model = nn.Sequential(
            nn.Linear(layers[0], layers[1]),
            nn.ReLU(),
            nn.Linear(layers[1], layers[2]),
            nn.ReLU(),
            nn.Linear(layers[2], layers[3]),
            nn.ReLU(),
            nn.Linear(layers[3], layers[4]),
            nn.ReLU(),
            nn.Linear(layers[4], layers[5])
        )
        return model

    def initialize_state_matrices(self):
        """Initializes the deque with zero state matrices."""
        for _ in range(self.config['state_queue_len']):
            state_matrix = np.zeros((4, 251))
            self.state_matrices.appendleft(state_matrix)

    def obs(self):
        """Converts state matrices into a flat tensor for model input."""
        state_matrix = self.env.getStateMatrixV2()
        self.state_matrices.appendleft(state_matrix)
        flat_state_array = np.concatenate(self.state_matrices).flatten()
        return torch.from_numpy(flat_state_array).float()

    def rew(self):
        """Calculates the reward using configured weights."""
        return (
            self.config['mu'] * self.env.getSpeedHW() + 
            self.config['omega'] * self.env.getNumberVehicleWaitingTL() + 
            self.config['tau'] * self.env.getSpeedRamp()
        )

    def reset(self):
        """Resets the environment and state matrix."""
        self.initialize_state_matrices()
        self.env.reset()

    def step(self, action):
        """Executes action and adjusts flow dynamically in the simulation."""
        for _ in range(self.config['simulation_step_len']):
            self.env.setFlowOnHW(self.interpolate_flow(self.env.getCurrentStep(), self.data_points)[0])
            self.env.setFlowOnRamp(self.interpolate_flow(self.env.getCurrentStep(), self.data_points)[1])
            self.env.doSimulationStep(action)

    def interpolate_flow(self, step, data_points):
        """Performs linear interpolation of the flow data."""
        times, hw_flows, ramp_flows = zip(*data_points)
        hw_flow = np.interp(step, times, hw_flows)
        ramp_flow = np.interp(step, times, ramp_flows)
        return int(hw_flow), int(ramp_flow)

    def train(self):
        """Trains the model using DQN with experience replay."""
        total_loss, total_rewards = [], []
        total_steps = 0
        
        for epoch in range(self.config['epochs']):
            print("Epoch:", epoch)
            epsilon = self.update_epsilon(epoch)
            self.reset()
            state1 = self.obs()
            is_done = False
            steps = 0
            
            while not is_done:
                total_steps += 1
                steps += 1
                action = self.select_action(state1, epsilon)
                self.step(action)
                state2 = self.obs()
                reward = self.rew()
                total_rewards.append(reward)
                
                self.replay.append((state1, action, reward, state2, is_done))
                state1 = state2
                
                if len(self.replay) > self.config['batch_size']:
                    loss = self.replay_experience()
                    total_loss.append(loss.item())

                    if total_steps % self.config['sync_freq'] == 0:
                        self.target_model.load_state_dict(self.model.state_dict())

                is_done = (steps >= self.config['max_steps'])
        
        return self.model, np.array(total_loss), np.array(total_rewards)

    def select_action(self, state, epsilon):
        """Selects an action using epsilon-greedy policy."""
        if random.random() < epsilon:
            return random.randint(0, self.action_space_n - 1)
        with torch.no_grad():
            return torch.argmax(self.model(state)).item()

    def replay_experience(self):
        """Trains model on a random minibatch from replay memory."""
        minibatch = random.sample(self.replay, self.config['batch_size'])
        state1_batch = torch.cat([s1.unsqueeze(0) for (s1, a, r, s2, d) in minibatch])
        action_batch = torch.Tensor([a for (s1, a, r, s2, d) in minibatch])
        reward_batch = torch.Tensor([r for (s1, a, r, s2, d) in minibatch])
        state2_batch = torch.cat([s2.unsqueeze(0) for (s1, a, r, s2, d) in minibatch])
        done_batch = torch.Tensor([d for (s1, a, r, s2, d) in minibatch])

        Q1 = self.model(state1_batch)
        with torch.no_grad():
            Q2 = self.target_model(state2_batch).max(1)[0]
        
        Y = reward_batch + self.config['gamma'] * ((1 - done_batch) * Q2)
        X = Q1.gather(dim=1, index=action_batch.long().unsqueeze(dim=1)).squeeze()

        loss = self.loss_fn(X, Y.detach())

        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        return loss

    def update_epsilon(self, epoch):
        """Updates epsilon using exponential or linear decay."""
        if self.config['eps_dec_exp']:
            return self.config['eps_min'] + (self.config['eps_start'] - self.config['eps_min']) * np.exp(-self.config['eps_decay_factor'] * epoch)
        else:
            decay_rate = (self.config['eps_start'] - self.config['eps_min']) / self.config['epochs']
            return max(self.config['eps_min'], self.config['eps_start'] - decay_rate * epoch)

In [3]:
if __name__ == "__main__":
    config = {
        'random_seed': 33,
        'flow_on_HW': 5000,
        'flow_on_Ramp': 2000,
        'state_queue_len': 3,
        'data_points': [
            (0, 1000, 500), (10, 2000, 1300), (20, 3200, 1800),
            (30, 2500, 1500), (40, 1500, 1000), (50, 1000, 700), (60, 800, 500)
        ],
        'simulation_step_len': 2,
        'mu': 0.05,
        'omega': -0.5,
        'tau': 0.2,
        'epochs': 70,
        'batch_size': 32,
        'max_steps': 1800,
        'learning_rate': 5e-5,
        'gamma': 0.99,
        'eps_start': 0.8,
        'eps_min': 0.05,
        'eps_decay_factor': 0.05,
        'eps_dec_exp': True,
        'sync_freq': 5,
        'mem_size': 50000
    }

    agent = SumoDQNAgent(action_space_n=2, observation_space_n=3012, config=config)
    model, total_loss, total_rewards = agent.train()
    torch.save(model.state_dict(), "DynamicModel.pth")

Epoch: 1


KeyboardInterrupt: 

This code defines a DQN (Deep Q-Network) agent to train a neural network on a traffic simulation environment built with SUMO (Simulation of Urban MObility). The agent learns optimal actions to control traffic flow based on simulated traffic patterns.

The primary components are:

- Environment Interaction: Using a custom SumoEnv environment, the agent retrieves observations and performs actions.
- Neural Network: A feedforward network trained via Q-learning.
- Replay Buffer: Used to store and sample experience tuples.
- Training with DQN: Using experience replay and a target network to improve stability