In [3]:
from stable_baselines3 import SAC
import gymnasium as gym
import pandas as pd
import numpy as np
from gymnasium.spaces import Box

# Example traffic data DataFrame
data = pd.DataFrame({
    'Direction': ['North', 'South', 'East', 'West'],
    'Traffic_Volume': [18.36, 24.6, 26.16, 25.04],
    'Average_Speed_kmph': [43.18, 42.98, 21.33, 32.51],
    'Queue_Length_meters': [21.29, 16.79, 14.05, 14.39],
    'Traffic_Density_vehicles_per_meter': [0.79, 1.47, 1.86, 1.77]
})

# Define the Traffic Environment class
class TrafficEnv(gym.Env):
    def __init__(self, data):
        super(TrafficEnv, self).__init__()
        self.data = data  # Use the actual data DataFrame
        
        # Define the action space (Green signal times for 4 directions)
        self.action_space = Box(low=10.0, high=120.0, shape=(4,), dtype=np.float32)  # Continuous values for 4 directions
        
        # Define the observation space (traffic data excluding 'Direction')
        self.observation_space = Box(
            low=0.0,
            high=np.inf,
            shape=(len(data.columns) - 1,),  # Exclude 'Direction'
            dtype=np.float32
        )
        
        self.state = None
        self.current_step = 0
        self.reset()

    def reset(self, seed=None, options=None):
        """Resets the environment to an initial state."""
        # Handle the seed argument (optional)
        if seed is not None:
            np.random.seed(seed)

        # Reset current step and state
        self.current_step = 0
        self.state = self.data.iloc[self.current_step].drop('Direction').values  # Initial traffic state
        
        # Return the initial observation and an empty dictionary for `info`
        return self.state, {}


    def step(self, action):
        """
        Takes an action (green signal times) and calculates the next state, reward, and other details.

        Args:
        - action: Array of green signal times for each direction.

        Returns:
        - next_state: The next observed state.
        - reward: The reward for the current step.
        - done: Boolean indicating if the episode is over.
        - truncated: Boolean indicating if the episode was truncated.
        - info: Additional information.
        """
        green_times = action
        orange_time = 3  # Fixed orange signal time
        
        # Simulate the waiting time based on action and traffic density
        waiting_time = sum(
            self.state[3:] / (green_times + orange_time)  # Traffic density divided by total signal time
        )
        reward = -waiting_time  # Negative reward to minimize waiting time
        
        # Transition to the next state
        self.current_step = (self.current_step + 1) % len(self.data)  # Loop through the data cyclically
        next_state = self.data.iloc[self.current_step].drop('Direction').values
        
        # Mock traffic simulation: Update state with slight random variations
        self.state = next_state + np.random.normal(0, 0.1, size=next_state.shape)  # Adding noise for realism
        
        # Indicate the episode does not end (non-episodic task)
        done = False
        
        # Set truncated to False (or implement logic to determine if the episode should be truncated)
        truncated = False
        
        info = {}
        # Example of adding truncation logic
        max_steps = 100  # Define a maximum number of steps for truncation
        truncated = self.current_step >= max_steps
                
        return self.state, reward, done, truncated, info


env = TrafficEnv(data)



In [4]:
#Training the RL agent
# Test the environment before training
print("Observation Space:", env.observation_space)
print("Action Space:", env.action_space)

# Reset the environment to get the initial observation
obs, info = env.reset()
print("Initial Observation:", obs)

# Initialize and train the RL agent (SAC in this case)
model = SAC("MlpPolicy", env, verbose=1, device="cuda")
model.learn(total_timesteps=1000)

# Save the trained model    
model.save("models/traffic_signal_model")

# Optionally, load and test the saved model
model = SAC.load("models/traffic_signal_model", env=env)
print("Model loaded and ready for testing.")

Observation Space: Box(0.0, inf, (4,), float32)
Action Space: Box(10.0, 120.0, (4,), float32)
Initial Observation: [18.36 43.18 21.29 0.79]
Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
Model loaded and ready for testing.


In [5]:
import gym
from stable_baselines3 import SAC 
from stable_baselines3.common.env_util import DummyVecEnv

# Import your custom environment
from traffic_env import TrafficEnv  # Replace `your_env_file` with the name of your environment file

# Load traffic data (make sure it matches the format used during training)
import pandas as pd

data = pd.DataFrame({
    'Direction': ['North', 'South', 'East', 'West'],
    'Traffic_Volume': [18.36, 24.6, 26.16, 25.04],
    'Average_Speed_kmph': [43.18, 42.98, 21.33, 32.51],
    'Queue_Length_meters': [21.29, 16.79, 14.05, 14.39],
    'Traffic_Density_vehicles_per_meter': [0.79, 1.47, 1.86, 1.77]
})

# Initialize the custom environment
env = TrafficEnv(data)
env = DummyVecEnv([lambda: env])  # Wrap it in DummyVecEnv for compatibility

# Load the trained model
model = SAC.load("models/traffic_signal_model.zip")  # Replace the path with the correct model file location

# Test the model
num_episodes = 5
for episode in range(num_episodes):
    obs = env.reset()  # Reset the environment
    done = False
    total_reward = 0
    
    while not done:
        # Get action from the trained model
        action, _states = model.predict(obs, deterministic=True)
        
        # Perform the action in the environment
        obs, reward, done, info = env.step(action)
        
        # Accumulate reward
        total_reward += reward
        
        # Print intermediate results
        print("Action Taken:", action)
        print("Observation:", obs)
        print("Reward:", reward)
    
    print(f"Episode {episode + 1} finished with total reward: {total_reward}")

# Close the environment
env.close()


Action Taken: [[59.03569  47.901073 76.533165 71.82783 ]]
Observation: [[24.490574  43.090008  16.89316    1.4942585]]
Reward: [-0.04874544]
Action Taken: [[56.63752  44.520294 77.983315 71.940315]]
Observation: [[26.07667   21.404726  14.133307   1.8779556]]
Reward: [-0.09489107]
Action Taken: [[63.20729  57.807747 73.43485  68.18747 ]]
Observation: [[24.925905  32.456375  14.6567335  1.6033313]]
Reward: [-0.11019806]
Action Taken: [[57.972736 50.63289  75.42023  69.88809 ]]
Observation: [[18.33137   43.130787  21.399836   0.6856859]]
Reward: [-0.09863298]
Action Taken: [[58.508495 47.995132 76.5704   71.60981 ]]
Observation: [[24.590063  43.01737   16.730227   1.4980224]]
Reward: [-0.04240157]
Action Taken: [[56.604717 44.2367   78.30936  71.8353  ]]
Observation: [[26.144676  21.420341  14.015857   1.8837081]]
Reward: [-0.09528705]
Action Taken: [[62.980484 57.474823 73.74415  68.127716]]
Observation: [[24.99923   32.60461   14.41367    1.9991854]]
Reward: [-0.11072687]
Action Taken:

KeyboardInterrupt: 