---
Before you start exploring this notebook make sure that GPU support is enabled.
To enable the GPU backend for your notebook, go to **Edit** → **Notebook Settings** and set **Hardware accelerator** to **GPU**. 

---


# Imports

Install OpenAI Gym and dependencies to render the environments

In [None]:
!apt update
!apt install -y xvfb x11-utils python-opengl ffmpeg swig
!pip install gymnasium==0.27.1 gymnasium[box2d] pyvirtualdisplay imageio-ffmpeg moviepy==1.0.3

In [None]:
%matplotlib inline

import os
import time
import shutil

# PyTorch imports
import torch
import torch.optim as optim
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torch.distributions import Categorical
from torch.utils.data.sampler import BatchSampler, SubsetRandomSampler
from torchvision.transforms import Compose, ToTensor, Grayscale, ToPILImage

# Auxiliary Python imports
import math
import glob
import io
import base64
import random
import numpy as np
from tqdm.notebook import tqdm as tqdm
from time import sleep, time, strftime

# Environment import and set logger level to display error only
import gymnasium as gym
from gymnasium import logger as gymlogger
from gymnasium.wrappers import RecordVideo
gymlogger.set_level(gym.logger.ERROR)

# start virtual display
from IPython.display import HTML, clear_output
from IPython import display
from pyvirtualdisplay import Display
pydisplay = Display(visible=0, size=(640, 480))
pydisplay.start()

In [None]:
"""
Utility functions to show video in a notebook cell
"""
def show_video():
  mp4list = glob.glob('video/*.mp4')
  if len(mp4list) > 0:
    mp4 = mp4list[0]
    video = io.open(mp4, 'r+b').read()
    encoded = base64.b64encode(video)
    display.display(HTML(data='''<video alt="test" autoplay 
                loop controls style="height: 400px;">
                <source src="data:video/mp4;base64,{0}" type="video/mp4" />
             </video>'''.format(encoded.decode('ascii'))))
  else: 
    print("Could not find video")

## Setup Google Drive mount to store your results

In [None]:
import os
from google.colab import drive
drive.mount('/content/drive')
os.listdir('/content/drive/My Drive')

# Action space

In [None]:
# Action space (map from continuous actions for steering, throttle and break to 25 action combinations)
action_mapping = [
    (0, 0, 0),  # no action
    (0, 0.5, 0),  # half throttle
    (0, 1, 0),  # full trottle
    (0, 0, 0.5),  # half break
    (0, 0, 1),  # full break
    # steering left with throttle/break control
    (-0.5, 0, 0),  # half left
    (-1, 0, 0),  # full left
    (-0.5, 0.5, 0),  # half left
    (-1, 0.5, 0),  # full left
    (-0.5, 1, 0),  # half left
    (-1, 1, 0),  # full left
    (-0.5, 0, 0.5),  # half left
    (-1, 0, 0.5),  # full left
    (-0.5, 0, 1),  # half left
    (-1, 0, 1),  # full left
    # steering right with throttle/break control
    (0.5, 0, 0),  # half right
    (1, 0, 0),  # full right
    (0.5, 0.5, 0),  # half right
    (1, 0.5, 0),  # full right
    (0.5, 1, 0),  # half right
    (1, 1, 0),  # full right
    (0.5, 0, 0.5),  # half right
    (1, 0, 0.5),  # full right
    (0.5, 0, 1),  # half right
    (1, 0, 1)  # full right
]


# Environment

In [None]:
class Env():
    """
    Environment wrapper for CarRacing 
    """

    def __init__(self,record_video=True):
        self.record_video = record_video
        self.gym_env = gym.make('CarRacing-v2', render_mode="rgb_array")
        self.env = self.wrap_env(self.gym_env)
        self.action_space = self.env.action_space


    def reset(self):
        self.env = self.wrap_env(self.gym_env)
        self.rewards = []
        img_rgb = self.env.reset()
        return img_rgb

    def step(self, action):
        img_rgb, reward, terminated, truncated, _ = self.env.step(action)            
        # accumulate reward
        self.rewards.append(reward)            
        # if no reward recently, end the episode
        done = terminated or truncated
        die = True if np.mean(self.rewards[-np.minimum(100, len(self.rewards)):]) <= -1 else False
        if done or die:
            self.close()

        return img_rgb, np.sum(self.rewards[-1]), done, die

    def render(self, *arg):
        return self.env.render(*arg)

    def close(self):
        self.env.close()
        
    def wrap_env(self, env):
        if self.record_video:
            env = RecordVideo(env, './video', name_prefix="carracing-v2", 
                              episode_trigger=lambda ep_id: True, 
                              disable_logger=True)
        return env



### Run episode with random agent

In [None]:
def run_episode(show_progress=True, record_video=True):
    env = Env(record_video=record_video)
    state = env.reset()
    score = 0
    done_or_die = False
    ep=0
    if show_progress:
        progress = tqdm(desc="Score: 0")
    while not done_or_die:
        action_idx = np.random.choice(len(action_mapping))
        action = action_mapping[action_idx]
        a_logp = 1/len(action_mapping) 

        state, reward, done, die = env.step(action)
        score += reward
        if ep > 500: # stop early
           die = True
        if show_progress:
            progress.update()
            progress.set_description("Score: {:.2f}".format(score))
        if done or die:
            done_or_die = True
        ep += 1
    env.close()
    if show_progress:
        progress.close()    
    if record_video:
        show_video()
    return score

Let's see how the agent is doing in the real environment

In [None]:
run_episode(show_progress=True, record_video=True);