In [None]:
import gymnasium as gym
from gymnasium.wrappers import RecordVideo
from stable_baselines3 import PPO
from stable_baselines3.common.env_util import make_vec_env
from stable_baselines3.common.vec_env import SubprocVecEnv

In [None]:
TRAIN = True

if True : 
    n_cpu = 6
    batch_size = 64
    env = make_vec_env("racetrack-v0", n_envs=n_cpu, vec_env_cls=SubprocVecEnv)
    model = PPO(
        "MlpPolicy",
        env,
        policy_kwargs=dict(net_arch=[dict(pi=[256, 256], vf=[256, 256])]),
        n_steps=batch_size * 12 // n_cpu,
        batch_size=batch_size,
        n_epochs=10,
        learning_rate=5e-4,
        gamma=0.9,
        verbose=2,
        tensorboard_log="racetrack_ppo/",
    )
    # Train the model
    if TRAIN:
        model.learn(total_timesteps=int(1e5))
        model.save("racetrack_ppo/model")
        del model

In [None]:
# Run the algorithm
model = PPO.load("racetrack_ppo/model", env=env)

env = gym.make("racetrack-v0", render_mode="rgb_array")
env = RecordVideo(
    env, video_folder="racetrack_ppo/videos", episode_trigger=lambda e: True
)
env.unwrapped.set_record_video_wrapper(env)

for video in range(10):
    done = truncated = False
    obs, info = env.reset()
    while not (done or truncated):
        # Predict
        action, _states = model.predict(obs, deterministic=True)
        # Get reward
        obs, reward, done, truncated, info = env.step(action)
        # Render
        env.render()
env.close()

## Learning Sterring and Acceleration

In [None]:
import numpy as np
import gymnasium as gym

env = gym.make("racetrack-v0", render_mode="rgb_array")

config = {
    "observation": {
        "type": "OccupancyGrid",
        "features": ["presence", "on_road"],
        "grid_size": [[-18, 18], [-18, 18]],
        "grid_step": [5, 5],
        "as_image": False,
        "align_to_vehicle_axes": True
    },
    "action": {
        "type": "ContinuousAction",
        "longitudinal": True,
        "lateral": True,
        "steering_range": [-np.pi / 4, np.pi / 4],  # [rad]
        "acceleration_range": [-2, 2],  # [m/s²]
        "speed_range": [0, 15],  # [m/s]
    },
    "simulation_frequency": 15,
    "policy_frequency":     5,
    "duration": 300,
    "collision_reward": -1,
    "lane_centering_cost": 4,
    "action_reward": -0.3,
    "controlled_vehicles": 1,
    "other_vehicles": 1,
    "screen_width": 600,
    "screen_height": 600,
    "centering_position": [0.5, 0.5],
    "scaling": 7,
    "show_trajectories": False,
    "render_agent": True,
    "offscreen_rendering": False
}


env.unwrapped.configure(config)
env.reset()

In [None]:
## Learning with steering and acceleration
TRAIN = True

if True : 
    n_cpu = 6
    batch_size = 64
    model = PPO(
        "MlpPolicy",
        env,
        policy_kwargs=dict(net_arch=[dict(pi=[256, 256], vf=[256, 256])]),
        n_steps=batch_size * 12 // n_cpu,
        batch_size=batch_size,
        n_epochs=10,
        learning_rate=5e-4,
        gamma=0.9,
        verbose=2,
        tensorboard_log="racetrack_ppo/",
    )
    # Train the model
    if TRAIN:
        model.learn(total_timesteps=int(1e5))
        model.save("racetrack_ppo/model_sterring_acceleration")
        del model

In [None]:
import numpy as np
import gymnasium as gym

env = gym.make("racetrack-v0", render_mode="rgb_array")

config = {
    "observation": {
        "type": "OccupancyGrid",
        "features": ["presence", "on_road"],
        "grid_size": [[-18, 18], [-18, 18]],
        "grid_step": [5, 5],
        "as_image": False,
        "align_to_vehicle_axes": True
    },
    "action": {
        "type": "ContinuousAction",
        "longitudinal": True,
        "lateral": True,
        "steering_range": [-np.pi / 4, np.pi / 4],  # [rad]
        "acceleration_range": [-2, 2],  # [m/s²]
        "speed_range": [0, 15],  # [m/s]
    },
    "simulation_frequency": 15,
    "policy_frequency": 5,
    "duration": 300,
    "collision_reward": -1,
    "lane_centering_cost": 4,
    "action_reward": -0.3,
    "controlled_vehicles": 1,
    "other_vehicles": 1,
    "screen_width": 600,
    "screen_height": 600,
    "centering_position": [0.5, 0.5],
    "scaling": 7,
    "show_trajectories": False,
    "render_agent": True,
    "offscreen_rendering": False
}


env.unwrapped.configure(config)
env.reset()

In [None]:
from matplotlib import pyplot as plt
from matplotlib.animation import FuncAnimation
from IPython.display import HTML, display, clear_output
import gymnasium as gym
import numpy as np
from copy import deepcopy

def run_one_episode_and_save(env, agent, to_display=True, save_video=True, max_steps=1000):
    frames = []  
    rewards = 0
    step = 0
    state_history = []
    env.reset() 
    display_env = deepcopy(env)  

    total_steps = 0
    continue_loop = True
    while continue_loop:
        state, _ = display_env.reset()
        done = False
        additional_frames_when_done = 5
        while not done:
            clear_output(wait=True)
            frame = display_env.render() 
            frames.append(frame)
            if to_display:
                plt.imshow(frame)
                plt.axis('off')
                plt.show()

            action, _state = agent.predict(state, deterministic=True)
            state, reward, episode_done, _, _ = display_env.step(action)
            rewards += reward
            step += 1
            state_history.append(state)

            total_steps += 1
            if episode_done or step >= max_steps:
                additional_frames_when_done -= 1
                if additional_frames_when_done <= 0:
                    done = True

        if total_steps >= max_steps:
            continue_loop = False

    if save_video:
        fig, ax = plt.subplots()
        img_ax = ax.imshow(frames[0])  
        ax.axis('off')  

        def update(frame):
            img_ax.set_data(frame)  

        ani = FuncAnimation(fig, update, frames=frames, repeat=False)

        video_filename = 'trained_agent_simulation_vCNN_4_bis_plus.mp4'
        ani.save(video_filename, writer='ffmpeg', fps=50)

        plt.close(fig)  

    display_env.close()  

    return np.array(state_history)

In [None]:
model = PPO.load("model_sterring_acceleration", env=env)

state_history = run_one_episode_and_save(env, model, to_display=True, save_video=True, max_steps=1000)