In [1]:
import numpy as np
import gymnasium as gym
import torch
import torch.nn as nn
import torch.nn.functional as F

In [2]:
env = gym.make("CarRacing-v2", continuous=False)
env.reset()

(array([[[0, 0, 0],
         [0, 0, 0],
         [0, 0, 0],
         ...,
         [0, 0, 0],
         [0, 0, 0],
         [0, 0, 0]],
 
        [[0, 0, 0],
         [0, 0, 0],
         [0, 0, 0],
         ...,
         [0, 0, 0],
         [0, 0, 0],
         [0, 0, 0]],
 
        [[0, 0, 0],
         [0, 0, 0],
         [0, 0, 0],
         ...,
         [0, 0, 0],
         [0, 0, 0],
         [0, 0, 0]],
 
        ...,
 
        [[0, 0, 0],
         [0, 0, 0],
         [0, 0, 0],
         ...,
         [0, 0, 0],
         [0, 0, 0],
         [0, 0, 0]],
 
        [[0, 0, 0],
         [0, 0, 0],
         [0, 0, 0],
         ...,
         [0, 0, 0],
         [0, 0, 0],
         [0, 0, 0]],
 
        [[0, 0, 0],
         [0, 0, 0],
         [0, 0, 0],
         ...,
         [0, 0, 0],
         [0, 0, 0],
         [0, 0, 0]]], dtype=uint8),
 {})

In [3]:
%%time

total_reward = 0
number_of_episodes = 30

for x in range(number_of_episodes):
    episode_reward = 0
    while(True):
        action = env.action_space.sample()
        observation, reward, terminated, truncated, info = env.step(action)
        episode_reward += reward
        if terminated or truncated:
            env.reset()
            break
    total_reward += episode_reward

env.close()
print(f"Average reward with random actions after {number_of_episodes} episodes: {total_reward / number_of_episodes}")

Average reward with random actions after 30 episodes: -55.54395207731279
CPU times: total: 3.36 s
Wall time: 3min 31s


In [3]:
import cv2

def preprocess(img):
    img = img[:84, 6:90] # CarRacing-v2-specific cropping
    # img = cv2.resize(img, dsize=(84, 84)) # or you can simply use rescaling
    
    img = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY) / 255.0
    return img

In [4]:
class ImageEnv(gym.Wrapper):
    def __init__(
        self,
        env,
        skip_frames=4,
        stack_frames=4,
        initial_no_op=50,
        **kwargs
    ):
        super(ImageEnv, self).__init__(env, **kwargs)
        self.initial_no_op = initial_no_op
        self.skip_frames = skip_frames
        self.stack_frames = stack_frames
    
    def reset(self):
        # Reset the original environment.
        s, info = self.env.reset()

        # Do nothing for the next `self.initial_no_op` steps
        for i in range(self.initial_no_op):
            s, r, terminated, truncated, info = self.env.step(0)
        
        # Convert a frame to 84 X 84 gray scale one
        s = preprocess(s)

        # The initial observation is simply a copy of the frame `s`
        self.stacked_state = np.tile(s, (self.stack_frames, 1, 1))  # [4, 84, 84]
        return self.stacked_state, info
    
    def step(self, action):
        # We take an action for self.skip_frames steps
        reward = 0
        for _ in range(self.skip_frames):
            s, r, terminated, truncated, info = self.env.step(action)
            reward += r
            if terminated or truncated:
                break

        # Convert a frame to 84 X 84 gray scale one
        s = preprocess(s)

        # Push the current frame `s` at the end of self.stacked_state
        self.stacked_state = np.concatenate((self.stacked_state[1:], s[np.newaxis]), axis=0)

        return self.stacked_state, reward, terminated, truncated, info

In [5]:
class CNNActionValue(nn.Module):
    def __init__(self, state_dim, action_dim, activation=F.relu):
        super(CNNActionValue, self).__init__()
        self.conv1 = nn.Conv2d(state_dim, 16, kernel_size=8, stride=4)  # [N, 4, 84, 84] -> [N, 16, 20, 20]
        self.conv2 = nn.Conv2d(16, 32, kernel_size=4, stride=2)  # [N, 16, 20, 20] -> [N, 32, 9, 9]
        self.in_features = 32 * 9 * 9
        self.fc1 = nn.Linear(self.in_features, 256)
        self.fc2 = nn.Linear(256, action_dim)
        self.activation = activation

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = x.view((-1, self.in_features))
        x = self.fc1(x)
        x = self.fc2(x)
        return x

In [6]:
device = torch.device('cuda' if torch.cuda.is_available else 'cpu')
print(device) # let's see what device we got

cuda


In [119]:
state_dim = (4, 84, 84)
action_dim = env.action_space.n
model = CNNActionValue(state_dim[0], action_dim).to( device )
model.load_state_dict( torch.load("dqn_grayscale_890.8579.pt") )

<All keys matched successfully>

In [120]:
%%time

eval_env = gym.make('CarRacing-v2', continuous=False, render_mode='rgb_array')
eval_env = ImageEnv(eval_env)

total_reward = 0
number_of_episodes = 30

for x in range(number_of_episodes):
    frames = []
    episode_reward = 0
    (s, _), done = eval_env.reset(), False
    while not done:
        frames.append(eval_env.render())
        x = torch.from_numpy(s).float().unsqueeze(0).to(device)
        q = model(x)
        a = torch.argmax(q).item()
        observation, reward, terminated, truncated, info = eval_env.step(a)
        s = observation
        episode_reward += reward
        done = terminated or truncated
    total_reward += episode_reward

env.close()
print(f"Average reward dqn_grayscale_890.8579.pt model after {number_of_episodes} episodes: {total_reward / number_of_episodes}")

Average reward dqn_grayscale_890.8579.pt model after 30 episodes: 845.4126234815643
CPU times: total: 18.5 s
Wall time: 5min


In [121]:
def animate(imgs, video_name, _return=True):
    import cv2
    import os
    import string
    import random
    
    if video_name is None:
        video_name = ''.join(random.choice(string.ascii_letters) for i in range(18)) + '.webm'
    height, width, layers = imgs[0].shape
    fourcc = cv2.VideoWriter_fourcc(*'VP90')
    video = cv2.VideoWriter(video_name, fourcc, 10, (width, height))
    
    for img in imgs:
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        video.write(img)
    video.release()
    if _return:
        from IPython.display import Video
        return Video(video_name)

In [127]:
frames = []
episode_reward = 0
(s, _), done = eval_env.reset(), False
while not done:
    frames.append(eval_env.render())
    x = torch.from_numpy(s).float().unsqueeze(0).to(device)
    q = model(x)
    a = torch.argmax(q).item()
    observation, reward, terminated, truncated, info = eval_env.step(a)
    s = observation
    episode_reward += reward
    done = terminated or truncated

In [128]:
print(episode_reward)

823.6440677965977


In [124]:
animate(frames, 'test1.webm')