# Sensor only

In [None]:
import carla 
# client = carla.Client("localhost", 2000)
# world = client.load_world('Town01')
import gymnasium as gym
from carla_env import CarEnv
import numpy as np
import time
import matplotlib.pyplot as plt
from stable_baselines3.common.callbacks import BaseCallback
from stable_baselines3 import SAC, PPO, DDPG
%matplotlib inline

class CARLA_G(gym.Env):
    def __init__(self, ):
        super(CARLA_G, self).__init__()
        self.env = CarEnv()
        self.action_space = gym.spaces.Box(low=-1
                                           , high=1
                                           , shape = (2, )
                                           , dtype=np.float32)
        self.observation_space = gym.spaces.Box(low = -np.inf
                                                , high = np.inf
                                                , shape=(16, )
                                                , dtype=np.float32)
        
    def step(self, action):
        [new_image, new_state], reward, done, info = self.env.step(action)
        # plt.imshow(new_image)
        # new_image = np.transpose(new_image,(2,0,1))
        
        
        return new_state.astype(np.float32), reward, done, False, {}
    
    def reset(self, seed = None, options = {}):
        image, state = self.env.reset()
        # image = np.transpose(image,(2,0,1))
        return state.astype(np.float32), {}
    
    def render(self):
        pass

env = CARLA_G()
town_name = 'Town01'
env.env.town_name= town_name
obs, info = env.reset()


# Evaluate the agent
episode_reward = 0
model = SAC.load("./SAC_model_sesnor_run_96.zip", print_system_info=True)
for _ in range(10000000):
    action, _ = model.predict(obs, deterministic=True)
    obs, reward, terminated, truncated, info = env.step(action)
    episode_reward += reward
    if terminated or truncated or info.get("is_success", False):
        print("Reward:", episode_reward, "Success?", info.get("is_success", False))
        episode_reward = 0.0
        obs, info = env.reset()


# Camera only

In [None]:
# ENV CONF
import carla 
# client = carla.Client("localhost", 2000)
# world = client.load_world('Town01')
import gymnasium as gym
from carla_env import CarEnv
import numpy as np
import time
import matplotlib.pyplot as plt
from stable_baselines3.common.callbacks import BaseCallback

class CARLA_G_camera(gym.Env):
    def __init__(self, ):
        super(CARLA_G_camera, self).__init__()
        self.env = CarEnv()
        self.action_space = gym.spaces.Box(low=-1, high=1, shape = (2, ), dtype=np.float32)
        self.observation_space = gym.spaces.Box(low=0, high=1,shape=(3,60,160), dtype=np.float32)

    def step(self, action):
        [new_image, new_state], reward, done, info = self.env.step(action)
        first_ch_new_image = np.moveaxis(new_image,-1,0)/255
        return (first_ch_new_image.astype(np.float32)), reward, done, False, {}
    
    def reset(self, seed = None, options = {}):
        image, state = self.env.reset()
        first_ch_image = np.moveaxis(image,-1,0)/255

        return first_ch_image.astype(np.float32), {}
    
    def render(self):
        pass


class MyCallback(BaseCallback):
    def __init__(self, verbose=0):
        super(MyCallback, self).__init__(verbose)
        self.episode_rewards = []
        self.episode_lengths = []

    def _on_step(self) -> bool:
        # Check if there are completed episodes
        if len(self.model.ep_info_buffer) > 0:
            info = self.model.ep_info_buffer[-1]
            self.episode_rewards.append(info['r'])
            self.episode_lengths.append(info['l'])

            # Here we log the episodic rewards and lengths to TensorBoard
            self.logger.record('episode_reward', np.mean(self.episode_rewards[-100:]))
            self.logger.record('episode_length', np.mean(self.episode_lengths[-100:]))
            self.logger.dump(step=self.num_timesteps)

        return True


env = CARLA_G_camera()
town_name = 'Town01'
env.env.town_name = town_name
env.reset()

# Evaluate the agent
episode_reward = 0
model = SAC.load("./SAC_model_camera_run_117.zip", print_system_info=True)
for _ in range(10000000):
    action, _ = model.predict(obs, deterministic=True)
    obs, reward, terminated, truncated, info = env.step(action)
    episode_reward += reward
    if terminated or truncated or info.get("is_success", False):
        print("Reward:", episode_reward, "Success?", info.get("is_success", False))
        episode_reward = 0.0
        obs, info = env.reset()

# Sensors and camera data (fusion)

In [None]:
# ENV CONF
import carla 
# client = carla.Client("localhost", 2000)
# world = client.load_world('Town01')
import gymnasium as gym
from carla_env import CarEnv
import numpy as np
import time
import matplotlib.pyplot as plt
from stable_baselines3.common.callbacks import BaseCallback
class CARLA_G_fusion(gym.Env):
    def __init__(self, ):
        super(CARLA_G_fusion, self).__init__()
        self.env = CarEnv()
        self.action_space = gym.spaces.Box(low=-1, high=1, shape = (2, ), dtype=np.float32)
        # self.observation_space = gym.spaces.Box(low = -np.inf, high = np.inf, shape=(16, ), dtype=np.float32)
        self.observation_space = gym.spaces.Dict({"image": gym.spaces.Box(low=0, high=1,shape=(3,60,160), dtype=np.float32), "tracking": gym.spaces.Box(low = -np.inf, high = np.inf, shape=(16, ), dtype=np.float32)})

    def step(self, action):
        [new_image, new_state], reward, done, info = self.env.step(action)
        first_ch_new_image = np.moveaxis(new_image,-1,0)/255
        return {"image":first_ch_new_image.astype(np.float32), "tracking":new_state.astype(np.float32)}, reward, done, False, {}
    
    def reset(self, seed = None, options = {}):
        image, state = self.env.reset()
        first_ch_image = np.moveaxis(image,-1,0)/255

        return {"image":first_ch_image.astype(np.float32),"tracking":state.astype(np.float32)}, {}
    
    def render(self):
        pass


class MyCallback(BaseCallback):
    def __init__(self, verbose=0):
        super(MyCallback, self).__init__(verbose)
        self.episode_rewards = []
        self.episode_lengths = []

    def _on_step(self) -> bool:
        # Check if there are completed episodes
        if len(self.model.ep_info_buffer) > 0:
            info = self.model.ep_info_buffer[-1]
            self.episode_rewards.append(info['r'])
            self.episode_lengths.append(info['l'])

            # Here we log the episodic rewards and lengths to TensorBoard
            self.logger.record('episode_reward', np.mean(self.episode_rewards[-100:]))
            self.logger.record('episode_length', np.mean(self.episode_lengths[-100:]))
            self.logger.dump(step=self.num_timesteps)

        return True


env = CARLA_G_fusion()
town_name = 'Town01'
env.env.town_name = town_name
env.reset()

# Evaluate the agent
episode_reward = 0
model = SAC.load("./SAC_model_fusion_run_106.zip", print_system_info=True)
for _ in range(10000000):
    action, _ = model.predict(obs, deterministic=True)
    obs, reward, terminated, truncated, info = env.step(action)
    episode_reward += reward
    if terminated or truncated or info.get("is_success", False):
        print("Reward:", episode_reward, "Success?", info.get("is_success", False))
        episode_reward = 0.0
        obs, info = env.reset()