# Reinforcement Learning: Deep Q-Networks, Double DQN, & Dueling DQN
<br>
James Chapman<br>
CIS 730 Artificial Intelligence – Term Project<br>
Kansas State University


[Hyperparameters & Control](#Hyperparameters&Control)<br>
[Gymnasium Wrappers](#GymnasiumWrappers)<br>
[Convolutional Neural Network](#ConvolutionalNeuralNetwork)<br>
[Agent](#Agent)<br>
[Initialization](#Initialization)<br>
[Testing](#Testing)<br>
[Save](#Save)<br>


In [2]:
import sys
IN_COLAB = "google.colab" in sys.modules

if IN_COLAB:
    !pip install "gymnasium[atari, accept-rom-license]"
    !pip install stable_baselines3
    !pip install ale-py

In [3]:
if IN_COLAB:
    from google.colab import drive
    drive.mount('/content/gdrive')

    %cd gdrive/My Drive/Atari
    
    from psutil import virtual_memory
    ram_gb = virtual_memory().total / 1e9
    print('Your runtime has {:.1f} gigabytes of available RAM\n'.format(ram_gb))
    if ram_gb < 20:
        print('Not using a high-RAM runtime')
    else:
        print('You are using a high-RAM runtime!')

    gpu_info = !nvidia-smi
    gpu_info = '\n'.join(gpu_info)
    if gpu_info.find('failed') >= 0:
        print('Not connected to a GPU')

In [4]:
import matplotlib.pyplot as plt
import numpy as np
import datetime
import random
import time

import gymnasium as gym
#from gymnasium.wrappers import FrameStack,GrayScaleObservation,ResizeObservation
from stable_baselines3.common.atari_wrappers import FireResetEnv,MaxAndSkipEnv,NoopResetEnv#EpisodicLifeEnv,ClipRewardEnv

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

import cv2
import collections 
from collections import deque
from typing import Dict, List, Tuple
from IPython import display
plt.ion()

import warnings
warnings.filterwarnings('ignore')

# Control some randomness, for reproducibility
torch.manual_seed(0) 
torch.backends.cudnn.benchmark = False
torch.backends.cudnn.deterministic = True
torch.cuda.manual_seed_all(0)   
np.random.seed(0)
random.seed(0)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cuda


<a id='Hyperparameters&Control'></a>
## Hyperparameters & Control

In [5]:
ENV_NAME = "PongNoFrameskip-v4" 
# ENV_NAME = "BreakoutNoFrameskip-v4"
# ENV_NAME = "SpaceInvadersNoFrameskip-v4" 
# ENV_NAME = "MsPacmanNoFrameskip-v4"
# ENV_NAME = "QbertNoFrameskip-v4"
# ENV_NAME = "LunarLander-v2" 

NETWORK_SAVE_PATH = 'models/Pong-DOUBLE'
DATA_SAVE_PATH = 'data/Pong-DOUBLE'
                                            
is_dueling = False
noop_max = 30

num_frames = 3e6
warmup_size = 10000

test_interval = 250000
test_num_frames = 50000

test_epsilon = 0.005

<a id='GymnasiumWrappers'></a>
## Gymnasium Wrappers 
#### Same as Training

In [6]:
class ProcessFrame84(gym.ObservationWrapper):
    def __init__(self, env=None):
        super(ProcessFrame84, self).__init__(env)
        self.observation_space = gym.spaces.Box(low=0, high=255, shape=(84, 84, 1), dtype=np.uint8)

    def observation(self, obs):
        return ProcessFrame84.process(obs)

    @staticmethod
    def process(frame):
        if frame.size == 210 * 160 * 3:
            img = np.reshape(frame, [210, 160, 3]).astype(np.float32)
        elif frame.size == 250 * 160 * 3:
            img = np.reshape(frame, [250, 160, 3]).astype(np.float32)
        else:
            assert False, "Unknown resolution."
        img = img[:, :, 0] * 0.299 + img[:, :, 1] * 0.587 + img[:, :, 2] * 0.114
        resized_screen = cv2.resize(img, (84, 110), interpolation=cv2.INTER_AREA)
        x_t = resized_screen[18:102, :]
        x_t = np.reshape(x_t, [84, 84, 1])
        return x_t.astype(np.uint8)


class BufferWrapper(gym.ObservationWrapper):
    def __init__(self, env, n_steps, dtype=np.float32):
        super(BufferWrapper, self).__init__(env)
        self.dtype = dtype
        old_space = env.observation_space
        self.observation_space = gym.spaces.Box(old_space.low.repeat(n_steps, axis=0),
                                                old_space.high.repeat(n_steps, axis=0), dtype=dtype)
    ############################
    # CONVERTED to Gymnasium
    # def reset(self):
    #    self.buffer = np.zeros_like(self.observation_space.low, dtype=self.dtype)
    #    return self.observation(self.env.reset())
    def reset(self, **kwargs):# CONVERTED to Gymnasium
        self.buffer = np.zeros_like(self.observation_space.low, dtype=self.dtype)
        obs, info = self.env.reset(**kwargs)
        return self.observation(obs), info

    def observation(self, observation):
        self.buffer[:-1] = self.buffer[1:]
        self.buffer[-1] = observation
        return self.buffer


class ImageToPyTorch(gym.ObservationWrapper):
    def __init__(self, env):
        super(ImageToPyTorch, self).__init__(env)
        old_shape = self.observation_space.shape
        self.observation_space = gym.spaces.Box(low=0.0, high=1.0, shape=(old_shape[-1], 
                                old_shape[0], old_shape[1]), dtype=np.float32)

    def observation(self, observation):
        return np.moveaxis(observation, 2, 0)


class ScaledFloatFrame(gym.ObservationWrapper):
    def observation(self, obs):
        return np.array(obs).astype(np.float32) / 255.0

In [7]:
def PongWrappers(env):
    print('Creating environment------------')
    env = MaxAndSkipEnv(env, 4) # From stable_baselines3
    env = FireResetEnv(env)  # From stable_baselines3
    env = NoopResetEnv(env, noop_max=noop_max)  # From stable_baselines3
    
    print(env.observation_space)
    env = ProcessFrame84(env)
    print(env.observation_space)
    env = ImageToPyTorch(env)
    print(env.observation_space)
    env = BufferWrapper(env, 4)
    print(env.observation_space)
    env = ScaledFloatFrame(env)
    print(env.observation_space)     
    print('--------------------------------')
    return env

<a id='ConvolutionalNeuralNetwork'></a>
## Convolutional Neural Network
#### Same as Training

In [8]:
class ConvNet(nn.Module):
    def __init__(self, input_shape, n_actions, is_dueling):
        super(ConvNet, self).__init__()
        self.input_shape = input_shape
        self.n_actions = n_actions
        self.is_dueling = is_dueling

        self.conv = nn.Sequential(
            nn.Conv2d(input_shape[0], 32, kernel_size=8, stride=4),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=4, stride=2),
            nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=3, stride=1),
            nn.ReLU()
        )
        self.conv_out_size = self._get_conv_out(input_shape)
        self.fc = nn.Sequential(
            nn.Linear(self.conv_out_size, 512),
            nn.ReLU(),
            nn.Linear(512, n_actions)
        )
        self.dueling_value = nn.Sequential(
            nn.Linear(self.conv_out_size, 512),
            nn.ReLU(),
            nn.Linear(512, 1)
        )

    def _get_conv_out(self, shape):
        o = self.conv(torch.zeros(1, *shape))
        return int(np.prod(o.size()))

    def forward(self, x):
        if is_dueling:
            ############## Dueling Networks ################
            conv_out = self.conv(x).view(x.size()[0], -1)
            advantage_out = self.fc(conv_out)
            value_out = self.dueling_value(conv_out)
            return value_out + advantage_out - advantage_out.mean()
            ################################################
        else:
            conv_out = self.conv(x).view(x.size()[0], -1)
            return self.fc(conv_out)

<a id='Agent'></a>
## Agent
#### Without Experience Replay Buffer

In [9]:
class Agent:
    def __init__(self, env):
        self.env = env
        self._reset()

    def _reset(self):
        self.state, info = env.reset() #New
        self.total_reward = 0.0

    def play_step(self, net, epsilon, device):
        done_reward = None
        if np.random.random() < epsilon:
            action = env.action_space.sample()
        else:
            state_a = np.array([self.state], copy=False)
            state_v = torch.tensor(state_a).to(device)
            q_vals_v = net(state_v)
            _, act_v = torch.max(q_vals_v, dim=1)
            action = int(act_v.item())
            
        new_state, reward, terminated, truncated, info = self.env.step(action) #New # clipped_reward = np.clip(reward, -1, 1)
        is_done = terminated or truncated  #New
        self.total_reward += reward #np.clip(reward, -1, 1)?np.sign(float(reward))
            
        self.state = new_state
        if is_done:
            done_reward = self.total_reward
            self._reset()
        return done_reward
    

<a id='Initialization'></a>
## Initialization 

In [10]:
base_env = gym.make(ENV_NAME)
env = PongWrappers(base_env)
net = ConvNet(env.observation_space.shape, env.action_space.n,is_dueling).to(device)
agent = Agent(env)

torch.set_grad_enabled(False)

test_results_dict = {}
cur_test_interval = 1250000
test_results_summary = []

Creating environment------------
Box(0, 255, (210, 160, 3), uint8)
Box(0, 255, (84, 84, 1), uint8)
Box(0.0, 1.0, (1, 84, 84), float32)
Box(0.0, 1.0, (4, 84, 84), float32)
Box(0.0, 1.0, (4, 84, 84), float32)
--------------------------------


<a id='Testing'></a>
## Testing

In [11]:
def test_model_interval(net, interval_SAVE_NAME):
    net.load_state_dict(torch.load(interval_SAVE_NAME))
    net.eval()
    
    interval_frame_idxs = []
    interval_rewards = []
    with torch.inference_mode():
        for frame_idx in range(test_num_frames):
            reward = agent.play_step(net, test_epsilon, device=device)
            if reward is not None:
                interval_frame_idxs.append(frame_idx)
                interval_rewards.append(reward)
                                         
    return [interval_frame_idxs, interval_rewards]

In [12]:

while True:
    
    interval_SAVE_NAME = NETWORK_SAVE_PATH + '-{}.dat'.format(cur_test_interval)
    interval_test_results = test_model_interval(net, interval_SAVE_NAME)
    
    test_results_dict[cur_test_interval] = interval_test_results

    test_results_summary.append([cur_test_interval,
                                 round(np.mean(interval_test_results[1]),2),
                                 round(np.std(interval_test_results[1]),2)])
    print(cur_test_interval,
          round(np.mean(interval_test_results[1]),2),
          round(np.std(interval_test_results[1]),2))
    
    cur_test_interval+= test_interval
    if cur_test_interval> num_frames:
        break   
        
# frame_num, score, standard_deviation

1250000 15.5 4.74
1500000 20.2 1.66
1750000 20.38 0.85
2000000 20.71 0.68
2250000 18.47 8.67
2500000 20.47 0.85
2750000 20.83 0.45
3000000 19.83 0.46


In [13]:
print(test_results_summary) 

[[1250000, 15.5, 4.74], [1500000, 20.2, 1.66], [1750000, 20.38, 0.85], [2000000, 20.71, 0.68], [2250000, 18.47, 8.67], [2500000, 20.47, 0.85], [2750000, 20.83, 0.45], [3000000, 19.83, 0.46]]


<a id='Save'></a>
## Save

In [14]:

test_results_dict_file = DATA_SAVE_PATH + '_test_results_dict.npy'
print(test_results_dict_file)
with open(test_results_dict_file, 'wb') as f:
    np.save(f, test_results_dict)
    
test_results_summary_file = DATA_SAVE_PATH + '_test_results_summary.npy'
print(test_results_summary_file)
with open(test_results_summary_file, 'wb') as f:
    np.save(f, np.array(test_results_summary))


data/Pong-DOUBLE_test_results_dict.npy
data/Pong-DOUBLE_test_results_summary.npy


In [15]:
: )

SyntaxError: unmatched ')' (2155285666.py, line 1)