You are tasked with developing an optimal strategy for controlling traffic signals at an intersection to minimize traffic congestion and reduce waiting times for vehicles. The goal is to use Reinforcement Learning techniques to learn the best timing for traffic signal changes based on real-time traffic conditions.

Input:
Real-time traffic data, including the number of vehicles approaching the intersection from different directions, average vehicle speeds, and historical traffic patterns.

Output:
An optimal policy for controlling traffic signals at the intersection to maximize traffic flow efficiency and minimize congestion.


In [3]:
import numpy as np
import random
from collections import deque
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import Adam

class Intersection:
    def __init__(self, num_lanes):
        self.num_lanes = num_lanes
        self.traffic_signal = [0] * num_lanes  # Initial state of traffic signal for each lane
    
    def step(self, action):
        # Update traffic signal based on action
        self.traffic_signal = action
        
        # Simulate traffic flow and calculate reward
        # For simplicity, let's assume a fixed reward structure
        
        # Calculate reward based on traffic flow efficiency and congestion
        reward = np.sum(action) - np.mean(action)  # Example reward function
        
        # Sample next state (not implemented for simplicity, in real-world, this would be based on traffic simulation)
        next_state = np.zeros(self.num_lanes)  # Placeholder
        
        return next_state, reward
    
    def reset(self):
        # Reset traffic signal to initial state
        self.traffic_signal = [0] * self.num_lanes

class DQNAgent:
    def __init__(self, state_size, action_size):
        self.state_size = state_size
        self.action_size = action_size
        self.memory = deque(maxlen=2000)
        self.gamma = 0.95    # Discount rate
        self.epsilon = 1.0  # Exploration rate
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.995
        self.learning_rate = 0.001
        self.model = self._build_model()

    def _build_model(self):
        model = Sequential()
        model.add(Dense(24, input_dim=self.state_size, activation='relu'))
        model.add(Dense(24, activation='relu'))
        model.add(Dense(self.action_size, activation='linear'))
        model.compile(loss='mse', optimizer=Adam(lr=self.learning_rate))
        return model

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def act(self, state):
        if np.random.rand() <= self.epsilon:
            return random.randrange(self.action_size)
        act_values = self.model.predict(state)
        return np.argmax(act_values[0])  # Returns action using policy derived from Q-network

    def replay(self, batch_size):
        minibatch = random.sample(self.memory, batch_size)
        for state, action, reward, next_state, done in minibatch:
            target = reward
            if not done:
                target = reward + self.gamma * np.amax(self.model.predict(next_state)[0])
            target_f = self.model.predict(state)
            target_f[0][action] = target
            self.model.fit(state, target_f, epochs=1, verbose=0)
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

# Main loop for training
env = Intersection(num_lanes=4)  # Initialize intersection environment
state_size = env.num_lanes
action_size = 2  # For simplicity, let's assume two possible actions for each lane (green or red)
agent = DQNAgent(state_size, action_size)
batch_size = 32
episodes = 1000

for e in range(episodes):
    state = np.zeros(state_size)  # Initial state
    total_reward = 0
    for time_step in range(100):  # Fixed number of time steps per episode
        action = agent.act(state)
        next_state, reward = env.step(action)
        total_reward += reward
        agent.remember(state, action, reward, next_state, False)
        state = next_state
        if len(agent.memory) > batch_size:
            agent.replay(batch_size)
    print("Episode {}: Total Reward: {}".format(e, total_reward))

# Once training is done, use the learned policy to control traffic signals in real-time



ValueError: in user code:

    File "c:\Users\hp\AppData\Local\Programs\Python\Python39\lib\site-packages\keras\engine\training.py", line 2169, in predict_function  *
        return step_function(self, iterator)
    File "c:\Users\hp\AppData\Local\Programs\Python\Python39\lib\site-packages\keras\engine\training.py", line 2155, in step_function  **
        outputs = model.distribute_strategy.run(run_step, args=(data,))
    File "c:\Users\hp\AppData\Local\Programs\Python\Python39\lib\site-packages\keras\engine\training.py", line 2143, in run_step  **
        outputs = model.predict_step(data)
    File "c:\Users\hp\AppData\Local\Programs\Python\Python39\lib\site-packages\keras\engine\training.py", line 2111, in predict_step
        return self(x, training=False)
    File "c:\Users\hp\AppData\Local\Programs\Python\Python39\lib\site-packages\keras\utils\traceback_utils.py", line 70, in error_handler
        raise e.with_traceback(filtered_tb) from None
    File "c:\Users\hp\AppData\Local\Programs\Python\Python39\lib\site-packages\keras\engine\input_spec.py", line 253, in assert_input_compatibility
        raise ValueError(

    ValueError: Exception encountered when calling layer 'sequential_1' (type Sequential).
    
    Input 0 of layer "dense_3" is incompatible with the layer: expected min_ndim=2, found ndim=1. Full shape received: (None,)
    
    Call arguments received by layer 'sequential_1' (type Sequential):
      • inputs=tf.Tensor(shape=(None,), dtype=float32)
      • training=False
      • mask=None


In [2]:
from tensorflow.keras.optimizers import SGD

# create an SGD optimizer with a learning rate of 0.01
optimizer = SGD(learning_rate=0.01)