<a href="https://colab.research.google.com/github/aksh0001/DeepQLearning-Space-Invaders-/blob/master/RL_DQL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
import torch.nn as nn
import torch as T
import torch.nn.functional as F
import torch.optim as optim
import numpy as np

Create DQN (conv2d->conv2d->conv2d->fc->fc)

In [0]:
class DQN(nn.Module):
  def __init__(self, LR):
    super(DQN, self).__init__()
    # use conv to reduce state space
    self.conv1 = nn.Conv2d(1, 32, 8, stride=4, padding=1)  # 1 input channel (color doesn't matter - save computation)
    self.conv2 = nn.Conv2d(32, 64, 4, stride=2)
    self.conv3 = nn.Conv2d(64, 128, 3)
    
    self.fc1 = nn.Linear(128*19*8, 512)
    self.fc2 = nn.Linear(512, 6)  # 6 actions (L,R,shoot static, shoot while moving left, shoot while moving right,skip)
    
    self.optimizer = optim.RMSprop(self.parameters(), lr=LR)
    self.loss = nn.MSELoss()
    self.device = T.device('cuda:0' if T.cuda.is_available() else 'cpu')
    self.to(self.device)
    
  def forward(self, observation):
    obs = T.Tensor(observation).to(self.device)  # convert sequence of frames to Tensor
    obs = obs.view(-1, 1, 185, 95)  # reshape for conv layer
    obs = F.relu(self.conv1(obs))
    obs = F.relu(self.conv2(obs))
    obs = F.relu(self.conv3(obs))
    
    # flatten convolved images; then feed into fc
    obs = obs.view(-1, 128*19*8)
    obs = F.relu(self.fc1(obs))
    
    actions = self.fc2(obs)
    
    return actions  # this will be a matrix: k x 6 where k=num imgs passed in

Creating our Agent

In [0]:
class Agent:
  def __init__(self, gamma, eps, LR, max_mem_size, eps_end=0.5, replace=10000, action_space=[0,1,2,3,4,5]):
    self.gamma = gamma
    self.eps = eps
    self.eps_end = eps_end
    self.action_space = action_space
    self.max_mem_size = max_mem_size
    self.steps = 0
    self.learn_step_counter = 0  # for target network replacement
    self.memory = []  # use as a list
    self.mem_counter = 0
    self.replace_target_counter = replace
    self.Q_eval = DQN(LR)  # agent's estimate of the current set of states
    self.Q_next = DQN(LR)  # agent's estimate of the successor set of states
    
  def store_transition(self, state, action, reward, resulting_state):
    if self.mem_counter < self.max_mem_size:
      self.memory.append([state, action, reward, resulting_state])
    else:
      self.memory[self.mem_counter%self.max_mem_size] = [state, action, reward, resulting_state]
    
    self.mem_counter += 1
    
  def choose_action(self, observation):
    # we pass in a sequence of observations
    rand = np.random.random()
    actions = self.Q_eval.forward(observation)
    if rand < 1 - self.eps:
      action = T.argmax(actions[1]).item()
    else:
      action = np.random.choice(self.action_space)
    
    self.steps += 1
    return action
  
  
  def learn(self, batch_size):
    self.Q_eval.optimizer.zero_grad()  # batch learning, zero grad
    if self.replace_target_counter is not None and self.learn_step_counter%self.replace_target_counter == 0:
      self.Q_next.load_state_dict(self.Q_eval.state.dict())
      
    if self.mem_counter + batch_size < self.max_mem_size:
      mem_start = int(np.random.choice(range(self.mem_counter)))
    else:
      mem_start = int(np.random.choice(range(self.max_mem_size-batch_size-1)))
    
    mini_batch = self.memory[mem_start:mem_start+batch_size]
    memory = np.array(mini_batch)
    
    Qpred = self.Q_eval.forward(list(memory[:,0][:])).to(self.Q_eval.device)
    Qnext = self.Q_next.forward(list(memory[:,3][:])).to(self.Q_eval.device)
    
    maxA = T.argmax(Qnext, dim=1).to(self.Q_eval.device)
    rewards = T.Tensor(list(memory[:,2])).to(self.Q_eval.device)
    Qtarget = Qpred
    Qtarget[:, maxA] = rewards + self.gamma*T.max(Qnext[1])
    
    if self.steps > 500:
      if self.eps - 1e-4 > self.eps_end:
        self.eps -= 1e-4  # converge epsilon
      else:
        self.eps = self.eps_end
    
    loss = self.Q_eval.loss(Qtarget, Qpred).to(self.Q_eval.device)
    loss.backward()
    self.Q_eval.optimizer.step()
    self.learn_step_counter += 1
   
  
    

Driver loop

In [0]:
import gym

In [57]:
environment = gym.make('SpaceInvaders-v0')
agent = Agent(gamma=0.95, eps=1.0,LR=0.03,max_mem_size=5000,replace=None)

# init memory
while agent.mem_counter < agent.max_mem_size:
  obs = environment.reset()
  done = False
  while not done:
    # 0=no action;1=fire;2=right;3=left;4=move right fire;5=move left fire
    action = environment.action_space.sample()
    obs_, reward, done, info = environment.step(action)
    if done and info['ale.lives'] == 0:
      reward = -100
    
    agent.store_transition(np.mean(obs[15:200, 30:125], axis=2), action, reward,
                         np.mean(obs_[15:200, 30:125], axis=2))
    obs = obs_
  
print('Done memory init')

Done memory init


In [0]:
scores = []
eps_history = []
num_games = 50
batch_size = 32

for i in range(num_games):
  print('game start',i+1,'eps: %.4f'%agent.eps)
  eps_history.append(agent.eps)
  done = False
  obs = environment.reset()
  frames = [np.sum(obs[15:200, 30:125], axis=2)]
  score = 0
  last_action = 0
  
  while not done:
    if len(frames) == 3:
      action = agent.choose_action(frames)
      frames = []
    else:
      action = last_action
     
    obs_, reward, done, info = environment.step(action)
    score += reward
    frames.append(np.sum(obs_[15:200, 30:125], axis=2))
    if done and info['ale.lives'] == 0:
      reward = -100
      
    agent.store_transition(np.mean(obs[15:200, 30:125], axis=2), action, reward,
                         np.mean(obs_[15:200, 30:125], axis=2))
    
    obs = obs_
    agent.learn(batch_size)
    last_action = action
    # environment.render()  # see environment
  
  scores.append(score)
  print('score: ', score)
  x = [i + 1 for i in range(num_games)]
  fname = 'test' + str(num_games) + '.png'
  # plot x, scores, eps_history and save into file