In [3]:

import numpy as np
import random
from collections import deque
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense, Input

class TrafficIntersection:
    def __init__(self, lanes):
        self.lanes = lanes
        self.state = self.reset()
        
    def reset(self):
        self.state = [random.randint(0, 100) for _ in range(self.lanes)]
        return self.state
    
    def step(self, actions):
        green_times = [action * 3 for action in actions]
        new_state = []
        rewards = []
        
        for i in range(self.lanes):
            vehicle_left = self.state[i] - green_times[i]
            new_state.append(max(vehicle_left, 0))  
            
            reward = 0
            if vehicle_left == 0:
                reward += 10000
            else:
                reward -= abs(vehicle_left) * 100
            if green_times[i] > self.state[i]:
                reward -= (green_times[i] - self.state[i]) * 100 
            rewards.append(reward)
        
        self.state = new_state
        return new_state, rewards

@tf.keras.utils.register_keras_serializable()
class QNetwork(Model):
    def __init__(self, state_size, action_size, **kwargs):
        super(QNetwork, self).__init__(**kwargs)
        self.state_size = state_size
        self.action_size = action_size
        self.dense1 = Dense(24, activation='relu')
        self.dense2 = Dense(24, activation='relu')
        self.out = Dense(action_size, activation='linear')
    
    def call(self, inputs):
        x = self.dense1(inputs)
        x = self.dense2(x)
        return self.out(x)
    
    def get_config(self):
        config = super(QNetwork, self).get_config()
        config.update({
            'state_size': self.state_size,
            'action_size': self.action_size,
        })
        return config
    
    @classmethod
    def from_config(cls, config):
        return cls(**config)

class CentralController:
    def __init__(self, agents):
        self.agents = agents
    
    def get_joint_action(self, states):
        actions = []
        for i, agent in enumerate(self.agents):
            state = np.reshape(states[i], [1, 1])
            action = agent.choose_action(state)
            actions.append(action)
        return actions

class DQNAgent:
    def __init__(self, state_size, action_size, gamma, epsilon, epsilon_min, epsilon_decay, learning_rate, batch_size, memory_size):
        self.state_size = state_size
        self.action_size = action_size
        self.memory = deque(maxlen=memory_size)
        self.gamma = gamma  
        self.epsilon = epsilon  
        self.epsilon_min = epsilon_min
        self.epsilon_decay = epsilon_decay
        self.learning_rate = learning_rate
        self.batch_size = batch_size
        self.model = self._build_model()
        self.target_model = self._build_model()
        self.model(np.zeros((1, self.state_size)))  
        self.target_model(np.zeros((1, self.state_size)))  
        self.update_target_model()
    
    def _build_model(self):
        model = QNetwork(self.state_size, self.action_size)
        model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=self.learning_rate),
                      loss='mse')
        return model
    
    def update_target_model(self):
        self.target_model.set_weights(self.model.get_weights())
    
    def remember(self, state, action, reward, next_state):
        self.memory.append((state, action, reward, next_state))
    
    def choose_action(self, state):
        if np.random.rand() <= self.epsilon:
            return random.randrange(self.action_size)
        act_values = self.model.predict(state, verbose=0)
        return np.argmax(act_values[0])
    
    def replay(self):
        if len(self.memory) < self.batch_size:
            return
        minibatch = random.sample(self.memory, self.batch_size)
        for state, action, reward, next_state in minibatch:
            target = self.model.predict(state, verbose=0)
            target_f = self.target_model.predict(next_state, verbose=0)
            target[0][action] = reward + self.gamma * np.amax(target_f[0])
            self.model.fit(state, target, epochs=1, verbose=0)
            
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay
    
    def train(self, env, controller, episodes):
        for e in range(episodes):
            state = env.reset()
            done = [False] * env.lanes
            time_step = 0
            print(f"\nTraining Episode {e+1}/{episodes}")
            while not all(done):
                actions = controller.get_joint_action(state)
                next_state, rewards = env.step(actions)
                for i in range(env.lanes):
                    if not done[i]:  # Skip lanes that are already done
                        self.remember(np.reshape(state[i], [1, 1]), actions[i], rewards[i], np.reshape(next_state[i], [1, 1]))
                        print(f"Time Step {time_step+1}, Lane {i+1}, Vehicle Waiting: {state[i]}, Green Time Chosen: {actions[i] * 3} seconds, Vehicle Left: {next_state[i]}, Reward: {rewards[i]}")
                        if next_state[i] == 0:
                            done[i] = True
                state = next_state
                if time_step % 10 == 0:
                    self.replay()
                time_step += 1
            self.update_target_model()
            print(f"Episode {e+1}/{episodes}, Epsilon: {self.epsilon:.2f}")
    
    def predict_green_signal(self, vehicle_count):
        vehicle_count = int(vehicle_count)
        return self.choose_action(np.reshape(vehicle_count, [1, 1])) * 3
    
    def save_model(self, path):
        self.model.save(path)
    
    def load_model(self, path):
        self.model = tf.keras.models.load_model(path)

gamma = 0.95
epsilon = 1.0
epsilon_min = 0.01
epsilon_decay = 0.995
learning_rate = 0.001
batch_size = 32
memory_size = 2000
state_size = 1 
action_size = 11 
lanes = 4
env = TrafficIntersection(lanes)
agents = [DQNAgent(state_size, action_size, gamma, epsilon, epsilon_min, epsilon_decay, learning_rate, batch_size, memory_size) for _ in range(lanes)]
controller = CentralController(agents)

episodes = 500
for agent in agents:
    agent.train(env, controller, episodes)

def test_agents(environment, agents, controller, num_episodes):
    for e in range(num_episodes):
        states = environment.reset()
        done = [False] * environment.lanes
        time_step = 0
        print(f"\nTesting Episode {e+1}/{num_episodes}")
        while not all(done):
            actions = controller.get_joint_action(states)
            next_states, rewards = environment.step(actions)
            
            for lane in range(environment.lanes):
                if not done[lane]:
                    done[lane] = next_states[lane] == 0
                    print(f"Time Step {time_step+1}, Lane {lane+1}, Vehicle Waiting: {states[lane]}, Green Time Chosen: {actions[lane] * 3} seconds, Vehicle Left: {next_states[lane]}, Reward: {rewards[lane]}")
                    states[lane] = next_states[lane]
                    
            time_step += 1

test_episodes = 5
test_agents(env, agents, controller, test_episodes)



Training Episode 1/500
Time Step 1, Lane 1, Vehicle Waiting: 88, Green Time Chosen: 21 seconds, Vehicle Left: 67, Reward: -6700
Time Step 1, Lane 2, Vehicle Waiting: 9, Green Time Chosen: 12 seconds, Vehicle Left: 0, Reward: -600
Time Step 1, Lane 3, Vehicle Waiting: 28, Green Time Chosen: 15 seconds, Vehicle Left: 13, Reward: -1300
Time Step 1, Lane 4, Vehicle Waiting: 79, Green Time Chosen: 3 seconds, Vehicle Left: 76, Reward: -7600
Time Step 2, Lane 1, Vehicle Waiting: 67, Green Time Chosen: 27 seconds, Vehicle Left: 40, Reward: -4000
Time Step 2, Lane 3, Vehicle Waiting: 13, Green Time Chosen: 21 seconds, Vehicle Left: 0, Reward: -1600
Time Step 2, Lane 4, Vehicle Waiting: 76, Green Time Chosen: 15 seconds, Vehicle Left: 61, Reward: -6100
Time Step 3, Lane 1, Vehicle Waiting: 40, Green Time Chosen: 6 seconds, Vehicle Left: 34, Reward: -3400
Time Step 3, Lane 4, Vehicle Waiting: 61, Green Time Chosen: 30 seconds, Vehicle Left: 31, Reward: -3100
Time Step 4, Lane 1, Vehicle Waiting: