In [None]:
import gymnasium as gym
from matplotlib import pyplot as plt
import pprint
from highway_env.envs import MergeEnv
from highway_env import utils
import time
import numpy as np
from stable_baselines3 import PPO
import os
%matplotlib inline

### The environment is designed to have only two cars on the road: the ego vehicle on the highway and the merging vehicle.

# Rewards Break close to the merging car

In [None]:
# Personalized environment with custom reward function
class CustomMergeEnv(MergeEnv):
    def _make_vehicles(self) -> None:
        
        road = self.road
        # Ego vehicle
        ego_vehicle = self.action_type.vehicle_class(
            road, road.network.get_lane(("a", "b", 1)).position(30, 0), speed=30
        )
        road.vehicles.append(ego_vehicle)

        other_vehicles_type = utils.class_from_path(self.config["other_vehicles_type"])

        # Merging vehicle
        merging_v = other_vehicles_type(
            road, road.network.get_lane(("j", "k", 0)).position(110, 0), speed=20
        )
        merging_v.target_speed = 30
        road.vehicles.append(merging_v)
        
        # Set the ego vehicle as the primary vehicle
        self.vehicle = ego_vehicle

    def _reward(self, action: int) -> float:
        """
        Custom reward function combining the original reward with proximity-based
        braking behavior to allow the merging vehicle to merge safely.
        """
        # Get the original reward from the parent class (if it exists)
        reward = super()._reward(action)
        
        ego_vehicle = self.vehicle
        road = self.road

        # Find the merging vehicle (the vehicle in the merging lane)
        merging_vehicle = None
        for vehicle in road.vehicles:
            if vehicle.lane_index == ("j", "k", 0):  # Assuming this is the merging lane
                merging_vehicle = vehicle
                break
        
        #print("ego",ego_vehicle.position)
        #print("merging",vehicle.position)
        
        # Calculate distance to the merging vehicle
        if not merging_vehicle:
            return reward
            
            
        distance=abs(ego_vehicle.position[0]-vehicle.position[0])    
       
        # Braking Incentive Near Merging Car
        d_brake_threshold = 30  # Braking threshold distance (in meters)
        gamma = 1.0  # Braking reward scaling factor

        # Check if braking is needed (negative acceleration and speed condition)
        if distance < d_brake_threshold and ego_vehicle.speed > merging_vehicle.speed:
            acceleration = ego_vehicle.acceleration  # Negative for braking
            if acceleration < 0:  # Ensure this is actual braking
                braking_reward = gamma * abs(acceleration)
            else:
                braking_reward = 0.0
        else:
            braking_reward = 0.0
        
        # Distance-based Reward (Proximity to Merging Car)
        beta = 0.1  # Controls how fast reward decays as distance increases
        distance_reward = np.exp(-beta * (distance ** 2))

        # Add both rewards to the original reward
        reward += braking_reward + distance_reward

        return reward

In [None]:
env = CustomMergeEnv(render_mode='rgb_array',config={"high_speed_reward": 1, "lane_change_reward": -5, "right_lane_reward": 3})

In [None]:
model = PPO('MlpPolicy', env,
            policy_kwargs=dict(net_arch=[256, 256]),
            learning_rate=5e-4,
            n_steps=2048, 
            batch_size=64, 
            n_epochs=10,  
            gamma=0.8,
            gae_lambda=0.95, 
            clip_range=0.2, 
            verbose=1,
            tensorboard_log="env_brake_close/")
timesteps = 50000
model.learn(total_timesteps=timesteps)
model.save("env_brake_close/model")

# Rewards break far from the merging vehicle

In [None]:
# Personalized environment with custom reward function
class CustomMergeEnv(MergeEnv):
    def _make_vehicles(self) -> None:
        
        road = self.road
        # Ego vehicle
        ego_vehicle = self.action_type.vehicle_class(
            road, road.network.get_lane(("a", "b", 1)).position(30, 0), speed=30
        )
        road.vehicles.append(ego_vehicle)

        other_vehicles_type = utils.class_from_path(self.config["other_vehicles_type"])

        # Merging vehicle
        merging_v = other_vehicles_type(
            road, road.network.get_lane(("j", "k", 0)).position(110, 0), speed=20
        )
        merging_v.target_speed = 30
        road.vehicles.append(merging_v)
        
        # Set the ego vehicle as the primary vehicle
        self.vehicle = ego_vehicle

    def _reward(self, action: int) -> float:
        """
        Custom reward function combining the original reward with proximity-based
        braking behavior and smooth braking to allow the merging vehicle to merge safely.
        """
        # Get the original reward from the parent class (if it exists)
        reward = super()._reward(action)
        
        ego_vehicle = self.vehicle
        road = self.road

        # Find the merging vehicle (the vehicle in the merging lane)
        merging_vehicle = None
        for vehicle in road.vehicles:
            if vehicle.lane_index == ("j", "k", 0):  # Assuming this is the merging lane
                merging_vehicle = vehicle
                break

        # Calculate distance to the merging vehicle
        if not merging_vehicle:
            return reward  # No merging vehicle, no proximity or braking rewards

        distance = abs(ego_vehicle.position[0] - merging_vehicle.position[0])

        ## Proximity Reward ##
        d_min = 10  # Minimum safe distance
        d_max = 30  # Maximum safe distance
        
        if d_min < distance < d_max:
            proximity_reward = 1.0  # Reward for staying in a safe proximity to the merging vehicle
        else:
            proximity_reward = 0.0  # No reward if outside the safe distance range

            ## Distance-based Penalty if too close to the merging vehicle ##
        close_distance_threshold = 5  # Threshold for being "too close" to the merging vehicle

        if distance < close_distance_threshold:
            distance_penalty = -2.0  # Penalty for being too close to the merging vehicle
        else:
            distance_penalty = 0.0  # No penalty if not too close

        ## Braking Incentive Near Merging Car ##
        d_brake_threshold = 60  # Braking threshold distance (in meters)
        gamma = 1.0  # Scaling factor for braking reward

        # Reward braking if the ego vehicle is within threshold and going faster than merging vehicle
        if distance < d_brake_threshold and ego_vehicle.speed > merging_vehicle.speed:
            # Calculate deceleration (if ego_vehicle.previous_speed is available)
            if not hasattr(ego_vehicle, 'previous_speed'):
                ego_vehicle.previous_speed = ego_vehicle.speed  # Initialize previous speed

            deceleration = ego_vehicle.previous_speed - ego_vehicle.speed  # Calculate deceleration
            ego_vehicle.previous_speed = ego_vehicle.speed  # Update previous speed

            if deceleration > 0:  # Check if decelerating
                braking_incentive = gamma * deceleration  # Reward for braking
            else:
                braking_incentive = 0.0
        else:
            braking_incentive = 0.0

        # Add proximity reward, distance penalty, and braking incentive to the original reward
        reward += proximity_reward + braking_incentive + distance_penalty

        return reward

In [None]:
env1 = CustomMergeEnv(render_mode='rgb_array',config={"high_speed_reward": 1, "lane_change_reward": -5, "right_lane_reward": 3})

In [None]:
model = PPO('MlpPolicy', env1,
            policy_kwargs=dict(net_arch=[256, 256]),
            learning_rate=5e-4,
            n_steps=2048, 
            batch_size=64, 
            n_epochs=10,  
            gamma=0.8,
            gae_lambda=0.95, 
            clip_range=0.2, 
            verbose=1,
            tensorboard_log="env1_brake_close/")
timesteps = 50000
model.learn(total_timesteps=timesteps)
model.save("env1_brake_close/model")

# Testing

In [None]:
def test_model(path):
    # Create the environment
    env = CustomMergeEnv(render_mode='rgb_array', config={
        "real_time_rendering": True
    })

    # Load the trained model
    model_path = os.path.join(path, "model.zip")  # Correctly join the path and model file name
    model = PPO.load(model_path)  # Load the saved model

    # Reset the environment and get the observation (ignore the info)
    obs, _ = env.reset()  # Unpack obs and ignore info

    # Run the simulation for a fixed number of steps or until the episode ends
    for _ in range(100):  # Run for 100 steps
        action, _states = model.predict(obs)  # Use the model to predict actions
        obs, reward, done, truncated, info = env.step(action)  # Take the predicted action
        env.render()  # Render the environment

        if done or truncated:
            obs, _ = env.reset()  # Reset the environment when an episode finishes and ignore info

    # Display the final frame
    plt.imshow(env.render())
    plt.show()

    # Close the environment
    env.close()

In [None]:
test_model("env_brake_close")