In [None]:
"""
in dqn we are only giving the states of our agent and deep nn is responsible for giving the q values for all the actions
that can be taken from that state s to other state s'
"""

In [None]:
# this is to get the access to the googel drive
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# this is to goto the directory where i stored the mask rcnn as in this code i use it for detection
import os
os.chdir("/content/drive/My Drive/pong/")

In [None]:
# this is to save the files generated during the processing
!mkdir video

In [None]:
import gym
import gym.spaces

DEFAULT_ENV_NAME = "PongNoFrameskip-v4" # this is the atari environment
test_env = gym.make(DEFAULT_ENV_NAME)
print(test_env.action_space.n)
# in the action space there are three actions and three of them are equal to other three

In [None]:
print(test_env.unwrapped.get_action_meanings())

In [None]:
print(test_env.observation_space.shape) 
# here what is taken is the dimensionality of the game space and in atari given by deepmind all are (210, 160, 3)


Type of hardware accelerator provided by Colab

In [None]:
!nvidia-smi 

In [5]:
import warnings
warnings.filterwarnings('ignore')

## OpenAI Gym Wrappers

In [6]:
# Taken from 
# https://github.com/PacktPublishing/Deep-Reinforcement-Learning-Hands-On/blob/master/Chapter06/lib/wrappers.py

import cv2
import numpy as np
import collections

# some games as Pong require a user to press the FIRE button to start the game
class FireResetEnv(gym.Wrapper):
    def __init__(self, env=None):
        super(FireResetEnv, self).__init__(env) # calling to super class
        assert env.unwrapped.get_action_meanings()[1] == 'FIRE' # work if the second action is fire
        assert len(env.unwrapped.get_action_meanings()) >= 3 # work if no.of actions is greater that or equal to 3(although there are 6 actions three are same as others)

    def step(self, action):
        # this function is just to get an action
        return self.env.step(action)

    def reset(self):
        # for both action 1 and action 2 it is reset 
        # action 1 - nope
        # action 2 - fire
        self.env.reset()
        obs, _, done, _ = self.env.step(1) # go to initial step
        if done:
            self.env.reset()
        obs, _, done, _ = self.env.step(2) # go to second step
        if done:
            self.env.reset()
        return obs

# to skip the frames four by four    
class MaxAndSkipEnv(gym.Wrapper):
    def __init__(self, env=None, skip=4):
        super(MaxAndSkipEnv, self).__init__(env)
        # most recent raw observations (for max pooling across time steps)
        self._obs_buffer = collections.deque(maxlen=2)
        self._skip = skip # this helps to skip the environment observations by 4
    # when skipping getting the related details
    def step(self, action):
        total_reward = 0.0
        done = None
        for _ in range(self._skip):
            obs, reward, done, info = self.env.step(action) # this is how define taking an action
            self._obs_buffer.append(obs)
            total_reward += reward
            if done:
                break
        max_frame = np.max(np.stack(self._obs_buffer), axis=0)
        return max_frame, total_reward, done, info

    def reset(self):
        self._obs_buffer.clear()
        obs = self.env.reset()
        self._obs_buffer.append(obs)
        return obs

# image preprocessing - rgb to gray --> dimensionality reduction --> reshaping 
class ProcessFrame84(gym.ObservationWrapper):
    def __init__(self, env=None):
        super(ProcessFrame84, self).__init__(env)
        self.observation_space = gym.spaces.Box(low=0, high=255, shape=(84, 84, 1), dtype=np.uint8) # here reseting the observational space of gym env

    def observation(self, obs):
        return ProcessFrame84.process(obs)

    @staticmethod
    def process(frame):
        # by getting the shape then convert the pixels into tensor shape given
        if frame.size == 210 * 160 * 3:
            img = np.reshape(frame, [210, 160, 3]).astype(np.float32)
        elif frame.size == 250 * 160 * 3:
            img = np.reshape(frame, [250, 160, 3]).astype(np.float32)
        else:
            assert False, "Unknown resolution."
        
        # convert to gray scale
        img = img[:, :, 0] * 0.299 + img[:, :, 1] * 0.587 + img[:, :, 2] * 0.114 
        # resize the image means change the dimentionalities of the images
        resized_screen = cv2.resize(img, (84, 110), interpolation=cv2.INTER_AREA)
        x_t = resized_screen[18:102, :]
        # reshape means changing the pixels channel states
        x_t = np.reshape(x_t, [84, 84, 1])
        
        return x_t.astype(np.uint8)

# getting four frames as the input states for dqn
class BufferWrapper(gym.ObservationWrapper):
    def __init__(self, env, n_steps, dtype=np.float32):
        super(BufferWrapper, self).__init__(env)
        self.dtype = dtype
        old_space = env.observation_space
        # make four frames as observational space --- env.observation_space.low.repeat(n_steps, axis=0)
        self.observation_space = gym.spaces.Box(old_space.low.repeat(n_steps, axis=0),
                                                old_space.high.repeat(n_steps, axis=0), dtype=dtype)

    def reset(self):
        self.buffer = np.zeros_like(self.observation_space.low, dtype=self.dtype)
        return self.observation(self.env.reset())

    def observation(self, observation):
        self.buffer[:-1] = self.buffer[1:]
        self.buffer[-1] = observation
        return self.buffer

# changes the shape of the observation from (height, width, channel) to (channel, height, width) format required by PyTorch
class ImageToPyTorch(gym.ObservationWrapper):
    def __init__(self, env):
        super(ImageToPyTorch, self).__init__(env)
        old_shape = self.observation_space.shape
        # swapping the dimensions
        self.observation_space = gym.spaces.Box(low=0.0, high=1.0, 
                                                shape=(old_shape[-1], old_shape[0], old_shape[1]), dtype=np.float32)
                                
    def observation(self, observation):
        return np.moveaxis(observation, 2, 0)

# original pixel values 0 - 255 -----> rescale 0.0 - 1.0
class ScaledFloatFrame(gym.ObservationWrapper):
    def observation(self, obs):
        # divide by zero to rescale
        return np.array(obs).astype(np.float32) / 255.0

def make_env(env_name):
    env = gym.make(env_name)
    env = MaxAndSkipEnv(env)
    env = FireResetEnv(env)
    env = ProcessFrame84(env)
    env = ImageToPyTorch(env)
    env = BufferWrapper(env, 4)
    return ScaledFloatFrame(env)

## The DQN model


In [7]:
import torch
import torch.nn as nn        # Pytorch neural network package
import torch.optim as optim  # Pytorch optimization package

device = torch.device("cuda")# cuda is for optimizing the parallel processing of the gpu

In [8]:
# Taken from 
# https://github.com/PacktPublishing/Deep-Reinforcement-Learning-Hands-On/blob/master/Chapter06/lib/dqn_model.py

import numpy as np

class DQN(nn.Module):
    def __init__(self, input_shape, n_actions):
        super(DQN, self).__init__()
        # creating a sequential model for the network
        self.conv = nn.Sequential(
            nn.Conv2d(input_shape[0], 32, kernel_size=8, stride=4),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=4, stride=2),
            nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=3, stride=1),
            nn.ReLU()
        )
        
        conv_out_size = self._get_conv_out(input_shape)
        
        # fully connected layer with relu activation
        self.fc = nn.Sequential(
            nn.Linear(conv_out_size, 512),
            nn.ReLU(),
            nn.Linear(512, n_actions)
        )
    # this method is to get the output size
    def _get_conv_out(self, shape):
        o = self.conv(torch.zeros(1, *shape))
        return int(np.prod(o.size()))

    def forward(self, x):
        conv_out = self.conv(x).view(x.size()[0], -1) # flatten the layer
        return self.fc(conv_out)

In [9]:
test_env = make_env(DEFAULT_ENV_NAME)
test_net = DQN(test_env.observation_space.shape, test_env.action_space.n).to(device)
print(test_net)

DQN(
  (conv): Sequential(
    (0): Conv2d(4, 32, kernel_size=(8, 8), stride=(4, 4))
    (1): ReLU()
    (2): Conv2d(32, 64, kernel_size=(4, 4), stride=(2, 2))
    (3): ReLU()
    (4): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1))
    (5): ReLU()
  )
  (fc): Sequential(
    (0): Linear(in_features=3136, out_features=512, bias=True)
    (1): ReLU()
    (2): Linear(in_features=512, out_features=6, bias=True)
  )
)


## Training

Load Tensorboard extension

In [10]:
from torch.utils.tensorboard import SummaryWriter
%load_ext tensorboard

Import required modules and define the hyperparameters

In [11]:
import time
import numpy as np
import collections


MEAN_REWARD_BOUND = 19.0           

gamma = 0.99                   
batch_size = 32 # this is the no.of q values taken from at one time from the replay memory                
replay_size = 10000            
learning_rate = 1e-4           
sync_target_frames = 1000 # time gap for moderating the taget network weights and biases   
replay_start_size = 10000      

eps_start=1.0
eps_decay=.999985
eps_min=0.02

Experience replay buffer

In [12]:
# can access this tuple to store data using the name given
Experience = collections.namedtuple('Experience', field_names=['state', 'action', 'reward', 'done', 'new_state'])
# this is to remove the data correlation
class ExperienceReplay:
    def __init__(self, capacity):
        self.buffer = collections.deque(maxlen=capacity)

    def __len__(self):
        return len(self.buffer)

    def append(self, experience):
        # stored the new data in the replay memory and input is a tuple
        self.buffer.append(experience)

    def sample(self, batch_size):
        # in this method when batch size is given then sample the data related to that and get the indices of the replay memory regard to them
        # then zip them in where states are in one zip and return those zips
        indices = np.random.choice(len(self.buffer), batch_size, replace=False)
        states, actions, rewards, dones, next_states = zip(*[self.buffer[idx] for idx in indices])
        return np.array(states), np.array(actions), np.array(rewards, dtype=np.float32), np.array(dones, dtype=np.uint8), np.array(next_states)

Agent

In [13]:
class Agent:
    def __init__(self, env, exp_buffer):
        self.env = env
        self.exp_buffer = exp_buffer
        self._reset()

    def _reset(self):
        self.state = env.reset()
        self.total_reward = 0.0 # initial amount of rewards

    def play_step(self, net, epsilon=0.0, device="cpu"):

        done_reward = None
        if np.random.random() < epsilon:
            action = env.action_space.sample()# exploration
        else:
            state_a = np.array([self.state], copy=False)
            state_v = torch.tensor(state_a).to(device)
            q_vals_v = net(state_v)# getting q values from the nn for exploitation
            _, act_v = torch.max(q_vals_v, dim=1)# getting the max q value q*(s,a,s')
            action = int(act_v.item())# getting the optimal action according to the max q value

        new_state, reward, is_done, _ = self.env.step(action)# getting the data according the optimal q value
        self.total_reward += reward
        # then add the data to replay memory using the defined tuple
        exp = Experience(self.state, action, reward, is_done, new_state)
        self.exp_buffer.append(exp)
        self.state = new_state # go to next state
        if is_done:
            # if reached to the end of the game then reset this is check by at the end done=true
            done_reward = self.total_reward
            self._reset()
        return done_reward


In [None]:
import datetime
print(">>>Training starts at ",datetime.datetime.now())

Main training loop

In [None]:
# initializing the need classes and methods
env = make_env(DEFAULT_ENV_NAME)# initialize the environment

net = DQN(env.observation_space.shape, env.action_space.n).to(device)# define the main nn
target_net = DQN(env.observation_space.shape, env.action_space.n).to(device)# this is the target network for enhancement of the loss function

writer = SummaryWriter(comment="-" + DEFAULT_ENV_NAME)
 
buffer = ExperienceReplay(replay_size)
agent = Agent(env, buffer)

epsilon = eps_start

optimizer = optim.Adam(net.parameters(), lr=learning_rate)
total_rewards = []
frame_idx = 0  

best_mean_reward = None

while True:
    frame_idx += 1 # this is to identify the time atwhich update the taget nn from the weights of the main nn
    epsilon = max(epsilon*eps_decay, eps_min)

    reward = agent.play_step(net, epsilon, device=device)# exploration or exploitation
    if reward is not None:
        total_rewards.append(reward)

        mean_reward = np.mean(total_rewards[-100:])

        print("%d:  %d games, mean reward %.3f, (epsilon %.2f)" % (
            frame_idx, len(total_rewards), mean_reward, epsilon))

        writer.add_scalar("epsilon", epsilon, frame_idx)
        writer.add_scalar("reward_100", mean_reward, frame_idx)
        writer.add_scalar("reward", reward, frame_idx)

        if best_mean_reward is None or best_mean_reward < mean_reward:
            torch.save(net.state_dict(), DEFAULT_ENV_NAME + "-best.dat")
            best_mean_reward = mean_reward
            if best_mean_reward is not None:
                print("Best mean reward updated %.3f" % (best_mean_reward))

        if mean_reward > MEAN_REWARD_BOUND:
            print("Solved in %d frames!" % frame_idx)
            break
    # if there is no enough samples in the replay memory
    if len(buffer) < replay_start_size:
        continue
    # if there is enough samples in the replay memory
    batch = buffer.sample(batch_size)
    states, actions, rewards, dones, next_states = batch

    states_v = torch.tensor(states).to(device)
    next_states_v = torch.tensor(next_states).to(device)
    actions_v = torch.tensor(actions).to(device)
    rewards_v = torch.tensor(rewards).to(device)
    done_mask = torch.ByteTensor(dones).to(device)

    # using main nn
    state_action_values = net(states_v).gather(1, actions_v.unsqueeze(-1)).squeeze(-1)

    # using target network 
    next_state_values = target_net(next_states_v).max(1)[0]
    next_state_values[done_mask] = 0.0 # for the last step there is no reward
    next_state_values = next_state_values.detach()# to avoid the values effected to the weights of the target nn
    expected_state_action_values = next_state_values * gamma + rewards_v

    # loss function to get the loss
    loss_t = nn.MSELoss()(state_action_values, expected_state_action_values)

    # getting the sochastic gradient descent and optimize to gobal minimum
    optimizer.zero_grad()
    loss_t.backward()
    optimizer.step()

    if frame_idx % sync_target_frames == 0:
        target_net.load_state_dict(net.state_dict())# updating the taget network at given frequency
          
writer.close()

In [None]:
print(">>>Training ends at ",datetime.datetime.now())

Performance

In [None]:
tensorboard  --logdir=runs

## Using the model

In [18]:
import gym
import time
import numpy as np

import torch

import collections

DEFAULT_ENV_NAME = "PongNoFrameskip-v4"
FPS = 25

Tunning the image rendering in colab


In [None]:
# Taken from 
# https://towardsdatascience.com/rendering-openai-gym-envs-on-binder-and-google-colab-536f99391cc7

!apt-get install -y xvfb x11-utils

!pip install pyvirtualdisplay==0.2.* \
             PyOpenGL==3.1.* \
             PyOpenGL-accelerate==3.1.*

!pip install gym[box2d]==0.17.*

import pyvirtualdisplay

_display = pyvirtualdisplay.Display(visible=False, size=(1400, 900))
_ = _display.start()

In [None]:
# Taken (partially) from 
# https://github.com/PacktPublishing/Deep-Reinforcement-Learning-Hands-On/blob/master/Chapter06/03_dqn_play.py


model='PongNoFrameskip-v4-best.dat'
record_folder="video"  
visualize=True

env = make_env(DEFAULT_ENV_NAME)
if record_folder:
        env = gym.wrappers.Monitor(env, record_folder, force=True)
net = DQN(env.observation_space.shape, env.action_space.n)
net.load_state_dict(torch.load(model, map_location=lambda storage, loc: storage))

state = env.reset()
total_reward = 0.0

while True:
        start_ts = time.time()
        if visualize:
            env.render()
        state_v = torch.tensor(np.array([state], copy=False))
        q_vals = net(state_v).data.numpy()[0]
        action = np.argmax(q_vals)
        
        state, reward, done, _ = env.step(action)
        total_reward += reward
        if done:
            break
        if visualize:
            delta = 1/FPS - (time.time() - start_ts)
            if delta > 0:
                time.sleep(delta)
print("Total reward: %.2f" % total_reward)

if record_folder:
        env.close()