In [None]:
# import sys
# import gymnasium as gym
# sys.modules["gym"] = gym
import gym
gym.__version__

In [None]:
from stable_baselines3 import PPO
from stable_baselines3.common.monitor import Monitor
from stable_baselines3.common.vec_env import DummyVecEnv, SubprocVecEnv
from stable_baselines3.common.env_util import make_vec_env
from stable_baselines3.common.callbacks import EvalCallback

In [None]:
import numpy as np
import matplotlib.pyplot as plt

from scipy.spatial import distance

In [None]:
import gnwrapper

In [None]:
env_id = "CarRacing-v0"
NUM_CPU = 4  # Number0of processes to use

In [None]:
env = gnwrapper.Animation(gym.make(env_id))
env.reset()
try:
    track = np.array(env.track)
    print(track.shape)
    # plt.plot(track[:, 2], track[:, 3])
    plt.scatter(track[:, 2], track[:, 3], c=-track[:, 1], cmap='gray')
    car_position =  env.car.hull.position
    car_angle = -env.car.hull.angle
    
    rot = np.array([[np.cos(car_angle), -np.sin(car_angle)], [np.sin(car_angle), np.cos(car_angle)]])
    arg = np.argmin(distance.cdist([car_position], track[:, 2:4]))
    track_size = track.shape[0]
    path = rot @ (track[arg+1-track_size:arg+5-track_size:1, 2:4] - car_position).T
    
    print(path)
    path_angles = np.arctan2(*path)
    print(path_angles)
except Exception as e:
    print('ERROR', e)
env.close()

In [None]:
from scipy.spatial import distance
class CarRacingGroundTruthObsWrapper(gym.ObservationWrapper):
    def __init__(self, env):
        super().__init__(env)
        self.low_state = np.array(
            [0, -np.pi, *[-float('inf'), -float('inf'), -np.pi]*4], dtype=np.float32
        )
        self.high_state = np.array(
            [100, np.pi, *[float('inf'), float('inf'), np.pi]*4], dtype=np.float32
        )
        self.observation_space = gym.spaces.Box(
            low=self.low_state, high=self.high_state, dtype=np.float32
        )
        
    def observation(self, obs):
        velocity = np.sqrt(
            np.square(self.car.hull.linearVelocity[0])
            + np.square(self.car.hull.linearVelocity[1])
        )
        angular_velocity = self.car.hull.angularVelocity
        # ABS sensors
        # abs_sensors = []
        # for i in range(4):
            # abs_sensors.append(0.01 * self.car.wheels[i].omega)
        car_position =  self.car.hull.position
        car_angle = -self.car.hull.angle
        track = np.array(self.track)
        
        rot = np.array([[np.cos(car_angle), -np.sin(car_angle)], [np.sin(car_angle), np.cos(car_angle)]])
        arg = np.argmin(distance.cdist([car_position], track[:, 2:4]))
        track_size = track.shape[0]
        path = rot @ (track[arg+1-track_size:arg+5-track_size:1, 2:4] - car_position).T
        path_angles = np.expand_dims(np.arctan2(*path), axis=0)
        
        # print(path.shape, path_angles.shape)
        # print(np.concatenate((path.T, path_angles.T), axis=1))
        # print()
        # print(path_angles)
        return np.array([velocity, angular_velocity, *np.concatenate((path, path_angles), axis=0).ravel(order='F')])


In [None]:
# env = DummyVecEnv([lambda: CarRacingGroundTruthObsWrapper(gym.make(env_id))])
from utils import CarRacingGroundTruthObsWrapper
def wrapper(env):
    env = CarRacingGroundTruthObsWrapper(env) 
    env = gnwrapper.Animation(env)
    return env

In [None]:
# def make_env(env_id):
#     def _init():
#         env = gym.make(env_id)
#         env = wrapper(env)
#         return env
#     return _init

# env = gym.make(env_id)
# env = CarRacingGroundTruthObsWrapper(env)
# env = SubprocVecEnv([make_env(env_id) for _ in range(1)])
# env = SubprocVecEnv([lambda: wrapper(gym.make(env_id))])

env = make_vec_env(env_id, n_envs=NUM_CPU, wrapper_class=wrapper)
env.observation_space

In [None]:
from typing import Callable
def linear_schedule(initial_value: float) -> Callable[[float], float]:
    """
    Linear learning rate schedule.

    :param initial_value: Initial learning rate.
    :return: schedule that computes
      current learning rate depending on remaining progress
    """
    def func(progress_remaining: float) -> float:
        """
        Progress will decrease from 1 (beginning) to 0.

        :param progress_remaining:
        :return: current learning rate
        """
        return progress_remaining * initial_value

    return func

In [None]:
model = PPO("MlpPolicy",
            env,
            verbose=1,
            seed=0,
            batch_size=512,
            learning_rate=linear_schedule(1e-03),
            n_epochs=10,
            n_steps=8*NUM_CPU,
           )
model.learn(total_timesteps=100_000, progress_bar=True)

env.close()

In [None]:
model = PPO.load("ppo_CarRacing_expert", print_system_info=True)

In [None]:
env = gym.make(env_id)
env = wrapper(env)
# env = CarRacingGroundTruthObsWrapper(env)

# env = gnwrapper.Monitor(env, directory="CarRacing/video")
# env = gym.wrappers.RecordVideo(env, 'CarRacing/video')

obs = env.reset()
while True:
    action, _states = model.predict(obs)
    obs, rewards, dones, info = env.step(action)
    env.render()
    if dones:
        break
    
env.close()

In [None]:
env = DummyVecEnv([lambda: CarRacingGroundTruthObsWrapper(gym.make(env_id))])
# env = gnwrapper.Animation(env)

# env = DummyVecEnv([lambda: env])
obs = env.reset()
for _ in range(10000):
    action, _states = model.predict(obs)
    # action = np.array([0.1, 0.01, 0])
    obs, rewards, dones, info = env.step(action)
    env.render()