<a href="https://colab.research.google.com/github/akiran703/DQN_demo_for_SI/blob/main/DQN_for_spaceinvaders.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [7]:
!pip install gym[atari]
!pip install autorom[accept-rom-license]



In [8]:
import gym
import math
import random
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple, deque
from itertools import count
from gym import spaces

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

In [9]:
from ale_py import ALEInterface
ale = ALEInterface()

from ale_py.roms import SpaceInvaders
ale.loadROM(SpaceInvaders)

In [10]:
#in order to reduce instability for when the Deep Q-learning is being trained, we apply a replay system to help.
#A replay system essentially is a way to store transistions that the agent observes and it can be used later training.
#This helps with reducing catostraphic forgetting in which the agent forgets all previous experiences as its new experience and reduces correlation between experiences.



#creating that class that creates the Replay system
class ReplayMemory:
    #constructor creating a list to store the experiences
    def __init__(self, capacity):
        self.memory = []
        self.maxlen=capacity
        self.nextindex = 0

    # we can not keep every single transition, so we will remove the oldest one
    def push(self, state,action,reward,next_state,done):

      data = (state, action, reward, next_state, done)

      if self.nextindex >= len(self.memory):
        self.memory.append(data)
      else:
        self.memory[self.nextindex] = data
      self.nextindex = (self.nextindex + 1) % self.maxlen


    #encode the sample into arrays
    def encode_sample(self,indices):
      states, actions, rewards, next_states, dones = [], [], [], [], []
      for i in indices:
        d = self.memory[i]
        state, action, reward, next_state, done = d
        states.append(np.array(state,copy=False))
        actions.append(action)
        rewards.append(reward)
        next_states.append(next_state,copy=False)
        dones.append(done)
      return np.array(states), np.array(actions), np.array(rewards), np.array(next_states), np.array(dones)




    # getting a random experience
    def sample(self, batch_size):
      indx = np.random.randint(0, len(self.memory) - 1, size = batch_size)
      return self.encode_sample(indx)


    # the length of the memory
    def __len__(self):
        return len(self.memory)


In [11]:
class DQN(nn.Module):
  def __init__(self,observation_space: spaces.Box, action_space: spaces.discrete):
     super().__init__()
     # checks to enable that the observation space is a box, it forms a channels x widith x height, the action space is discrete
     assert type(observation_space) == spaces.Box
     assert type(observation_space.shape) == 3
     assert type(action_space) == spaces.Discrete

      #Convolution layer with activation function
     self.conv = nn.Sequential(
         nn.Conv2d(in_channels=observation_space.shape[0], out_channels=32, kernel_size=8, stride=4),
         nn.ReLU(),
         nn.Conv2d(in_channels=32, out_channels=64, kernel_size=4, stride=2),
         nn.ReLU(),
         nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, stride=1),
         nn.ReLU()
     )

     #fully connected layer
     self.fc = nn.Sequential(
            nn.Linear(in_features=64*7*7 , out_features=512),
            nn.ReLU(),
            nn.Linear(in_features=512, out_features=action_space.n)
        )
  #feed foward neural network
  def forward(self, x):
        conv_out = self.conv(x).view(x.size()[0],-1)
        return self.fc(conv_out)

In [12]:
#creating the agent class which contains double DQN and

class DQN_Agent:
  def __init__(self,observation_space: spaces.Box, action_space: spaces.discrete,replay_buffer: ReplayMemory,use_double_dqn,learning_rate,batch_size,gamma,device=torch.device('cpu')):
    self.memory = replay_buffer
    #double dqn to make sure the estimated Q-value isnt overestimated
    self.use_double_dqn = use_double_dqn
    self.batch_size = batch_size
    self.gamma = gamma


    #the network to decide the best policy to take (highest Q-value)
    self.policy_network = DQN(observation_space,action_space).to(device)
    #the network to calculate out Q-value of taking that action in the next state
    self.target_network = DQN(observation_space,action_space).to(device)
    self.update_target_network()
    self.target_network.eval()

    self.optimiser = torch.optim.RMSprop(self.policy_network.parameters(), learning_rate = learning_rate)


    self.device = device





In [13]:
#function to create an environment

def create_env():
  base_env = gym.make("ALE/SpaceInvaders-v5")
  env = gym.wrappers.GrayScaleObservation(base_env)
  env = gym.wrappers.FrameStack(env,4)
  env = gym.wrappers.ResizeObservation(env, (84,84))
  #print(base_env.action_space)
  #print(env.observation_space)

  return env




real_env = create_env()

  deprecation(
  deprecation(
