In [40]:
import gym
import collections
import random

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

learning_rate = 0.0005
gamma = 0.98
buffer_limit  = 50000
batch_size = 32

## Replay buffer

class ReplayBuffer():
    def __init__(self):
        self.buffer = collections.deque(maxlen=buffer_limit)

    def put(self, transition):
        self.buffer.append(transition)

    def sample(self, n):
        mini_batch = random.sample(self.buffer, n) #sample 메서드 : buffer 중에 n 개만 뽑음
        s_lst, a_lst, r_lst, s_prime_lst, done_mask_lst = [], [], [], [], []
        # state 값, action 값, reward 값, 다음 state 값, done_mask 값
        # done_mask : 종료 상태의 Value 값을 마스킹해줍니다

        for transition in mini_batch: #우리가 뽑은 n 개의 미니배치들 하나씩에서
            s, a, r, s_prime, done_mask = transition #

            # 이로 보아 하나의 sample 에는 s,a,s_prime,done_mask 값이 담김을 확인할 수 있다
            s_lst.append(s)
            a_lst.append([a]) # 자료형이 list 임을 확인 가능, 여러개의 action 이 담겨있을 것이라 추측
            r_lst.append([r])
            s_prime_lst.append(s_prime)
            done_mask_lst.append([done_mask])

        return torch.tensor(s_lst, dtype=torch. float), torch.tensor(a_lst), torch.tensor(r_lst), torch.tensor(s_prime_lst, dtype=torch. float), torch.tensor(done_mask_lst)
        # s_lst, s_prime_lst 는 타입을 바꿔주었다. 데이터가 int 였기 때문에
        # 나머지는 tensor화

    def size(self):
        return len(self.buffer)

이 때 tensor 의 이해를 위해 기본 예제를 추가했다.

### Tensor 의 기본 데이터 구조

In [14]:
data = [[1,2],[3,4]]
x_data = torch.tensor(data)

In [15]:
type(x_data)
print(x_data.shape)
print(x_data.dtype)
print(x_data.device)

torch.Size([2, 2])
torch.int64
cpu


In [10]:
tensor = torch.rand(3,4)

print(f"Shape of tensor: {tensor.shape}")
print(f"Datatype of tensor: {tensor.dtype}")
print(f"Device tensor is stored on: {tensor.device}")

tensor

Shape of tensor: torch.Size([3, 4])
Datatype of tensor: torch.float32
Device tensor is stored on: cpu


tensor([[0.5911, 0.0660, 0.2966, 0.3930],
        [0.1805, 0.9510, 0.5649, 0.0447],
        [0.6955, 0.7548, 0.1732, 0.6378]])

---

다음은 Q net class

In [46]:
class Qnet(nn.Module):
    def __init__(self):
        super(Qnet, self).__init__()
        self.fc1 = nn.Linear(4,128) # 4가지 input 들어감
        self.fc2 = nn.Linear(128,128)
        self.fc3 = nn.Linear(128,2) #좌 인지 우 인지의 action state 값을 반환

    def forward(self,x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x) # 2개의 값을 반환
        return x

    def sample_action(self, obs, epsilon):
        out = self.forward(obs) ### obs 에 무엇이 들어가는가? 카트의 위치,속도, 막대의 각도, 각속도 4개
        coin = random.random() # 0~1 사이의 값을 랜덤하게 호출

        if coin < epsilon:  # 동전던지기, 8% 확률로 랜덤하게 행동
            return random.randint(0,1)
        else:
            return out.argmax().item() #Action state 값이 더 높은 index 값 호출

## 학습 함수

def train(q, q_target, memory, optimizer): 
    for i in range(10):
        s, a, r, s_prime, done_mask = memory.sample(batch_size) #32개를 버퍼에서 뽑아 모아 놓은 s,a,r,s_prime,done_mask

        q_out = q(s) # s 값으로 다음 각 action 값들의 value 값 반환
        q_a = q_out.gather(1,a) #선택한 액션값들의 q(s,a) 반환
        max_q_prime = q_target(s_prime).max(1)[0].unsqueeze(1) # 다음 state의 각 q(s,a) 값 반환
        target = r + gamma * max_q_prime * done_mask # 배열 맞춰주기, 쓰러진 경우는 제거
        loss = F.smooth_l1_loss(q_a, target) # DQN 의 손실함수 계산 L1 유클리드

        optimizer.zero_grad() # optimizer 의 모든 parameter 를 0으로 변환
        loss.backward() # loss 에 대한 gradient 계산
        optimizer.step() # 손실값을 바탕으로 Qnet 의 파라미터 업데이트

        

---

### tensor.gather 에 대해서

In [16]:
# gather 메서드
t = torch.tensor([i for i in range(4*2*3)]).reshape(4,2,3)
print(t)

tensor([[[ 0,  1,  2],
         [ 3,  4,  5]],

        [[ 6,  7,  8],
         [ 9, 10, 11]],

        [[12, 13, 14],
         [15, 16, 17]],

        [[18, 19, 20],
         [21, 22, 23]]])


In [17]:
ind_A = torch.tensor([1,0,3])
ind_A = ind_A.unsqueeze(1).unsqueeze(2)
ind_A = ind_A.expand(ind_A.size(0), t.size(1), t.size(2))

In [18]:
ind_A

tensor([[[1, 1, 1],
         [1, 1, 1]],

        [[0, 0, 0],
         [0, 0, 0]],

        [[3, 3, 3],
         [3, 3, 3]]])

In [20]:
res =t.gather(0,ind_A)

In [21]:
res

tensor([[[ 6,  7,  8],
         [ 9, 10, 11]],

        [[ 0,  1,  2],
         [ 3,  4,  5]],

        [[18, 19, 20],
         [21, 22, 23]]])

## Main 함수

In [42]:
def main():
    env = gym.make('CartPole-v1')
    q = Qnet()
    q_target = Qnet()
    q_target.load_state_dict(q.state_dict()) # 현재 Qnet 의 파라미터를 q_target 에 load

    memory = ReplayBuffer()

    print_interval = 20 #20회마다 출력
    score = 0.0
    optimizer = optim.Adam(q.parameters(), lr = learning_rate) # loss 값을 바탕으로 업데이트할 비율 (q_target 말고 q 만 업데이트)


    for n_epi in range(10000):
        epsilon = max(0.01, 0.08 - 0.01 * (n_epi/200))
        # exploration 비율 - 200번마다 1%씩 8%에서 1%로 감소

        s = env.reset()
        
        done = False 
        while not done: # 게임이 끝날 때까지
            a = q.sample_action(torch.from_numpy(s).float(), epsilon) #환경의 값이 numpy로 되어 있는데, 이걸 가져와서 
            #신경망을 돌린 후 0,1 중 하나의 값을 선택

            #s = cart 의 속도, 방향, Pole 의 각, 각속도
            s_prime, r, done, info = env.step(a) # a를 선택했을때 s_prime, 종료여부, info
            
            done_mask = 0.0 if done else 1.0 # False 가 아직 안끝난 경우, true 가 끝난경우
            # 안끝났을 때 done_mask = 0 , 끝나면 1

            memory.put((s,a,r/100.0, s_prime, done_mask))

            s = s_prime # 다음 s 값으로 이어서 진행
            score += r
            if done:
                break

            if memory.size() > 2000:
                train(q, q_target, memory, optimizer) 
            # 일단 2000개의 epi 가 쌓이고 나서 학습 시작

            if n_epi%print_interval==0 and n_epi != 0: # 200번마다 한번씩
                q_target.load_state_dict(q.state_dict()) #q_target 지금걸로 업데이트
                print(f"n_episode :{n_epi}, score = {int(score/print_interval)}, n_buffer : {memory.size()}, eps : {epsilon*100}")
                score = 0.0

                # 이때 score 는 200번 시도 평균임

        env.close()






### GUM 라이브러리에 대한 간단 메서드

- make() : 만들고 싶은 환경 가져오기  
- reset() : defalut 환경 구축  
- step() : a 액션을 취했을 때에 다음 state로 넘어감  

In [50]:
 env = gym.make('CartPole-v1')

 s = env.reset()

 s 

array([-0.02374508,  0.04786856,  0.04516721,  0.01752456], dtype=float32)

In [49]:
a = env.step(0)

In [62]:
a = env.step(1)
a

  logger.warn(


(array([ 0.24502157,  2.3899536 , -0.33426705, -3.6713626 ], dtype=float32),
 0.0,
 True,
 {})

환경이 끝났을 시, warn 경고문 반환