In [2]:
#all imports
import carla #the sim library itself
import random #to pick random spawn point
import cv2 #to work with images from cameras
import numpy as np #in this example to change image representation - re-shaping
import math
import time
from keras.models import load_model
import pandas as pd
import sys
sys.path.append("D:\WindowsNoEditor\PythonAPI\carla")  # Adjust this path
import carla
import pygame
from agents.navigation.global_route_planner import GlobalRoutePlanner
from agents.navigation.behavior_agent import BehaviorAgent
from agents.navigation.local_planner import RoadOption
from agents.navigation.local_planner import LocalPlanner
from agents.navigation.basic_agent  import BasicAgent



pygame 2.6.1 (SDL 2.28.4, Python 3.7.16)
Hello from the pygame community. https://www.pygame.org/contribute.html


In [1]:
# Initialize CARLA environment
# Constants
eta_ch = 0.92  # Battery charging efficiency
P_ev_max = 80  # Maximum Power Transfer (40 kW)
Q1 = 90  # Quality factor of primary coil
Q2 = 90 # Quality factor of secondary coil
k0 = 0.2  # Nominal coupling coefficient (perfect alignment)
d_max = 0.5  # Maximum lateral misalignment before no power transfer (meters)

# Vehicle constants
m = 1680  # kg, Mass
g = 9.81  # m/s^2, Gravity
c_rr = 0.01  # Rolling resistance coefficient
c_d = 0.28  # Drag coefficient
A = 1.93  # m^2, Frontal area
rho = 1.20  # kg/m^3, Air density
C_bat = 24  # kWh, Battery capacity
alpha = 0.90  # Transmission efficiency
beta = 0.75  # Regeneration efficiency
P_aux = 0  # kW, Auxiliary power consumption
max_speed = 70 / 3.6  # m/s
avg_speed = 35 / 3.6
avg_slop = 0.002
avg_acc = 0.28

In [3]:


class FakeWaypoint:
    """
    A custom 'fake' waypoint that overrides:
      - is_junction => always False
      - lane_change => always Both
    while forwarding all other attributes to the original waypoint.
    """

    def __init__(self, original_wp: carla.Waypoint):
        self._wp = original_wp  # store the real waypoint internally

    @property
    def is_junction(self):
        # üö¶ Always return False to ignore intersections
        return False

    @property
    def lane_change(self):
        # üîÑ Always allow both left & right lane changes
        return carla.LaneChange.Both

    # Forward all other attributes / methods to the original waypoint
    def __getattr__(self, name):
        return getattr(self._wp, name)


class PatchedMap:
    """
    Wraps the real carla.Map to ensure that every call to get_waypoint
    or get_waypoint_xodr returns a FakeWaypoint instead of a real one.
    """

    def __init__(self, original_map: carla.Map):
        self._original_map = original_map

    def get_waypoint(self, location, project_to_road=True, lane_type=carla.LaneType.Driving):
        wp = self._original_map.get_waypoint(location, project_to_road, lane_type)
        if wp:
            return FakeWaypoint(wp)  # wrap the real waypoint
        return None

    def get_waypoint_xodr(self, road_id, lane_id, s):
        wp = self._original_map.get_waypoint_xodr(road_id, lane_id, s)
        if wp:
            return FakeWaypoint(wp)
        return None

    def __getattr__(self, name):
        # Forward all other map methods (like get_spawn_points, get_topology, etc.)
        return getattr(self._original_map, name)


def patch_carla_map(world: carla.World):
    """
    Replaces world.get_map() with a lambda returning our PatchedMap,
    so all code in all scripts see FakeWaypoint objects with:
      - is_junction=False
      - lane_change=Both
    """
    original_map = world.get_map()
    patched_map = PatchedMap(original_map)

    def get_map_override():
        return patched_map

    # Monkey-patch the world object
    world.get_map = get_map_override

    print("‚úÖ Global patch applied: 'is_junction' = False, 'lane_change' = Both for all waypoints!")


In [4]:
# connect to the sim 
client = carla.Client('localhost', 2000)
# optional to load different towns
client.set_timeout(15)




In [11]:
def create_sim_env():

    global carla_map 
    global all_corners
    global length
    global vehicle
    global start_point
    global new_start_point
    global start_location
    global world

    world = client.load_world('Town06')

    settings = world.get_settings()

    # ‚úÖ Disable rendering mode
    settings.no_rendering_mode = True  # ‚úÖ Turns off all 3D rendering
    world.apply_settings(settings)  # ‚úÖ Apply new settings

    #define environment/world and get possible places to spawn a car
   
    
    
    #look for a blueprint of Mini car
    vehicle_bp = world.get_blueprint_library().filter('nissan')
    #spawn a car in a random location
    
    # Define the target location (x, y, z)
    target_location = carla.Location(x=22.6, y=251, z=0.0)  # Modify with actual coordinates

    # Get the map object
    carla_map = world.get_map()

    # Find the nearest waypoint to the target location
    current_wp = carla_map.get_waypoint(target_location, project_to_road=True, lane_type=carla.LaneType.Driving)

    start_point = current_wp.transform
    
    
    new_start_point = current_wp.get_left_lane().transform
    
    vehicle = world.try_spawn_actor(vehicle_bp[0], new_start_point)

    # move simulator view to the car
    spectator = world.get_spectator()
    start_point.location.z = start_point.location.z+1 #start_point was used to spawn the car but we move 1m up to avoid being on the floor
    spectator.set_transform(start_point)

    # Get the map object
    carla_map = world.get_map()
    all_corners = list()

    # Get the waypoint for the starting position (assumes the car spawns in the leftmost lane)
    start_waypoint = carla_map.get_waypoint(
        start_point.location, 
        project_to_road=True, 
        lane_type=carla.LaneType.Driving
    )

    # Shift 4 lanes to the right
    target_waypoint = start_waypoint
    #for _ in range(4):
     #   if target_waypoint.get_right_lane():
      #      target_waypoint = target_waypoint.get_right_lane()
       # else:
        #    print("The road does not have 4 lanes to the right!")
         #   target_waypoint = None
          #  break

    # Ensure the target lane exists before proceeding
    if target_waypoint:
        # Define the starting location in the shifted lane
        start_location = target_waypoint.transform.location

        # Define the size of the "plane" (length, width)
        length = 569  # Length of the "plane" (this determines how far ahead the lines go)
        width = 3.5   # Width of the "plane" (lane width)

        # Number of lines to draw, to cover most of the center of the lane
        num_lines = 200  # You can increase this number for more lines

        # Define the spacing between lines
        line_spacing = width / num_lines

        # Draw multiple lines in the center of the lane to simulate a "plane"
        for i in range(num_lines):
            # Calculate the y-coordinate for each line to simulate a series of parallel lines
            y_offset = (i - num_lines / 2) * line_spacing  # Center the lines in the lane

            # Define the start and end location of each line
            start_line_location = carla.Location(
                x=start_location.x, 
                y=start_location.y + y_offset, 
                z=start_location.z
            )
            
            # End point 25 meters ahead
            end_line_location = carla.Location(
                x=start_line_location.x + length, 
                y=start_line_location.y, 
                z=start_line_location.z
            )

            # Define a color (e.g., Yellow)
            color = carla.Color(r=0, g=1, b=0, a=0)

            # Draw the line on the road
            world.debug.draw_line(
                start_line_location, 
                end_line_location, 
                thickness=0.05, 
                color=color, 
                life_time=0
            )
    else:
        print("Could not draw the lines. Target lane does not exist!")


    # Ensure the target lane exists before proceeding
    if target_waypoint:
        start_location = target_waypoint.transform.location


        # Define coil size
        coil_width = 6 # Width of each coil (adjust as needed)
        coil_height = 1  # Height (adjust as needed)

        # Define coil positions based on the image (5m and 8m spacing, total 39m)
        coil_positions = [ i for i in range(5,int(length),2+coil_width)]
        
    

        
        coil_color1 = carla.Color(r=1, g=1, b=1)
        coil_color2 = carla.Color(r=1, g=1, b=0)
        coil_color = coil_color2

        # Draw each coil as a rectangle
        for i,pos in enumerate(coil_positions):

            if (i+1)%3==0 and coil_color == coil_color2:
                coil_color = coil_color1
            elif (i+1)%3==0 and coil_color == coil_color1:
                coil_color = coil_color2


            # Define coil color (Yellow)
            
            coil_center_x = start_location.x + pos  # Adjust X position
            coil_center_y = start_location.y  # Keep Y position in lane center

            # Define 4 corners of the rectangle
            corners = [
                carla.Location(x=coil_center_x - coil_width/2, y=coil_center_y - coil_height/2, z=start_location.z),
                carla.Location(x=coil_center_x + coil_width/2, y=coil_center_y - coil_height/2, z=start_location.z),
                carla.Location(x=coil_center_x + coil_width/2, y=coil_center_y + coil_height/2, z=start_location.z),
                carla.Location(x=coil_center_x - coil_width/2, y=coil_center_y + coil_height/2, z=start_location.z)
            ]
            all_corners.append(corners)

            # Draw the rectangle by connecting its corners
            for i in range(4):
                world.debug.draw_line(
                    corners[i], 
                    corners[(i+1) % 4], 
                    thickness=0.1, 
                    color=coil_color, 
                    life_time=0
                )
    else:
        print("Could not draw the coils. Target lane does not exist!")


In [6]:

class BatteryManager:
    def __init__(self, vehicle, destination, world, power_model, C_bat, P_aux):
        self.vehicle = vehicle
        self.destination = destination  # Renamed from point_b
        self.world = world
        self.power_model = power_model
        self.C_bat = C_bat
        self.P_aux = P_aux
        self.last_update_time = self.get_simulation_time()
        self.last_vehicle_location = self.vehicle.get_location()
        self.remaining_distance = self.compute_remaining_distance()
        self.soc = 19  # State of Charge in percentage
        self.total_energy = 0  # Total energy consumed (kWh)
        self.prev_velocity = self.vehicle.get_velocity()  # Initialize in constructor
        self.eta = None
        self.soc_required = None
        self.initial_eta = self.remaining_distance / avg_speed  
        self.in_dwcl = self.is_on_charging_lane()
        

    
    def get_simulation_time(self):
        return self.world.get_snapshot().timestamp.elapsed_seconds

    def compute_remaining_distance(self):
        carla_map = self.world.get_map()
        grp = GlobalRoutePlanner(carla_map, 2.0)
        route = grp.trace_route(self.vehicle.get_location(), self.destination)
        return sum(route[i][0].transform.location.distance(route[i + 1][0].transform.location) for i in range(len(route) - 1))

    def update_remaining_distance(self):
        current_time = self.get_simulation_time()
        vehicle_location = self.vehicle.get_location()
        
        self.remaining_distance = self.compute_remaining_distance()
        self.last_update_time = current_time
        self.last_vehicle_location = vehicle_location

    def get_slope(self):
        rotation = self.vehicle.get_transform().rotation
        return math.radians(rotation.pitch)

    def compute_forces(self, speed_m_s, acceleration):
        slope = self.get_slope()
        speed_vector = np.array([speed_m_s, avg_speed])
        acc_vector = np.array([acceleration, avg_acc])
        slope_vector = np.array([slope, avg_slop])
        
        F_roll = c_rr * m * g * speed_vector / 1000
        F_climb = m * g * np.sin(slope_vector) * speed_vector / 1000
        F_aero = 0.5 * c_d * A * rho * speed_vector**2 * speed_vector / 1000
        F_acc = m * acc_vector * speed_vector  / 1000
        
        return F_roll, F_climb, F_aero, F_acc

    def get_power_transfer_efficiency(self, k):
        return (k**2 * Q1 * Q2) / (1 + k**2 * Q1 * Q2)

    def get_coil_center_and_orientation(self, corners):
        center_x = sum(c.x for c in corners) / 4
        center_y = sum(c.y for c in corners) / 4
        dx = corners[1].x - corners[0].x
        dy = corners[1].y - corners[0].y
        coil_length = math.sqrt(dx**2 + dy**2)
        yaw_angle = math.degrees(math.atan2(dy, dx))
        return carla.Location(center_x, center_y, corners[0].z), yaw_angle, coil_length

    def is_on_charging_coil(self, all_corners):
        epsilon = 0.25
        vehicle_location = self.vehicle.get_location()
        vehicle_x, vehicle_y = vehicle_location.x, vehicle_location.y
        for corners in all_corners:
            coil_center, coil_orientation, coil_length = self.get_coil_center_and_orientation(corners)
            x_min, x_max = min(c.x for c in corners), max(c.x for c in corners)
            y_min, y_max = min(c.y for c in corners), max(c.y for c in corners)
            if x_min - epsilon <= vehicle_x <= x_max + epsilon and y_min - epsilon <= vehicle_y <= y_max + epsilon:
                return True, coil_center, coil_orientation, coil_length
        return False, None, None, None
    




    def get_plane_corners(self,length=569, width=3.5):
        """
        Given the starting location and plane dimensions,
        returns the 4 corner points of the plane.
        """

        # Compute Y offsets
        half_width = width / 2  # Half width to get the left and right bounds

        # ‚úÖ 4 Corner Points
        top_left = carla.Location(
            x=start_location.x, 
            y=start_location.y + half_width, 
            z=start_location.z
        )

        top_right = carla.Location(
            x=start_location.x + length,  # Move `length` meters forward
            y=start_location.y + half_width, 
            z=start_location.z
        )

        bottom_left = carla.Location(
            x=start_location.x, 
            y=start_location.y - half_width, 
            z=start_location.z
        )

        bottom_right = carla.Location(
            x=start_location.x + length,  # Move `length` meters forward
            y=start_location.y - half_width, 
            z=start_location.z
        )

        return top_left, top_right, bottom_left, bottom_right

    


    def is_on_charging_lane(self):
        """
        Checks if the vehicle is inside the rectangular plane formed by the given corners.
        
        :param vehicle: The CARLA vehicle actor.
        :param plane_corners: A tuple of (top_left, top_right, bottom_left, bottom_right) carla.Location
        :return: True if the vehicle is inside the plane, False otherwise.
        """
        
        # üöó Get vehicle position
        vehicle_location = self.vehicle.get_location()
        vehicle_x, vehicle_y = vehicle_location.x, vehicle_location.y

       

        # üéØ Unpack the plane's 4 corners
        top_left, top_right, bottom_left, bottom_right = self.get_plane_corners()

        # üìå Define the rectangular bounds
        min_x = min(top_left.x, top_right.x, bottom_left.x, bottom_right.x)  # Smallest x
        max_x = max(top_left.x, top_right.x, bottom_left.x, bottom_right.x)  # Largest x
        min_y = min(top_left.y, top_right.y, bottom_left.y, bottom_right.y)  # Smallest y
        max_y = max(top_left.y, top_right.y, bottom_left.y, bottom_right.y)  # Largest y

        # ‚úÖ Check if the vehicle is inside the bounds
        inside_x = min_x <= vehicle_x <= max_x
        inside_y = min_y <= vehicle_y <= max_y

        return inside_x and inside_y  # True if inside both x & y bounds


    def get_alignment_factor(self, coil_center, coil_orientation, coil_length):
        l_max = coil_length
        vehicle_location = self.vehicle.get_location()
        vehicle_x, vehicle_y = vehicle_location.x, vehicle_location.y
        d_y = abs(vehicle_y - coil_center.y)
        f_lat = np.exp(- (d_y / d_max) ** 2)
        d_x = abs(vehicle_x - coil_center.x)
        f_long = np.exp(- (d_x / l_max) ** 2)
        vehicle_rotation = self.vehicle.get_transform().rotation.yaw
        theta_misalign = abs(vehicle_rotation - coil_orientation)
        theta_misalign_rad = math.radians(theta_misalign)
        f_ang = abs(np.cos(theta_misalign_rad) + 0.5 * np.sin(theta_misalign_rad))
        f_align = f_lat * f_long * f_ang
        k = k0 * f_align
        return f_align, k
    

    def get_acceleration(self,delta_time):
       
        
        if delta_time <= 0:
            return 0  # Avoid division by zero or negative time steps

        velocity = self.vehicle.get_velocity()
        speed_m_s = math.sqrt(velocity.x**2 + velocity.y**2)

        if hasattr(self, "prev_velocity") and self.prev_velocity is not None:
            prev_speed_m_s = math.sqrt(self.prev_velocity.x**2 + self.prev_velocity.y**2)
            acceleration = (speed_m_s - prev_speed_m_s) / delta_time
        else:
            acceleration = 0  # First time, assume no acceleration

        # Update for next calculation
        self.prev_velocity = velocity
        
        

        return acceleration
    


    def get_target_dwcl_lane_id(self):
        """üîç Find the lane ID of the nearest DWCL."""
        min_distance = float('inf')
        target_lane_id = None

        for corners in all_corners:
            dwcl_wp = self.vehicle.get_world().get_map().get_waypoint(corners[0])
            distance = self.vehicle.get_location().distance(dwcl_wp.transform.location)

            if distance < min_distance:
                min_distance = distance
                target_lane_id = dwcl_wp.lane_id

        return target_lane_id  # Returns the lane ID of the closest DWCL
    








    


    def update_battery(self, delta_time, all_corners):
        location = self.vehicle.get_location()
        velocity = self.vehicle.get_velocity()
        speed_m_s = np.sqrt(velocity.x**2 + velocity.y**2)
        acceleration = self.get_acceleration(delta_time)
        slope = self.get_slope() * 57.2958
        F_roll, F_climb, F_aero, F_acc = self.compute_forces(speed_m_s, acceleration)
        df = pd.DataFrame({
            'aeroF_P': F_aero,
            'climbF_P': F_climb,
            'rollF_P': F_roll,
            'accF_P': F_acc
        })
        P_net, req_P = -power_model.predict(df, verbose=0).flatten() #in kW
        P_net = P_net  * 0 if speed_m_s <=0 else P_net
        P_total = delta_time * (P_aux + P_net)  # Convert to kWh
        T_req_P = (P_aux + req_P)

        # Compute remaining distance
        #remaining_distance = abs(point_b.x - location.x)
        self.update_remaining_distance()
        remaining_distance = self.compute_remaining_distance()

        # Compute energy consumption
        energy_consumed = (1 / 3600) * P_total  # Convert W to kWh
        energy_required = (remaining_distance / avg_speed) / 3600 * T_req_P
        soc_required = (energy_required / C_bat) * 100
        on_charging_lane, coil_center, coil_orientation, coil_length = self.is_on_charging_coil(all_corners)
        if on_charging_lane:
            f_align, k = self.get_alignment_factor(coil_center, coil_orientation, coil_length)
            eta_transfer = self.get_power_transfer_efficiency(k)
            P_ev = P_ev_max * eta_transfer * eta_ch
            energy_charged = (P_ev * delta_time) / 3600
        else:
            energy_charged, P_ev, f_align, k, eta_transfer = 0, 0, 0, 0, 0
        self.total_energy += (energy_consumed - energy_charged)
        self.soc -= ((energy_consumed - energy_charged) / C_bat) * 100
        self.soc = min(max(self.soc, 0), 100)
        self.total_energy = max(0,self.total_energy)
        
        self.eta = remaining_distance / (avg_speed + 0.01)
        self.soc_required = soc_required
        self.remaining_distance = remaining_distance
        #print('eta inside battery_manager:',self.eta)
        
        return self.soc, soc_required, self.total_energy, self.eta, P_ev, f_align, k, eta_transfer, remaining_distance,acceleration,slope


In [7]:
import gym
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import random
import collections
import carla
import time
from gym import spaces






global lane_switch_counter
lane_switch_counter = 0
global invalid_action_counter
invalid_action_counter = 0



class CustomBehaviorAgent(BehaviorAgent):
    """üöó Custom Behavior Agent that ignores red lights."""


    

    


    def run_step(self, debug=False):
        """üöÄ Runs a step but prevents crashes when waypoints are missing."""
        
        try:
            # ‚úÖ Try running the normal Behavior Agent step
            return super().run_step(debug)
        
        except AttributeError as e:
            #print(f"üö® Warning: Missing waypoint! Error: {e}")
            return carla.VehicleControl()  # Return empty control to avoid crashing



    def _update_information(self):
        """Overrides the default function to skip red light checks."""
        super()._update_information()

        # ‚úÖ Ignore Traffic Light States (Override the check)
        if self._vehicle.get_traffic_light_state() == carla.TrafficLightState.Red:
            #print("üö¶ Ignoring red light...")
            self._ignore_traffic_light = True  # Skip red light checks




# üöó CARLA ENVIRONMENT
class CarlaEnv(gym.Env):
    def __init__(self, vehicle, battery_manager, all_corners, destination):
        super(CarlaEnv, self).__init__()
        
        self.vehicle = vehicle
        self.battery_manager = battery_manager
        self.all_corners = all_corners
        self.destination = destination
        self.start_time = self.vehicle.get_world().get_snapshot().timestamp.elapsed_seconds
        #self.global_planner,self.local_planner = self.setup_route_planner()
        self.tm = self.set_up_traffic_manager()
        #self.agent = self.initialize_behavior_agent()
        self.target_speed = 40  # Set speed to 40 km/h

        # üîπ State Space: [SoC, Required SoC, ETA, Distance to DWCL End, Lane Type, Target_speed]
        self.observation_space = spaces.Box(
            low=np.array([0, 0, 0, 0, 0, 5]),
            high=np.array([100, 100, 1000, 1000, 1, 100]),
            dtype=np.float32
        )

        # üîπ Action Space: [Go to DWCL, Leave DWCL, Accelerate, Decelerate, Maintain Speed, Stay Out DWCL]
        self.action_space = spaces.Discrete(6)

    def get_current_lane_id(self):
        """üîç Retrieves the current lane ID of the vehicle."""
        map = self.vehicle.get_world().get_map()
        waypoint = map.get_waypoint(self.vehicle.get_location())
        return waypoint.lane_id
    

    
    



    

    def initialize_behavior_agent(self):
        """
        Initializes a Behavior Agent for autonomous navigation to a given destination.

        Args:
            vehicle (carla.Vehicle): The vehicle that will be controlled by the agent.
            destination_location (carla.Location): The target destination.

        Returns:
            BehaviorAgent: The initialized agent ready to navigate.
        """

        # ‚úÖ Step 1: Create the Behavior Agent
        agent = CustomBehaviorAgent(self.vehicle, behavior="normal",opt_dict={'ignore_traffic_lights': True})  # Options: "normal", "aggressive", "cautious"

        # ‚úÖ Step 2: Set the destination
        agent.set_destination(self.destination)

        agent._local_planner._min_waypoint_queue_length = 10
        agent._local_planner._buffer_size = 20
        agent.target_speed = 40  # Set speed to 40 km/h

        # ‚úÖ Disable Traffic Manager interference
        client = carla.Client("localhost", 2000)  # Ensure correct connection
        traffic_manager = client.get_trafficmanager(8000)
        
        traffic_manager.auto_lane_change(vehicle, False)
        agent._steering_gain = 0.5

        #print(f"‚úÖ Behavior Agent initialized. Navigating to: {self.destination}")

        # ‚úÖ Step 3: Return the agent for further use
        return agent

    
    


    def get_nearest_dwcl_location(self):
        """üîç Find the lane ID of the nearest DWCL."""
        min_distance = float('inf')
        target_lane_id = None

        for corners in self.all_corners:
            dwcl_wp = self.vehicle.get_world().get_map().get_waypoint(corners[0])
            distance = self.vehicle.get_location().distance(dwcl_wp.transform.location)

            if distance < min_distance:
                min_distance = distance
                target_lane_id = dwcl_wp.lane_id  # ‚úÖ Store correct DWCL lane
                target_lane_location = dwcl_wp.transform.location
                target_lane_transform = dwcl_wp.transform
                velocity = vehicle.get_velocity()

                # Speed in m/s
                speed_m_s = math.sqrt(velocity.x**2 + velocity.y**2 + velocity.z**2)

                # Speed in km/h
                speed_kmh = speed_m_s * 3.6
                brake_safe_dist = (speed_kmh/8)**2
                if speed_kmh < 15 and brake_safe_dist < 30:
                    brake_safe_dist = 30
                target_lane_transform.location.x += brake_safe_dist  
                #target_lane_location.y+=10
                

        # ‚úÖ Get a pedestrian blueprint
        blueprint_library = world.get_blueprint_library()
        pedestrian_bp = random.choice(blueprint_library.filter('walker.pedestrian.*'))

        # ‚úÖ Define spawn location
        #pedestrian_spawn = carla.Transform(carla.Location(x=50, y=-10, z=1))

        # ‚úÖ Spawn pedestrian
        #pedestrian = world.try_spawn_actor(pedestrian_bp, carla.Transform(target_lane_location))

       


        #print(f"üìå Target DWCL Lane Location: {target_lane_location}")  # ‚úÖ Debugging output
        return target_lane_transform  # ‚úÖ Returns correct DWCL lane ID
    


    def get_nearest_NOdwcl_location(self):
        """üîç Find the lane ID of the nearest DWCL."""
        min_distance = float('inf')
        target_lane_id = None
        # Get the current vehicle location
        vehicle_location = self.vehicle.get_location()

        # Get the map object from the world
        carla_map = self.vehicle.get_world().get_map()

        # Get the closest waypoint to the vehicle location
        current_wp = carla_map.get_waypoint(vehicle_location, project_to_road=True, lane_type=carla.LaneType.Driving)

        

        
        left_wp = current_wp.get_left_lane() 
       
        target_lane_transform = left_wp.transform
        velocity = vehicle.get_velocity()

        # Speed in m/s
        speed_m_s = math.sqrt(velocity.x**2 + velocity.y**2 + velocity.z**2)

        # Speed in km/h
        speed_kmh = speed_m_s * 3.6
        brake_safe_dist = (speed_kmh/8)**2
        if speed_kmh < 15 and brake_safe_dist < 20:
            brake_safe_dist = 20
        target_lane_transform.location.x += brake_safe_dist  
        #target_lane_location.y+=10
                

        # ‚úÖ Get a pedestrian blueprint
        blueprint_library = world.get_blueprint_library()
        pedestrian_bp = random.choice(blueprint_library.filter('walker.pedestrian.*'))

        # ‚úÖ Define spawn location
        #pedestrian_spawn = carla.Transform(carla.Location(x=50, y=-10, z=1))

        # ‚úÖ Spawn pedestrian
        #pedestrian = world.try_spawn_actor(pedestrian_bp, carla.Transform(target_lane_location))

       


        #print(f"üìå Target DWCL Lane Location: {target_lane_location}")  # ‚úÖ Debugging output
        return target_lane_transform  # ‚úÖ Returns correct DWCL lane ID
    
    def stop_car(self):

        self.vehicle.set_autopilot(False)
        control = carla.VehicleControl()
        control.throttle = 0.0
        control.brake = 1.0  # Full brake
        control.hand_brake = False
        control.steer = 0.0
        

        # ‚úÖ Apply stopping control
        self.vehicle.apply_control(control)
        #for _ in range(26):
        #     self.vehicle.get_world().tick()


    

    def setup_route_planner(self):
        """üó∫Ô∏è Initializes the CARLA Global Route Planner"""
        world_map = self.vehicle.get_world().get_map()
        #print("üó∫Ô∏è Route Planner Initialized")
        return GlobalRoutePlanner(world_map, 0.5), LocalPlanner(self.vehicle, opt_dict={"target_speed": 30})  # 2.0 meters precision
    
    def set_up_traffic_manager(self,reduce_speed_by=0):
        """üö¶ Configures CARLA's Traffic Manager to ensure smooth and safe driving to the DWCL."""
        client = carla.Client("localhost", 2000)  # Ensure correct connection
        self.tm = client.get_trafficmanager(8000)  # ‚úÖ Use correct API

        # ‚úÖ Attach Traffic Manager to the vehicle
        self.vehicle.set_autopilot(False, self.tm.get_port())

        # ‚úÖ Ensure collision avoidance
        self.tm.ignore_lights_percentage(self.vehicle, 100)  # Follow traffic lights
        self.tm.ignore_signs_percentage(vehicle, 100)
        self.tm.auto_lane_change(self.vehicle, True)  # Initially disable lane changing
        self.tm.set_global_distance_to_leading_vehicle(5)  # Avoid collisions
        self.tm.vehicle_percentage_speed_difference(self.vehicle, reduce_speed_by)  # 0 speed
        self.tm.set_respawn_dormant_vehicles(True)  # Prevent blocked traffic
        
        #print("‚úÖ Traffic Manager Initialized with collision avoidance.")
        return self.tm  # ‚úÖ Return the Traffic Manager instance



    
    







    def get_route_to_destination(self, new_destination = None):
        """üó∫Ô∏è Computes a route from the vehicle's location to the final destination."""
        current_location = self.vehicle.get_location()  # ‚úÖ Ensure it's a `Location`
        
        # ‚úÖ Ensure destination is a `Location`
        if new_destination is None :
            destination_location = self.destination
        
        
        # ‚úÖ Compute route using CARLA‚Äôs Global Route Planner
        route = self.route_planner.trace_route(current_location, destination_location)

        if not route or len(route) == 0:
            #print("üö® No route generated! Check destination location.")
            return []

        #print(f"üõ£Ô∏è Generated Route with {len(route)} waypoints to Destination")

        # ‚úÖ Convert Waypoints to Locations
        return [wp[0].transform.location for wp in route]  # ‚úÖ Extract `Location`






        

    
    


    def draw_waypoints(self, waypoints, color=carla.Color(0, 255, 0)):
        """üó∫Ô∏è Draws waypoints on the CARLA map (Green Dots üü¢)."""
        
        world = self.vehicle.get_world()

        for wp in waypoints:
            world.debug.draw_point(
                wp[0].transform.location,
                size=0.3,
                color=color,
                life_time=10  # Stay visible for 10 seconds
            )





    def draw_route(self, waypoints, color=carla.Color(255, 255, 0)):
        """üìç Draws lines between waypoints to visualize the full route (Yellow Lines üü°)."""
        
        world = self.vehicle.get_world()

        for i in range(len(waypoints) - 1):
            wp1 = waypoints[i]
            wp2 = waypoints[i + 1]

            world.debug.draw_line(
                wp1, wp2,
                thickness=0.1,
                color=color,
                life_time=0  # Stays visible for 10 seconds
            )


    





    def set_speed(self, desired_speed_kmh):
        """
        Reads the current lane's speed limit, calculates the percentage speed difference,
        and applies it to the Traffic Manager so the vehicle aims for `desired_speed_kmh`.
        """

        # 1) Get the world and traffic manager
        world = self.vehicle.get_world()
        

        # 2) Enable autopilot on this vehicle under the TM's port (if not done yet)
        self.vehicle.set_autopilot(True, self.tm.get_port())

        # 3) Get the vehicle's current waypoint to find the speed limit
        waypoint = world.get_map().get_waypoint(
            self.vehicle.get_location(),
            project_to_road=True,
            lane_type=carla.LaneType.Driving
        )

        # 4) Read the base speed limit in km/h
        base_speed_limit_kmh = self.vehicle.get_speed_limit()  # a float, e.g. 50.0 km/h

        # 5) If the base speed limit is zero or negative (rare), just skip
        if base_speed_limit_kmh <= 0:
            #print("üö® Invalid speed limit on this road; cannot compute percentage difference.")
            return

        # 6) Compute the percentage difference for the Traffic Manager formula:
        #    speed = base_speed_limit_kmh * (1 - percentage / 100)
        #
        # We rearrange:  desired_speed_kmh = base_speed_limit_kmh * (1 - (percentage/100))
        # => (percentage/100) = 1 - (desired_speed_kmh / base_speed_limit_kmh)
        # => percentage = 100 * (1 - desired_speed_kmh / base_speed_limit_kmh)
        #
        # Note: if desired_speed_kmh < base_speed_limit_kmh => percentage is positive (slower).
        #       if desired_speed_kmh > base_speed_limit_kmh => percentage is negative (faster).
        #
        ratio = desired_speed_kmh / base_speed_limit_kmh
        percentage = 100.0 * (1.0 - ratio)

        # 7) Apply the computed percentage to the vehicle
        self.tm.vehicle_percentage_speed_difference(self.vehicle, percentage)

        # 8) Print debug info
        #print(f"‚úÖ Base Speed Limit: {base_speed_limit_kmh:.1f} km/h")
        #print(f"‚úÖ Desired Speed:    {desired_speed_kmh:.1f} km/h")
        #print(f"‚úÖ Computed Pct:     {percentage:.1f}% (positive = slower, negative = faster)")

        # Vehicle will now aim for `desired_speed_kmh` under normal conditions.



    def move_to_dwcl(self):
        """
        Moves self.vehicle to the 'dwcl' location using BehaviorAgent and a global route.
        """
        self.vehicle.set_autopilot(False,self.tm.get_port())
        #self.tm.vehicle_percentage_speed_difference(self.vehicle, 100)
        
        agent = CustomBehaviorAgent(self.vehicle, behavior="normal",opt_dict={'ignore_traffic_lights': True})
        
        #self.disable_autopilot_completely()
        self.stop_car()
        

        # 2. Get the current map and the starting waypoint from the vehicle's position
        world_map = self.vehicle.get_world().get_map()
        
        #time.sleep(0.5)  # Wait for the vehicle to stop

        
        # 1. Create the BehaviorAgent for your vehicle
        #agent = BehaviorAgent(self.vehicle, behavior='normal')  # 'aggressive' or 'cautious' also possible
        
        

        # 3. Retrieve your destination transform
        destination_transform = self.get_nearest_dwcl_location()  # Example method returning carla.Transform
        if not destination_transform:
            #print("No destination transform was returned. Aborting.")
            return

        # 4. Snap the destination to a lane waypoint so the agent can properly route
        end_waypoint = world_map.get_waypoint(
            destination_transform.location,
            project_to_road=True,
            lane_type=carla.LaneType.Driving)
        
        if not end_waypoint:
            #print("No valid road waypoint found near destination. Aborting.")
            return
        self.vehicle.set_autopilot(False,self.tm.get_port())  # Disable autopilot for manual control
        start_waypoint = world_map.get_waypoint(
            self.vehicle.get_location(),
            project_to_road=True,
            lane_type=carla.LaneType.Driving)
        
        
        # 5. Compute a route (list of waypoints) from start to end
        route_trace = agent.trace_route(start_waypoint, end_waypoint)
        #print('route_trace:',route_trace)
        self.draw_waypoints(route_trace)
        if len(route_trace) >200:
            end_waypoint.transform.location.x = end_waypoint.transform.location.x - end_waypoint.transform.location.x/3
            self.vehicle.set_transform(end_waypoint.transform)

        # 6. Pass that route to the agent's global planner
        agent.set_global_plan(route_trace)
        #agent.set_destination(destination_transform.location)

        #print(f"Moving vehicle id={self.vehicle.id} to {destination_transform.location}")

        # 7. Main loop: step the agent until done or timeout
        start_time = time.time()
        timeout_seconds = 8

        while True:
            # If you're in asynchronous mode, you can let the simulator run:
            #time.sleep(0.05)

            # If you're in synchronous mode, use:
            #self.vehicle.get_world().tick()
            current_time = self.battery_manager.get_simulation_time()
            delta_time = current_time - self.battery_manager.last_update_time
            self.battery_manager.last_update_time = current_time

            self.battery_manager.update_battery(delta_time, self.all_corners)

            # 7a. Get the next control from the agent
            
            control = agent.run_step()
            self.vehicle.apply_control(control)
            in_dwcl = self.battery_manager.is_on_charging_lane()
            current_waypoint = world_map.get_waypoint(
            self.vehicle.get_location(),
            project_to_road=True,
            lane_type=carla.LaneType.Driving)
            
           
            
            
            # 7b. Check if the agent has completed the route
            if in_dwcl and current_waypoint.lane_id == end_waypoint.lane_id:
                # 1. Get the vehicle's current location
                
                vehicle_location = self.vehicle.get_location()
                
                # 2. Find the nearest drivable lane waypoint
                
                nearest_wp = world_map.get_waypoint(
                    vehicle_location,
                    project_to_road=True, 
                    lane_type=carla.LaneType.Driving
                )
                self.stop_car()
                # 3. If a valid waypoint is found, move the vehicle to that transform
                if nearest_wp is not None:
                    
                    #print("Vehicle teleported to the nearest waypoint on the road.")
                    #for _ in range(26):
                     #   self.vehicle.get_world().tick()
                    self.stop_car()
                    self.vehicle.set_transform(nearest_wp.transform)
                else:
                    print("No valid waypoint found near the vehicle‚Äôs location.")
                
                # Break out of the driving loop, etc.
                break

            if self.vehicle.get_location().x > self.destination.x - 5:
                self.stop_car()
                break
                
            

            # 7c. Check for timeout
            if (time.time() - start_time) > timeout_seconds:
             #   #print("Timeout: Destination not reached within given time.")
                self.vehicle.set_transform(end_waypoint.transform)
                break
        self.vehicle.set_autopilot(True, self.tm.get_port())




    def disable_autopilot_completely(self):
        """
        Fully removes the vehicle from Traffic Manager (TM) 
        and ensures autopilot is off, even near intersections.
        """

        
        

        # 2) Unregister (remove) the vehicle from TM‚Äôs control
        try:
            self.tm.unregister_vehicle(self.vehicle)
            #print("‚úÖ Vehicle unregistered from Traffic Manager.")
        except Exception as e:
            print(f"üö® Could not unregister vehicle from TM: {e}")

        # 3) Disable autopilot (in case it was set before)
        self.vehicle.set_autopilot(False)
        #print("üö¶ Autopilot disabled!")



    def exit_dwcl(self):
        """
        Moves self.vehicle to the 'dwcl' location using BehaviorAgent and a global route.
        """
        self.vehicle.set_autopilot(False,self.tm.get_port())
        #self.tm.vehicle_percentage_speed_difference(self.vehicle, 100)
        
        agent = CustomBehaviorAgent(self.vehicle, behavior="normal",opt_dict={'ignore_traffic_lights': True})
        
        #self.disable_autopilot_completely()
        self.stop_car()
        

        # 2. Get the current map and the starting waypoint from the vehicle's position
        world_map = self.vehicle.get_world().get_map()
        
        #time.sleep(0.5)  # Wait for the vehicle to stop

        
        # 1. Create the BehaviorAgent for your vehicle
        #agent = BehaviorAgent(self.vehicle, behavior='normal')  # 'aggressive' or 'cautious' also possible
        
        

        # 3. Retrieve your destination transform
        destination_transform = self.get_nearest_NOdwcl_location()  # Example method returning carla.Transform
        if not destination_transform:
            #print("No destination transform was returned. Aborting.")
            return

        # 4. Snap the destination to a lane waypoint so the agent can properly route
        end_waypoint = world_map.get_waypoint(
            destination_transform.location,
            project_to_road=True,
            lane_type=carla.LaneType.Driving)
        
        if not end_waypoint:
            #print("No valid road waypoint found near destination. Aborting.")
            return
        self.vehicle.set_autopilot(False,self.tm.get_port())  # Disable autopilot for manual control
        start_waypoint = world_map.get_waypoint(
            self.vehicle.get_location(),
            project_to_road=True,
            lane_type=carla.LaneType.Driving)
        
        
        # 5. Compute a route (list of waypoints) from start to end
        route_trace = agent.trace_route(start_waypoint, end_waypoint)
        #print('route_trace:',route_trace)
        self.draw_waypoints(route_trace)
        if len(route_trace) >200:
            end_waypoint.transform.location.x = end_waypoint.transform.location.x - end_waypoint.transform.location.x/3
            self.vehicle.set_transform(end_waypoint.transform)

        # 6. Pass that route to the agent's global planner
        agent.set_global_plan(route_trace)
        #agent.set_destination(destination_transform.location)

        #print(f"Moving vehicle id={self.vehicle.id} to {destination_transform.location}")

        # 7. Main loop: step the agent until done or timeout
        start_time = time.time()
        timeout_seconds = 8
        self.tm.vehicle_percentage_speed_difference(self.vehicle, 0)
        while True:
            # If you're in asynchronous mode, you can let the simulator run:
            #time.sleep(0.05)

            # If you're in synchronous mode, use:
            #self.vehicle.get_world().tick()
            current_time = self.battery_manager.get_simulation_time()
            delta_time = current_time - self.battery_manager.last_update_time
            self.battery_manager.last_update_time = current_time

            self.battery_manager.update_battery(delta_time, self.all_corners)

            # 7a. Get the next control from the agent
            
            control = agent.run_step()
            self.vehicle.apply_control(control)
            in_dwcl = self.battery_manager.is_on_charging_lane()
            current_waypoint = world_map.get_waypoint(
            self.vehicle.get_location(),
            project_to_road=True,
            lane_type=carla.LaneType.Driving)
            
           
            
            
            # 7b. Check if the agent has completed the route
            if not in_dwcl and current_waypoint.lane_id == end_waypoint.lane_id:
                # 1. Get the vehicle's current location
                
                vehicle_location = self.vehicle.get_location()
                
                # 2. Find the nearest drivable lane waypoint
                
                nearest_wp = world_map.get_waypoint(
                    vehicle_location,
                    project_to_road=True, 
                    lane_type=carla.LaneType.Driving
                )
                self.stop_car()
                # 3. If a valid waypoint is found, move the vehicle to that transform
                if nearest_wp is not None:
                    
                    #print("Vehicle teleported to the nearest waypoint on the road.")
                    for _ in range(26):
                        self.vehicle.get_world().tick()
                    self.stop_car()
                    self.vehicle.set_transform(nearest_wp.transform)
                else:
                    print("No valid waypoint found near the vehicle‚Äôs location.")
                
                # Break out of the driving loop, etc.
                break

            if self.vehicle.get_location().x > self.destination.x - 5:
                self.stop_car()
                break
                
            

            # 7c. Check for timeout
            if (time.time() - start_time) > timeout_seconds:
             #   #print("Timeout: Destination not reached within given time.")
                self.vehicle.set_transform(end_waypoint.transform)
                break
        
        self.vehicle.set_autopilot(True, self.tm.get_port())








    

    def exittt_dwcl(self):
        """üöó Uses Behavior Agent to move left and adjusts vehicle orientation by teleporting to the nearest waypoint."""
        
        #print("üîÑ Exiting DWCL by moving to the left lane...")
        self.vehicle.set_autopilot(False,self.tm.get_port())  # Disable autopilot for manual control

        # ‚úÖ Get CARLA map
        world_map = self.vehicle.get_world().get_map()

        # ‚úÖ Initialize Behavior Agent
        #agent = BehaviorAgent(self.vehicle, behavior="normal")

        # ‚úÖ Get Current Waypoint
        

        # ‚úÖ Get the Left Lane Waypoint
        

        #print(f"üîÑ Moving to left lane {left_wp.lane_id}...")
        agent = CustomBehaviorAgent(self.vehicle, behavior="normal",opt_dict={'ignore_traffic_lights': True})

        velocity = vehicle.get_velocity()

        # Speed in m/s
        speed_m_s = math.sqrt(velocity.x**2 + velocity.y**2 + velocity.z**2)

        # Speed in km/h
        speed_kmh = speed_m_s * 3.6
        brake_safe_dist = (speed_kmh/7)**2

        start_waypoint = world_map.get_waypoint(
            self.vehicle.get_location(),
            project_to_road=True,
            lane_type=carla.LaneType.Driving)
        current_wp = world_map.get_waypoint(self.vehicle.get_location(), project_to_road=True,lane_type=carla.LaneType.Driving)
        left_wp = current_wp.get_left_lane()
        
        if not left_wp:
            #print("üö® No left lane available! Staying in current lane.")
            return
         
        left_wp.transform.location.x -= brake_safe_dist  
        

        # ‚úÖ Set destination to the left lane
        #agent.set_destination(left_wp.transform.location)
        route_trace = agent.trace_route(start_waypoint, left_wp)
        #print('route_trace:',route_trace)
        self.draw_waypoints(route_trace)
        if len(route_trace) >200:
            left_wp.transform.location.x /= 3
            self.vehicle.set_transform(left_wp.transform)

        # 6. Pass that route to the agent's global planner
        agent.set_global_plan(route_trace)

        # ‚úÖ Smoothly follow the planned route
        while agent.done() is False:
            current_time = self.battery_manager.get_simulation_time()
            delta_time = current_time - self.battery_manager.last_update_time
            self.battery_manager.last_update_time = current_time
            self.battery_manager.update_battery(delta_time, self.all_corners)
            start_time = time.time()
            timeout_seconds = 8

            control = agent.run_step()
            self.vehicle.apply_control(control)
            time.sleep(0.05)  # Allow smooth execution
            current_wp = world_map.get_waypoint(self.vehicle.get_location(), project_to_road=True,lane_type=carla.LaneType.Driving)
            if current_wp.lane_id == left_wp.lane_id:
                

                
                new_wp = world_map.get_waypoint(self.vehicle.get_location(), project_to_road=True)
                
                
                self.vehicle.set_transform(new_wp.transform)  # ‚úÖ Teleport to ensure perfect alignment

                #print("‚úÖ Vehicle is now perfectly aligned on the left lane!")
                self.stop_car()
                break


            if (time.time() - start_time) > timeout_seconds:
             #   #print("Timeout: Destination not reached within given time.")
                self.vehicle.set_transform(left_wp.transform)
                break

            if self.vehicle.get_location().x > self.destination.x - 5:
                self.stop_car()
                break
        self.tm.vehicle_percentage_speed_difference(self.vehicle, 0)
        self.vehicle.set_autopilot(True,self.tm.get_port())  # ‚úÖ Resume autopilot after exiting DWCL

    def speed_up(self, delta_v=2):
        
        self.set_speed(self.target_speed + delta_v)  # Set target speed
        self.target_speed += delta_v  # Update target speed

    def slow_down(self, delta_v=5):

        if self.target_speed - delta_v >= 10:  # Ensure speed is above 5 km/h
        
            self.set_speed(self.target_speed - delta_v)  # Set target speed
            self.target_speed -= delta_v  # Update target speed

    def maintain_speed(self):
        
        self.set_speed(self.target_speed)  # Set target speed

       



    def compute_reward(self, required_soc ,SoC, in_dwcl, travel_time, ETA, speed):
        global lane_switch_counter
        """üìà Reward Function"""
        reward = 0
        if in_dwcl and SoC < required_soc + 20:
            reward += 15
        if in_dwcl and SoC > 80:
            reward -= 10
        if in_dwcl and SoC > required_soc + 20:
            reward -= 3
        if travel_time < (self.battery_manager.initial_eta - ETA) * 1.2:  # ETA correction
            reward += 20
        if self.target_speed > 50 and in_dwcl:
            reward -= 5
        if self.target_speed < 15 and in_dwcl:
            reward -= 15
        if SoC < 20:
            reward -= 20
        if SoC >= 20:
            reward += 20
        if self.battery_manager.compute_remaining_distance() < 10 and SoC >= 20:
            reward += 30

        if self.battery_manager.compute_remaining_distance() < 10 and SoC < 20:
            reward -= 30

        if lane_switch_counter > 0:
            reward -= lane_switch_counter**2

        return reward

    def step(self, action):
        """üîÑ Execute an action, ensure completion, and return new state, reward, and done flag."""
        global lane_switch_counter
        global invalid_action_counter
        global has_arrived
        
        
        #eta = self.battery_manager.eta
       # if eta is None:
        #    eta = 100  # Assign a high default value if not computed

        elapsed_time = self.vehicle.get_world().get_snapshot().timestamp.elapsed_seconds - self.start_time
        distance_to_destination = self.battery_manager.compute_remaining_distance()
        eta = distance_to_destination/avg_speed
        
        if self.battery_manager.in_dwcl != self.battery_manager.is_on_charging_lane():
        # üîπ Get lane information
            
            lane_switch_counter +=1

        self.battery_manager.in_dwcl = self.battery_manager.is_on_charging_lane()
        lane_type = int(self.battery_manager.in_dwcl)

        # üî• Terminal condition
        if self.vehicle.get_location().x > self.destination.x - 5:
            print('Simulation Ended;üî• Terminal condition;üî• Terminal condition;üî• Terminal condition;üî• Terminal condition;üî• Terminal condition')
            #print('Elapsed time: ', elapsed_time,' Distance to destination: ', distance_to_destination,' Initial eta: ', self.battery_manager.initial_eta)
            has_arrived = True
            self.stop_car()
            return np.array([0, 0, 0, 0, lane_type,0]), 0, True, {}
        
        if  elapsed_time > 2 * self.battery_manager.initial_eta:
            print('Simulation Ended;üî• Terminal condition;üî• Terminal condition;üî• Terminal condition;üî• Terminal condition;üî• Terminal condition')
            #print('Elapsed time: ', elapsed_time,' Distance to destination: ', distance_to_destination,' Initial eta: ', self.battery_manager.initial_eta)
            has_arrived = False
            self.stop_car()
            return np.array([0, 0, 0, 0, lane_type,0]), 0, True, {} # End simulation if it takes too long

        reward = 0
        action_completed = False  # üöÄ Track if the action has finished

        if action == 0 and lane_type == 0:  # Move to DWCL
            #print('Move to DWCL')
            self.move_to_dwcl()
            action_completed = True
            reward += 5
        elif action == 1 and lane_type == 1:  # Leave DWCL
            #print('Leave DWCL')
            self.exit_dwcl()
            action_completed = True
            reward += 5
        elif action == 2 and lane_type == 1:  # Accelerate inside DWCL
            #print(' Accelerate inside DWCL')
            self.speed_up()
            action_completed = True
            reward += 5
        elif action == 3 and lane_type == 1 and self.target_speed > 10:  # Decelerate inside DWCL #ensure a min of 5 km/h
            #print('Decelerate inside DWCL')
            self.slow_down()
            action_completed = True
            reward += 5
        elif action == 4 and lane_type == 1:  # Maintain speed inside DWCL
            #print('# Maintain speed inside DWCL')
            self.maintain_speed()
            action_completed = True
            reward += 5

        elif action == 5 and lane_type == 0:  # Maintain  outside DWCL
            #print('# Stay Out Of DWCL')
            self.vehicle.set_autopilot(True,self.tm.get_port())
            self.tm.vehicle_percentage_speed_difference(self.vehicle, 0)
            action_completed = True
            reward += 5

        else:
            #print("üö® Invalid action for lane type! Skipping action.")
            reward -= 20 # Penalize invalid actions
            invalid_action_counter += 1
            action_completed = True 

        velocity = vehicle.get_velocity()
        speed_kph = math.sqrt(velocity.x**2 + velocity.y**2) *3.6

        current_time = self.battery_manager.get_simulation_time()
        delta_time = current_time - self.battery_manager.last_update_time
        self.battery_manager.last_update_time = current_time

        self.battery_manager.update_battery(delta_time, self.all_corners)

        reward += self.compute_reward(self.battery_manager.soc_required,self.battery_manager.soc, lane_type, elapsed_time, eta, speed_kph)

        # üîπ Wait until action is complete before requesting a new action
        while not action_completed:
            print("‚è≥ Waiting for action completion...")
            #time.sleep(0.5)

        #print(f"‚úÖ Action {action} completed, requesting next action.")

        # Compute updated state
        state = np.array([self.battery_manager.soc, self.battery_manager.soc_required, eta, distance_to_destination, lane_type, self.target_speed], dtype=np.float32)
        return state, reward, False, {}


   
    

    

    def reset(self):
        """üîÑ Fully resets the CARLA simulation and reassigns objects, ensuring the vehicle drives towards its destination."""
        global lane_switch_counter
        global invalid_action_counter
        
        # ‚úÖ Step 1: Reset the entire CARLA environment
        self.vehicle.set_transform(new_start_point)  # Move vehicle to origin
        lane_switch_counter = 0
        invalid_action_counter = 0

        spawn_points = world.get_map().get_spawn_points()
        destination = carla.Location(
            x=start_point.location.x + length,
            y=start_point.location.y,
            z=start_point.location.z,
)

        # ‚úÖ Step 2: Reassign the new objects after reset
        self.vehicle = vehicle  # Reassign newly spawned vehicle
        self.stop_car()
        self.all_corners = all_corners  # Reassign new DWCL coil locations
        self.destination = destination  # Reassign destination location

        # ‚úÖ Step 3: Reinitialize `BatteryManager` with the new vehicle
        self.battery_manager = BatteryManager(self.vehicle, self.destination, world, power_model, C_bat, P_aux)

        # ‚úÖ Step 4: Reset simulation time
        self.start_time = self.vehicle.get_world().get_snapshot().timestamp.elapsed_seconds  

        # ‚úÖ Step 5: Ensure Traffic Manager Leads the Vehicle to Destination
        self.set_up_traffic_manager()
        #self.initialize_behavior_agent()

        # ‚úÖ Step 6: Compute new battery and state information
        self.soc, soc_required, _, ETA, _, _, _, _, remaining_distance, _, _ = self.battery_manager.update_battery(0, self.all_corners)

        # ‚úÖ Step 7: Check if the vehicle is in DWCL after the reset
        self.battery_manager.in_dwcl= self.battery_manager.is_on_charging_lane()
        lane_type = int(self.battery_manager.in_dwcl)
        self.target_speed = 30

        print("‚úÖ Reset complete. Traffic Manager guiding vehicle to destination.")

        # ‚úÖ Step 8: Return initial state for RL training
        return np.array([self.soc, soc_required, ETA, remaining_distance, lane_type, self.target_speed], dtype=np.float32)





In [12]:
create_sim_env()
# Load power model
power_model = load_model("c:\\Users\\HOUALEF\\Desktop\\RL_Carla\\models\\energy_model_best.hdf5")



destination = carla.Location(
    x=new_start_point.location.x + length,
    y=new_start_point.location.y,
    z=new_start_point.location.z,
)

# Initialize battery manager
battery_manager = BatteryManager(vehicle, destination, world, power_model,C_bat,P_aux)
env = CarlaEnv(vehicle, battery_manager, all_corners, destination)




  logger.warn(f"Box bound precision lowered by casting to {self.dtype}")


In [9]:
env.battery_manager.initial_eta

58.23153818402972

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import random
import numpy as np
import collections

class DQN(nn.Module):
    """üöÄ Neural Network for Q-Learning (DQN)"""
    def __init__(self, input_dim, output_dim):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(input_dim, 128)
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, output_dim)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        return self.fc3(x)


In [None]:
"""

import torch
import torch.nn as nn
import torch.optim as optim
import random
import numpy as np
import collections

class DQN(nn.Module):
    '''üöÄ Deeper Neural Network for Q-Learning (DQN)'''
    def __init__(self, input_dim, output_dim):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(input_dim, 256)  # Increased neurons
        self.fc2 = nn.Linear(256, 128)
        self.fc3 = nn.Linear(128, 64)
        self.fc4 = nn.Linear(64, 32)  # Additional layer
        self.fc5 = nn.Linear(32, output_dim)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = torch.relu(self.fc3(x))
        x = torch.relu(self.fc4(x))
        return self.fc5(x)  # Output layer (no activation)
"""

In [None]:
class DQNAgent:
    """üéØ DQN Agent with Experience Replay"""
    def __init__(self, state_dim, action_dim, gamma=0.9, lr=0.001, epsilon=1.0, epsilon_decay=0.99995, epsilon_min=0.05):
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.model = DQN(state_dim, action_dim).to(self.device)
        self.optimizer = optim.Adam(self.model.parameters(), lr=lr)
        self.memory = collections.deque(maxlen=10000)
        self.gamma = gamma
        self.epsilon = epsilon
        self.epsilon_decay = epsilon_decay
        self.epsilon_min = epsilon_min

    def select_action(self, state, evaluate = False):
        """ü§ñ Uses Œµ-greedy policy for action selection"""
        if random.random() < self.epsilon and not evaluate:
            return random.randint(0, 5)  # Random action
        else:
            state = torch.tensor(state, dtype=torch.float32).unsqueeze(0).to(self.device)
            with torch.no_grad():
                return torch.argmax(self.model(state)).item()

    def store_experience(self, state, action, reward, next_state, done):
        """üíæ Store experience in replay buffer"""
        self.memory.append((state, action, reward, next_state, done))

    def train(self, batch_size=64):
        if len(self.memory) < batch_size:
            return

        batch = random.sample(self.memory, batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)

        # ‚úÖ Debugging: Print each next_state's shape & type
        #print("\nüîç DEBUG: Checking next_states:")
        #for i, state in enumerate(states):
            
        #    print(f"  states[{i}] value: {state}\n")

        # ‚úÖ Convert to numpy before PyTorch tensor
        try:
            next_states = [np.array(s, dtype=np.float32) for s in next_states]
            next_states = np.array(next_states, dtype=np.float32)
            next_states = torch.tensor(next_states, dtype=torch.float32).to(self.device)
        except Exception as e:
            print("üö® Error while converting next_states:", e)
            print("‚ùå Debug Info: next_states before conversion:", next_states)
            exit(1)  # Force exit to debug

        # ‚úÖ Continue training if no errors
        states = torch.tensor(np.array(states, dtype=np.float32), dtype=torch.float32).to(self.device)
        actions = torch.tensor(actions, dtype=torch.int64).unsqueeze(1).to(self.device)
        rewards = torch.tensor(rewards, dtype=torch.float32).to(self.device)
        dones = torch.tensor(dones, dtype=torch.float32).to(self.device)

    # Compute Q-values, loss, and train the network...


        # ‚úÖ Compute Q-values
        q_values = self.model(states).gather(1, actions).squeeze(1)
        next_q_values = self.model(next_states).max(1)[0].detach()
        target_q_values = rewards + (1 - dones) * self.gamma * next_q_values

        # ‚úÖ Optimize
        loss = nn.functional.mse_loss(q_values, target_q_values)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        # ‚úÖ Reduce epsilon for exploration-exploitation tradeoff
        self.epsilon = max(self.epsilon * self.epsilon_decay, self.epsilon_min)
        return loss.item()




    


In [None]:
import os
import json
import torch
import pandas as pd
import matplotlib.pyplot as plt

# ‚úÖ Training Metrics Storage
episode_rewards = []
episode_steps = []
epsilon_values = []
switch_frequencies = []
soc_values = []
invalid_action_counts = []
arrival_flags = []
losses = []
times = []  # Initialize with 500 zeros

# ‚úÖ Checkpointing Function
def save_checkpoint(agent, episode, metrics, filename="dqn_checkpoint.pth"):
    """Save DQN model, optimizer, and training progress."""
    checkpoint = {
        'model_state_dict': agent.model.state_dict(),
        'optimizer_state_dict': agent.optimizer.state_dict(),
        'epsilon': agent.epsilon,
        'episode': episode,
        'metrics': metrics
    }
    torch.save(checkpoint, filename)

    # Save metrics as a CSV file
    df = pd.DataFrame(metrics)
    df.to_csv("training_metrics.csv", mode='w', index=False)  # Overwrite with latest data

    print(f"‚úÖ Checkpoint saved at Episode {episode}")

# ‚úÖ Load Training Progress
def load_checkpoint(agent, filename="dqn_checkpoint.pth"):
    """Load checkpoint if available and resume training."""
    if os.path.exists(filename):
        checkpoint = torch.load(filename)

        # Restore model and optimizer state
        agent.model.load_state_dict(checkpoint['model_state_dict'])
        agent.optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
        agent.epsilon = checkpoint['epsilon']
        start_episode = checkpoint['episode']
        metrics = checkpoint.get('metrics', {})

        print(f"‚úÖ Loaded checkpoint from Episode {start_episode}")

        # Load metrics from CSV
        if os.path.exists("training_metrics.csv"):
            metrics_df = pd.read_csv("training_metrics.csv")
            metrics = metrics_df.to_dict(orient="list")

        return start_episode, metrics
    else:
        print("üö® No checkpoint found. Starting fresh training.")
        return 0, {"episode_rewards": [], "episode_steps": [], "epsilon_values": [],
                   "switch_frequencies": [], "soc_values": [], "invalid_action_counts": [], 
                   "arrival_flags": [], "losses": []}

# ‚úÖ Train the DQN Agent with Automatic Checkpointing
def train_dqn(env, agent, num_episodes=500, max_steps=500, batch_size=100, checkpoint_interval=5):
    global lane_switch_counter, invalid_action_counter, has_arrived, episode_rewards, episode_steps, epsilon_values
    global switch_frequencies, soc_values, invalid_action_counts, arrival_flags, losses, times

    # Load previous training state if available
    start_episode, metrics = load_checkpoint(agent)
    
    # Initialize lists from saved metrics
    episode_rewards = metrics.get('episode_rewards', [])
    episode_steps = metrics.get('episode_steps', [])
    epsilon_values = metrics.get('epsilon_values', [])
    switch_frequencies = metrics.get('switch_frequencies', [])
    soc_values = metrics.get('soc_values', [])
    invalid_action_counts = metrics.get('invalid_action_counts', [])
    arrival_flags = metrics.get('arrival_flags', [])
    losses = metrics.get('losses', [])
    times = metrics.get('times', [])

    for episode in range(start_episode, num_episodes):
        state = env.reset()
        episode_reward = 0
        done = False
        step_count = 0
        episode_time = env.vehicle.get_world().get_snapshot().timestamp.elapsed_seconds

        while not done and step_count < max_steps:
            action = agent.select_action(state)
            next_state, reward, done, _ = env.step(action)

            agent.store_experience(state, action, reward, next_state, done)
            loss = agent.train(batch_size)

            state = next_state
            episode_reward += reward
            step_count += 1

        # Store metrics
        end_time = env.vehicle.get_world().get_snapshot().timestamp.elapsed_seconds
        episode_time = end_time - episode_time
        
        times.append(episode_time)
        episode_rewards.append(episode_reward)
        episode_steps.append(step_count)
        epsilon_values.append(agent.epsilon)
        switch_frequencies.append(lane_switch_counter)
        soc_values.append(env.battery_manager.soc)
        invalid_action_counts.append(invalid_action_counter)
        arrival_flags.append(has_arrived)
        losses.append(loss)

        print(f"Episode {episode+1}/{num_episodes} - Reward: {episode_reward}, Steps: {step_count}, "
              f"Epsilon: {agent.epsilon:.2f}, Switch_Freq: {lane_switch_counter}, SOC: {env.battery_manager.soc:.2f}, "
              f"Invalid Actions: {invalid_action_counter}, Arrived: {has_arrived}, Time: {episode_time:.2f}s")

        # ‚úÖ Save checkpoint every 50 episodes
        if (episode + 1) % checkpoint_interval == 0:
            metrics = {
                "episode_rewards": episode_rewards,
                "episode_steps": episode_steps,
                "epsilon_values": epsilon_values,
                "switch_frequencies": switch_frequencies,
                "soc_values": soc_values,
                "invalid_action_counts": invalid_action_counts,
                "arrival_flags": arrival_flags,
                "losses": losses,
                "times": times
            }
            save_checkpoint(agent, episode + 1, metrics)

    print("‚úÖ Training complete.")
    plot_training_metrics()

# ‚úÖ Plot Training Metrics


def smooth_curve(data, window_size=10):
    """Applies a moving average filter to smooth data curves."""
    if len(data) < window_size:
        return data  # Return raw data if not enough points for smoothing
    return pd.Series(data).rolling(window=window_size, min_periods=1, center=True).mean()

def plot_training_metrics():
    """Plot smooth curves for episode rewards, steps, epsilon, switch frequency, SOC, invalid actions, and arrival flags."""
    threshold = 1400
    fig, axs = plt.subplots(2, 3, figsize=(14, 10))

    axs[0, 0].plot(smooth_curve(episode_rewards[threshold:]), label="Episode Reward")
    axs[0, 0].set_title("Episode Rewards")
    axs[0, 0].set_xlabel("Episode")
    axs[0, 0].set_ylabel("Reward")

    axs[0, 1].plot(smooth_curve(losses[threshold:]), label="Loss per Episode", color="orange")
    axs[0, 1].set_title("Loss per Episode")
    axs[0, 1].set_xlabel("Episode")
    axs[0, 1].set_ylabel("Loss (MSE)")

    axs[0, 2].plot(smooth_curve(epsilon_values[threshold:]), label="Epsilon Decay", color="green")
    axs[0, 2].set_title("Epsilon Decay")
    axs[0, 2].set_xlabel("Episode")
    axs[0, 2].set_ylabel("Epsilon")

    axs[1, 0].plot(smooth_curve(switch_frequencies[threshold:]), label="Lane Switch Frequency", color="red")
    axs[1, 0].set_title("Lane Switch Frequency")
    axs[1, 0].set_xlabel("Episode")
    axs[1, 0].set_ylabel("Switches")

    axs[1, 1].plot(smooth_curve(soc_values[threshold:]), label="Battery SOC", color="purple")
    axs[1, 1].set_title("Battery SOC at End of Episode")
    axs[1, 1].set_xlabel("Episode")
    axs[1, 1].set_ylabel("SOC (%)")

    axs[1, 2].plot(smooth_curve(invalid_action_counts[threshold:]), label="Invalid Actions", color="brown")
    axs[1, 2].set_title("Invalid Actions per Episode")
    axs[1, 2].set_xlabel("Episode")
    axs[1, 2].set_ylabel("Count")

    plt.tight_layout()
    plt.show()


In [None]:
"""

import matplotlib.pyplot as plt

# Initialize lists to store episode metrics
episode_rewards = []
episode_steps = []
epsilon_values = []
switch_frequencies = []
soc_values = []
invalid_action_counts = []
arrival_flags = []
losses = []

def train_dqn(env, agent, num_episodes=500, max_steps=500, batch_size=100):
    global lane_switch_counter, invalid_action_counter, has_arrived

    for episode in range(num_episodes):
        state = env.reset()
        episode_reward = 0
        done = False
        step_count = 0

        while not done and step_count < max_steps:
            action = agent.select_action(state)
            next_state, reward, done, _ = env.step(action)

            agent.store_experience(state, action, reward, next_state, done)
            loss = agent.train(batch_size)

            state = next_state
            episode_reward += reward
            step_count += 1

        # Store metrics for plotting
        episode_rewards.append(episode_reward)
        episode_steps.append(step_count)
        epsilon_values.append(agent.epsilon)
        switch_frequencies.append(lane_switch_counter)
        soc_values.append(env.battery_manager.soc)
        invalid_action_counts.append(invalid_action_counter)
        arrival_flags.append(has_arrived)
        losses.append(loss)

        print(f"Episode {episode+1}/{num_episodes} - Reward: {episode_reward}, Steps: {step_count}, "
              f"Epsilon: {agent.epsilon:.2f}, Switch_Freq: {lane_switch_counter}, SOC: {env.battery_manager.soc:.2f}, "
              f"Invalid Actions: {invalid_action_counter}, Arrived: {has_arrived}")

        if (episode + 1) % 50 == 0:
            torch.save(agent.model.state_dict(), f"dqn_carla_episode_{episode+1}.pth")

    # Plot the metrics after training
    plot_training_metrics()

def plot_training_metrics():
    ''' Plot episode rewards, steps, epsilon, switch frequency, SOC, invalid actions, and arrival flags. '''
    
    fig, axs = plt.subplots(3, 2, figsize=(12, 10))
    
    axs[0, 0].plot(episode_rewards, label="Episode Reward")
    axs[0, 0].set_title("Episode Rewards")
    axs[0, 0].set_xlabel("Episode")
    axs[0, 0].set_ylabel("Reward")

    axs[0, 1].plot(losses, label="loss per Episode", color="orange")
    axs[0, 1].set_title("Loss per Episode")
    axs[0, 1].set_xlabel("Episode")
    axs[0, 1].set_ylabel("Loss (MSE)")

    axs[1, 0].plot(epsilon_values, label="Epsilon Decay", color="green")
    axs[1, 0].set_title("Epsilon Decay")
    axs[1, 0].set_xlabel("Episode")
    axs[1, 0].set_ylabel("Epsilon")

    axs[1, 1].plot(switch_frequencies, label="Lane Switch Frequency", color="red")
    axs[1, 1].set_title("Lane Switch Frequency")
    axs[1, 1].set_xlabel("Episode")
    axs[1, 1].set_ylabel("Switches")

    axs[2, 0].plot(soc_values, label="Battery SOC", color="purple")
    axs[2, 0].set_title("Battery SOC at End of Episode")
    axs[2, 0].set_xlabel("Episode")
    axs[2, 0].set_ylabel("SOC (%)")

    axs[2, 1].plot(invalid_action_counts, label="Invalid Actions", color="brown")
    axs[2, 1].set_title("Invalid Actions per Episode")
    axs[2, 1].set_xlabel("Episode")
    axs[2, 1].set_ylabel("Count")

    plt.tight_layout()
    plt.show()


"""

In [None]:


create_sim_env()
# Load power model
power_model = load_model("c:\\Users\\HOUALEF\\Desktop\\RL_Carla\\models\\energy_model_best.hdf5")

spawn_points = world.get_map().get_spawn_points()
destination = carla.Location(
    x=start_point.location.x + length,
    y=start_point.location.y,
    z=start_point.location.z,
)

# Initialize battery manager
battery_manager = BatteryManager(vehicle, destination, world, power_model,C_bat,P_aux)
env = CarlaEnv(vehicle, battery_manager, all_corners, destination)

# Create DQN agent
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n
agent = DQNAgent(state_dim, action_dim)

# Start training
train_dqn(env, agent, num_episodes=6500, max_steps=500)




In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
import pandas as pd

# Define range for plotting
start_episode = 1400
plot_episodes = np.arange(len(epsilon_values[start_episode:]))

sns.set_palette("Set2")  # Using Set2 for more distinguishable colors
colors_rl = ["#1f77b4", "#aec7e8"]  # RL: Blue shades
colors_rule = ["#ff7f0e", "#ffbb78"]  # Rule: Orange shades

# Function to smooth the epsilon values
def smooth_curve(data, window_size=20):
    return pd.Series(data).rolling(window=window_size, min_periods=1).mean()

# Apply smoothing
smoothing_window = 30
smoothed_epsilon = smooth_curve(epsilon_values[start_episode:], window_size=smoothing_window)

# Create figure
plt.figure(figsize=(10, 3))
sns.set_style("whitegrid")

# Color gradient for decay
colors = sns.color_palette("coolwarm", as_cmap=True)

# Plot smoothed epsilon decay
plt.plot(plot_episodes, smoothed_epsilon, label=" Epsilon Decay", color=colors_rule[0], linewidth=2)

# Optional: Add a confidence interval
std_dev = np.std(epsilon_values[start_episode:])  # Compute standard deviation
lower_bound = np.maximum(smoothed_epsilon - std_dev, 0)  # Ensure non-negative values
upper_bound = smoothed_epsilon + std_dev
#plt.fill_between(plot_episodes, lower_bound, upper_bound, alpha=0.2, color=colors(0.2), label="Confidence Interval")

# Labels and Title
plt.xlabel("Episode", fontsize=12, fontweight="bold" )
plt.ylabel("Epsilon Value", fontsize=12, fontweight="bold")
#plt.title("Epsilon Decay Over Episodes", fontsize=14, fontweight="bold")

# Grid and legend
plt.grid(True, linestyle="--", alpha=0.6)
plt.legend()

# Show the plot
plt.show()


In [None]:
"""

import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import scipy.stats as stats

# ‚úÖ Run 100 Test Episodes for RL-Based Agent
def evaluate_rl_agent(env, agent, num_episodes=100, max_steps=500):
    soc_results = []
    travel_times = []
    lane_switch_counts = []

    for episode in range(num_episodes):
        state = env.reset()
        done = False
        step_count = 0
        lane_switch_counter = 0
        start_time = env.vehicle.get_world().get_snapshot().timestamp.elapsed_seconds

        while not done and step_count < max_steps:
            prev_in_dwcl = env.battery_manager.in_dwcl
            action = agent.select_action(state)
            next_state, _, done, _ = env.step(action)

            # ‚úÖ Detect lane switch
            if prev_in_dwcl != env.battery_manager.is_on_charging_lane():
                lane_switch_counter += 1
            env.battery_manager.in_dwcl = env.battery_manager.is_on_charging_lane()

            state = next_state
            step_count += 1

        # ‚úÖ Record final SOC, travel time, and lane switches
        final_soc = env.battery_manager.soc
        end_time = env.vehicle.get_world().get_snapshot().timestamp.elapsed_seconds
        total_travel_time = end_time - start_time

        soc_results.append(final_soc)
        travel_times.append(total_travel_time)
        lane_switch_counts.append(lane_switch_counter)

        print(f"[RL-Agent] Episode {episode+1}/{num_episodes} - Final SOC: {final_soc:.2f}% - Travel Time: {total_travel_time:.2f}s - Lane Switches: {lane_switch_counter}")

    return soc_results, travel_times, lane_switch_counts


# ‚úÖ Run 100 Test Episodes for Rule-Based Baseline
def evaluate_rule_based(env, num_episodes=100, max_steps=500):
    soc_results = []
    travel_times = []
    lane_switch_counts = []

    for episode in range(num_episodes):
        state = env.reset()
        done = False
        step_count = 0
        lane_switch_counter = 0
        start_time = env.vehicle.get_world().get_snapshot().timestamp.elapsed_seconds

        while not done and step_count < max_steps:
            prev_in_dwcl = env.battery_manager.in_dwcl
            soc = env.battery_manager.soc
            soc_required = env.battery_manager.soc_required

            # üéØ Rule-Based Decision Making
            if soc < soc_required + 20:
                action = 0  # Move to DWCL
            elif soc >= soc_required + 20:
                action = 1  # Leave DWCL
                env.tm.vehicle_percentage_speed_difference(vehicle, 0)
            else:
                action = 5  # Maintain speed outside DWCL
                env.tm.vehicle_percentage_speed_difference(vehicle, 0)

            next_state, _, done, _ = env.step(action)

            # ‚úÖ Detect lane switch
            if prev_in_dwcl != env.battery_manager.is_on_charging_lane():
                lane_switch_counter += 1
            env.battery_manager.in_dwcl = env.battery_manager.is_on_charging_lane()

            state = next_state
            step_count += 1

        # ‚úÖ Record final SOC, travel time, and lane switches
        final_soc = env.battery_manager.soc
        end_time = env.vehicle.get_world().get_snapshot().timestamp.elapsed_seconds
        total_travel_time = end_time - start_time

        soc_results.append(final_soc)
        travel_times.append(total_travel_time)
        lane_switch_counts.append(lane_switch_counter)

        print(f"[Rule-Based] Episode {episode+1}/{num_episodes} - Final SOC: {final_soc:.2f}% - Travel Time: {total_travel_time:.2f}s - Lane Switches: {lane_switch_counter}")

    return soc_results, travel_times, lane_switch_counts


# ‚úÖ Plot Comparison Between RL and Rule-Based Model
def plot_comparison(rl_soc, rl_time, rl_switches, rule_soc, rule_time, rule_switches):
    sns.set(style="whitegrid", palette="muted", font_scale=1.2)
    fig, axs = plt.subplots(1, 3, figsize=(18, 5))

    # Final SOC Comparison
    sns.boxplot(data=[rl_soc, rule_soc], ax=axs[0], showmeans=True, palette=["blue", "orange"])
    axs[0].set_xticklabels(["RL-Based", "Rule-Based"])
    axs[0].set_title("Final SOC at Destination")
    axs[0].set_ylabel("SOC (%)")

    # Travel Time Comparison
    sns.boxplot(data=[rl_time, rule_time], ax=axs[1], showmeans=True, palette=["blue", "orange"])
    axs[1].set_xticklabels(["RL-Based", "Rule-Based"])
    axs[1].set_title("Total Travel Time")
    axs[1].set_ylabel("Time (seconds)")

    # Lane Switches Comparison
    sns.boxplot(data=[rl_switches, rule_switches], ax=axs[2], showmeans=True, palette=["blue", "orange"])
    axs[2].set_xticklabels(["RL-Based", "Rule-Based"])
    axs[2].set_title("Total Lane Switches")
    axs[2].set_ylabel("Count")

    plt.tight_layout()
    plt.show()


# ‚úÖ Perform Statistical Tests for Significance
def run_stat_tests(data1, data2, metric_name):
    ''' Performs Mann-Whitney U Test to compare RL vs Rule-Based results '''
    stat, p_value = stats.mannwhitneyu(data1, data2, alternative='two-sided')
    print(f"üìä Statistical Test for {metric_name}:")
    print(f"U-Statistic: {stat:.2f}, P-Value: {p_value:.6f}")
    if p_value < 0.05:
        print(f"‚úÖ Significant difference found for {metric_name} (p < 0.05)")
    else:
        print(f"‚ùå No significant difference found for {metric_name} (p >= 0.05)")
    print("-----------------------------------------------------")


# ‚úÖ Run Evaluations for Both Models
rl_soc, rl_time, rl_switches = evaluate_rl_agent(env, agent,num_episodes=1)
rule_soc, rule_time, rule_switches = evaluate_rule_based(env,num_episodes=10)

# ‚úÖ Plot Results
plot_comparison(rl_soc, rl_time, rl_switches, rule_soc, rule_time, rule_switches)

# ‚úÖ Run Statistical Tests
run_stat_tests(rl_soc, rule_soc, "Final SOC")
run_stat_tests(rl_time, rule_time, "Total Travel Time")
run_stat_tests(rl_switches, rule_switches, "Total Lane Switches")"

"""


In [None]:
import numpy as np
import torch
import matplotlib.pyplot as plt

def evaluate_policy(env, agent, num_episodes=100, use_rl=True, initial_soc=19.0):
    """
    Evaluates the RL agent or Rule-Based approach over multiple test episodes.
    
    Tracks:
    - Final SOC
    - Total Travel Time (Simulation Time)
    - Total Lane Switches
    - Time Spent Inside DWCL (Simulation Time)
    - Time Spent Outside DWCL (Simulation Time)
    """

    final_soc_list = []
    travel_time_list = []
    lane_switch_list = []
    time_inside_dwcl_list = []
    time_outside_dwcl_list = []

    for episode in range(num_episodes):
        state = env.reset()
        done = False
        
        # Initialize time tracking from simulation timestamp
        start_time = env.vehicle.get_world().get_snapshot().timestamp.elapsed_seconds
        time_inside_dwcl = 0
        time_outside_dwcl = 0
        lane_switches = 0

        in_dwcl = env.battery_manager.is_on_charging_lane()  # Initial DWCL status

        while not done:
            current_time = env.vehicle.get_world().get_snapshot().timestamp.elapsed_seconds  # Get current sim time

            if use_rl:
                action = agent.select_action(state, evaluate=True)
            else:
                # Rule-Based Policy: Enter DWCL if SOC < SOC_Required + 20, Exit if SOC > SOC_Required + 20
                soc = env.battery_manager.soc
                required_soc = env.battery_manager.soc_required + 20
                if in_dwcl and soc > required_soc:
                    action = 1  # Exit DWCL
                elif not in_dwcl and soc < required_soc:
                    action = 0  # Enter DWCL
                else:
                    action = 5  # Maintain current lane

            next_state, _, done, _ = env.step(action)

            # Track lane switches
            new_in_dwcl = env.battery_manager.is_on_charging_lane()
            if in_dwcl != new_in_dwcl:
                lane_switches += 1

            # Update time spent in/out of DWCL
            if new_in_dwcl:
                time_inside_dwcl += env.vehicle.get_world().get_snapshot().timestamp.elapsed_seconds - current_time
            else:
                time_outside_dwcl += env.vehicle.get_world().get_snapshot().timestamp.elapsed_seconds - current_time

            # Update state and lane status
            state = next_state
            in_dwcl = new_in_dwcl

        # Compute final simulation time
        total_time = env.vehicle.get_world().get_snapshot().timestamp.elapsed_seconds - start_time

        # Store results
        final_soc_list.append(env.battery_manager.soc)
        travel_time_list.append(total_time)
        lane_switch_list.append(lane_switches)
        time_inside_dwcl_list.append(time_inside_dwcl)
        time_outside_dwcl_list.append(time_outside_dwcl)

        print(f"Episode {episode + 1}/{num_episodes} - Final SOC: {env.battery_manager.soc:.2f}, "
              f"Travel Time: {total_time:.2f}s, Lane Switches: {lane_switches}, "
              f"Time Inside DWCL: {time_inside_dwcl:.2f}s, Time Outside DWCL: {time_outside_dwcl:.2f}s")

    return {
        "final_soc": np.array(final_soc_list),
        "travel_time": np.array(travel_time_list),
        "lane_switches": np.array(lane_switch_list),
        "time_inside_dwcl": np.array(time_inside_dwcl_list),
        "time_outside_dwcl": np.array(time_outside_dwcl_list)
    }


# **Run evaluation for both RL and Rule-Based models**
num_test_episodes = 1

print("\nEvaluating RL-Based Model...")
rl_results = evaluate_policy(env, agent, num_test_episodes, use_rl=True)

print("\nEvaluating Rule-Based Model...")
rule_results = evaluate_policy(env, agent, num_test_episodes, use_rl=False)


# **Plot results in comparison charts**
def plot_comparisons(rl_data, rule_data, metric_name, ylabel):
    """Plots a comparison of RL vs Rule-Based performance."""
    plt.figure(figsize=(8, 5))
    plt.bar(["RL-Based", "Rule-Based"], [np.mean(rl_data), np.mean(rule_data)], color=["blue", "orange"])
    plt.ylabel(ylabel)
    plt.title(f"Comparison of {metric_name} (Averaged Over {num_test_episodes} Episodes)")
    plt.show()


# Plot Final SOC
plot_comparisons(rl_results["final_soc"], rule_results["final_soc"], "Final SOC", "Final SOC (%)")

# Plot Travel Time
plot_comparisons(rl_results["travel_time"], rule_results["travel_time"], "Total Travel Time", "Simulation Time (s)")

# Plot Lane Switches
plot_comparisons(rl_results["lane_switches"], rule_results["lane_switches"], "Total Lane Switches", "Number of Switches")

# Plot Time Inside DWCL
plot_comparisons(rl_results["time_inside_dwcl"], rule_results["time_inside_dwcl"], "Time Inside DWCL", "Simulation Time in DWCL (s)")

# Plot Time Outside DWCL
plot_comparisons(rl_results["time_outside_dwcl"], rule_results["time_outside_dwcl"], "Time Outside DWCL", "Simulation Time Outside DWCL (s)")


In [None]:
import numpy as np
import matplotlib.pyplot as plt
speed_inside_dwcl = []
speed_outsite_dwcl = []
def evaluate_policy(env, agent, num_episodes=100, use_rl=True, initial_soc=19.0):
    """
    Evaluates the RL agent or Rule-Based approach over multiple test episodes for a given initial SoC.
    
    Tracks:
    - Final SOC
    - Total Travel Time (Simulation Time)
    - Total Lane Switches
    - Time Spent Inside DWCL (Simulation Time)
    """
    global speed_inside_dwcl
    global speed_outsite_dwcl
    final_soc_list = []
    travel_time_list = []
    lane_switch_list = []
    time_inside_dwcl_list = []
    

    for episode in range(num_episodes):
        # Reset environment with specified initial SOC
        
        state = env.reset()

        env.battery_manager.soc = initial_soc  
        done = False
        
        # Initialize time tracking from simulation timestamp
        start_time = env.vehicle.get_world().get_snapshot().timestamp.elapsed_seconds
        time_inside_dwcl = 0
        lane_switches = 0

        in_dwcl = env.battery_manager.is_on_charging_lane()  # Initial DWCL status

        while not done:
            current_time = env.vehicle.get_world().get_snapshot().timestamp.elapsed_seconds  # Get current sim time

            if use_rl:
                
                action = agent.select_action(state, evaluate=True)
                velocity = env.vehicle.get_velocity()
                speed_kmh = math.sqrt(velocity.x**2 + velocity.y**2)*3.6
                #print(speed_kmh)
                speed_inside_dwcl.append(speed_kmh)
            else:
                # Rule-Based Policy: Enter DWCL if SOC < SOC_Required + 20, Exit if SOC > SOC_Required + 20
                soc = env.battery_manager.soc
                required_soc = env.battery_manager.soc_required + 20
                if in_dwcl and soc > required_soc:
                    action = 1  # Exit DWCL
                    
                elif not in_dwcl and soc < required_soc:
                    
                    action = 0  # Enter DWCL

               

                else:
                    
                    action = 5  # Maintain current lane
                    

                velocity = env.vehicle.get_velocity()
                speed_kmh = math.sqrt(velocity.x**2 + velocity.y**2)*3.6
                #print(speed_kmh)
                speed_outsite_dwcl.append(speed_kmh)

            next_state, _, done, _ = env.step(action)

            # Track lane switches
            new_in_dwcl = env.battery_manager.is_on_charging_lane()
            if in_dwcl != new_in_dwcl:
                lane_switches += 1

            # Update time spent in DWCL
            if new_in_dwcl:
                time_inside_dwcl += env.vehicle.get_world().get_snapshot().timestamp.elapsed_seconds - current_time

            # Update state and lane status
            state = next_state
            in_dwcl = new_in_dwcl

        # Compute final simulation time
        total_time = env.vehicle.get_world().get_snapshot().timestamp.elapsed_seconds - start_time

        # Store results
        final_soc_list.append(env.battery_manager.soc)
        travel_time_list.append(total_time)
        lane_switch_list.append(lane_switches)
        time_inside_dwcl_list.append(time_inside_dwcl)

        print(f"Episode {episode + 1}/{num_episodes} | SOC: {env.battery_manager.soc:.2f}, "
              f"Time: {total_time:.2f}s, Switches: {lane_switches}, Inside DWCL: {time_inside_dwcl:.2f}s")

    return {
        "final_soc": np.array(final_soc_list),
        "travel_time": np.array(travel_time_list),
        "lane_switches": np.array(lane_switch_list),
        "time_inside_dwcl": np.array(time_inside_dwcl_list),
    }


# **Run evaluation for different SoC initial conditions**
num_test_episodes = 10
initial_socs = [17.2, 20.7]  # Different initial conditions

results = {"RL": {}, "Rule": {}}

for soc in initial_socs:
    print(f"\nEvaluating RL-Based Model (SoC={soc})...")
    results["RL"][soc] = evaluate_policy(env, agent, num_test_episodes, use_rl=True, initial_soc=soc)

    print(f"\nEvaluating Rule-Based Model (SoC={soc})...")
    results["Rule"][soc] = evaluate_policy(env, agent, num_test_episodes, use_rl=False, initial_soc=soc)


# **Visualization: Horizontal Subplots with Vertical Bars**
def plot_comparison_bars(results, metrics, titles, ylabels):
    """Creates horizontally arranged subplots comparing RL vs Rule-Based performance across scenarios."""
    
    fig, axes = plt.subplots(1, len(metrics), figsize=(18, 5))

    soc_values = list(results["RL"].keys())

    for i, (metric, title, ylabel) in enumerate(zip(metrics, titles, ylabels)):
        rl_means = [np.mean(results["RL"][soc][metric]) for soc in soc_values]
        rule_means = [np.mean(results["Rule"][soc][metric]) for soc in soc_values]

        x = np.arange(len(soc_values))
        width = 0.35

        # Vertical bars
        axes[i].bar(x - width/2, rl_means, width, label="RL-Based", color="royalblue")
        axes[i].bar(x + width/2, rule_means, width, label="Rule-Based", color="darkorange")

        axes[i].set_ylabel(ylabel)
        axes[i].set_xticks(x)
        axes[i].set_xticklabels([f"SoC={s}" for s in soc_values])
        axes[i].set_title(title)
        axes[i].legend()

    plt.tight_layout()
    plt.show()


# **Plot results for all three metrics**
metrics = ["final_soc", "travel_time", "lane_switches"]
titles = ["Final SOC Across Scenarios", "Total Travel Time & DWCL Time", "Lane Switches Across Scenarios"]
ylabels = ["Final SOC (%)", "Time (s)", "Number of Switches"]

plot_comparison_bars(results, metrics, titles, ylabels)


# **Plot Combined Travel Time & DWCL Time**
def plot_travel_and_dwcl_time(results):
    """Plots total travel time with time inside DWCL as a stacked bar representation for different scenarios."""
    
    fig, ax = plt.subplots(figsize=(8, 5))

    soc_values = list(results["RL"].keys())
    rl_total_time = [np.mean(results["RL"][soc]["travel_time"]) for soc in soc_values]
    rl_inside_dwcl = [np.mean(results["RL"][soc]["time_inside_dwcl"]) for soc in soc_values]
    rule_total_time = [np.mean(results["Rule"][soc]["travel_time"]) for soc in soc_values]
    rule_inside_dwcl = [np.mean(results["Rule"][soc]["time_inside_dwcl"]) for soc in soc_values]

    x = np.arange(len(soc_values))
    width = 0.35

    # Stacked bars for RL and Rule-Based models
    ax.bar(x - width/2, rl_total_time, width, label="RL Total Travel Time", color="dodgerblue", alpha=0.8)
    ax.bar(x - width/2, rl_inside_dwcl, width, label="RL Inside DWCL", color="navy")

    ax.bar(x + width/2, rule_total_time, width, label="Rule Total Travel Time", color="orangered", alpha=0.8)
    ax.bar(x + width/2, rule_inside_dwcl, width, label="Rule Inside DWCL", color="darkred")

    ax.set_ylabel("Simulation Time (s)")
    ax.set_xticks(x)
    ax.set_xticklabels([f"SoC={s}" for s in soc_values])
    ax.set_title("Total Travel Time and Time Spent in DWCL")
    ax.legend()

    plt.show()

plot_travel_and_dwcl_time(results)


In [None]:
np.mean(speed_inside_dwcl)

In [None]:
plt.plot(speed_inside_dwcl)

In [None]:
np.mean(speed_outsite_dwcl)

In [None]:
plt.plot(speed_outsite_dwcl)

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
import pandas as pd

# Function to apply rolling smoothing
def smooth_curve(data, window_size=20):
    return pd.Series(data).rolling(window=window_size, min_periods=1).mean()

# Function to compute rolling standard deviation
def compute_rolling_std(data, window_size=20):
    return pd.Series(data).rolling(window=window_size, min_periods=1).std()

# Convert speed data to NumPy array
speed_inside_dwcl = np.array(speed_inside_dwcl)

# Define smoothing window
smoothing_window = 5  # Change this to control smoothness

# Apply rolling smoothing
smoothed_speed = smooth_curve(speed_inside_dwcl, window_size=smoothing_window)

# Compute rolling standard deviation for confidence interval
rolling_std = compute_rolling_std(speed_inside_dwcl, window_size=smoothing_window)

# Define lower and upper bounds for fill_between
lower_bound = np.maximum(smoothed_speed - rolling_std, 0)  # Ensure no negative values
upper_bound = smoothed_speed + rolling_std

# Set up the figure
plt.figure(figsize=(10, 3))
sns.set_style("whitegrid")
sns.set_palette("deep")

# Plot smoothed speed curve
plt.plot(smoothed_speed, label="Smoothed Speed Inside DWCL", color="royalblue", linewidth=2,)

# Adaptive confidence interval: only show fill_between if smoothing window > 1
if smoothing_window > 1:
    plt.fill_between(range(len(smoothed_speed)), lower_bound, upper_bound, alpha=0.2, color="royalblue", label="Confidence Interval")

# Labels and Titles
plt.xlabel("Time Steps", fontsize=12,fontweight="bold")
plt.ylabel("Speed (km/h)", fontsize=12,fontweight="bold")
#plt.title("Vehicle Speed Inside DWCL Over Time", fontsize=14, fontweight="bold")

# Grid and Legend
plt.grid(True, linestyle="--", alpha=0.6)
plt.legend()

# Show the plot
plt.show()


In [None]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

def plot_comparison_bars(results, metrics, titles, ylabels):
    """Creates horizontally arranged subplots comparing RL vs Rule-Based performance across scenarios."""
    
    sns.set_palette("Set2")  # Using Set2 for more distinguishable colors
    colors_rl = ["#1f77b4", "#aec7e8"]  # RL: Blue shades
    colors_rule = ["#ff7f0e", "#ffbb78"]  # Rule: Orange shades

    fig, axes = plt.subplots(1, len(metrics), figsize=(18, 5))

    soc_values = list(results["RL"].keys())

    for i, (metric, title, ylabel) in enumerate(zip(metrics, titles, ylabels)):
        if metric == "travel_time":
            # Replace travel time with stacked bar
            rl_total_time = [np.mean(results["RL"][soc]["travel_time"]) for soc in soc_values]
            rl_inside_dwcl = [np.mean(results["RL"][soc]["time_inside_dwcl"]) for soc in soc_values]
            rule_total_time = [np.mean(results["Rule"][soc]["travel_time"]) for soc in soc_values]
            rule_inside_dwcl = [np.mean(results["Rule"][soc]["time_inside_dwcl"]) for soc in soc_values]

            x = np.arange(len(soc_values))
            width = 0.35

            # Stacked bars for RL and Rule-Based models
            axes[i].bar(x - width/2, rl_total_time, width, label="RL Total Travel Time", color=colors_rl[0], alpha=0.9)
            axes[i].bar(x - width/2, rl_inside_dwcl, width, label="RL Inside DWCL", color=colors_rl[1])

            axes[i].bar(x + width/2, rule_total_time, width, label="Rule Total Travel Time", color=colors_rule[0], alpha=0.9)
            axes[i].bar(x + width/2, rule_inside_dwcl, width, label="Rule Inside DWCL", color=colors_rule[1])

            axes[i].set_ylabel("Simulation Time (s)", fontsize=12, fontweight='bold')
            axes[i].set_xticks(x)
            axes[i].set_xticklabels([f"SoC={s}" for s in soc_values])
            axes[i].set_title("Total Travel Time & DWCL Time", fontsize=12, fontweight='bold')
            axes[i].legend()

        else:
            # Standard bar plots for Final SOC and Lane Switches
            rl_means = [np.mean(results["RL"][soc][metric]) for soc in soc_values]
            rule_means = [np.mean(results["Rule"][soc][metric]) for soc in soc_values]

            x = np.arange(len(soc_values))
            width = 0.35

            axes[i].bar(x - width/2, rl_means, width, label="RL-Based", color=colors_rl[0])
            axes[i].bar(x + width/2, rule_means, width, label="Rule-Based", color=colors_rule[0])

            axes[i].set_ylabel(ylabel, fontsize=12, fontweight='bold')
            axes[i].set_xticks(x)
            axes[i].set_xticklabels([f"SoC={s}" for s in soc_values])
            axes[i].set_title(title, fontsize=12, fontweight='bold')
            axes[i].legend()

    plt.tight_layout()
    plt.show()


# **Plot results for all three metrics**
metrics = ["final_soc", "travel_time", "lane_switches"]
titles = ["Final SOC Across Scenarios", "Total Travel Time & DWCL Time", "Lane Switches Across Scenarios"]
ylabels = ["Final SOC (%)", "Time (s)", "Number of Switches"]

plot_comparison_bars(results, metrics, titles, ylabels)


In [None]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

def plot_comparison_bars(results, metrics, titles, ylabels):
    """Creates compact subplots comparing RL vs Rule-Based performance while ensuring readability."""
    
    sns.set_palette("Set2")  # Use readable color scheme
    colors_rl = ["#1f77b4", "#aec7e8"]  # RL: Blue shades
    colors_rule = ["#ff7f0e", "#ffbb78"]  # Rule: Orange shades

    fig, axes = plt.subplots(1, len(metrics), figsize=(3.2, 2.2), dpi=300)  # IEEE one-column optimized size

    scenario_labels = ["Scenario 1", "Scenario 2"]  # Regular scenario labels (no bold)
    width = 0.15  # Balanced bar width
    x = np.array([0, 0.35])  # Adjusted spacing for compactness without overlap

    for i, (metric, title, ylabel) in enumerate(zip(metrics, titles, ylabels)):
        if metric == "travel_time":
            # Stacked bars for RL and Rule-Based models
            rl_total_time = [np.mean(results["RL"][soc]["travel_time"]) for soc in results["RL"]]
            rl_inside_dwcl = [np.mean(results["RL"][soc]["time_inside_dwcl"]) for soc in results["RL"]]
            rule_total_time = [np.mean(results["Rule"][soc]["travel_time"]) for soc in results["Rule"]]
            rule_inside_dwcl = [np.mean(results["Rule"][soc]["time_inside_dwcl"]) for soc in results["Rule"]]

            axes[i].bar(x - width/2, rl_total_time, width, label="RL Total Time", color=colors_rl[0], alpha=0.9)
            axes[i].bar(x - width/2, rl_inside_dwcl, width, label="RL Inside DWCL", color=colors_rl[1])

            axes[i].bar(x + width/2, rule_total_time, width, label="Rule Total Time", color=colors_rule[0], alpha=0.9)
            axes[i].bar(x + width/2, rule_inside_dwcl, width, label="Rule Inside DWCL", color=colors_rule[1])

            axes[i].set_ylabel("Time (s)", fontsize=4, fontweight='bold')
            axes[i].set_xticks(x)
            axes[i].set_xticklabels(scenario_labels, fontsize=4)  # Reduced font size to 3, removed bold
            axes[i].set_title("Travel Time & DWCL Time", fontsize=4, fontweight='bold')
            axes[i].legend(fontsize=3.4, frameon=True)

        else:
            # Standard bar plots for Final SOC and Lane Switches
            rl_means = [np.mean(results["RL"][soc][metric]) for soc in results["RL"]]
            rule_means = [np.mean(results["Rule"][soc][metric]) for soc in results["Rule"]]

            axes[i].bar(x - width/2, rl_means, width, label="RL-Based", color=colors_rl[0])
            axes[i].bar(x + width/2, rule_means, width, label="Rule-Based", color=colors_rule[0])

            axes[i].set_ylabel(ylabel, fontsize=4, fontweight='bold')
            axes[i].set_xticks(x)
            axes[i].set_xticklabels(scenario_labels, fontsize=4)  # Reduced font size to 3, removed bold
            axes[i].set_title(title, fontsize=4, fontweight='bold')
            axes[i].legend(fontsize=3.4, loc="upper right", frameon=True)

        # **Ensure y-axis tick labels have fontsize 4**
        axes[i].tick_params(axis='y', labelsize=4)
        
        # **Increase Y-axis spacing**
        axes[i].margins(y=0.4)  # Adjusted for better spacing

    # **Increased spacing among subplots**
    plt.subplots_adjust(left=0.2, right=0.95, top=0.85, bottom=0.3, wspace=0.5)  # Increased `wspace` for more separation

    plt.show()


# **Plot results for all three metrics**
metrics = ["final_soc", "travel_time", "lane_switches"]
titles = ["Final SOC", "Travel Time & DWCL Time", "Lane Switches"]
ylabels = ["Final SOC (%)", "Time (s)", "No. of Switches"]

plot_comparison_bars(results, metrics, titles, ylabels)


In [None]:
# Re-import necessary libraries since the execution state was reset
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns



# Compute the average for each metric
metrics = ["Final SOC (%)", "Total Travel Time (s)", "Total Lane Switches"]
rl_means = [np.mean(rl_soc), np.mean(rl_time), np.mean(rl_switches)]
rule_means = [np.mean(rule_soc), np.mean(rule_time), np.mean(rule_switches)]

# Define bar width and positions
x = np.arange(len(metrics))
width = 0.35

# Create bar plot
sns.set(style="whitegrid", palette="muted", font_scale=1.2)
fig, ax = plt.subplots(figsize=(10, 6))

bars1 = ax.bar(x - width/2, rl_means, width, label="RL-Based", color="blue", alpha=0.7)
bars2 = ax.bar(x + width/2, rule_means, width, label="Rule-Based", color="orange", alpha=0.7)

# Labels and Title
ax.set_xlabel("Metrics")
ax.set_ylabel("Average Value")
ax.set_title("Comparison of RL-Based and Rule-Based Performance")
ax.set_xticks(x)
ax.set_xticklabels(metrics)
ax.legend()

# Show values on top of bars
for bars in [bars1, bars2]:
    for bar in bars:
        height = bar.get_height()
        ax.annotate(f"{height:.1f}",
                    xy=(bar.get_x() + bar.get_width() / 2, height),
                    xytext=(0, 5),
                    textcoords="offset points",
                    ha='center', va='bottom', fontsize=10, fontweight='bold')

# Show the plot
plt.show()


In [None]:
plot_training_metrics()#add simulation time for each episode and if it arrives or no

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns

threshold = 1400
# Set improved color palette
sns.set_palette("colorblind")
reward_color = sns.color_palette("coolwarm", as_cmap=True)(0.2)  # Blueish
loss_color = sns.color_palette("coolwarm", as_cmap=True)(0.8)  # Reddish

# Ensure numerical stability by removing NaN values from losses
filtered_losses = np.array(losses[threshold:])
filtered_losses = filtered_losses[~np.isnan(filtered_losses)]  # Remove NaNs

# Smoothed Rewards and Confidence Interval
smoothed_rewards = smooth_curve(episode_rewards[threshold:], window_size=50)
std_rewards = np.std(episode_rewards[threshold:])  # Compute standard deviation

# Smoothed Losses and Confidence Interval (avoiding NaN issues)
smoothed_losses = smooth_curve(filtered_losses, window_size=50)
std_losses = np.std(filtered_losses) if len(filtered_losses) > 0 else 0  # Ensure stability

# Create figure and axes
fig, ax1 = plt.subplots(figsize=(10, 6))

# Adjust x-axis to start from episode threshold but display as episode 0
x_range = np.arange(len(smoothed_rewards))

# Plot Reward Curve
ax1.plot(x_range, smoothed_rewards, label=" Rewards", color=reward_color, linewidth=2)
ax1.fill_between(x_range, 
                 smoothed_rewards - std_rewards, 
                 smoothed_rewards + std_rewards, 
                 alpha=0.2, color=reward_color)

ax1.set_xlabel("Episode", fontsize=12, fontweight='bold')
ax1.set_ylabel("Reward", color=reward_color, fontsize=12, fontweight='bold')
ax1.tick_params(axis='y', labelcolor=reward_color)
ax1.grid(True, linestyle="--", alpha=0.6)

# Create second y-axis for Loss
ax2 = ax1.twinx()
ax2.plot(x_range[:len(smoothed_losses)], smoothed_losses, label=" Loss", color=loss_color, linestyle="dashed", linewidth=2)

# Ensure confidence interval does not go below zero
lower_loss_bound = np.maximum(smoothed_losses - std_losses, 0)
ax2.fill_between(x_range[:len(smoothed_losses)], 
                 lower_loss_bound, 
                 smoothed_losses + std_losses, 
                 alpha=0.2, color=loss_color)

ax2.set_ylabel("Loss (MSE)", color=loss_color, fontsize=12, fontweight='bold')
ax2.tick_params(axis='y', labelcolor=loss_color)

# Add legends
fig.legend(loc="upper left", bbox_to_anchor=(0.1, 0.97), fontsize=12)
ax1.set_xlim(0,870)
# Titles and Layout Improvements
#plt.title("Episode Rewards and Loss per Episode with Confidence Intervals", fontsize=14, fontweight='bold', pad=15)
fig.tight_layout()

# Show the plot
plt.show()


In [None]:
from matplotlib.colors import Normalize
#start_index=2500
def plot_soc_and_time_enhanced(soc_values, times, episodes, start_index=1, display_start=1100):
    """
    Generates an enhanced heatmap visualization for SoC and Travel Time over episodes.
    - Uses perceptually uniform colormaps.
    - Applies better contrast scaling.
    - Avoids x-axis overcrowding for clarity.

    Parameters:
    - soc_values (list): Battery State of Charge (SoC) per episode.
    - times (list): Travel time per episode.
    - episodes (list): Episode indices.
    - start_index (int): Index to start plotting from.
    - display_start (int): Displayed x-axis start value.
    """

    # Trim data based on start index
    soc_values = np.array(soc_values[start_index:])
    times = np.array(times[start_index:])
    episodes = np.arange(display_start, display_start + len(soc_values))  # Shift episode numbers

    # Downsample for better visualization (keep around 500 points)
    sampling_rate = max(1, len(episodes) // 500)
    sampled_episodes = episodes[::sampling_rate]
    sampled_soc = soc_values[::sampling_rate]
    sampled_times = times[::sampling_rate]

    # Convert to DataFrame
    df_sampled = pd.DataFrame({
        "Episodes": sampled_episodes,
        "SoC": sampled_soc,
        "Time": sampled_times
    })

    # Create figure with shared x-axis
    fig, ax = plt.subplots(2, 1, figsize=(12, 6), sharex=True, gridspec_kw={'hspace': 0.05})

    # Adjust x-ticks to prevent overcrowding
    num_xticks = 10  # Show only 10 evenly spaced ticks
    xtick_positions = np.linspace(0, len(sampled_episodes) - 1, num_xticks, dtype=int)
    xtick_labels = [sampled_episodes[i] for i in xtick_positions]

    # Normalize color scale for better contrast
    norm_soc = Normalize(vmin=max(0, np.min(sampled_soc)), vmax=np.max(sampled_soc))
    norm_time = Normalize(vmin=max(0, np.min(sampled_times)), vmax=np.max(sampled_times))

    # First heatmap: SoC (Using "coolwarm" for balanced color differentiation)
    sns.heatmap(np.expand_dims(df_sampled["SoC"], axis=0), cmap="inferno", cbar=True, ax=ax[0],
                norm=norm_soc, xticklabels=False, yticklabels=["SoC (%)"])
    #ax[0].set_title("Battery SoC and Travel Time Across Episodes")

    # Second heatmap: Travel Time (Using "cividis" for perceptual clarity)
    sns.heatmap(np.expand_dims(df_sampled["Time"], axis=0), cmap="twilight", cbar=True, ax=ax[1],
                norm=norm_time, xticklabels=xtick_labels, yticklabels=["Travel Time (s)"])
    ax[1].set_xlabel("Episodes")

    # Adjust x-ticks for clarity
    ax[1].set_xticks(xtick_positions)
    ax[1].set_xticklabels(xtick_labels, rotation=45)

    plt.show()

# Example usage with simulated data
episodes = np.arange(len(soc_values))  # Simulating episode numbers


# Call function with specified start index and display shift
plot_soc_and_time_enhanced(soc_values, times, episodes, start_index=0, display_start=1100)


In [None]:
# Re-import necessary libraries after execution state reset
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

# Reinitialize sample data since previous variables were lost
# Assuming `invalid_action_counts` and `switch_frequencies` were preloaded lists
episodes = np.arange(len(invalid_action_counts))  # Simulating episode numbers

# Define a rolling average function ensuring values remain positive
def rolling_average_positive(data, window_size=50):
    """Computes rolling average and ensures values remain non-negative."""
    avg = np.convolve(data, np.ones(window_size)/window_size, mode='valid')
    return np.maximum(avg, 0)  # Ensure values stay non-negative

# Start from episode 1400 (adjust x-axis accordingly)
start_idx = 1400
window_size = 50
adjusted_episodes = np.arange(0, len(episodes[start_idx:]) - (window_size - 1))  # Show as episode 0 onwards

# Compute rolling averages
rolling_invalid_actions = rolling_average_positive(invalid_action_counts[start_idx:], window_size)
rolling_lane_switches = rolling_average_positive(switch_frequencies[start_idx:], window_size)

# Compute standard deviation ensuring values remain above zero
std_invalid = np.std(invalid_action_counts[start_idx:])
std_switches = np.std(switch_frequencies[start_idx:])

# Ensure confidence intervals don't go below zero
lower_bound_invalid = np.maximum(rolling_invalid_actions - std_invalid, 0)
upper_bound_invalid = rolling_invalid_actions + std_invalid

lower_bound_switches = np.maximum(rolling_lane_switches - std_switches, 0)
upper_bound_switches = rolling_lane_switches + std_switches

# Set colorblind-friendly palette
sns.set_palette("colorblind")

# Define colors using 'coolwarm' colormap
coolwarm_cmap = sns.color_palette("coolwarm", as_cmap=True)
invalid_action_color = coolwarm_cmap(0.2)  # Blueish tone
lane_switch_color = coolwarm_cmap(0.8)  # Reddish tone

# Create figure
fig, ax = plt.subplots(figsize=(10, 6))

# Rolling Average Line Plot with Confidence Interval
ax.plot(adjusted_episodes, rolling_invalid_actions, label="Invalid Actions", color=invalid_action_color, linewidth=2)
ax.fill_between(adjusted_episodes, lower_bound_invalid, upper_bound_invalid, alpha=0.2, color=invalid_action_color)

ax.plot(adjusted_episodes, rolling_lane_switches, label="Lane Switches", color=lane_switch_color, linewidth=2, linestyle="dashed")
ax.fill_between(adjusted_episodes, lower_bound_switches, upper_bound_switches, alpha=0.2, color=lane_switch_color)

# Labels and Title
ax.set_xlabel("Episode", fontsize=12, fontweight='bold')
ax.set_ylabel("Count", fontsize=12, fontweight='bold')
#ax.set_title("Rolling Average of Invalid Actions & Lane Switches Over Time")
ax.legend()
ax.grid(True, linestyle="--", alpha=0.6)


plt.show()


In [None]:
#episode 4900 i added 20 to this --->  if in_dwcl and SoC < required_soc + 20:

In [None]:
#build a rule based agent and compare it with the DQN agent; over some test episodes

In [None]:
#when invalide action occu , penalize the agent by editing the q value of the action taken to be negative, so the agent will avoid taking it again

In [None]:
#add total travel time to csv

In [None]:
#gamma 0.95 --> 0.9 at episod 2240

In [None]:
#use DDQN

In [None]:
#use a scalled reward function 

In [None]:
#use elapsed time/ eta in state space

In [None]:
#use  abetter NN model

In [None]:
#fix the exit_dwcl function when there is no left lane, the vehicle should stay in the current lane,
#also add a condition to check if the vehicle is in the destination lane, if so, the vehicle should 
#stop and the simulation should end. otherwise a timeout should be triggered and the simulation should end.

In [None]:
start_point.location.x + length

In [None]:
env.destination.x

In [None]:
env.vehicle.get_location().x

In [None]:
env.battery_manager.soc

In [None]:
create_sim_env()
# Load power model
power_model = load_model("c:\\Users\\HOUALEF\\Desktop\\RL_Carla\\models\\energy_model_best.hdf5")


spawn_points = world.get_map().get_spawn_points()
destination = carla.Location(
    x=start_point.location.x + length,
    y=start_point.location.y,
    z=start_point.location.z,
)

# Initialize battery manager
battery_manager = BatteryManager(vehicle, destination, world, power_model,C_bat,P_aux)
env = CarlaEnv(vehicle, battery_manager, all_corners, destination)




In [None]:
#vehicle.set_autopilot(True)

In [None]:
env.stop_car()


In [None]:
vehicle.set_autopilot(True)

In [None]:


vehicle.set_autopilot(True)
control = carla.VehicleControl()
control.throttle = 0.0
control.brake = 1.0  # Full brake
control.steer = +0.00
# ‚úÖ Apply stopping control
vehicle.apply_control(control)


In [None]:
env.battery_manager.is_on_charging_lane()

In [None]:
env.vehicle.set_transform(start_point)

In [None]:
while True:
    env.move_to_dwcl()
    time.sleep(5)
    env.exit_dwcl()

In [None]:
env.move_to_dwcl()

In [None]:
env.tm.vehicle_percentage_speed_difference(vehicle, 0)

In [None]:
env.set_speed(10)

In [None]:
env.speed_up()

In [None]:
env.slow_down()

In [None]:
env.exit_dwcl()

In [None]:
print(vehicle.get_speed_limit())

In [None]:
while True:
    velocity = vehicle.get_velocity()
    speed_m_s = math.sqrt(velocity.x**2 + velocity.y**2) *3.6
    print('Speed:',speed_m_s,'   Max_speed: ' ,vehicle.get_speed_limit(),'\n')
    time.sleep(1)

In [None]:
"""

import gym
import numpy as np
import torch
import torch.nn as nnwww
import random
import collections
from gym import spaces

import carla
import numpy as np
import time
import gym
from gym import spaces

class CarlaEnv(gym.Env):
    def __init__(self, vehicle, battery_manager, all_corners, destination):
        super(CarlaEnv, self).__init__()
        self.vehicle = vehicle
        self.battery_manager = battery_manager
        self.all_corners = all_corners
        self.destination = destination
        self.start_time = time.time()

        # Define state space: [SoC, Required SoC, ETA, Distance to DWCL End, Lane Type]
        self.observation_space = spaces.Box(
            low=np.array([0, 0, 0, 0, 0]),
            high=np.array([100, 100, 1000, 500, 1]),
            dtype=np.float32
        )

        # Define action space: [Go to DWCL, Leave DWCL, Accelerate, Decelerate, Maintain Speed]
        self.action_space = spaces.Discrete(5)

    def find_nearest_dwcl(self):
        
        vehicle_location = self.vehicle.get_location()
        min_distance = float('inf')
        nearest_dwcl = None

        for corners in self.all_corners:
            center_x = sum(c.x for c in corners) / 4
            center_y = sum(c.y for c in corners) / 4
            dwcl_center = carla.Location(center_x, center_y, corners[0].z)

            distance = vehicle_location.distance(dwcl_center)
            if distance < min_distance:
                min_distance = distance
                nearest_dwcl = dwcl_center

        return nearest_dwcl

    def change_lane(self, target_lane):
        
        self.vehicle.set_autopilot(False)
        agent = BehaviorAgent(self.vehicle, behavior="normal")
        agent.set_destination(target_lane)
        self.vehicle.set_autopilot(True)

    def move_to_nearest_dwcl(self):
        
        nearest_dwcl = self.find_nearest_dwcl()
        if nearest_dwcl:
            self.change_lane(nearest_dwcl)
        else:
            print("No DWCL found nearby!")

    def leave_dwcl(self):
        
        vehicle_location = self.vehicle.get_location()
        map = self.vehicle.get_world().get_map()
        waypoint = map.get_waypoint(vehicle_location, project_to_road=True, lane_type=carla.LaneType.Driving)
        
        while waypoint.lane_type == carla.LaneType.Driving and waypoint.lane_id % 2 == 0:
            waypoint = waypoint.get_left_lane()
            if waypoint is None:
                print("No non-DWCL lane found!")
                return
        
        self.change_lane(waypoint.transform.location)

    def step(self, action):
        
        eta = self.battery_manager.eta
        elapsed_time = time.time() - self.start_time
        distance_to_destination = self.battery_manager.compute_remaining_distance()
        lane_type = int(self.battery_manager.is_on_charging_lane(self.all_corners)[0])

        # End episode if destination is reached or time exceeds 3x ETA
        if distance_to_destination < 1 or elapsed_time > 3 * eta:
            return np.array([0, 0, 0, 0, lane_type]), 0, True, {}
        
        reward = 0
        if action == 0:  # Move to nearest DWCL
            if lane_type == 0:
                self.move_to_nearest_dwcl()
                reward += 10  # Reward entering DWCL when necessary
            else:
                reward -= 5  # Penalize unnecessary lane changes
        elif action == 1:  # Leave DWCL
            if lane_type == 1:
                self.leave_dwcl()
                reward += 5  # Reward leaving DWCL when unnecessary
            else:
                reward -= 5
        elif action == 2:  # Accelerate inside DWCL
            if lane_type == 1:
                self.vehicle.set_autopilot(False)
                self.vehicle.apply_control(carla.VehicleControl(throttle=0.8))
                self.vehicle.set_autopilot(True)
                reward += 3  # Reward acceleration when charging is optimal
        elif action == 3:  # Decelerate inside DWCL
            if lane_type == 1:
                self.vehicle.set_autopilot(False)
                self.vehicle.apply_control(carla.VehicleControl(throttle=0.3, brake=0.2))
                self.vehicle.set_autopilot(True)
                reward += 3  # Reward slowing down when necessary
        elif action == 4:  # Maintain speed inside DWCL
            if lane_type == 1:
                self.vehicle.set_autopilot(False)
                self.vehicle.apply_control(carla.VehicleControl(throttle=0.5))
                self.vehicle.set_autopilot(True)
                reward += 2  # Reward maintaining speed for efficient charging

        # Compute updated state
        state = np.array([self.battery_manager.soc, self.battery_manager.soc_required, eta, distance_to_destination, lane_type])
        return state, reward, False, {}

    def reset(self):
       
        create_sim_env()
        
       
        self.vehicle.set_autopilot(True)
        
        self.soc, soc_required, self.total_energy, ETA, P_ev, f_align, k, eta_transfer, remaining_distance, acceleration, slope = self.battery_manager.update_battery(0,self.all_corners)
        state = np.array([self.soc, soc_required, ETA, remaining_distance, int(self.battery_manager.is_on_charging_lane(self.all_corners)[0])])
        return state



class DQN(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(input_dim, 128)
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, output_dim)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        return self.fc3(x)

class DQNAgent:
    def __init__(self, state_dim, action_dim):
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.model = DQN(state_dim, action_dim).to(self.device)
        self.optimizer = optim.Adam(self.model.parameters(), lr=0.001)
        self.memory = collections.deque(maxlen=10000)
        self.gamma = 0.99
        self.epsilon = 1.0
        self.epsilon_decay = 0.995
        self.epsilon_min = 0.05

    def select_action(self, state):
        if random.random() < self.epsilon:
            return random.randint(0, 4)  # Random action
        else:
            state = torch.tensor(state, dtype=torch.float32).to(self.device)
            with torch.no_grad():
                return torch.argmax(self.model(state)).item()

    def train(self, batch_size=64):
        if len(self.memory) < batch_size:
            return

        batch = random.sample(self.memory, batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)

        states = torch.tensor(np.array(states), dtype=torch.float32).to(self.device)
        actions = torch.tensor(actions, dtype=torch.int64).to(self.device)
        rewards = torch.tensor(rewards, dtype=torch.float32).to(self.device)
        next_states = torch.tensor(np.array(next_states), dtype=torch.float32).to(self.device)
        dones = torch.tensor(dones, dtype=torch.float32).to(self.device)

        q_values = self.model(states).gather(1, actions.unsqueeze(1)).squeeze(1)
        next_q_values = self.model(next_states).max(1)[0].detach()
        target_q_values = rewards + (1 - dones) * self.gamma * next_q_values

        loss = nn.functional.mse_loss(q_values, target_q_values)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        self.epsilon = max(self.epsilon * self.epsilon_decay, self.epsilon_min)

"""


In [None]:
"""

# Constants
eta_ch = 0.92  # Battery charging efficiency
P_ev_max = 80  # Maximum Power Transfer (40 kW)
Q1 = 90  # Quality factor of primary coil
Q2 = 90 # Quality factor of secondary coil
k0 = 0.2  # Nominal coupling coefficient (perfect alignment)
d_max = 0.5  # Maximum lateral misalignment before no power transfer (meters)

# Vehicle constants
m = 1680  # kg, Mass
g = 9.81  # m/s^2, Gravity
c_rr = 0.01  # Rolling resistance coefficient
c_d = 0.28  # Drag coefficient
A = 1.93  # m^2, Frontal area
rho = 1.20  # kg/m^3, Air density
C_bat = 24  # kWh, Battery capacity
alpha = 0.90  # Transmission efficiency
beta = 0.75  # Regeneration efficiency
P_aux = 0  # kW, Auxiliary power consumption
max_speed = 70 / 3.6  # m/s
avg_speed = 35 / 3.6
avg_slop = 0.002
avg_acc = 0.28

create_sim_env()
# Load power model
power_model = load_model("c:\\Users\\HOUALEF\\Desktop\\RL_Carla\\models\\energy_model_best.hdf5")

spawn_points = world.get_map().get_spawn_points()
destination = carla.Location(
    x=start_point.location.x - length,
    y=start_point.location.y,
    z=start_point.location.z,
)

# Initialize battery manager
battery_manager = BatteryManager(vehicle, destination, world, power_model,C_bat,P_aux)


# Initialize Environment and Agent
env = CarlaEnv(vehicle, battery_manager, all_corners,destination)
agent = DQNAgent(state_dim=5, action_dim=5)

num_episodes = 500  # Number of episodes to train
max_steps = 1000  # Max steps per episode
batch_size = 64  # Experience replay batch size

# Training loop
for episode in range(num_episodes):
    state = env.reset()
    done = False
    total_reward = 0

    for step in range(max_steps):
        # Select action using Œµ-greedy policy
        action = agent.select_action(state)

        # Execute action in environment
        next_state, reward, done, _ = env.step(action)

        # Store experience in memory
        agent.memory.append((state, action, reward, next_state, done))

        # Train the agent
        agent.train(batch_size)

        # Move to next state
        state = next_state
        total_reward += reward

        # End episode if done
        if done:
            break

    # Print training progress
    print(f"Episode {episode + 1}/{num_episodes}, Total Reward: {total_reward:.2f}, Epsilon: {agent.epsilon:.3f}")

# Save the trained model
torch.save(agent.model.state_dict(), "dqn_carla.pth")
print("Training complete! Model saved.")
"""


In [13]:


# Constants
eta_ch = 0.92  # Battery charging efficiency
P_ev_max = 80  # Maximum Power Transfer (40 kW)
Q1 = 90  # Quality factor of primary coil
Q2 = 90 # Quality factor of secondary coil
k0 = 0.2  # Nominal coupling coefficient (perfect alignment)
d_max = 0.5  # Maximum lateral misalignment before no power transfer (meters)

# Vehicle constants
m = 1680  # kg, Mass
g = 9.81  # m/s^2, Gravity
c_rr = 0.01  # Rolling resistance coefficient
c_d = 0.28  # Drag coefficient
A = 1.93  # m^2, Frontal area
rho = 1.20  # kg/m^3, Air density
C_bat = 24  # kWh, Battery capacity
alpha = 0.90  # Transmission efficiency
beta = 0.75  # Regeneration efficiency
P_aux = 0  # kW, Auxiliary power consumption
max_speed = 70 / 3.6  # m/s
avg_speed = 35 / 3.6
avg_slop = 0.002
avg_acc = 0.28

# Pygame initialization
pygame.init()
WIDTH, HEIGHT = 1600, 900  # Window size
screen = pygame.display.set_mode((WIDTH, HEIGHT))
pygame.display.set_caption("CARLA Vehicle Info Display")
font = pygame.font.Font(None, 24)
clock = pygame.time.Clock()
create_sim_env()
# Connect to CARLA
settings = world.get_settings()
settings.synchronous_mode = True
world.apply_settings(settings)

spawn_points = world.get_map().get_spawn_points()
destination = carla.Location(
    x=start_point.location.x - length,
    y=start_point.location.y,
    z=start_point.location.z,
)

# Load power model
power_model = load_model("c:\\Users\\HOUALEF\\Desktop\\RL_Carla\\models\\energy_model_best.hdf5")

# Initialize battery manager
battery_manager = BatteryManager(vehicle, destination, world, power_model,C_bat,P_aux)

# Attach a camera to the vehicle
camera_bp = world.get_blueprint_library().find("sensor.camera.rgb")
camera_bp.set_attribute("image_size_x", str(WIDTH))
camera_bp.set_attribute("image_size_y", str(HEIGHT))
camera_bp.set_attribute("fov", "110")
camera_transform = carla.Transform(carla.Location(x=1.5, z=2.0))
camera = world.spawn_actor(camera_bp, camera_transform, attach_to=vehicle)
latest_image = np.zeros((HEIGHT, WIDTH, 3), dtype=np.uint8)

def process_img(image):
    global latest_image
    array = np.frombuffer(image.raw_data, dtype=np.uint8)
    array = np.reshape(array, (image.height, image.width, 4))
    latest_image = cv2.flip(array[:, :, :3], 1)

camera.listen(lambda image: process_img(image))

# Agent for automatic driving
agent = BehaviorAgent(vehicle, behavior="normal")
agent.set_destination(destination)

manual_mode = True
control = carla.VehicleControl()

# Main loop
running = True
while running:
    world.tick()
    #clock.tick(20)
    
    current_time = battery_manager.get_simulation_time()
    delta_time = current_time - battery_manager.last_update_time
    battery_manager.last_update_time = current_time
    
    soc, soc_required, total_energy, ETA, charging_power, f_align, k, eta_transfer, remaining_distance,acceleration,slope = battery_manager.update_battery(
        delta_time, all_corners
    )
    
    # Manual control
    keys = pygame.key.get_pressed()
    if keys[pygame.K_m]:
        manual_mode = not manual_mode
        time.sleep(0.1)
    
    if manual_mode:
        control.throttle = 1.0 if keys[pygame.K_UP] else 0.0
        control.brake = 1.0 if keys[pygame.K_DOWN] else 0.0
        control.steer = -0.5 if keys[pygame.K_LEFT] else (0.5 if keys[pygame.K_RIGHT] else 0.0)
        control.reverse = keys[pygame.K_SPACE]
    else:
        control = agent.run_step()
    
    vehicle.apply_control(control)
    
    # Pygame Display Update
    screen.fill((0, 0, 0))
    frame = cv2.cvtColor(latest_image, cv2.COLOR_BGR2RGB)
    frame = np.rot90(frame)
    frame = pygame.surfarray.make_surface(frame)
    screen.blit(frame, (0, 0))
    map = vehicle.get_world().get_map()
    waypoint = map.get_waypoint(vehicle.get_location())
    lane_id = waypoint.lane_id
    
    text_data = [
        f"Driving Mode: {'Manual' if manual_mode else 'Automatic'}",
        f"Speed: {vehicle.get_velocity().length() * 3.6:.2f} km/h",
        f"SoC: {soc:.2f}%",
        f"Energy Consumed: {total_energy:.4f} kWh",
        f"ETA: {ETA:.2f}s",
        f"Required SoC: {soc_required:.2f}%",
        f"Remaining Distance: {remaining_distance:.2f}m",
        f"Charging Power: {charging_power:.2f} kW",
        f"Coupling k: {k:.2f}",
        f"Alignment: {f_align:.2f}",
        f"Eta Transfer: {eta_transfer:.2f}",
        f"Delta Time: {delta_time:.2f} s" ,
        f"Acceleration: {acceleration:.2f} m/s¬≤",
        f"Slope: {slope:.2f} ¬∞",
        f"lane_id: {lane_id}"
    ]
    
    y_offset = 30
    for text in text_data:
        screen.blit(font.render(text, True, (255, 255, 255)), (20, y_offset))
        y_offset += 30
    
    pygame.display.flip()
    
    for event in pygame.event.get():
        if event.type == pygame.QUIT or soc <= 0:
            running = False

pygame.quit()
camera.destroy()
print("Simulation ended.")





Simulation ended.
