# 18. 강화 학습

강화 학습(Reinforcement Learning)은 요즘 머신러닝에서 가장 흥미진진한 분야이자 가장 오래된 분야이다. 강화 학습이 현대에 다시 재조명을 받은 시기는 2013년에 딥마인드에서 시도한 <a href="https://arxiv.org/pdf/1312.5602.pdf?source=post_page---------------------------">아타리게임들에 대해 수행된 연구</a>때문입니다. 이 연구에서 기계는 화면 픽셀에 대한 데이터만 입력으로 받고 <a href="https://www.datascienceassn.org/sites/default/files/Human-level%20Control%20Through%20Deep%20Reinforcement%20Learning.pdf">게임 규칙에 대한 어떤 사전 정보없이 대부분 사람을 능가하는 성과</a>를 냈다.

딥마인드가 이러한 성과를 낼수 있던 이유는? 강화 학습 분야에 강력한 딥러닝을 적용했기 때문이다.

18장에서는,
* 강화 학습의 정의 및 활용 분야
* 정책 그라디언트, 심층 Q-네트워크

## 18.1. 보상을 최적화하기 위한 학습

### 강화 학습의 구성요소

* 에이전트: 인공지능 플레이어
* 환경: 에이전트가 솔루션을 찾기 위한 무대
* 행동: 에이전트가 환경 안에서 시행하는 상호작용
* 보상: 에이전트의 행동에 따른 점수 혹은 결과

위의 4가지 요소를 가지고 강화학습은 **'에이전트는 관측을 하고 주어진 환경에서 행동을 하고, 이에 대한 결과로 보상을 받는다'**라는 문장으로 요약할 수 있다. 에이전트는 환경 아래에서 시행착오를 겪으며 보상을 최대로 하는 방향으로 학습한다. 강화학습은 자율주행 자동차, 추천 시스템, 웹페이지에 광고 배치, 이미지 분류 시스템의 제어 등에 사용될 수 있다.

## 18.2. 정책 탐색

에이전트가 행동을 결정하기 위해 사용하는 알고리즘을 정책(policy)라고 한다. 아래의 그림과 같이 Agent가 위치한 상태를 입력으로 받고 행동을 출력하는 신경망이 정책이 될 수 있다.

![RL_figure](../../img/RL_figure.jpg)

### 강화 학습의 예시; 청소기

* Agent: 30분 동안 수집한 먼지의 양을 보상으로 받는 로봇 진공청소기
* 정책: 매 초마다 p의 확률로 전진 or (1-p)의 확률로 왼쪽 또는 오른쪽으로 랜덤하게 회전; 회전의 각도는 -r과 +r 사이의 랜덤한 각도

**어떻게 훈련할 수 있을까?(정책탐색; Policy Search)**
1. 무작위 방식: 정책 파라미터들에 대해 무작위로 시행을 수행하고 성능이 좋은 조합을 선택
2. 유전 알고리즘(Genetic Algorithm): 1세대 정책 100개를 랜덤하여 생성하고, 하위 80개의 정책을 drop. 남은 20개를 활용하여 자식 정책 4개를 생성한다. 자식 정책 4개는 복사된 부모의 정책과 약간의 무작위 성을 설정한 것.
3. 정책 그라디언트(Policy Gradient): 정책 파라미터에 대한 보상의 그라디언트를 평가하여 높은 보상의 방향을 따르는 그라디언트로 파라미터를 수정하는 최적화 기법


## 18.3. OpenAI 짐

강화 학습 에이전트 훈련을 위한 최소한의 시뮬레이션 환경을 제공하는 패키지

간단하게 확인해 볼 환경은 CartPole이라는 아타리의 게임 중 기울어지는 막대를 세우는 게임이다. 

CartPole 환경에서 return되는 관측값은 아래와 같이 구성된다.

[수평 위치(0.0=중앙), 카트의 속도(양수=우측; 음수=좌측), 막대의 각도(0.0=수직), 막대의 각속도(양수=시계방향; 음수=반시계방향)]

In [1]:
import gym
import torch
import torch.nn as nn
import torch.optim as optim

env = gym.make("CartPole-v1")
obs = env.reset()
obs

array([0.0159937 , 0.00347802, 0.01264561, 0.0356175 ])

In [2]:
env.action_space

Discrete(2)

In [3]:
action = 1
obs, reward, done, info = env.step(action)

In [4]:
obs

array([ 0.01606326,  0.19841637,  0.01335796, -0.25304894])

In [5]:
reward

1.0

In [6]:
done

False

In [7]:
info

{}

In [8]:
def basic_policy(obs):
    angle = obs[2]
    return 0 if angle < 0 else 1

totals = []
for episode in range(500):
    episode_rewards = 0
    obs = env.reset()
    for step in range(200):
        action = basic_policy(obs)
        obs, reward, done, info = env.step(action)
        episode_rewards += reward
        if done:
            break
    totals.append(episode_rewards)

In [9]:
import numpy as np

totals= np.array(totals)
totals.shape

(500,)

In [10]:
rewards_mean = np.mean(totals)
rewards_std = np.std(totals)
rewards_min = np.min(totals)
rewards_max = np.max(totals)

print("mean:{:.2f}; std:{:.4f}; min:{}; max:{}".format(rewards_mean,rewards_std,rewards_min,rewards_max))

mean:42.46; std:8.8997; min:24.0; max:72.0


## 18.4 신경망 정책

신경망 정책에서는 관측값을 통해 특정 결과에 대한 확률을 추정한다. 그리고 추정환 확률을 기반으로 랜덤으로 행동하도록 선택한다. 그렇다면 여기서 왜 '랜덤'하게 행동하도록 선택할까? 그 이유는 에이전트가 새로운 행동을 탐험하고 잘 할 수 있는 행동을 활용하는 행동을 유도하기 위함이다. 

일반적으로 각 관측은 환경에 대한 완전한 상태를 갖고 있기 때문에 과거 관측값에 대한 고려가 필요없다(e.g) CartPole). 그러나 관측에 잡음이 있는 경우에는 가능성있는 현재 상태의 추정을 위해 지난 관측 몇 개를 사용하는 것이 좋다.

## 18.5. 행동 평가: 신용 할당 문제

강화 학습에서는 일반적인 지도학습과는 달리 학습에 대한 평가 시, 실제 값 또는 Label이 주어지지 않는다. 다시 말해, 학습을 평가하는데 있어 사용되는 지표는 행동으로 주어지는 보상(reward)밖에 없다는 것이다. 그렇다면 에이전트가 수행한 각 행동에 대해 어떤 것이 좋고 나쁨을 구별할 수 있을까? 이는 신용 할당 문제(credit assignment problem)이라고 불린다.

위의 문제를 해결하기 위해 행동이 일어난 후 각 단계마다 할인 계수(discount factor; $\gamma$)를 적용한 보상을 모두 합하여 행동을 평가하는 것입니다. 이렇게 보상이 모두 합쳐진 값을 대가(return)이라고 부릅니다.

<div align="center" style="margin-top:20px">$0\le\gamma(할인 계수; discount factor)\ge1$</div>
<div align="center" style="margin-top:10px"><b>할인 계수 범위</b></div>

할인 계수는 0과 1사이의 값으로 구성되며, 값이 클 수록 미래 시점에 주어지는 보상에 대해 우선순위롤 높게주는 것이고, 값이 낮을 수록 현재 시점에 주어지는 보상에 우선순위를 높게 주는 것이다. 일반적으로 $0.9~0.99$사이의 값을 준다.

e.g) 0.95: 13step 이후의 보상 50% 할인; 0.99: 69step 이후의 보상 50% 할인

위의 원리로 수행하게 된다면, 좋은 행동 후에 나쁜 행동이 이어져 낮은 대가를 받을 수 있다. 하지만, 평균적으로 다른 가능한 행동과 비교하여 각 행동이 얼마나 좋은지 혹은 나쁜지를 추정해야 한다. 이를 행동이익이라고 부르며, 많은 에피소드를 실행하여 모든 행동의 대가를 정규화해야 한다.

## 18.6. 정책 그라디언트

### REINFORCEMENT 알고리즘

참고

https://towardsdatascience.com/learning-reinforcement-learning-reinforce-with-pytorch-5e8ad7fc7da0

https://github.com/g6ling/Reinforcement-Learning-Pytorch-Cartpole/tree/master/PG/1-REINFORCE

https://wonseokjung.github.io/page5/

1. 먼저 신경망 정책이 여러 번에 걸쳐 게임을 플레이하고 매 스텝마다 선택된 행동이 더 높은 가능성을 가지도록 만드는 그라디언트를 계산합니다. 하지만 그라디언트를 적용하지는 않는다.
2. 에피소드를 몇 번 실행한 다음, 각 행동의 이익을 계산한다.
3. 한 행동의 이익이 양수이면, 이 행동이 좋은 것임을 의미하므로 미래에 선택될 가능성이 높도록 앞서 계산한 그라디언트를 적용합니다. 그러나 행동이익이 음수이면 이 행동이 나쁜 것임을 의미하므로 미래에 이 행동이 덜 선택되도록 반대의 그라디언트를 적용합니다. 이는 각 그라디언트 벡터와 그에 상응하는 행동의 이익을 곱하면 됩니다.
4. 마지막으로 모든 결과 그라디언트 벡터를 평균 내어 경사 하강법 스텝을 수행합니다.

![sudo_REINFORCMENT](../../img/sudo_REINFORCEMENT.png)

### 주요 파라미터
* step size
* distcount rate
* batch size
* max epsiodes

### 정책 손실 $L(\theta)$
 신경망을 통해 도출되는 값은 확률분포를 따른다. 그렇기 때문에 $\pi(a | s,\theta)$는 신경망에서 각 상태에 대해 확률의 평균값을 얻기 위함이다. 그리고 확률의 평균 값을 할인 계수로 곱하여 신경망의 기대값을 계산한다.
 
 

In [11]:
import torch

env_name = 'CartPole-v1'
gamma = 0.99
lr = 0.001
goal_score = 200
log_interval = 10
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [12]:
import random
from collections import namedtuple, deque

Transition = namedtuple('Transition', ('state', 'next_state', 'action', 'reward', 'mask'))

class Memory(object):
    def __init__(self):
        self.memory = deque()

    def push(self, state, next_state, action, reward, mask):
        self.memory.append(Transition(state, next_state, action, reward, mask))

    def sample(self):
        memory = self.memory
        return Transition(*zip(*memory)) 

    def __len__(self):
        return len(self.memory)

In [54]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np

# from config import gamma
class QNet(nn.Module):
    def __init__(self, num_inputs, num_outputs):
        super(QNet, self).__init__()
        self.num_inputs = num_inputs
        self.num_outputs = num_outputs

        self.fc_1 = nn.Linear(num_inputs, 128)
        self.fc_2 = nn.Linear(128, num_outputs)

        for m in self.modules():
            if isinstance(m, nn.Linear):
                nn.init.xavier_uniform(m.weight)

    def forward(self, input):
        x = F.relu(self.fc_1(input))
        policy = F.softmax(self.fc_2(x))
        return policy

    # 매 종료된 에피소드마다 각 행동을 평가하기 위한 함수
    @classmethod
    def train_model(cls, net, transitions, optimizer):
        states, actions, rewards, masks = transitions.state, transitions.action, transitions.reward, transitions.mask

        states = torch.stack(states)
        actions = torch.stack(actions)
        rewards = torch.Tensor(rewards)
        masks = torch.Tensor(masks)

        returns = torch.zeros_like(rewards)
        
        running_return = 0
        
#         tensor([ 1.,  1.,  1.,  1.,  1.,  1.,  1.,  1.,  1.,  1.,  1.,  1.,  1.,  1.,
#          1.,  1.,  1.,  1.,  1., -1.])
#         print(rewards)
        
        # 3: 각 state 별 미래가치를 현재가치로 할인한 보상의 값
        # 각 state에서 추정한 action으로 얻은 reward를 역순으로하여 미래가치로 할인을 수행
        for t in reversed(range(len(rewards))):
            running_return = rewards[t] + gamma * running_return * masks[t]
            returns[t] = running_return
            
#         print(returns)
#         tensor([ 1.6557e+01,  1.5714e+01,  1.4863e+01,  1.4003e+01,  1.3134e+01,
#          1.2257e+01,  1.1370e+01,  1.0475e+01,  9.5708e+00,  8.6574e+00,
#          7.7348e+00,  6.8028e+00,  5.8614e+00,  4.9105e+00,  3.9500e+00,
#          2.9798e+00,  1.9998e+00,  1.0099e+00,  1.0000e-02, -1.0000e+00])

        # 정책 그라디언트를 학습하기 위한 신경망의 input으로 매 agent의 step별 state를 투입
        policies = net(states)
        policies = policies.view(-1, net.num_outputs)
        
#      https://subinium.github.io/pytorch-Tensor-Variable/
#      detach reason: pytorch에서 텐서를 복제하기 위한 방법. 복사하면서 이전 텐서의 gradient에는 영향을 미치지 않기 위해서 사용함.
        log_policies = (torch.log(policies) * actions.detach()).sum(dim=1)
        
#     정책들 중에서도 선택된 action에 대한 정책만을 선정하여 loss 값에 사용
        loss = (-log_policies * returns).sum()
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        return loss

    # 행동을 return하는 함수
    def get_action(self, input):
        policy = self.forward(input)
        # 왼쪽(0)과 오른쪽(1) 가속을 할 확률을 return 받음
        # 0: 왼쪽인 확률을 기준으로 랜덤으로 다음 action을 선정 => 이는 exploration 과 exploitng 사이의 균형을 맞추기 위함
        policy = policy[0].data.numpy()

        action = np.random.choice(self.num_outputs, 1, p=policy)[0]
        return action

In [55]:
import os
import sys
import gym
import random
import numpy as np

import torch
import torch.optim as optim
import torch.nn.functional as F

env = gym.make(env_name)
env.seed(500)
torch.manual_seed(500)

num_inputs = env.observation_space.shape[0]
num_actions = env.action_space.n
print('state size:', num_inputs)
print('action size:', num_actions)

net = QNet(num_inputs, num_actions)

optimizer = optim.Adam(net.parameters(), lr=lr)

net.to(device)
net.train()
running_score = 0
# steps = 0
loss = 0

# 1: 
for e in range(3000):
    done = False
    memory = Memory()

    steps = 0
    score = 0
    state = env.reset()
    state = torch.Tensor(state).to(device)
    state = state.unsqueeze(0)

    # 2: 신경망에서 각 상태에 대해 확률의 평균값을 얻기
    while not done:
        steps += 1

        #현재 State를 가지고 다음에 취하게 될 action을 확률로 추정한 후 그 확률을 가지고 random하게 다음 action을 선정
        # next_state(obs)는 아래와 같이 구성
        # * 수평위치
        # * 카트의 속도
        # * 막대의 각도
        # * 막대의 각속도
        action = net.get_action(state)
        
        # 추정한 action을 가지고 수행 
        next_state, reward, done, _ = env.step(action)
        
        next_state = torch.Tensor(next_state)
        # 1차원 -> 2차원으로 확장 (행 개념 ) (4,) => (1,4)
        next_state = next_state.unsqueeze(0)

        mask = 0 if done else 1
        reward = reward if not done or score == 499 else -1

        # 0과 1중 action이 취해진 항목에 대해 1로 값을 반영
        action_one_hot = torch.zeros(2)
        action_one_hot[action] = 1
        # 메모리에  현재 state, 다음 state, 현재 state에서 취한 행동, 보상, mask를 기록
        memory.push(state, next_state, action_one_hot, reward, mask)

        # 학습 상황을 보기 위해  점수(== 보상) 출력
        score += reward
        state = next_state

    break
    # 폴이 쓰러져서 한 eposiode가 끝난 경우, 그에 대한 학습을 수행
    loss = QNet.train_model(net, memory.sample(), optimizer)

    score = score if score == 500.0 else score + 1
    running_score = 0.99 * running_score + 0.01 * score
    if e % log_interval == 0:
        print('{} episode | steps: {} |score: {:.2f}'.format(
            e, steps ,running_score))
#         writer.add_scalar('log/score', float(running_score), e)
#         writer.add_scalar('log/loss', float(loss), e)

    if running_score > goal_score:
        break

state size: 4
action size: 2


