In [None]:
%%capture
!pip install gymnasium
!pip install stable_baselines3

In [None]:
# NumPy
import numpy as np

# Gym
import gymnasium as gym
from gymnasium import spaces

# Stable Baselines
import stable_baselines3
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import DummyVecEnv, VecNormalize

from stable_baselines3.common.utils import set_random_seed

In [None]:
# Define Environment
class SupervisedCarRacing(gym.Env):
  def __init__(self, render_mode=None):
    self.observation_space = spaces.Box(low=0, high=255, shape=(4, 96, 96), dtype=np.uint8)
    self.action_space = spaces.Box(np.array([-1, 0, 0]).astype(np.float32), np.array([+1, +1, +1]).astype(np.float32))

    self.current_index = 0
    with open('states_data.npy', 'rb') as f:
      self.training_obs = np.load(f)
    with open('actions_data.npy', 'rb') as f:
      self.training_act = np.load(f)

    assert render_mode is None
    self.render_mode = render_mode

  def reset(self, seed=None, options=None):
    super().reset(seed=seed)

    self.current_index = 0

    return self.training_obs[self.current_index], {}

  def step(self, action):
    reward = self._get_reward(action)

    self.current_index += 1
    terminated = True if self.current_index == self.training_obs.shape[0] else False
    observation = self.training_obs[0] if terminated else self.training_obs[self.current_index]

    return observation, reward, terminated, False, {}

  def _get_reward(self, action):
    total_score = 0

    dif_turn = abs(action[0] - self.training_act[self.current_index][0])
    dif_acc = abs(action[1] - self.training_act[self.current_index][1])
    dif_brake = abs(action[2] - self.training_act[self.current_index][2])

    def compute_score(metric, is_turn):
      lower_threshold = 2/3 if is_turn else 1/3
      upper_threshold = 4/3 if is_turn else 2/3

      if metric <= lower_threshold:
        return 1
      if metric > lower_threshold and metric <= upper_threshold:
        return 0
      if metric > upper_threshold:
        return -1

    total_score += compute_score(dif_turn, True)
    total_score += compute_score(dif_acc, False)
    total_score += compute_score(dif_brake, False)

    return total_score

In [None]:
# Define Environment Creation
def make_env(rank: int, seed: int = 50):
    def _init():
        env = SupervisedCarRacing()
        env.reset(seed = np.random.randint(9999))
        return env
    set_random_seed(np.random.randint(9999))
    return _init

In [None]:
# Make Training Environment
num_cpu = 1
vec_env = DummyVecEnv([make_env(i) for i in range(num_cpu)])
vec_env = VecNormalize(vec_env)
_ = vec_env.reset()

In [None]:
# Train New Agent (!!!OVERWRITES EXISTING WEIGHTS!!!)
model = PPO("MlpPolicy", vec_env, verbose=1)
model.learn(total_timesteps=50000, progress_bar=True)

model.save("ppo_carracing")

In [None]:
# Load Agent & Perform Additional Training
model = PPO.load("ppo_carracing", env=vec_env)
model.learn(total_timesteps=50000, progress_bar=True)

model.save("ppo_carracing")