In [10]:
%pip install imageio imageio-ffmpeg pygame casadi stable-baselines3 tensorboard "stable-baselines3[extra]"  pyvirtualdisplay ipywidgets --quiet
%pip install "gymnasium[other]" --quiet

Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


In [11]:
import numpy as np
import gymnasium as gym
from VehicleConfigLoader import VehicleConfigLoader
from Simulation import ArticulatedVehicle
from Simulation import Map
from Simulation import MapEntity
import random
import Visualization
from casadi import cos, sin, tan
from typing import Any, SupportsFloat, Optional
from stable_baselines3 import PPO, SAC
import stable_baselines3.common.monitor
import torch
import os
import tensorboard
from IPython.display import HTML, display
from pygame import font
from MapConfigLoader import MapConfigLoader

In [12]:
log_dir = "logs"
monitor_dir = "monitor"

In [13]:
#parametros da simulação
dt = 0.2
max_steps = 400

#### API Gymnasium

Aqui é definido o ambiente de treinamento, extendendo a classe gym.Env.

In [14]:
class ParkingEnv(gym.Env):

    metadata = {"render_modes": ["rgb_array"], "render_fps": 24}

    VEHICLE_NAME = "BUG1" #nome do veículo
    MAP_NAME = "MAPA_1" #nome do mapa
    MAP_WIDTH = 60 # largura do mapa
    MAP_HEIGHT = 60 # altura do mapa
    SENSOR_RANGE_M = 150.0 # raio do sensor
    SPEED_LIMIT_MS = 5.0 # velocidade maxima
    STEERING_LIMIT_RAD = float(np.deg2rad(28.0)) # angulo maximo de esterçamento
    JACKKNIFE_LIMIT_RAD = float(np.deg2rad(65.0)) # angulo maximo de jackknife
    DT = 0.2 # tempo de simulação
    MAX_SECONDS = 30.0

    def __init__(self, seed = 0):

        self.render_mode = "rgb_array"

        vehicle_loader = VehicleConfigLoader("lista_veiculos.json")
        vehicle_geometry = vehicle_loader.get_geometry(self.VEHICLE_NAME)

        map_loader = MapConfigLoader("lista_mapas.json")
        self.map = map_loader.load_map(self.MAP_NAME, self.MAP_WIDTH, self.MAP_HEIGHT)

        self.vehicle = ArticulatedVehicle(geometry=vehicle_geometry)
        self.dt = dt
        self.max_steps = max_steps
        self.steps = 0
        
        # Gymnasium spaces based on README specifications


        # Observation: [x, y, theta, beta, alpha, r1..r14, goal_x, goal_y, goal_theta]
        obs_low = np.array(
            [
                0.0,                         # x
                0.0,                         # y
                -np.pi,                      # theta
                -self.JACKKNIFE_LIMIT_RAD,        # beta
                -self.STEERING_LIMIT_RAD,         # alpha
            ]
            + [0.0] * self.vehicle.get_raycast_count()                     # raycasts
            + [0.0, -np.pi],            # goal distance, goal direction
            dtype=np.float32,
        )

        obs_high = np.array(
            [
                self.MAP_WIDTH,                   # x
                self.MAP_HEIGHT,                  # y
                np.pi,                       # theta
                self.JACKKNIFE_LIMIT_RAD,         # beta
                self.STEERING_LIMIT_RAD,          # alpha
            ]
            + [self.SENSOR_RANGE_M] * self.vehicle.get_raycast_count()          # raycasts
            + [self.MAP_WIDTH + self.MAP_HEIGHT, np.pi], # goal distance, goal direction
            dtype=np.float32,
        )

        self.observation_space = gym.spaces.Box(low=obs_low, high=obs_high, dtype=np.float32)

        # Action: [v, alpha]
        act_low = np.array(
            [
                -self.SPEED_LIMIT_MS,             # v (allow reverse)
                -self.STEERING_LIMIT_RAD,         # alpha
            ],
            dtype=np.float32,
        )
        act_high = np.array(
            [
                self.SPEED_LIMIT_MS,             # v (allow reverse)
                self.STEERING_LIMIT_RAD,          # alpha
            ],
            dtype=np.float32,
        )
        self.action_space = gym.spaces.Box(low=act_low, high=act_high, dtype=np.float32)

    def reset(self, *, seed: int | None = None, options: dict | None = None):
        super().reset(seed=seed)
        self.steps = 0
        start_pos = self.map.get_random_start_position(self.vehicle, max_attempts=100)
        if start_pos is None:
            start_x = self.map.get_size()[0] / 2
            start_y = self.map.get_size()[1] / 2
        else:
            start_x, start_y = start_pos
        start_theta = 0.0
        start_beta = 0.0
        start_alpha = 0.0
        self.best_distance_to_goal = float('inf')
        self.vehicle.update_physical_properties(start_x, start_y, 0.0, start_theta, start_beta, start_alpha)
        self.vehicle.initialize_raycasts()
        self.vehicle.update_raycasts(self.map.get_entities())

        # Build observation
        x, y = self.vehicle.get_position()
        theta = self.vehicle.get_theta()
        beta = self.vehicle.get_beta()
        alpha_current = self.vehicle.get_alpha()
        raycast_lengths = self.vehicle.get_raycast_lengths()
        goal_x, goal_y = self.map.get_parking_goal_position()
        goal_theta = self.map.get_parking_goal_theta()
        observation = np.array([x, y, theta, beta, alpha_current] + raycast_lengths + [self._calculate_goal_distance(), self._calculate_goal_direction()], dtype=np.float32)
        info = {}
        return observation, info

    def step(self, action) -> tuple[np.ndarray, SupportsFloat, bool, bool, dict[str, Any]]:
        velocity, alpha = action
        self.steps += 1
        terminated = False
        truncated = False
        info = {}
        reward = -0.01 # recompensa base por passo de tempo (reduzida)



        ## Strongly punish zero/low speed
        if abs(velocity) < 0.1:
            reward -= 0.4  # Penalidade muito maior para ficar parado
        elif abs(velocity) < 0.5:
            reward -= 0.05  # Penalidade menor para velocidade muito baixa

        if self.steps >= self.max_steps:
            truncated = True
        self._move_vehicle(velocity, alpha, self.dt)
        if self._check_vehicle_collision():
            terminated = True
            reward = -20.0
        elif self._check_vehicle_parking():
            terminated = True
            reward = 100.0
        elif self._check_trailer_jackknife():
            terminated = True
            reward = -20.0
        self.vehicle.update_raycasts(self.map.get_entities())


        # Observação: [x, y, theta, beta, alpha, r1..r14]
        x, y = self.vehicle.get_position()
        theta = self.vehicle.get_theta()
        beta = self.vehicle.get_beta()
        alpha_current = self.vehicle.get_alpha()
        raycast_lengths = self.vehicle.get_raycast_lengths()
        goal_x, goal_y = self.map.get_parking_goal_position()
        goal_theta = self.map.get_parking_goal_theta()
        observation = np.array([x, y, theta, beta, alpha_current] + raycast_lengths + [self._calculate_goal_distance(), self._calculate_goal_direction()], dtype=np.float32)

        return observation, reward, terminated, truncated, info

    def render(self):
        rgb_array = Visualization.to_rgb_array(self.map, self.vehicle, (288, 288), self._calculate_goal_distance(), self._calculate_goal_direction())
        return rgb_array

    def close(self):
        pass


    def _move_vehicle(self, velocity: float, alpha: float, dt: float):
        # Current state
        x, y = self.vehicle.get_position()
        theta = self.vehicle.get_theta()
        beta = self.vehicle.get_beta()

        # Geometry
        D = self.vehicle.get_distancia_eixo_dianteiro_quinta_roda() - self.vehicle.get_distancia_eixo_traseiro_quinta_roda()
        L = self.vehicle.get_distancia_eixo_traseiro_trailer_quinta_roda()
        a = self.vehicle.get_distancia_eixo_traseiro_quinta_roda()

        angular_velocity_tractor = (velocity / D) * tan(alpha)
        beta_dot = angular_velocity_tractor * (1 - (alpha * cos(beta)) / L) - (velocity * sin(beta)) / L

        # Kinematics
        x_dot = velocity * cos(theta)
        y_dot = velocity * sin(theta)
        theta_dot = (velocity / D) * tan(alpha)
        beta_dot = beta_dot = angular_velocity_tractor * (1 - (alpha * cos(beta)) / L) - (velocity * sin(beta)) / L

        # Euler step
        new_x = x + x_dot * dt
        new_y = y + y_dot * dt
        new_theta = theta + theta_dot * dt
        new_beta = beta + beta_dot * dt


        self.vehicle.update_physical_properties(new_x, new_y, velocity, new_theta, new_beta, alpha)
        self.vehicle.update_raycasts(self.map.get_entities())

    def _check_vehicle_collision(self) -> bool:
        """Verifica se o veículo colidiu com alguma parede ou passou por cima de uma vaga de estacionamento."""
        for entity in self.map.get_entities():
            if entity.type == MapEntity.ENTITY_WALL:
                if self.vehicle.check_collision(entity):
                    return True
        return False

    def _calculate_goal_distance(self):
        x, y = self.vehicle.get_position()
        goal_x, goal_y = self.map.get_parking_goal_position()
        distance_to_goal = np.sqrt((x - goal_x)**2 + (y - goal_y)**2)
        return distance_to_goal

    def _calculate_goal_direction(self):
        x, y = self.vehicle.get_position()
        goal_x, goal_y = self.map.get_parking_goal_position()
        direction_to_goal = np.arctan2(goal_y - y, goal_x - x)
        return direction_to_goal

    def _check_vehicle_parking(self) -> bool:
        """Verifica se o trailer do veículo está dentro de uma vaga de estacionamento."""
        goal = self.map.get_parking_goal()
        if goal.get_bounding_box().contains_bounding_box(self.vehicle.get_bounding_box_trailer()):
            return True
        return False

    def _check_trailer_jackknife(self) -> bool:
        """Verifica se o trailer do veículo está em jackknife."""
        return self.vehicle.get_beta() > np.deg2rad(65.0) or self.vehicle.get_beta() < np.deg2rad(-65.0)



#### Funções de treinamento e avaliação

In [15]:
def evaluate_model(model: PPO, iterations: int = 10):
    total_reward = 0.0
    for _ in range(iterations):
        total_reward += run_episode(model)
    return total_reward / iterations
    
def run_episode_and_save_video(model):
    video_recorder = Visualization.VideoRecorder("simulation.mp4", fps=10)
    env = ParkingEnv()
    observation, info = env.reset()
    total_reward = 0.0

    for _ in range(max_steps):
        action, _ = model.predict(observation, deterministic=True)
        observation, reward, terminated, truncated, info = env.step(action)
        total_reward += float(reward)
        video_recorder.append(env.render())
        if terminated or truncated:
            break

    video_recorder.close()
    env.close()
    return total_reward

def run_episode(model):
    env = ParkingEnv()
    observation, info = env.reset()
    total_reward = 0.0

    for _ in range(max_steps):
        action, _ = model.predict(observation, deterministic=True)
        observation, reward, terminated, truncated, info = env.step(action)
        total_reward += float(reward)
        if terminated or truncated:
            break

    env.close()
    return total_reward

def run_random_episode():
    env = ParkingEnv()
    observation, info = env.reset()
    total_reward = 0.0
    frames = []
    for t in range(max_steps):
        action = env.action_space.sample()
        observation, reward, terminated, truncated, info = env.step(action)
        total_reward += float(reward)
        frames.append(env.render())
        if terminated or truncated:
            break
    Visualization.save_frames_as_mp4(frames, "simulation.mp4")
    env.close()
    return total_reward

from stable_baselines3.common.vec_env import SubprocVecEnv, DummyVecEnv
import platform

N_ENVS = 6

def make_env(seed: int = 0):
    def _init():
        env = ParkingEnv()
        env = stable_baselines3.common.monitor.Monitor(env, monitor_dir)
        env.reset(seed=seed)
        return env
    return _init

def make_vector_env(n_envs: int = N_ENVS, use_subproc: bool | None = None):
    """Create a vectorized environment.

    On Windows (especially inside notebooks), `SubprocVecEnv` can be unstable
    and cause BrokenPipeError/EOFError. There we default to `DummyVecEnv`.
    """
    # Auto-select backend if not specified
    if use_subproc is None:
        use_subproc = platform.system() != "Windows"

    env_fns = [make_env(seed=i) for i in range(n_envs)]

    if use_subproc:
        try:
            return SubprocVecEnv(env_fns)
        except Exception as e:
            print(f"SubprocVecEnv failed ({e}), falling back to DummyVecEnv.")

    return DummyVecEnv(env_fns)

def train_ppo(model: PPO | None = None, total_timesteps: int = 10000):
    # Create environment
    env = make_vector_env()

    # neural network hyperparameters
    # net_arch is a list of number of neurons per hidden layer, e.g. [16,20] means
    # two hidden layers with 16 and 20 neurons, respectively
    policy_kwargs = dict(activation_fn=torch.nn.ReLU,
                        net_arch=[16,8,4])

    # instantiates the model using the defined hyperparameters
    model = PPO(
        policy="MlpPolicy",           # neural network policy architecture (MLP for vector observations)
        env=env,                      # gymnasium-compatible environment to train on
        policy_kwargs=policy_kwargs,  # custom network architecture and activation
        verbose=0,                    # logging verbosity: 0(silent),1(info),2(debug)
        tensorboard_log=log_dir,      # directory for TensorBoard logs
        learning_rate=3e-4,           # optimizer learning rate
        n_steps=max_steps,                 # rollout steps per environment update
        batch_size=400,                # minibatch size for optimization
        gamma=0.9995,                   # discount factor
        gae_lambda=0.95,              # GAE lambda for bias-variance tradeoff
        ent_coef=0.01,                 # entropy coefficient (encourages exploration)
        clip_range=0.2,               # PPO clipping parameter
        n_epochs=10,                  # number of optimization epochs per update
        device="auto"                 # use GPU if available, else CPU
    )

    # You can also experiment with other RL algorithms like A2C, PPO, DDPG etc.
    # Refer to  https://stable-baselines3.readthedocs.io/en/master/guide/examples.html
    # for documentation. For example, if you would like to run DDPG, just replace "DQN" above with "DDPG".

    model.learn(total_timesteps=total_timesteps, progress_bar=True)

    return model

#### Funções auxiliares para visualização

In [16]:
# @title Play Video function
from IPython.display import HTML
from base64 import b64encode
import platform

# Only import and use pyvirtualdisplay on Linux
if platform.system() != 'Windows':
    from pyvirtualdisplay import Display
else:
    Display = None

# create the directory to store the video(s)
os.makedirs("./video", exist_ok=True)

# Only start virtual display on Linux (not needed on Windows)
display = None
if platform.system() != 'Windows' and Display is not None:
    display = Display(visible=False, size=(2000, 1500))
    _ = display.start()

"""
Utility functions to enable video recording of gym environment
and displaying it.
To enable video, just do "env = wrap_env(env)""
"""
def render_mp4(videopath: str) -> str:
  """
  Gets a string containing a b4-encoded version of the MP4 video
  at the specified path.
  """
  if not os.path.exists(videopath):
      return f'<p>Video file not found: {videopath}</p>'
  mp4 = open(videopath, 'rb').read()
  base64_encoded_mp4 = b64encode(mp4).decode()
  return f'<video width=400 controls><source src="data:video/mp4;' \
         f'base64,{base64_encoded_mp4}" type="video/mp4"></video>'

def record_and_display_video_manual(env, model, video_name, num_episodes=1):
    """
    Records a video manually using Visualization.VideoRecorder (more reliable).
    
    Args:
        env: The gymnasium environment.
        model: The trained model.
        video_name (str): The name to use for the video file.
        num_episodes (int): The number of episodes to record (default is 1).
    """
    os.makedirs("./video", exist_ok=True)
    
    video_path = f"video/{video_name}.mp4"
    video_recorder = Visualization.VideoRecorder(video_path, fps=10)
    
    total_reward = 0.0
    episode_count = 0
    
    for episode in range(num_episodes):
        observation, info = env.reset()
        episode_reward = 0.0
        
        for step in range(max_steps):
            action, _ = model.predict(observation, deterministic=True)
            observation, reward, terminated, truncated, info = env.step(action)
            episode_reward += float(reward)
            video_recorder.append(env.render())
            
            if terminated or truncated:
                break
        
        total_reward += episode_reward
        episode_count += 1
    
    video_recorder.close()
    print(f"\nTotal reward: {total_reward}")
    print(f"Video saved to: {video_path}")
    
    html = render_mp4(video_path)
    return HTML(html)

def record_and_display_video(env, model, video_name, num_episodes=1):
    """
    Records a video of the agent performing in the environment and displays it.

    Args:
        env: The gymnasium environment.
        model: The trained model.
        video_name (str): The name to use for the video file.
        num_episodes (int): The number of episodes to record (default is 1).
    """
    import glob
    
    # create the directory to store the video(s)
    os.makedirs("./video", exist_ok=True)

    # Use a virtual display for rendering (only on Linux)
    display = None
    if platform.system() != 'Windows' and Display is not None:
        display = Display(visible=False, size=(1400, 900))
        _ = display.start()

    env_name = "ParkingEnv"

    env = gym.wrappers.RecordVideo(
        env,
        video_folder="video",
        name_prefix=f"{env_name}_{video_name}",
        episode_trigger=lambda episode_id: episode_id < num_episodes
    )

    observation, _ = env.reset()
    total_reward = 0
    done = False
    episode_count = 0

    while not done:
        action, states = model.predict(observation, deterministic=True)
        observation, reward, terminated, truncated, info = env.step(action)
        done = terminated or truncated
        total_reward += reward
        if done:
            episode_count += 1
            if episode_count < num_episodes:
                observation, _ = env.reset()
                done = False

    env.close()
    # Stop the virtual display if it was started
    if display is not None:
        display.stop()

    print(f"\nTotal reward: {total_reward}")

    # Find the video file that was created
    video_pattern = f"video/{env_name}_{video_name}*.mp4"
    video_files = glob.glob(video_pattern)
    
    if not video_files:
        # Try alternative pattern
        video_pattern = f"video/*{video_name}*.mp4"
        video_files = glob.glob(video_pattern)
    
    if not video_files:
        # List all video files for debugging
        all_videos = glob.glob("video/*.mp4")
        print(f"Warning: Expected video file not found. Available video files: {all_videos}")
        return HTML("<p>Video file not found. Check the video directory.</p>")
    
    # Use the first matching video file
    video_path = video_files[0]
    print(f"Found video file: {video_path}")
    
    # show video
    html = render_mp4(video_path)
    return HTML(html)

In [17]:
import shutil
def clean_logs():
    if os.path.exists(log_dir):
        print("Cleaning logs")
        shutil.rmtree(log_dir)
    os.makedirs(log_dir, exist_ok=True)

clean_logs()


Cleaning logs


In [18]:


model = None
training_timesteps = 10000

model_save_path = "ppo_model_" + str(training_timesteps) + ".zip"

#train new model
print("Training new model")

model = train_ppo(model, total_timesteps=training_timesteps)
model.save(model_save_path)

print("Training complete, total timesteps: ", model._total_timesteps)

Training new model


Output()

Training complete, total timesteps:  10000


In [19]:
env = ParkingEnv()
record_and_display_video_manual(env, model, "ppo_model", num_episodes=1)

Saved 400 frames as MP4 to video\ppo_model.mp4

Total reward: -163.99999999999892
Video saved to: video/ppo_model.mp4


In [20]:
#load tensorboard
#%load_ext tensorboard
#%tensorboard --logdir logs --host localhost --port 8096



