<a href="https://colab.research.google.com/github/RizanSM/zero_shot_llms_in_HIL_RL/blob/main/01_highway_env/04_edge_case_scenario_2/01_HF_DIRECT/01_Generate_Trajectories_for_Model_Testing_HF_DIRECT_Edge_Case_2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Install the required libraries in your Google Colab environment
!pip install stable-baselines3 gymnasium highway-env -q

In [None]:
# Import the necessary libraries
import gymnasium as gym
import highway_env
import numpy as np
import pandas as pd
import pickle
import os
import matplotlib.pyplot as plt

In [None]:
from highway_env.road.road import Road, RoadNetwork, LaneIndex
from highway_env.vehicle.controller import ControlledVehicle
from highway_env.vehicle.kinematics import Vehicle

# EDGE CASE SCENARIO 2:
A custom wrapper for the Highway Environment implementing the follwoing: <br>
    a. Ego vehicle: Starts in the middle lane(2) (25 m/s)<br>
    b. Set the middle lane(2) as the lane with congestion (slower vehicles).<br>
    c. The adjacent lanes are populated with faster vehicles (speeds between 25-30 m/s).

In [None]:
class VehicleGeneratorWrapper(gym.Wrapper):
    def __init__(self, env, lane_index, traffic_density, min_speed, max_speed, seed):
        """
        Wrapper for the highway environment to add new vehicles to a specific lane after reset.

        Parameters:
        env: The highway environment to be wrapped.
        lane_index: The index of the lane where vehicles will be added (0 to 3).
        traffic_density: Number of vehicles to be added to the lane.
        min_speed: Minimum speed (in m/s) of the new vehicles.
        max_speed: Maximum speed (in m/s) of the new vehicles.
        """
        super().__init__(env)

        self.seed_value = seed
        self.seed(self.seed_value)

        self.vehicles_count = env.unwrapped.config["vehicles_count"]
        self.lane_index = lane_index
        self.traffic_density = traffic_density
        self.min_speed = min_speed
        self.max_speed = max_speed
        self.added_vehicles = []  # Store added vehicles' details for display

    def seed(self, seed=None):
        # Update the seed value if provided
        if seed is not None:
            self.seed_value = seed

        # Seed Python's random module and NumPy's RNG
        np.random.seed(self.seed_value)

        # Return the seed(s) for consistency
        return [self.seed_value]

    def reset(self, **kwargs):

        self.seed(self.seed_value)

        """Reset the environment and add new vehicles to the specified lane."""
        obs, info = super().reset(seed=self.seed_value)
        # obs, info = super().reset()
        self._display_lane_traffic(before_reset=True)
        vehicle = self.env.unwrapped.vehicle
        target_lane_id = self.lane_index

        # Log environment reset
        print(" Environment reset initiated")
        print(f" Ego vehicle starts in lane {target_lane_id} at speed 25 m/s")

        # a. Ego vehicle: Starts in the rightmost lane (25 m/s)
        self.ego_vehicle = ControlledVehicle(
            self.env.unwrapped.road,  # Changed from self.road
            vehicle.position,
            vehicle.heading,
            vehicle.speed,
            target_lane_index=target_lane_id,  # Ego Vehicle Starts in lane 3
            target_speed=25,  # Initial speed of ego vehicle = 25 m/s
        )
        self.vehicles = [self.ego_vehicle]

        # b. Set the rightmost lane as the lane with congestion (slower vehicles).
        self._add_vehicles()

        # c. The adjacent lanes are populated with faster vehicles (speeds between 25-30 m/s).
        self._set_lane_vehicle_speed([1, 3], 25, 30)  # Adjust speeds in Lane 1 and Lane 3
        self._display_lane_traffic(before_reset=False)

        # Display all vehicle details after reset
        print("All Vehicle details:")
        for v in self.env.unwrapped.road.vehicles:
            print(f"  Vehicle ID: {id(v)}, Lane: {v.lane_index[2]}, Position: {v.position}, Speed: {v.speed}")

        return obs, info

    def _add_vehicles(self):
        """Add vehicles to the specified lane in the environment."""
        self.added_vehicles = []  # Reset the list of added vehicles
        for i in range(self.traffic_density):
            lane = self.lane_index
            position = self._get_safe_position(lane)
            if position is None:
                print(f" Could not find a safe position for Vehicle {i + 1} in Lane {lane}")
                continue  # Skip adding this vehicle if no safe position is found

            speed = np.random.uniform(self.min_speed, self.max_speed)

            # Create and place the new vehicle
            new_vehicle = ControlledVehicle.create_random(
                self.env.unwrapped.road,
                lane_id=lane
            )
            new_vehicle.position = position  # Update with desired position
            new_vehicle.speed = speed
            self.env.unwrapped.road.vehicles.append(new_vehicle)

            # Store vehicle details for display
            self.added_vehicles.append({
                "id": i + 1,
                "position": position,
                "speed": speed})

            # DEBUGGING ADDED: Log added vehicle details
            print(f" Vehicle {i + 1} added to Lane {lane}: Position {position}, Speed {speed}")

    def _get_safe_position(self, lane):
        """Generate a safe longitudinal position for the new vehicle in the specified lane."""
        existing_vehicles = [v for v in self.env.unwrapped.road.vehicles if v.lane_index[2] == lane]
        existing_positions = [v.position[0] if isinstance(v.position, (list, np.ndarray)) else v.position for v in existing_vehicles]

        if not existing_positions:
            return 0  # Start of the lane if no vehicles exist

        max_retries = 100  # Maximum number of attempts to find a safe position
        for _ in range(max_retries):
            position = np.random.uniform(0, 500)  # Arbitrary range for position
            if all(abs(position - p) > 8 for p in existing_positions):  # Safe gap = 8
                return position

        # print("No valid position found after max retries")
        return None  # If no valid position is found after max_retries

    def _set_lane_vehicle_speed(self, lane_indices, min_speed_adj_lane, max_speed_adj_lane):
        """
        Set the speed of vehicles in specified lanes to be within the given range.

        Parameters:
        lane_indices: List of lane indices where vehicle speeds will be modified.
        min_speed: Minimum speed (in m/s) for vehicles in the specified lanes.
        max_speed: Maximum speed (in m/s) for vehicles in the specified lanes.
        """
        for lane in lane_indices:
            lane_vehicles = [v for v in self.env.unwrapped.road.vehicles if v.lane_index[2] == lane]
            for vehicle in lane_vehicles:
                vehicle.speed = np.random.uniform(min_speed_adj_lane, max_speed_adj_lane)

                # Log speed adjustments for vehicles
                print(f" Vehicle in Lane {lane} updated to Speed: {vehicle.speed:.2f} m/s")
            print("")

    def _display_lane_traffic(self, before_reset):
        """Display the number of vehicles in the specified lane and overall vehicle count."""
        lane_vehicles = [v for v in self.env.unwrapped.road.vehicles if v.lane_index[2] == self.lane_index]
        total_vehicles = len(self.env.unwrapped.road.vehicles)


        if before_reset:
            # print(f"Lane {self.lane_index} Vehicle Count before reset: {len(lane_vehicles)}")
            print(f"Total Vehicle Count before reset: {self.vehicles_count}")
        else:
            print(f"After reset: Number of vehicles added = {len(self.added_vehicles)}")
            for vehicle in self.added_vehicles:
                print(f"Vehicle added to Lane {self.lane_index}: Vehicle {vehicle['id']}: Position {vehicle['position']:.0f}, Speed {vehicle['speed']:.0f} m/s in Lane {self.lane_index}")
            #print(f"Lane {self.lane_index} Vehicle Count after reset: {len(lane_vehicles)}")
            print(f"Total Vehicle Count after reset: {total_vehicles}")
            print("")

    def log_episode_info(self, episode_info):
        """Log episode-specific information in the specified format."""
        for episode, info in episode_info.items():
            print(f"Episode {episode} Info: {info}")
        print("")  # Insert a blank line for better visibility

In [None]:
# THE ENVIRONMENT
# Step 1.1: Choose the Environment
# Initialize the environment.
env = gym.make("highway-v0")
env.unwrapped.config["vehicles_count"] = 50
env.unwrapped.config["initial_lane_id"] = 2
# Wrap the environment with different seed values
wrapped_env_1 = VehicleGeneratorWrapper(env, lane_index=2, traffic_density=10, min_speed=15, max_speed=20, seed=2)
wrapped_env_2 = VehicleGeneratorWrapper(env, lane_index=2, traffic_density=10, min_speed=15, max_speed=20, seed=10)
wrapped_env_3 = VehicleGeneratorWrapper(env, lane_index=2, traffic_density=10, min_speed=15, max_speed=20, seed=6)
wrapped_env_4 = VehicleGeneratorWrapper(env, lane_index=2, traffic_density=10, min_speed=15, max_speed=20, seed=20)
wrapped_env_5 = VehicleGeneratorWrapper(env, lane_index=2, traffic_density=10, min_speed=15, max_speed=20, seed=34)

Trajectory Collection with Additional information (Collision Flag and Lane Index)

In [None]:
# TRAJECTORY COLLECTION WITH ADDTIONNAL INFORMATION
# Initialize a list to store trajectory data
trajectories = []

# FUNCTION TO COLLECT TRAJECTORY DATA (state-action-reward transitions).

def collect_trajectory_data(env, model, num_episodes):
    """
    Collect trajectory data for a number of episodes.
    Each trajectory contains state-action-reward sequences.
    """
    trajectory_data = []

    for episode in range(num_episodes):
        state, _ = env.reset()  # Reset the environment at the start of each episode              #  change environment name here
        done = False
        episode_data = []

        while not done:
            # Get action from the trained PPO model
            action, _states = model.predict(state, deterministic = True)                                       # change model name here

            # Take the action and get next state and reward
            next_state, reward, terminated, truncated, info = env.step(action)                    #  change environment name here
            done = terminated or truncated
            # Extract lane index and collision flag
            lane_index = int(env.unwrapped.vehicle.lane_index[2])
            collision_flag = int(info.get('crashed', 0))

            # Store the trajectory: (state, action, reward, next_state)
            episode_data.append({
                "state": state,
                "action": action,
                "reward": reward,
                "next_state": next_state,
                "lane_indices": lane_index,
                "collision_flags": collision_flag
            })

            # Update the state for the next iteration
            state = next_state

        # Add the episode data to the overall trajectory list
        trajectory_data.append(episode_data)

    return trajectory_data

In [None]:
# FUNCTION TO PREPROCESS TRAJECTORY DATA
def preprocess_trajectory_data(trajectory_data):
    """
    Preprocesses the trajectory data into a structured format for further analysis.
    Returns a DataFrame with columns: episode, time_step, state, action, reward, next_state, speed, and reward_details.
    """
    processed_data = []

    for episode_num, episode_data in enumerate(trajectory_data):
        for time_step, step in enumerate(episode_data):
            # Flatten the state and next_state for easy interpretation (if they are multi-dimensional)
            state = np.array(step['state']).flatten()  # Flatten the state vector (if multi-dimensional)
            next_state = np.array(step['next_state']).flatten()  # Flatten the next_state vector

            collision_flag = step['collision_flags']
            lane_index = step['lane_indices']

            # Append the processed data for this step
            processed_data.append({
                "episode": episode_num,
                "time_step": time_step,
                "state": state,
                "action": step['action'],
                "reward": step['reward'],
                "next_state": next_state,
                "collision_flag": collision_flag,
                "lane_index": lane_index
            })

    # Convert the list of processed data into a DataFrame
    df = pd.DataFrame(processed_data)
    return df

In [None]:
from stable_baselines3 import PPO
from google.colab import drive
from google.colab import data_table

# Mount Google Drive
drive.mount('/content/drive')

Model Loading

In [None]:
# Step A.6.2: Load the all the saved PPO model
model = PPO.load('/content/drive/MyDrive/data_rp1/1_trained_models/1_ppo_highway_hf_direct_ideal')        # Update directory location 1

1. GENERATING TRAJECTORIES(HF-D-IDEAL-EDGE-CASE-2)



First HF-D-IDEAL-EDGE-CASE-2 data frame

In [None]:
# Collect data for 100 episodes
trajectory_data_1 = collect_trajectory_data(wrapped_env_1, model, num_episodes=100)        # change environment and model name

In [None]:
# Preprocess the trajectory data
trajectory_df_1 = preprocess_trajectory_data(trajectory_data_1)

In [None]:
data_table.enable_dataframe_formatter()
data_table.DataTable(trajectory_df_1)

In [None]:
# Save the dataframe as a pickle file
trajectory_df_1.to_pickle('/content/drive/MyDrive/data_rp1/3_test_trajectories/10_edge_case_2/1_hf_d_ideal_edge_2/1_trajectory_hf_d_ideal_edge_2_df.pkl')     # change data frame name      # Update directory location 2

Second HF-D-IDEAL-EDGE-CASE-2 data frame

In [None]:
# Collect data for 100 episodes
trajectory_data_2 = collect_trajectory_data(wrapped_env_2, model, num_episodes=100)        # change environment and model name

In [None]:
# Preprocess the trajectory data
trajectory_df_2 = preprocess_trajectory_data(trajectory_data_2)

In [None]:
data_table.enable_dataframe_formatter()
data_table.DataTable(trajectory_df_2)

In [None]:
# Save the dataframe as a pickle file
trajectory_df_2.to_pickle('/content/drive/MyDrive/data_rp1/3_test_trajectories/10_edge_case_2/1_hf_d_ideal_edge_2/2_trajectory_hf_d_ideal_edge_2_df.pkl')     # change data frame name      # Update directory location 3

Thrid HF-D-IDEAL-EDGE-CASE-3 data frame

In [None]:
# Collect data for 100 episodes
trajectory_data_3 = collect_trajectory_data(wrapped_env_3, model, num_episodes=100)        # change environment and model name

In [None]:
# Preprocess the trajectory data
trajectory_df_3 = preprocess_trajectory_data(trajectory_data_3)

In [None]:
data_table.enable_dataframe_formatter()
data_table.DataTable(trajectory_df_3)

In [None]:
# Save the dataframe as a pickle file
trajectory_df_3.to_pickle('/content/drive/MyDrive/data_rp1/3_test_trajectories/10_edge_case_2/1_hf_d_ideal_edge_2/3_trajectory_hf_d_ideal_edge_2_df.pkl')      # change data frame name     # Update directory location 4

Fourth HF-D-IDEAL-EDGE-CASE-2 data frame

In [None]:
# Collect data for 100 episodes
trajectory_data_4 = collect_trajectory_data(wrapped_env_4, model, num_episodes=100)        # change environment and model name

In [None]:
# Preprocess the trajectory data
trajectory_df_4 = preprocess_trajectory_data(trajectory_data_4)

In [None]:
data_table.enable_dataframe_formatter()
data_table.DataTable(trajectory_df_4)

In [None]:
# Save the dataframe as a pickle file
trajectory_df_4.to_pickle('/content/drive/MyDrive/data_rp1/3_test_trajectories/10_edge_case_2/1_hf_d_ideal_edge_2/4_trajectory_hf_d_ideal_edge_2_df.pkl')     # change data frame name       # Update directory location 5

Fifth HF-D-IDEAL-EDGE-CASE-2 data frame

In [None]:
# Collect data for 100 episodes
trajectory_data_5 = collect_trajectory_data(wrapped_env_5, model, num_episodes=100)        # change environment and model name

In [None]:
# Preprocess the trajectory data
trajectory_df_5 = preprocess_trajectory_data(trajectory_data_5)

In [None]:
data_table.enable_dataframe_formatter()
data_table.DataTable(trajectory_df_5)

In [None]:
# Save the dataframe as a pickle file
trajectory_df_5.to_pickle('/content/drive/MyDrive/data_rp1/3_test_trajectories/10_edge_case_2/1_hf_d_ideal_edge_2/5_trajectory_hf_d_ideal_edge_2_df.pkl')      # change data frame name      # Update directory location 6