### Training RL Policies using L5Kit Closed-Loop Environment

This notebook describes how to train RL policies for self-driving using our gym-compatible closed-loop environment.

We will be using [Proximal Policy Optimization (PPO)](https://arxiv.org/abs/1707.06347) algorithm as our reinforcement learning algorithm, as it not only demonstrates remarkable performance but it is also empirically easy to tune.

The PPO implementation in this notebook is based on [Stable Baselines3](https://github.com/DLR-RM/stable-baselines3) framework, a popular framework for training RL policies. Note that our environment is also compatible with [RLlib](https://docs.ray.io/en/latest/rllib.html), another popular frameworks for the same.

In [1]:
#@title Download L5 Sample Dataset and install L5Kit
import os
RunningInCOLAB = 'google.colab' in str(get_ipython())
if RunningInCOLAB:
    !wget https://raw.githubusercontent.com/lyft/l5kit/master/examples/setup_notebook_colab.sh -q
    !sh ./setup_notebook_colab.sh
    os.environ["L5KIT_DATA_FOLDER"] = open("./dataset_dir.txt", "r").read().strip()
else:
    os.environ["L5KIT_DATA_FOLDER"] = "/home/pronton/rl/l5kit_dataset/"
    print("Not running in Google Colab.")

Not running in Google Colab.


In [2]:
import gym

from stable_baselines3 import PPO, SAC
from stable_baselines3.common.callbacks import CheckpointCallback
from stable_baselines3.common.env_util import make_vec_env
from stable_baselines3.common.utils import get_linear_fn
from stable_baselines3.common.vec_env import SubprocVecEnv

from l5kit.configs import load_config_data
from l5kit.environment.feature_extractor import CustomFeatureExtractor
from l5kit.environment.callbacks import L5KitEvalCallback
from l5kit.environment.envs.l5_env import SimulationConfigGym, L5Env, GymStepOutput

from l5kit.visualization.visualizer.zarr_utils import episode_out_to_visualizer_scene_gym_cle
from l5kit.visualization.visualizer.visualizer import visualize
from bokeh.io import output_notebook, show

Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  _MAX_FLOAT = np.maximum_sctype(np.float)


In [3]:
# Dataset is assumed to be on the folder specified
# in the L5KIT_DATA_FOLDER environment variable

# get environment config
env_config_path = '../gym_config.yaml'
cfg = load_config_data(env_config_path)

In [4]:
import gym
import ray
from ray.rllib.agents.ppo import PPOTrainer
from ray.tune.logger import pretty_print
import ray.rllib.algorithms.ppo as ppo
from ray.rllib.models import ModelCatalog
from ray.rllib.models.torch.torch_modelv2 import TorchModelV2
import torch.nn as nn
import numpy as np

In [29]:


'''
model = nn.Sequential(
        nn.Conv2d(num_input_channels, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False),
        nn.GroupNorm(4, 64),
        nn.ReLU(),
        nn.MaxPool2d(kernel_size=2, stride=2),
        nn.Conv2d(64, 32, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False),
        nn.GroupNorm(2, 32),
        nn.ReLU(),
        nn.MaxPool2d(kernel_size=2, stride=2),
        nn.Flatten(),
        nn.Linear(in_features=1568, out_features=features_dim),
    )

'''
from customModel import GNCNN
ModelCatalog.register_custom_model(
        "GN_CNN_torch_model", GNCNN
    )


### Define Training and Evaluation Environments

**Training**: We will be training the PPO policy on episodes of length 32 time-steps. We will have 4 sub-processes (training environments) that will help to parallelize and speeden up episode rollouts. The *SimConfig* dataclass will define the parameters of the episode rollout: like length of episode rollout, whether to use log-replayed agents or simulated agents etc.

**Evaluation**: We will evaluate the performance of the PPO policy on the *entire* scene (~248 time-steps).

In [None]:
import torch as th
from torch import nn
from l5kit.environment import models
from ray.rllib.utils.typing import ModelConfigDict, TensorType
from ray.rllib.models.torch.fcnet import FullyConnectedNetwork as TorchFCNet
from ray.rllib.models.torch.complex_input_net import ComplexInputNetwork


class CustomFeatureExtractor(TorchModelV2, nn.Module):
    """Custom feature extractor from raster images for the RL Policy.

    :param observation_space: the input observation space
    :param features_dim: the number of features to extract from the input
    :param model_arch: the model architecture used to extract the features
    """

    # def __init__(self, obs_space, action_space, num_outputs, model_config, name):
    def __init__(self, obs_space: gym.spaces.Space,
               action_space: gym.spaces.Space, num_outputs: int,
               model_config: ModelConfigDict, name: str):
        TorchModelV2.__init__(self, obs_space, action_space, num_outputs,
                          model_config, name)
        nn.Module.__init__(self)
        self.model = ComplexInputNetwork(
            obs_space, action_space, num_outputs, model_config, name
        )
        print(model_config)
        # features_dim = model_config['features_dim']
        features_dim = 8
        model_arch = "simple_gn"
    # def __init__(self, observation_space: gym.spaces.Dict, features_dim: int = 256,
    #              model_arch: str = "simple_gn"):
        # super(CustomFeatureExtractor, self).__init__(observation_space, features_dim)

        # We assume CxHxW images (channels first)
        # Re-ordering will be done by pre-preprocessing or wrapper
        
        num_input_channels = obs_space["image"].shape[0]

        if model_arch == 'simple_gn':
            # A simplified feature extractor with GroupNorm.
            model = models.SimpleCNN_GN(num_input_channels, features_dim)
        else:
            raise NotImplementedError

        extractors = {"image": model}
        self.extractors = nn.ModuleDict(extractors)
        self._features_dim = features_dim

    def forward(self, input_dict: gym.spaces.Dict, states, seq_lens) -> th.Tensor:
        encoded_tensor_list = []

        # self.extractors contain nn.Modules that do all the processing.
        for key, extractor in self.extractors.items():
            encoded_tensor_list.append(extractor(input_dict['obs'][key]))
        # Return a (B, self._features_dim) PyTorch tensor, where B is batch dimension.
        # raise ValueError(th.cat(encoded_tensor_list, dim=1))
        return th.cat(encoded_tensor_list, dim=1), []
ModelCatalog.register_custom_model(
        "my_torch_model", CustomFeatureExtractor
    )

In [6]:
from typing import Dict
import numpy as np

class L5EnvWrapper(gym.Wrapper):
    def __init__(self, env):
        super().__init__(env)
        self.env = env
        self.n_channels = 7
        self.raster_size = 112
        obs_shape = (self.raster_size, self.raster_size, self.n_channels)
        self.observation_space =gym.spaces.Box(low=0, high=1, shape=obs_shape, dtype=np.float32)

    def step(self, action:  np.ndarray) -> GymStepOutput:
        # return GymStepOutput(obs, reward["total"], done, info)
        output =  self.env.step(action)
        onlyImageState = output.obs['image'].reshape(self.raster_size, self.raster_size, self.n_channels)
        return GymStepOutput(onlyImageState, output.reward, output.done, output.info)

    def reset(self) -> Dict[str, np.ndarray]:
        return self.env.reset()['image'].reshape(self.raster_size, self.raster_size, self.n_channels)

In [10]:
callback_list = []

env_kwargs = {'env_config_path': env_config_path, 'use_kinematic': True, 'sim_cfg': train_sim_cfg}
env = L5EnvWrapper(L5Env(**env_kwargs))
# Save Model Periodically
save_freq = 100000
save_path = './logs/'
output = 'PPO'
train_envs = 4
eval_envs = 1
checkpoint_callback = CheckpointCallback(save_freq=(save_freq // train_envs), save_path=save_path, \
                                         name_prefix=output)
callback_list.append(checkpoint_callback)

# Eval Model Periodically
eval_freq = 100000
n_eval_episodes = 1
val_eval_callback = L5KitEvalCallback(env, eval_freq=(eval_freq // train_envs), \
                                      n_eval_episodes=n_eval_episodes, n_eval_envs=eval_envs)
callback_list.append(val_eval_callback)

Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  lanes_bounds = np.empty((0, 2, 2), dtype=np.float)  # [(X_MIN, Y_MIN), (X_MAX, Y_MAX)]


In [7]:
from wrapper import L5EnvWrapper
train_sim_cfg = SimulationConfigGym()
from ray import tune

# Register , how your env should be constructed (always with 5, or you can take values from the `config` EnvContext object):
env_kwargs = {'env_config_path': env_config_path, 'use_kinematic': True, 'sim_cfg': train_sim_cfg}

tune.register_env("L5-CLE-V0", lambda config: L5Env(**env_kwargs))
tune.register_env("L5-CLE-V1", lambda config: L5EnvWrapper(L5Env(**env_kwargs)))

In [8]:
from customModel import GNCNN
ModelCatalog.register_custom_model(
        "GN_CNN_torch_model", GNCNN
    )

In [None]:
# Create the Trainer.
algo = ppo.PPO(
        env="L5-CLE-V1",
        config={
            "framework": "torch",
            "model": {
                "custom_model": "GN_CNN_torch_model",
                # Extra kwargs to be passed to your model's c'tor.
                "custom_model_config": {'feature_dim':128},
            },
            '_disable_preprocessor_api': True,
        },
    )

In [None]:
# ray.init()
train_envs = 4
config = ppo.DEFAULT_CONFIG.copy()
config["num_gpus"] = 0
config["framework"] = 'tf2'
config["num_workers"] = 8
config["num_envs_per_worker"] = train_envs
config['_disable_preprocessor_api'] = True,
config["model"]["dim"] = 112
config["model"]["conv_filters"] = [[64, 7, 3], [32, 11, 3], [256, 11, 3]]
# config["log_level"] = 1
# config["evaluation_interval"] = 1 # change to 10000
# config["evaluation_duration"] = "auto"
# config["evaluation_parallel_to_training"] = True,
# config["evaluation_duration_unit"] = "timesteps"
# config["evaluation_num_workers"] = 3
# config["enable_async_evaluation"] = True,

# config["model"]["conv_activation"] = 'relu'
# config["model"]["post_fcnet_hiddens"] =  [256]
# config["model"]["post_fcnet_activation"] = "relu",
# config["train_batch_size"] = 200
algo = ppo.PPO(config=config, env="L5-CLE-V1")

In [26]:
for i in range(400):
    algo.train()

In [None]:
ray.init(num_cpus=3, ignore_reinit_error=True, log_to_driver=False)

config = ppo.DEFAULT_CONFIG.copy()
config['num_workers'] = 1
config['num_sgd_iter'] = 30
config['sgd_minibatch_size'] = 128
config['num_cpus_per_worker'] = 0  # This avoids running out of resources in the notebook environment when this cell is re-executed
config['framework'] = 'torch' 
# config['custom_model'] = CustomFeatureExtractor
# config['custom_model_config'] = {"features_dim": 7, "model_arch": 'simple_gn'},

# config['env_config'] = env_kwargs 
algo = ppo.PPO(config=config, env= 'L5-CLE-V1')

In [None]:
# Create the Trainer.
algo = ppo.PPO(
        env="L5-CLE-V0",
        config={
            
            "framework": "torch",
            "model": {
                # Auto-wrap the custom(!) model with an LSTM.
                # "use_lstm": True,
                # To further customize the LSTM auto-wrapper.
                # "lstm_cell_size": 64,
                # Specify our custom model from above.
                "custom_model": "my_torch_model",
                # Extra kwargs to be passed to your model's c'tor.
                "custom_model_config": {'feature_dim': 128},
            },
            '_disable_preprocessor_api': True,
        },
    )

In [None]:
# Train on episodes of length 32 time steps
train_eps_length = 32
train_envs = 4

# Evaluate on entire scene (~248 time steps)
eval_eps_length = None
eval_envs = 1

# make train env
train_sim_cfg = SimulationConfigGym()
train_sim_cfg.num_simulation_steps = train_eps_length + 1
env_kwargs = {'env_config_path': env_config_path, 'use_kinematic': True, 'sim_cfg': train_sim_cfg}
env = make_vec_env("L5-CLE-v0", env_kwargs=env_kwargs, n_envs=train_envs,
                   vec_env_cls=SubprocVecEnv, vec_env_kwargs={"start_method": "fork"})

# make eval env
validation_sim_cfg = SimulationConfigGym()
validation_sim_cfg.num_simulation_steps = None
eval_env_kwargs = {'env_config_path': env_config_path, 'use_kinematic': True, \
                   'return_info': True, 'train': False, 'sim_cfg': validation_sim_cfg}
eval_env = make_vec_env("L5-CLE-v0", env_kwargs=eval_env_kwargs, n_envs=eval_envs,
                        vec_env_cls=SubprocVecEnv, vec_env_kwargs={"start_method": "fork"})

### Define backbone feature extractor

The backbone feature extractor is shared between the policy and the value networks. The feature extractor *simple_gn* is composed of two convolutional networks followed by a fully connected layer, with ReLU activation. The feature extractor output is passed to both the policy and value networks composed of two fully connected layers with tanh activation (SB3 default).

We perform **group normalization** after every convolutional layer. Empirically, we found that group normalization performs far superior to batch normalization. This can be attributed to the fact that activation statistics change quickly in on-policy algorithms (PPO is on-policy) while batch-norm learnable parameters can be slow to update causing training issues.

In [79]:
# A simple 2 Layer CNN architecture with group normalization
model_arch = 'simple_gn'
features_dim = 128

# Custom Feature Extractor backbone
policy_kwargs = {
    "features_extractor_class": CustomFeatureExtractor,
    "features_extractor_kwargs": {"features_dim": features_dim, "model_arch": model_arch},
    "normalize_images": False
}

### Clipping Schedule

We linearly decrease the value of the clipping parameter $\epsilon$ as the PPO training progress as it shows improved training stability

In [80]:
# Clipping schedule of PPO epsilon parameter
start_val = 0.1
end_val = 0.01
training_progress_ratio = 1.0
clip_schedule = get_linear_fn(start_val, end_val, training_progress_ratio)

### Hyperparameters for PPO. 

For detailed description, refer https://stable-baselines3.readthedocs.io/en/master/_modules/stable_baselines3/ppo/ppo.html#PPO

In [81]:
lr = 3e-4
num_rollout_steps = 256
gamma = 0.8
gae_lambda = 0.9
n_epochs = 10
seed = 42
batch_size = 64
tensorboard_log = 'tb_log'

### Define the PPO Policy.

SB3 provides an easy interface to the define the PPO policy. Note: We do need to tweak appropriate hyperparameters and the custom policy backbone has been defined above.


In [None]:
# define model
model = PPO("MultiInputPolicy", env, policy_kwargs=policy_kwargs, verbose=1, n_steps=num_rollout_steps,
            learning_rate=lr, gamma=gamma, tensorboard_log=tensorboard_log, n_epochs=n_epochs,
            clip_range=clip_schedule, batch_size=batch_size, seed=seed, gae_lambda=gae_lambda)
# model = SAC("CnnPolicy", env, batch_size=batch_size, learning_rate= lr,
#                     gamma=gamma,
#                     buffer_size=10000,
#                     tensorboard_log=tensorboard_log, 
#                     train_freq= (1, "step"),
#                     gradient_steps=1, # gradient 1 time per 1 episode
#                     target_update_interval=200, # each ...(200) steps, we update target_net (load weight from policy_net)
#                     learning_starts=batch_size,
#                     policy_kwargs=policy_kwargs,
#                     verbose=1,
#                     seed=42,
#                     use_sde=True,
#                     sde_sample_freq=-1,
#                     use_sde_at_warmup=True,
#                     tau=.95, 
#                     create_eval_env=True,
#                     )

### Defining Callbacks

We can additionally define callbacks to save model checkpoints and evaluate models during training.

In [None]:
callback_list = []

# Save Model Periodically
save_freq = 100000
save_path = './logs/'
output = 'PPO'
output = 'SAC'
checkpoint_callback = CheckpointCallback(save_freq=(save_freq // train_envs), save_path=save_path, \
                                         name_prefix=output)
callback_list.append(checkpoint_callback)

# Eval Model Periodically
eval_freq = 100000
n_eval_episodes = 1
val_eval_callback = L5KitEvalCallback(eval_env, eval_freq=(eval_freq // train_envs), \
                                      n_eval_episodes=n_eval_episodes, n_eval_envs=eval_envs)
callback_list.append(val_eval_callback)


### Train

In [None]:
n_steps = 300000
model.learn(n_steps, callback=callback_list)


**Voila!** We have a trained PPO policy! Train for larger number of steps for better accuracy. Typical RL algorithms require training atleast 1M steps for good convergence. You can visualize the quantitiative evaluation using tensorboard.

In [None]:
# Visualize Tensorboard logs (!! run on local terminal !!)
!tensorboard --logdir tb_log

In [None]:
model = PPO.load("../logs/PPO_20000_steps.zip")

### Visualize the episode from the environment

We can easily visualize the outputs obtained by rolling out episodes in the L5Kit using the Bokeh visualizer.

In [None]:
rollout_sim_cfg = SimulationConfigGym()
rollout_sim_cfg.num_simulation_steps = None
rollout_env = gym.make("L5-CLE-v0", env_config_path=env_config_path, sim_cfg=rollout_sim_cfg, \
                       use_kinematic=True, train=False, return_info=True)

def rollout_episode(model, env, idx = 0):
    """Rollout a particular scene index and return the simulation output.

    :param model: the RL policy
    :param env: the gym environment
    :param idx: the scene index to be rolled out
    :return: the episode output of the rolled out scene
    """

    # Set the reset_scene_id to 'idx'
    env.reset_scene_id = idx
    
    # Rollout step-by-step
    obs = env.reset()
    done = False
    while True:
        action, _ = model.predict(obs, deterministic=True)
        obs, _, done, info = env.step(action)
        if done:
            break

    # The episode outputs are present in the key "sim_outs"
    sim_out = info["sim_outs"][0]
    return sim_out

# Rollout one episode
sim_out = rollout_episode(model, rollout_env)

In [None]:
# might change with different rasterizer
map_API = rollout_env.dataset.rasterizer.sem_rast.mapAPI

def visualize_outputs(sim_outs, map_API):
    for sim_out in sim_outs: # for each scene
        vis_in = episode_out_to_visualizer_scene_gym_cle(sim_out, map_API)
        show(visualize(sim_out.scene_id, vis_in))

output_notebook()
visualize_outputs([sim_out], map_API)