# DQN on “FreewayDeterministic-v4"

### Imports

In [1]:
import sys
from pathlib import Path

# Add the src folder to sys.path
sys.path.append(str(Path().resolve().parent / "src"))

In [2]:
import gym
import torch
import numpy as np
import matplotlib.pyplot as plt
from collections import deque, namedtuple
from gym.wrappers.monitoring.video_recorder import VideoRecorder
from IPython import display as ipythondisplay
from pyvirtualdisplay import Display

In [3]:
env = gym.make("FreewayDeterministic-v4")
env.seed(0)
print("State shape: ", env.observation_space.shape)
print("Number of actions: ", env.action_space.n)

A.L.E: Arcade Learning Environment (version 0.8.1+53f58b7)
[Powered by Stella]


State shape:  (210, 160, 3)
Number of actions:  3


In [4]:
BUFFER_SIZE = int(1e5)  # replay buffer size
BATCH_SIZE = 128
GAMMA = 0.99  # discount factor
TAU = 1e-3  # for soft update of target parameters
LR = 5e-4  # learning rate
UPDATE_EVERY = 4  # how often to update the network

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [5]:
from agent import Agent

agent = Agent(state_size=None, action_size=3, seed=0, device=device, lr=LR, buffer_size=BUFFER_SIZE, batch_size=BATCH_SIZE, gamma=GAMMA, tau=TAU, update_every=UPDATE_EVERY, input_type="image")

In [6]:
from torchvision import transforms as T


transform = T.Compose(
    [
        T.ToPILImage(),  # Convert numpy array to PIL Image
        T.Grayscale(num_output_channels=1),  # Convert to grayscale
        T.Resize((80, 80)),  # Resize to 80x80
        T.ToTensor(),  # Convert PIL Image to PyTorch tensor
        T.Normalize(0.0, 1.0),  # Normalize to [0, 1] range
    ]
)


def preprocess_frame(frame):
    """Preprocess a single frame using torchvision.transforms."""
    frame_tensor = transform(frame)  # Apply the transformation pipeline
    return frame_tensor


def stack_frames(stacked_frames, frame, is_new_episode, num_stack=4):
    """Stack frames to create temporal context."""
    frame_tensor = preprocess_frame(frame)

    if is_new_episode:
        # Initialize the stack with the same frame for a new episode
        stacked_frames = torch.cat([frame_tensor] * num_stack, dim=0)
    else:
        # Remove the oldest frame and append the new one
        stacked_frames = torch.cat((stacked_frames[1:], frame_tensor.unsqueeze(0)), dim=0)

    return stacked_frames

In [7]:
def dqn(n_episodes=2000, max_t=1000, eps_start=1.0, eps_end=0.01, eps_decay=0.995):
    """
    Deep Q-Learning.

    Args:
        n_episodes (int): maximum number of training episodes
        max_t (int): maximum number of timesteps per episode
        eps_start (float): starting value of epsilon, for epsilon-greedy action selection
        eps_end (float): minimum value of epsilon
        eps_decay (float): multiplicative factor (per episode) for decreasing epsilon
    """
    print(f"Starting training on {device}")
    scores = []  # list containing scores from each episode
    scores_window = deque(maxlen=100)  # last 100 scores
    eps = eps_start  # initialize epsilon
    state_frames = []
    next_state_frames = []
    
    for i_episode in range(1, n_episodes + 1):
        state = env.reset()
        state = state[0] if isinstance(state, tuple) else state
        state_frames = stack_frames(None, state, True)
        score = 0
        for t in range(max_t):
            action = agent.act(state, eps)
            next_state, reward, done, _ = env.step(action)     
            if isinstance(next_state, tuple):
                next_state = next_state[0]
            next_state_frames = stack_frames(state_frames, next_state, False)  
            agent.step(
                state_frames.numpy(), action, reward, next_state_frames.numpy(), done
            )
            state_frames = next_state_frames
            score += reward
            if done:
                break
        scores_window.append(score)  # save most recent score
        scores.append(score)  # save most recent score
        eps = max(eps_end, eps_decay * eps)  # decrease epsilon
        print(
            "\rEpisode {}\tAverage Score: {:.2f}".format(
                i_episode, np.mean(scores_window)
            ),
            end="",
        )
        if i_episode % 100 == 0:
            print(
                "\rEpisode {}\tAverage Score: {:.2f}".format(
                    i_episode, np.mean(scores_window)
                )
            )
        if np.mean(scores_window) >= 200.0:
            print(
                "\nEnvironment solved in {:d} episodes!\tAverage Score: {:.2f}".format(
                    i_episode - 100, np.mean(scores_window)
                )
            )
            torch.save(agent.qnetwork_local.state_dict(), "dqn-checkpoint.pth")
            break
    return scores


scores = dqn()

# plot the scores
fig = plt.figure()
ax = fig.add_subplot(111)
plt.plot(np.arange(len(scores)), scores)
plt.ylabel("Score")
plt.xlabel("Episode #")
plt.show()


Starting training on cuda:0


RuntimeError: Given groups=1, weight of size [32, 4, 8, 8], expected input[1, 210, 160, 3] to have 4 channels, but got 210 channels instead

### Inference time!

In [None]:
# load the weights from file
agent.qnetwork_local.load_state_dict(torch.load("dqn-checkpoint.pth"))

In [None]:
# used code from https://www.anyscale.com/blog/an-introduction-to-reinforcement-learning-with-openai-gym-rllib-and-google
# for the video saving and display

before_training = "trained.mp4"

video = VideoRecorder(env, before_training)
# returns an initial observation
state = env.reset()
score = 0
for i in range(1000):
    env.render()
    video.capture_frame()
    # env.action_space.sample() produces either 0 (left) or 1 (right).
    action = agent.act(state)
    state, reward, done, _ = env.step(action)
    score += reward
    if done:
        break

video.close()
env.close()

print("Total score was", score)


In [None]:
from base64 import b64encode

def render_mp4(videopath: str) -> str:
    """
    Gets a string containing a b4-encoded version of the MP4 video
    at the specified path.
    """
    mp4 = open(videopath, "rb").read()
    base64_encoded_mp4 = b64encode(mp4).decode()
    return (
        f'<video width=400 controls><source src="data:video/mp4;'
        f'base64,{base64_encoded_mp4}" type="video/mp4"></video>'
    )


In [None]:
from IPython.display import HTML

html = render_mp4(before_training)
HTML(html)
