<a href="https://colab.research.google.com/github/HERIUN/RLstudy/blob/master/DQN_ipyb.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
import gym
import collections
import random

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

In [0]:
learning_rate = 0.0005
gamma         = 0.98
buffer_limit  = 50000
batch_size    = 32

In [0]:
class ReplayBuffer():
  def __init__(self):
    self.buffer = collections.deque(maxlen=buffer_limit)
  
  def put(self,experience): #experience = obs, a, reward, next_obs, done_mask
    self.buffer.append(experience)

  def sample(self, n):
    mini_batch = random.sample(self.buffer, n)
    obs_lst, a_lst, r_lst, next_obs_lst, done_mask_lst = [], [], [], [], []
        
    for experience in mini_batch:
      obs, a, r, next_obs, done_mask = experience
      obs_lst.append(obs)
      a_lst.append([a])
      r_lst.append([r])
      next_obs_lst.append(next_obs)
      done_mask_lst.append([done_mask])

      return torch.tensor(obs_lst, dtype=torch.float), torch.tensor(a_lst), \
             torch.tensor(r_lst), torch.tensor(next_obs_lst, dtype=torch.float), \
             torch.tensor(done_mask_lst)

  
  def size(self):
    return len(self.buffer)

In [0]:
class Qnet(nn.Module):
  def __init__(self):
    super(Qnet, self).__init__()
    self.fc1 = nn.Linear(4, 128) ##  4의 의미는 obs, action, reward, next_obs. 4개라서
    self.fc2 = nn.Linear(128, 128)
    self.fc3 = nn.Linear(128, 2) ## 2의 의미는 action의 가지수

  def forward(self, x):
    x = F.relu(self.fc1(x))
    x = F.relu(self.fc2(x))
    x = self.fc3(x)
    return x
  
  def sample_action(self, obs, epsilon):  ## e-greedy 구현
    out = self.forward(obs)
    coin = random.random()   # 0~1까지의 실수 
    if coin < epsilon:
      return random.randint(0,1)  # action을 0(왼)또는 1(오른)random하게 해라!
    else:
      return out.argmax().item()  # obs -> nn -> output action중 큰걸 해라!

In [0]:
def main():
  env = gym.make('CartPole-v1')
  q = Qnet()        #예측
  q_target = Qnet() #실제 닮아야할 값
  q_target.load_state_dict(q.state_dict()) #일단 q_target의 wetigh를 q의 weight로 복사
  exp_memory = ReplayBuffer()
  score = 0.0

  optimizer = optim.Adam(q.parameters(), lr=learning_rate) 
  #q를 gradeint dscent로 파라미터 업데이트 하는거임 not q_target
  
  for n_epi in range(10000):
    epsilon = 0.1 # decaying e-greedy 10% -> 1%
    obs = env.reset()
    done = False

    while not done:
      a = q.sample_action(torch.from_numpy(obs).float(), epsilon)
      next_obs, reward, done, info = env.step(a) #info는 디버깅할때 쓸 정보
      # env.step(env.action_space.sample()) = take random action
      done_mask = 0.0 if done else 1.0 #마지막 state의 qvalue는 0으로 하려고 만듬
      exp_memory.put((obs,a,reward/100.0,next_obs,done_mask))
      obs = next_obs

      score +=reward

      if done:
        break
    
    if exp_memory.size() > 2000: #2000개 이상 exp가 있을때부터 학습 시작하자
      train(q,q_target,exp_memory,optimizer)

    if n_epi%20==0 and n_epi!=0:
      q_target.load_state_dict(q.state_dict()) #20번 에피소드마다 target network를 업데이트
      print("n_episode : {}, score : {:.1f}, n_buffer : {}, eps : {:.1f}%".format(
             n_epi, score/20, exp_memory.size(), epsilon*100))
      
      score = 0.0 # score는 버틴 시간수치
  env.close()

In [0]:
def train(q, q_target, exp_memory, optimizer):
  for i in range(10):
    obs,a,r,next_obs,done_mask = exp_memory.sample(batch_size)
    #episode가 한번 끝날때마다 샘플 32개로 10번 총 320개 샘플로 weight가 업데이트됨

    q_out = q(obs) #s's shape[32,4] q(s)'s shape[32,2]
    q_a = q_out.gather(1,a) #취한 action의 q값만 골라냄 [32,1]
    max_q_prime = q_target(next_obs).max(1)[0].unsqueeze(1)
    # q_target의 shape [32,2]에서 max 취하면 [32] 거기서 unsqueeze하면 [32,1]
    target = r + gamma * max_q_prime * done_mask
    # done_mask[32,1]는 마지막 state일시에 0을 곱하는 용도
    loss = F.smooth_l1_loss(target, q_a)

    optimizer.zero_grad() #optimizer의 gradient를 0으로 비우고
    loss.backward() #gard가 backprop되면서 구해지고
    optimizer.step() # 그 grad를 이용해 weight들이 업데이트 됨

In [0]:
if __name__ == '__main__':
    main()