Install Box2D: `pip install "gymnasium[box2d]"`
To play with the car racing game: `python env/lib/python3.9/site-packages/gymnasium/envs/box2d/car_racing.py`

In continuous space, there are 3 actions:
* 0: steering, -1 is full left, +1 is full right
* 1: gas
* 2: breaking

In discrete space there are 5 actions:
* 0: do nothing
* 1: steer left
* 2: steer right
* 3: gas
* 4: brake

Observation space: a top-down 96x96 RGB image of the car and race track.

The reward is -0.1 every frame and +1000/N for every track tile visited, where N is the total number of tiles visited in the track. For example, if you have finished in 732 frames, your reward is 1000 - 0.1*732 = 926.8 points.

The episode finishes when all the tiles are visited. The car can also go outside the playfield - that is, far off the track, in which case it will receive -100 reward and die.

In [187]:
import random

import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms as T

import numpy as np

transforms = T.Compose([
    T.ToTensor(),  # scale to [0, 1] adds batch dim
    # crop removes the black footer and crops the image to 84x84
    T.Lambda(lambda img: T.functional.crop(img, top=0, left=6, height=84, width=84)),
    T.Grayscale(),
    # T.Resize(size=(16,16), antialias=False),
])

class DQN(nn.Module):
    def __init__(self):
        super().__init__()
        frame_stack_len = 3  # number of contiguous frames ingested

        # (steering, gas, break)
        # limiting the action space seems to greatly improve training.
        # policy doesn't even learn to turn otherwise.
        self.action_space = np.array([
            [-1, 1, 0], [0, 1, 0], [1, 1, 0],
            [-1, 0.5, 0], [0, 0.5, 0], [1, 0.5, 0],
            [-1, 0, 0.2], [0, 0, 0.2], [1, 0, 0.2],
            [-1, 0, 0], [0, 0, 0], [1, 0, 0]
        ])

        self.n_actions = len(self.action_space)

        # self.steering_bins = np.linspace(-1, 1, 21)
        # self.gaz_bins = np.linspace(0, 1, 11)
        # self.brake_bins = np.linspace(0, 1, 11)
        # n_actions = len(self.steering_bins) + len(self.gaz_bins) + len(self.brake_bins)

        self.conv1 = nn.Conv2d(frame_stack_len, 6, (7, 7), stride=3)
        self.conv2 = nn.Conv2d(6, 12, (4, 4), stride=1)
        self.lin1 = nn.Linear(300, 128)
        self.lin2 = nn.Linear(128, self.n_actions)
    
    def forward(self, x):
        batch_size = x.shape[0]
        out = F.relu(self.conv1(x))
        out = F.max_pool2d(out, kernel_size=(2, 2))
        out = F.relu(self.conv2(out))
        out = F.max_pool2d(out, kernel_size=(2, 2))
        out = out.reshape(batch_size, -1)
        out = F.relu(self.lin1(out))
        out = self.lin2(out)
        return out  # batch_size, n_actions

    def act(self, epsilon, state):
        if random.uniform(0, 1) < epsilon:
            return random.randint(0, self.n_actions-1)

        with torch.no_grad(): 
            q = self.forward(state)
            a = int(q.argmax(-1)[0])  # select first element to eliminate batch dim
            return a
    
    # def get_action(self, idx):
    #     if idx < len(self.steering_bins):
    #         return np.array([self.steering_bins[idx], 0, 0])
    #     elif idx < len(self.steering_bins) + len(self.gaz_bins):
    #         return np.array([0, self.gaz_bins[idx - len(self.steering_bins)], 0])
    #     else:
    #         return np.array([0, self.brake_bins[idx - len(self.steering_bins) - len(self.gaz_bins)], 0])
        
    def compute_loss(self, s, y, a):
        out = self.forward(s)
        q = out[np.arange(out.shape[0]), a]
        return F.mse_loss(q, y, reduction="mean")

In [214]:
import random
import pickle
from collections import deque

from tqdm.notebook import tqdm
from torch.utils.tensorboard import SummaryWriter

import gymnasium as gym
# domain_randomize: background and track colours are different on every reset.
env = gym.make("CarRacing-v2", domain_randomize=False, continuous=True)

# state size = 16*16 img *4 frames = 1024 bytes
# 100 MB buffer size = 100 000 states
# buffer has to be large enough to break correlations between multiple states
buffer_size = 5000
buffer = deque(maxlen=buffer_size)  # will automatically pop items when we go over the buffer_size#
buffer_pbar = tqdm(total=buffer_size)  # progress bar to keep track of buffer filling up.

frame_stack_len = 3
skip_frames = 2
epsilon = 1
epsilon_min = 0.1
epsilon_decay = np.exp(np.log(epsilon_min/epsilon)/60000)  # such that epsilon * epsilon_decay ** 60k steps = epsilon_min

batch_size = 64
gamma = 0.95

model = DQN()
target_model = DQN()

def sync_models(model, target_model, path="./checkpoints/model_sync.pt"):
    torch.save(model.state_dict(), path)
    target_model.load_state_dict(torch.load(path))

sync_models(model, target_model)

optimizer = torch.optim.Adam(params=model.parameters(), lr=1e-3)
writer = SummaryWriter()

# n_episodes = 100
n_updates = 200000
pbar = tqdm(total=n_updates)
updates_counter = 0
episode_counter = 0
best_total_reward = 0
sync_models_frequency = 5  # episodes
while updates_counter < n_updates:
    episode_counter += 1
    # normal reset changes the colour scheme by default
    obs, info = env.reset()
    s = transforms(obs)
    frame_stack = deque(maxlen=frame_stack_len)
    frame_stack.extend([s for _ in range(frame_stack_len)])  # init with the same state.
    done = False
    t = 1
    total_loss = 0
    total_reward = 0
    off_track_counter = 0
    gas_counter = 0  # how many steps was gas > 0
    negative_reward_counter = 0
    while not done:
        state = torch.concatenate(list(frame_stack), dim=0)
        state = state.unsqueeze(0)  # add batch dim
        a = model.act(epsilon, state)
        a_arr = model.action_space[a]
        reward = 0
        off_track = False
        off_map = False
        # repeat same action over skip_frames
        for _ in range(skip_frames):
            t += 1
            new_obs, r, terminated, truncated, info = env.step(a_arr)
            off_track = off_track or np.isclose(reward, -0.1)  # hacky way to check if race car is off track...
            off_track_counter += 1 if off_track else 0
            off_map = np.isclose(reward, -100)
            if off_track or off_map:
                r = -1
            gas = a_arr[1] > 0
            gas_counter += 1 if gas else 0
            if gas: # gas
                r *= 1.5
            reward += r
            done = terminated or truncated
            if done:
                break
            
        new_s = transforms(new_obs)
        frame_stack.append(new_s)

        new_state = torch.concatenate(list(frame_stack), dim=0)
        new_state = new_state.unsqueeze(0)

        # if reward < 0:
        #     negative_reward_counter += 1 

        total_reward += reward
        
        buffer.append((state, a, reward, new_state, done))
        if len(buffer) < buffer_size:
            # only start training once buffer is full
            buffer_pbar.update(1)
            continue

        # construct batch.
        batch = {"r": [], "done": [], "s": [], "new_s": [], "a": []}
        for (sj, aj, rj, new_sj, donej) in random.sample(buffer, batch_size):
            batch["done"].append(float(donej))
            batch["r"].append(rj)
            batch["s"].append(sj)
            batch["new_s"].append(new_sj)
            batch["a"].append(aj)

        batch["done"] = torch.tensor(batch["done"])
        batch["r"] = torch.tensor(batch["r"])
        batch["s"] = torch.concatenate(batch["s"], dim=0)
        batch["new_s"] = torch.concatenate(batch["new_s"], dim=0)
        batch["a"] = torch.tensor(batch["a"], dtype=torch.int)

        with torch.no_grad():
            q = target_model.forward(batch["new_s"]).max(dim=-1).values
            batch["y"] = batch["r"] + (1 - batch["done"]) * gamma * q

        loss = model.compute_loss(batch["s"], batch["y"], batch["a"])
        total_loss += loss.item()
        total_reward += reward
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
        updates_counter += 1
        pbar.update(1)

        writer.add_scalar("Epsilon", epsilon, updates_counter)
        writer.add_scalar('Loss', loss.item(), updates_counter)
        epsilon = max(epsilon * epsilon_decay, epsilon_min)

        # early termination of negative episodes. 
        # This really helped training for some reason. 
        # Probably because the agent can play more episodes this way.
        if done or off_track_counter >= 10 or total_reward < 0:
            break

    if updates_counter == 0:
        continue

    writer.add_scalar('Total Reward per episode', total_reward, episode_counter)
    writer.add_scalar('Episode length', t, episode_counter)
    writer.add_scalar("Off track freq", off_track_counter/t, episode_counter)
    writer.add_scalar("Gas freq", gas_counter/t, episode_counter)

    if total_reward > best_total_reward:
        best_total_reward = total_reward
        torch.save(model.state_dict(), "./checkpoints/best_model.pt")

    if episode_counter % sync_models_frequency == 0:
        sync_models(model, target_model)
    
    if episode_counter % 100 == 0:
        torch.save(model.state_dict(), f"./checkpoints/{episode_counter}.pt")

writer.close()
env.close()



  0%|          | 0/5000 [00:00<?, ?it/s]

  0%|          | 0/200000 [00:00<?, ?it/s]

In [209]:
import imageio

model.load_state_dict(torch.load("./checkpoints/600.pt"))

def sample_episode():
    env = gym.make("CarRacing-v2", domain_randomize=False, continuous=True, render_mode="rgb_array")
    frames = []
    done = False
    obs, info = env.reset()
    max_frames = 600
    counter = 1
    frame_stack_len = 3
    s = transforms(obs)
    frame_stack = deque(maxlen=frame_stack_len)
    frame_stack.extend([s for _ in range(frame_stack_len)])
    pbar = tqdm(total=max_frames)
    while not done and counter <= max_frames:
        frame = env.render()
        frames.append(frame)
        state = torch.concatenate(list(frame_stack), dim=0)
        state = state.unsqueeze(0)
        a = model.act(epsilon=0, state=state)
        action = model.action_space[a]
        new_obs, reward, terminated, truncated, info = env.step(action)
        frame_stack.append(transforms(new_obs))
        done = terminated or truncated
        counter += 1
        pbar.update(1)
    env.close()

    print("💾 Saving to GIF...")
    fps = 60
    imageio.mimsave('carracing.gif', frames, duration=len(frames)/fps)
    print("🚀 Done!")
    return
    

sample_episode()                

  0%|          | 0/600 [00:00<?, ?it/s]

💾 Saving to GIF...
🚀 Done!
