<a href="https://colab.research.google.com/github/BeeGassy/Deep-Q-Learning/blob/main/DQN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import transforms as T
import numpy as np
import matplotlib.pyplot as plt
import gym
from gym.wrappers import FrameStack
from pdb import set_trace
import random
from tqdm import trange
import atari_py

In [2]:
class Model(nn.Module):
  #takes the # of frames stacked and the possible outputs (move right, left, etc)
  def __init__(self, numberStacked, possibleOutputs):
    super(Model, self).__init__()
    hiddenKernels = 16

    sizePostConvolution = 525824 #figure this out, depends on how we modify the env
    self.conv1 = nn.Conv2d(numberStacked, hiddenKernels, 2)
    self.rl = nn.ReLU()
    self.conv2 = nn.Conv2d(hiddenKernels, hiddenKernels, 2)
    self.fc1 = nn.Linear(sizePostConvolution, possibleOutputs)

  def forward(self, stackedState):
    x = self.conv1(stackedState)
    x = self.rl(x)
    x = self.conv2(x)
    print(x.size())
    x = x.view(x.size(0), -1)
    #x = x.view(-1, x.size()[1] * x.size()[2] * x.size()[3])
    x = self.fc1(x)
    return x

In [3]:
class Agent():
  def __init__(self, size, numberStacked, possibleOutputs, gamma):
    self.replay_buffer_size = size
    self.replay_buffer_list = []
    self.m = Model(numberStacked, possibleOutputs)
    self.optimizer = optim.Adam(self.m.parameters(), lr=1e-3, weight_decay=1e-5)
    self.lossFn = torch.nn.MSELoss()
    self.gamma = gamma
    self.batch_size = 16

  def action_value(self, input_state):
    with torch.no_grad():
        q_val = self.m(input_state)
    action = torch.argmax(q_val)
    return action

  def sample_replay_buffer(self, batch_size):
    #print(len(self.replay_buffer_list))
    choices = np.random.choice(len(self.replay_buffer_list), batch_size)
    perm = torch.tensor(choices)
    idx = perm[:batch_size]
    samples = np.array(self.replay_buffer_list)[idx]
    return samples

  def SGD(self):
    mini_batch = self.sample_replay_buffer(self.batch_size)
    for batch in mini_batch:
        self.optimizer.zero_grad()
        state, action, reward, next_state, done = batch
        yj = reward
        print("the reward", yj)
        if not done:
          print("the type for next state:", type(next_state))
          q_val = self.m(next_state) 
          #best_predicted_action = torch.argmax(q_val)
          #best_predicted_next_state, best_predicted_reward, done, _ = d.test_env.step(best_predicted_action)
          print("q_val: ", q_val)
          yj += self.gamma * q_val
        
        q_val = self.m(state)
        predicted_action = torch.argmax(q_val)
        predicted_next_state, best_reward, done, _ = d.test_env.step(predicted_action)
        print("type of yj: ", type(yj))
        print("type of best_reward: ", type(best_reward))
        loss = self.lossFn(torch.tensor(best_reward, requires_grad=True), yj.detach())
        loss.backward()
        self.optimizer.step()

    return loss
  
  def addToReplay(self, newInput):
    if len(self.replay_buffer_list) >= self.replay_buffer_size:#random eviction
        toEvict = random.randint(0, self.rb_size)
        del replay_buffer_list[toEvict]
    self.replay_buffer_list.append(newInput)

In [4]:
class DQN():
  def __init__(self):
    self.episodes = 400
    self.time_in_episode = 1000000
    self.epsilon = 0.7
    self.possibleOutputs = 6
    self.gamma = 0.01
    self.rbSize = 100000
    self.numberStacked = 4
    self.height = 210
    self.width = 160
    self.agent = Agent(self.rbSize, self.numberStacked, self.possibleOutputs, self.gamma)
    DEFAULT_ENV_NAME = 'PongNoFrameskip-v4'
    self.test_env = gym.make(DEFAULT_ENV_NAME)
    self.test_env = self.stack_frames(self.test_env, self.numberStacked)
    
  def initTransition(self):
    state = self.test_env.reset()    
    action = self.test_env.action_space.sample()
    next_state, reward, _, _ = self.test_env.step(action)
    transition = (state, action, next_state, reward)

    return transition

  def execute_action(self, input_action, state):
    next_state, reward, done, _ = self.test_env.step(input_action)
    transition = (state, input_action, reward, next_state, done)
    self.agent.addToReplay((state, input_action, reward, next_state, done))

    return transition

  def preprocessing(self, input_next_state):
    print(f'preprocess: {input_next_state.shape}')
    
    np_next_state = np.transpose(input_next_state, (0, 3, 1, 2))#batch h w color to batch color h w
    print(f'preprocess: {np_next_state.shape}')

    copy_next_state = np_next_state.copy()
    torch_next_state = torch.tensor(copy_next_state, dtype=torch.float)
    transform = T.Grayscale()
    #transform = T.Compose([ T.Grayscale(), T.ToTensor(), T.ToPILImage()])
    grey_scaled_next_state = transform(torch_next_state)
    #print(f'grey_scaled_next_state: {grey_scaled_next_state.shape}')
    grey_scaled_next_state = grey_scaled_next_state.view(1, self.numberStacked, self.height, self.width)
    #print(f'grey_scaled_next_state: {grey_scaled_next_state.shape}')

    return grey_scaled_next_state


  #stack the frames of the states in group of 4. 4 Frames per stack
  def stack_frames(self, input_env, stack_count):
    enviroment = FrameStack(input_env, stack_count)

    return enviroment
  
  def train(self):
    for e in trange(self.episodes):
      rewardVal = 0

      #initialize episode and get first transition
      initial_transition = self.initTransition()
      state, action, next_state, reward = initial_transition

      #preprocess data
      grey_scaled_next_state = self.preprocessing(next_state)

      for time_step in range(self.time_in_episode):
        random_action_prob = random.uniform(0.0, 1.0)
        if random_action_prob < self.epsilon:
          action = self.test_env.action_space.sample()
        else: 
          action = self.agent.action_value(grey_scaled_next_state)

        #perform action for timestep
        initialState = grey_scaled_next_state
        print(action) 
        state, action, reward, next_state, done  = self.execute_action(action, grey_scaled_next_state)
        grey_scaled_next_state = self.preprocessing(next_state)

        #send all information into our replay buffer so we can test on it within SGD
        self.agent.addToReplay((initialState, action, reward, grey_scaled_next_state, done))
        self.agent.SGD()

        #perform epsilon decay
        epsilon_decay_rate = max((e - time_step) / e, 0)
        epsilon -= epsilon_decay_rate
        
    #only render every 100 episodes
    if e % 100 == 0 and e > 0:
      self.test_env.render()    

In [5]:
if __name__ == "__main__":
  torch.device("cuda" if torch.cuda.is_available() else "cpu")
  d = DQN()
  DQN.train(d)

  return F.mse_loss(input, target, reduction=self.reduction)
  0%|          | 0/400 [00:00<?, ?it/s]


preprocess: (4, 210, 160, 3)
preprocess: (4, 3, 210, 160)
5
preprocess: (4, 210, 160, 3)
preprocess: (4, 3, 210, 160)
the reward 0.0
the type for next state: <class 'torch.Tensor'>
torch.Size([1, 16, 208, 158])
q_val:  tensor([[ -4.8443,   2.6262, -12.3209,  -2.1878,  -7.9786,  -1.6866]],
       grad_fn=<AddmmBackward>)
torch.Size([1, 16, 208, 158])
type of yj:  <class 'torch.Tensor'>
type of best_reward:  <class 'float'>
the reward 0.0
the type for next state: <class 'torch.Tensor'>
torch.Size([1, 16, 208, 158])
q_val:  tensor([[ -4.8443,   2.6262, -12.3209,  -2.1878,  -7.9786,  -1.6866]],
       grad_fn=<AddmmBackward>)
torch.Size([1, 16, 208, 158])
type of yj:  <class 'torch.Tensor'>
type of best_reward:  <class 'float'>
the reward 0.0
the type for next state: <class 'torch.Tensor'>
torch.Size([1, 16, 208, 158])
q_val:  tensor([[ -4.8443,   2.6262, -12.3209,  -2.1878,  -7.9786,  -1.6866]],
       grad_fn=<AddmmBackward>)
torch.Size([1, 16, 208, 158])
type of yj:  <class 'torch.Tenso

TypeError: ignored