observation은 환경에 대한 정보를 포함한 4-dimension vector로 <br>
각각 
```[Cart Position(카트의 위치), Cart Velocity(카트의 속도), Pole Angle(막대기의 각도), Pole Velocity At Tip(막대기 끝의 속도)]```을 의미한다.

In [2]:
import gym
import math
import random
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from collections import deque

import torch
import torch.nn as nn # layer 설정
import torch.optim as optim # optimizer 설정
import torch.nn.functional as F # loss function, activation function 설정
import torchvision.transforms as T

env = gym.make('CartPole-v0').unwrapped

# gpu사용시
# 하지만 gpu쓰려고 colab열면 gym.render가 안되지~~
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [3]:
print("state shape : ",env.observation_space.shape)
print("# of action : ",env.action_space.n )

state shape :  (4,)
# of action :  2


In [4]:
# Hyper Parameters
BATCH_SIZE = 32
LR = 0.01                   # learning rate
EPSILON = 0.9               # greedy policy
GAMMA = 0.9                 # reward discount
TARGET_REPLACE_ITER = 100   # target update frequency
MEMORY_CAPACITY = 2000
env = gym.make('CartPole-v0')
env = env.unwrapped
N_ACTIONS = env.action_space.n
N_STATES = env.observation_space.shape[0]
# isinstance : action_space의 sample action의 type이 int인가? True or False return
ENV_A_SHAPE = 0 if isinstance(env.action_space.sample(), int) else env.action_space.sample().shape 

### Neural Network for DQN

In [5]:
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.l1 = nn.Linear(N_STATES, 24)
        self.l1.weight.data.normal_(0,0.1) # weight 초기화
        self.l2 = nn.Linear(24,24)
        self.l2.weight.data.normal_(0,0.1)
        self.out = nn.Linear(24, N_ACTIONS)
        self.out.weight.data.normal_(0,0.1)
        
    def forward(self, x):
        x = self.l1(x)
        x = F.relu(x)
        x = self.l2(x)
        x = F.relu(x)
        actions_value = self.out(x)
        return actions_value

In [6]:
class DQN:
    def __init__(self):
        # 먼저 prediction network와 target network 분리
        self.pred_net, self.target_net = Net(), Net()
        
        self.learn_step_counter = 0 # for target updating
        self.memory_counter = 0 # for storing memory
        # (capacity, 4) ; 4 = (state, action, reward, next_state)
        self.memory = np.zeros((MEMORY_CAPACITY, N_STATES * 2 + 2))
        self.optimizer = torch.optim.Adam(self.pred_net.parameters(), lr=LR)
        self.loss_func = nn.MSELoss() # 이러면 알아서 y와 y_hat을 인식하나??
        
    def choose_action(self, x):
        x = torch.unsqueeze(torch.FloatTensor(x), dim=0) # (1, ?)
        # 1. greedy 
        if np.random.uniform() < EPSILON: # EPSILON = 0.9
            actions_value = self.pred_net.forward(x)
            # [1]은 return값중 argmax를 받겠다는 의미
            # max의 return값이 max, argmax 두개가 나옴
            # .data까지는 print해주는 output의 변화없고 이걸 numpy로 변환
            action = torch.max(actions_value, dim=1)[1].data.numpy()
            # 아래가 뭘 의미하는 거지...
            action = action[0] if ENV_A_SHAPE == 0 else action.reshape(ENV_A_SHAPE)
        
        # random
        else:
            action = np.random.randint(0, N_ACTIONS) # 0,1중에 하나 선택
            action = action if ENV_A_SHAPE == 0 else action.reshape(ENV_A_SHAPE)
        
        return action
    
    def store_transition(self, state, action, reward, next_state):
        # state가 list/array이기때문에, action과 reward를 []로 묶어준다.
        # **state가 4가지 정보를 담고있다.
        transition = np.hstack([state, [action,reward], next_state])
        # replace the old memory with new memory
        idx = self.memory_counter % MEMORY_CAPACITY
        self.memory[idx,:] = transition
        self.memory_counter += 1
        
    def learn(self):
        # TARGET_REPLACE_ITER은 100으로 설정했다.
        # 이 말은 즉 target network를 100번에 한번씩 업데이트하겠다는 의미
        # pred_net 100번 업데이트하고 나서야 비로소 target_net 1번 업데이트
        if self.learn_step_counter % TARGET_REPLACE_ITER == 0:
            # target의 parameter를 pred_net의 parameter로 대체하겠다는 의미
            self.target_net.load_state_dict(self.pred_net.state_dict())
            
        self.learn_step_counter += 1
        
        # batch transition sampling
        sample_idx = np.random.choice(MEMORY_CAPACITY, BATCH_SIZE)
        b_memory = self.memory[sample_idx, :]
        b_state = torch.FloatTensor(b_memory[:, :N_STATES])
        b_action = torch.LongTensor(b_memory[:, N_STATES : N_STATES+1].astype(int)) # long 이어야함 (Integer)
        b_reward = torch.FloatTensor(b_memory[:, N_STATES+1 : N_STATES+2])
        b_next_state = torch.FloatTensor(b_memory[:, -N_STATES:])
        
        q_pred = self.pred_net(b_state).gather(1, b_action) # gather의 의미를 모르겠다..
        q_prime = self.target_net(b_next_state).detach() # detach from graph, don't backpropagate(gradient가 흘러들어가 업데이트 되지않도록 예방)
        q_target = b_reward + GAMMA * q_prime.max(dim=1)[0].view(BATCH_SIZE,1) # shape (batch, 1)
        loss = self.loss_func(q_pred, q_target)
        
        self.optimizer.zero_grad() # gradient initialize
        loss.backward()
        self.optimizer.step()
print("Get Ready for Training!!")

Get Ready for Training!!


In [7]:
dqn = DQN()

In [None]:
print("\nCollecting Experiences....")
ep_reward_ls = []
for i_episode in range(500):
    state = env.reset()
    episode_reward = 0
    while True:
        env.render()
        action = dqn.choose_action(state)
        
        # take action
        next_state, reward, done, info = env.step(action)
        
        # reward 수정
        x, x_dot, theta, theta_dot = next_state
        r1 = (env.x_threshold - abs(x)) / env.x_threshold - 0.8
        r2 = (env.theta_threshold_radians - abs(theta)) / env.theta_threshold_radians - 0.5
        reward = r1 + r2
        
        dqn.store_transition(state, action, reward, next_state)
        
        episode_reward += reward
        ep_reward_ls.append(episode_reward)
        if dqn.memory_counter > MEMORY_CAPACITY:
            dqn.learn()
            if done:
                print("Episode : {}, Episode reward : {}..".format(i_episode, round(episode_reward,2)) )
        
        if done:
            break
        state = next_state
env.close()


Collecting Experiences....
Episode : 204, Episode reward : 2.26..
Episode : 205, Episode reward : 5.57..
Episode : 206, Episode reward : 1.66..
Episode : 207, Episode reward : 2.79..
Episode : 208, Episode reward : 1.49..
Episode : 209, Episode reward : 1.16..
Episode : 210, Episode reward : 2.1..
Episode : 211, Episode reward : 1.68..
Episode : 212, Episode reward : 3.14..
Episode : 213, Episode reward : 2.92..
Episode : 214, Episode reward : 11.97..
Episode : 215, Episode reward : 1.32..
Episode : 216, Episode reward : 4.51..
Episode : 217, Episode reward : 3.32..
Episode : 218, Episode reward : 14.95..
Episode : 219, Episode reward : 1.99..
Episode : 220, Episode reward : 9.16..
Episode : 221, Episode reward : 38.82..
Episode : 222, Episode reward : 39.85..
Episode : 223, Episode reward : 34.4..
Episode : 224, Episode reward : 37.34..
Episode : 225, Episode reward : 32.8..
Episode : 226, Episode reward : 108.8..
Episode : 227, Episode reward : 173.18..
Episode : 228, Episode reward

- 좀 더 안정적으로 학습할 수 없을까?
- 넘어지지는 않는데..그대로 화면밖으로 나가버림...
- 화면밖으로 나가면(done에 도달하면) penalty를 주는 방식으로 해야할 거 같다.

In [19]:
a = torch.FloatTensor([1,2,3])
print(a.size())
print(torch.unsqueeze(a, 1))
print(torch.unsqueeze(a, 1).shape)
b = torch.unsqueeze(a, 0)

torch.Size([3])
tensor([[1.],
        [2.],
        [3.]])
torch.Size([3, 1])


In [23]:
torch.max(b, 1)[1].data.numpy()

array([2], dtype=int64)

In [28]:
env.action_space.sample()

1

In [29]:
np.random.choice(100, 10)

array([69, 44, 49, 99, 86, 82, 73, 18, 28, 74])