In [16]:
import os
import csv
import carla
import numpy as np
import random
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.autograd import Variable
import learn2learn as l2l
import gymnasium as gym
from gymnasium import spaces
from collections import deque
import copy
import time
import math
class Vector:
    def __init__(self, x, y, z):
        self.x = x
        self.y = y
        self.z = z
threshold = 2


In [17]:

# Configuration
CSV_FILE = "waypoint.csv"
EPISODES = 500
STEPS = 50000
BATCH_SIZE = 64
GAMMA = 0.99
LEARNING_RATE = 1e-3
MEMORY_SIZE = 10000


class PolicyNetwork(nn.Module):
    def __init__(self, state_size, action_size):
        super(PolicyNetwork, self).__init__()
        self.action_size = action_size
        self.state_size = state_size
        self.fc1 = nn.Linear(state_size, 64)
        self.fc2 = nn.Linear(64, 64)
        self.fc3 = nn.Linear(64, action_size)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return torch.softmax(x, dim=-1)

    
def waypoint_based_action(vehicle, next_waypoint):
    current_transform = vehicle.get_transform()
    current_yaw = current_transform.rotation.yaw

    next_location = carla.Location(x=next_waypoint[0], y=next_waypoint[1])
    desired_yaw = math.degrees(math.atan2(next_location.y - current_transform.location.y, 
                                          next_location.x - current_transform.location.x))
    yaw_diff = (desired_yaw - current_yaw) % 360

    if yaw_diff > 180:
        yaw_diff -= 360

    if yaw_diff > 5:
        action = 3  # Steer right
    elif yaw_diff < -5:
        action = 2  # Steer left
    else:
        action = 0  # Go straight
#     print(f'action decided: {action}, current yaw: {yaw_diff}')
    return torch.tensor([[action]], dtype=torch.long)

    

def read_waypoints(file_path):
    waypoints = []
    with open(file_path, 'r') as csvfile:
        csvreader = csv.reader(csvfile)
        for row in csvreader:
            x, y, yaw = map(float, row)
            waypoints.append((x, y, yaw))
            
    return waypoints



In [18]:

class CarlaEnv(gym.Env):
    def __init__(self, vehicle, waypoints):
        super(CarlaEnv, self).__init__()
        self.vehicle = vehicle
        self.waypoints = waypoints
        self.current_waypoint_index = 0

        self.action_space = spaces.Discrete(4)  # [throttle, brake, steer_left, steer_right]
        self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=(5,), dtype=np.float32)

        self.lane_invasion = False
#         print('CarlaEnv made')
#         self.lane_invasion_sensor = self._spawn_lane_invasion_sensor()
        
#     def _spawn_lane_invasion_sensor(self):
#         sensor_bp = self.world.get_blueprint_library().find('sensor.other.lane_invasion')
#         sensor_transform = carla.Transform(carla.Location(x=2.5, z=0.7))
#         sensor = self.world.spawn_actor(sensor_bp, sensor_transform, attach_to=self.vehicle)
#         sensor.listen(lambda event: self._on_lane_invasion(event))
#         return sensor

    def _on_lane_invasion(self, event):
#         print("lane invasion detected")
        self.lane_invasion = True    

    def reset(self):
        vel = carla.Vector3D()
        vel.x = 0
        vel.y = 0
        vel.z = 0
        self.vehicle.set_target_velocity(vel)
        x, y, yaw = self.waypoints[0]
        transform = carla.Transform(carla.Location(x, y), carla.Rotation(yaw=yaw))
        self.vehicle.set_transform(transform)
#         print("reset done")
        return self._get_observation()

    def step(self, action):
        throttle = 0
        brake = 0
        steer = 0

        if action == 0:
            throttle = 1.0
        elif action == 1:
            brake = 1.0
        elif action == 2:
            steer = -1.0
        
        elif action == 3:
            steer = 1.0
            

        control = carla.VehicleControl(throttle=throttle, brake=brake, steer=steer)
        self.vehicle.apply_control(control)

        next_waypoint = self.waypoints[self.current_waypoint_index]
        next_waypoint_location = carla.Location(x=next_waypoint[0], y=next_waypoint[1])
        current_location = self.vehicle.get_location()
        distance = current_location.distance(next_waypoint_location)

        if distance < threshold:
            self.current_waypoint_index += 1
            if self.current_waypoint_index >= len(self.waypoints):
                self.current_waypoint_index = 0

        reward = self._get_reward(distance, control)

        done = False
        if self.current_waypoint_index == len(self.waypoints) - 1:
            done = True
#         print (f'step taken: Throttle: {throttle} and steer: {steer}')
        return self._get_observation(), reward, done, {}

    def _get_observation(self):
        location = self.vehicle.get_location()
        orientation = self.vehicle.get_transform().rotation.yaw
        speed = self.vehicle.get_velocity()
        speed = np.sqrt(speed.x**2 + speed.y**2 + speed.z**2)
        next_waypoint = self.waypoints[self.current_waypoint_index]
        next_waypoint_location = carla.Location(x=next_waypoint[0], y=next_waypoint[1])
        distance = location.distance(next_waypoint_location)
#         print('observation taken')
        return np.array([location.x, location.y, orientation, speed, distance])


    def _get_reward(self, distance, control):
        reward = 0
        if control.throttle > 0:
            reward += 0
        if distance < threshold:
            reward += 10    
        # Apply a large negative reward for lane invasion
        if self.lane_invasion:
            reward -= 1000  # Adjust the value as needed
            self.lane_invasion = True    
#         print('reward calculated')    
        return reward
    


In [19]:
class trainer(CarlaEnv):
    def __init__(self, episodes, steps, sample_size, learner, meta_optim):
        self.episodes = episodes
        self.steps = steps
        self.sample_size = sample_size
        self.learner = learner
        self.meta_optim = meta_optim
        

    def setup(self, weather):
        
        try:
            state_size = 5
            action_size = 4
            agent = PolicyNetwork(state_size, action_size)
            #target_agent = copy.deepcopy(agent)
            #optimizer = optim.Adam(agent.parameters(), lr=LEARNING_RATE)
            memory = deque(maxlen=MEMORY_SIZE)


            # Read waypoints from CSV
            waypoints = read_waypoints(CSV_FILE)

            # Connect to CARLA server
            client = carla.Client('localhost', 2000)
            client.set_timeout(10.0)

            # Load CARLA world
            world = client.get_world()
            
            current_weather = weather
            print(current_weather)
            if current_weather == "HardRainNoon":
                world.set_weather(carla.WeatherParameters.HardRainNoon)

            
            blueprint_library = world.get_blueprint_library()

            # Spawn vehicle
            vehicle_bp = random.choice(blueprint_library.filter('wrangler_rubicon'))
            spawn_point = carla.Transform(carla.Location(x=-23.6,y=137.5,z=1),carla.Rotation(yaw=0))
            self.vehicle = world.spawn_actor(vehicle_bp, spawn_point)

            sensor_bp = world.get_blueprint_library().find('sensor.other.lane_invasion')
            sensor_transform = carla.Transform(carla.Location(x=2.5, z=0.7))
            self.sensor = world.spawn_actor(sensor_bp, sensor_transform, attach_to=self.vehicle)
            self.sensor.listen(lambda event: self.env._on_lane_invasion(event))

            # Create CARLA environment
            self.env = CarlaEnv(self.vehicle, waypoints)

        except:
            pass
            

    def generate_traj(self):

        self.states_buffer = np.empty((self.sample_size, self.steps))
        action_buffer = np.empty((self.sample_size, self.steps))
        rewards_buffer = np.empty((self.sample_size, self.steps))
        next_states_buffer = np.empty((self.sample_size, self.steps))
    
        for episode in range(self.sample_size):
            state = self.env.reset()
            self.env.current_waypoint_index = 0
            self.env.lane_invasion = False
            episode_reward = 0

            states = []
            actions = []
            rewards = []
            next_states = []

            for step in range(self.steps):
                
                #states = []
                #actions = []
                #rewards = []
                #next_states = []

                # Observe
                state = self.env._get_observation()
                next_waypoint = self.env.waypoints[self.env.current_waypoint_index]
                action = waypoint_based_action(self.vehicle, next_waypoint)

                
                next_state, reward, done, _ = self.env.step(action.item())
                next_state = torch.tensor(next_state, dtype=torch.float32).unsqueeze(0)
                state = next_state
                episode_reward += reward

                states.append(state)
                actions.append(action)
                rewards.append(reward)
                next_states.append(next_state)

            self.states_buffer.append(states)
            action_buffer = np.stack((action_buffer, actions))
            rewards_buffer = np.vstack((rewards_buffer, rewards))
            next_states_buffer = np.vstack((next_states_buffer, next_state))

            
            #print(episode)
        print(actions)

    def destroy(self):
        # Destroy the vehicle in CARLA
        self.sensor.destroy()                      
        self.vehicle.destroy()
        print("All Cleared")


                

    def meta_train(self):

        # Compute loss
        loss = F.cross_entropy(self.learner(self.states_buffer), action)

        # Compute gradients and update model
        self.learner.adapt(loss.item())

        # Meta update
        self.meta_optim.zero_grad()
        mean_task_loss = torch.mean(torch.tensor(task_losses))
        mean_task_loss.backward()
        self.meta_optim.step()
            





In [20]:

state_size = 5
action_size = 4
meta_lr = 0.001
fast_lr = 0.01

# Defining MAML model
model = l2l.algorithms.MAML(PolicyNetwork(state_size, action_size), lr = fast_lr)

# Defining meta-optimizer
meta_optim = torch.optim.Adam(model.parameters(), lr = meta_lr)



#train_obj.setup("HardRainNoon")

learner = model.clone()
episodes = 100

train_obj = trainer(episodes, 100, 50, learner, meta_optim)
train_obj.setup("HardRainNoon")
train_obj.generate_traj()


train_obj.destroy()








HardRainNoon


AttributeError: 'numpy.ndarray' object has no attribute 'append'