# **DQN을 활용한 CartPole**
Q_Learning을 딥러닝에 조합한 방법으로 "상태"를 입력으로 받아 "행동"을 출력하며 이때 출력은 특정한 상태에서 특정한 행동을 선택할 확률을 의미

In [None]:
# Tensorflow를 Pytorch로 변환
import gym # 강화학습을 위한 라이브러리 (gym 0.26.2)
import numpy as np

import torch
import torch.nn as nn
import torchvision.transforms as T

from collections import deque
from torch.nn import HuberLoss
from torch.optim import Adam

In [5]:
# 해당 부분은 argparser로 수정 가능하도록 코드 수정 필요
# Parameter
NUM_EPSIODES = 500
MAX_STEPS = 200
GAMMA = 0.99
WARMUP = 10 # 초기화 시 조작하지 않을 스텝 수

# Parameter for search
E_INITIAL = 1.0
E_STOP = 0.01
E_DECAY_RATE = 0.001

# Parameter for memory
MEMORY_SIZE = 10000
BATCH_SIZE = 32

In [None]:
class QNetwork(nn.Module):
    def __init__(self, state_size, action_size):
        model_blk = []
        in_channel = state_size
        for _ in range(3):
            model_blk.append(
                nn.Linear(in_channel, 16)
            )
            model_blk.append(
                nn.ReLU()
            )
            in_channel = 16

        model_blk.append(
            nn.Linear(in_channel, action_size)
        )

        self.model = nn.Sequential(*model_blk)

    def forward(self, x):
        x = self.model(x)
        return x

In [4]:
# 과거의 경험을 저장하며 오래된 순서부터 제거
class Memory():
    def __init__(self, memory_size):
        self.buffer = deque(maxlen=memory_size) # maxlen 이상의 입력이 들어올 경우 맨 처음 element부터 삭제

    def add(self, experience):
        self.buffer.append(experience)

    def sample(self, batch_size):
        idx = np.random.choice(np.arange(len(self.buffer)), size=batch_size, replace=False) # 과거의 경험 중 batch_size 만큼의 경험을 랜덤하게 가져오는 함수

    def __len__(self):
        return len(self.buffer)

In [2]:
def train():
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    printr(f'\nDevice : {device}')

    print('\nSet & Initialize Envieronment')
    env = gym.make('CartPole-v0')
    state_size = env.observation_space.shape[0]
    action_size = env.action_space.n
    
    state = env.reset()
    state = np.reshape(state, [1, state_size])

    print('\nSet Model')
    main_net = QNetwork(state_size, action_size) # 갱신 대상 네트워크
    target_net = QNetwork(state_size, action_size) # 갱신량 계산을 위한 네트워크 (과거의 메인 네트워크로 일정 에피소드마다 main에 덮어씌움)
    memory = Memory(MEMORY_SIZE) # 경험을 저장하기 위한 memory

    print('\nTrain the model')
    total_step = 0 # 총 스텝 수
    success_count = 0 # 성공 수

    for episode in range(1, NUM_EPSIODES+1): # 진행할 episode 수
        step = 0
        target_net.model.set_weights(main_net.model.get_weights()) # 매 episode마다 target_net 갱신

        for _ in range(1, MAX_STEPS+1): # 1 episode 당 업데이트하는 step 수
            step += 1
            total_step += 1

            # ε 감소
            epsilon = E_STOP + (E_INITIAL - E_STOP) * np.exp(-E_DECAY_RATE*total_step)

            # 행동 선택
            if epsilon > np.random.rand():
                action = env.action_space.sample() # 랜덤하게 선택
            else:
                action = np.argmax(main_net.model.predict(state)[0]) # 행동 가치에 따른 선택

            # state와 action 저장
            next_state, _, done, _ = env.step(action) # done : 에피소드가 완료되었는가
            next_state = np.reshape(next_state, [1, state_size])

            # 1 episode 완료 시
            if done:
                if step >= 190:
                    success_count += 1
                    reward = 1
                else:
                    success_count = 0
                    reward = 0
                
                next_state = np.zeros(state.shape) # next_state 초기화

                if step > WARMUP: # WARMUP에 해당하는 step마다 memory에 상태 저장
                    memory.add((state, action, reward, next_state))

            else:
                reward = 0

                if step > WARMUP:
                    memory.add((state, action, reward, next_state))

                state = next_state

            # main_net 갱신


