## Q-learning

클래스를 정의해서 CartPole을 구현한다.

* Agent
    CartPole의 수레에 해당
    update_Q_function과 get_action의 메서드를 가지고 있음
    Brain 클래스 객체를 멤버 변수로 가짐

* Brain
    Agent 클래스의 두뇌 역할
    Q 테이블을 이용해 Q-learning이 구현됨
    bins, digitize_state, update_Q_table, decide_action의 메서드를 가지고 있음
    bins, digitize_state: Agent가 관측한 생태 observation을 이산 변수로 변환

* Environment
    OpenAI Gym이 실행되는 실행 환경
    CartPole을 실행하며 실행을 맡을 run 메서드를 가지고 있음

Agent와 Brain을 별도의 클래스로 분리한 것은 딥러닝에서 Brain 클래스로 국한되기 때문이다.

## 구현 과정

* #### Action 결정 (Agent -> Brain -> Agent)
    * 행동을 결정하기 위해 Agent는 현재 상태 observation_t를 Brain 클래스에 전달한다.

    * Brain 클래스는 전달받은 상태변수를 이산변수로 변환한 다음 Q_table을 참조해서 행동을 결정한다

    * 결정된 현재 행동 action_t를 Agent에 전달한다.
    
* #### Action 실행 (Agent -> Environment -> Agent)
    * Agent는 Environment에 행동 action_t를 전달하며 Action을 취한 다음 Environment를 한 단계 진행시킨다.

    * Environment는 다시 행동 action_t를 실행한 결과가 되는 상태 observation_t+1과 이때 얻은 즉각보상 reward_t+1을 Agent에 반환한다.

* #### Q_table 수정 (Agent -> Brain)
    * Agent는 현재 상태 observation_t, 조금 전 취했던 행동 action_t, 행동의 결과로 얻은 새로운 상태 observation_t+1과 받게된 즉각보상 reward_t+1 네 개의 변수를 Brain에 전달한다.
    * Brain은 전달받은 변수를 통해 기존의 Q_table을 수정한다.
    * observation_t, action_t, observation_t+1, reward_t+1 네 개의 변수를 합쳐놓은 것을 'Transition'이라고 부른다.
    

Action 결정, Action 실행, Q_table 수정과 같이 세부과정을 포함한 총 세 개의 절차를 반복하며 Q_learning이 수행된다.

#  

## 클래스 구조 및 정보 흐름 구현

In [1]:
import numpy as np
import matplotlib.pyplot as plt
import gym

%matplotlib inline

In [2]:
'''
애니메이션 함수 정의
'''

from JSAnimation.IPython_display import display_animation
from matplotlib import animation
from IPython.display import display

def display_frames_as_gif(frames):
    
    plt.figure(figsize = (frames[0].shape[1] / 72.0, frames[0].shape[0] / 72.0), dpi = 72)
    patch = plt.imshow(frames[0])
    plt.axis('off')
    
    def animate(i):
        patch.set_data(frames[i])
        
    anim = animation.FuncAnimation(
        plt.gcf(), 
        animate, 
        frames = len(frames), 
        interval = 50
    )
    
    anim.save('CartPole_Q-learning.gif')
    display(display_animation(anim, default_mode = 'loop'))

In [3]:
'''
상수 정의
'''

# 태스크 네임
ENV = 'CartPole-v0'

# 각 상태를 이산변수로 변환할 구간 수
num_digitized = 6

# 시간할인율
gamma = 0.99 

# 학습률
eta = 0.5

# Epoch당 최대 단계 수
# CartPole-v0은 200단계를 버티면 클리어로 간주
max_steps = 200

# 최대 Epoch
nb_epochs = 1000

In [4]:
'''
Agent 클래스 구현

생성자 메서드인 __init__()에서 상태변수의 수와 행동의 가짓수를 전달받고, 두뇌 역할을 할 Brain 클래스의 인스턴스를 만든다.
'''

# 실제 Agent의 역할을 하는 클래스
class Agent:
    
    def __init__(self, num_states, num_actions):
        
        # Agent의 행동을 결정하는 두뇌 역할 (Brain class 참조)
        self.brain = Brain(num_states, num_actions)
        
    def update_Q_function(self, observation, action, reward, observation_next):
        
        # Brain에 전달 및 Q_table 수정 (Brain class 참조)
        self.brain.update_Q_table(observation, action, reward, observation_next)
        
    def get_action(self, observation, step):
        
        # 행동 결정 (Brain class 참조)
        action = self.brain.decide_action(observation, step)
        
        return action

In [5]:
'''
Brain 클래스 구현

Q_table을 수정하는 방법과 다음 행동을 결정하는 방법은 epsilon-greedy 알고리즘을 이용하여 구현한다.
'''

# Agent의 두뇌 역할을 하는 클래스
class Brain:
    
    def __init__(self, num_states, num_actions):
        
        # 행동의 가짓수(왼쪽, 오른쪽)를 구함
        self.num_actions = num_actions
    
        # Q_table 생성
        self.Q_table = np.random.uniform(
        
                low = 0, 
                high = 1,
                size = (
            
                    # 행 수는 상태를 구간수**4(변수의 수 = 4)가지 값 중 하나로 변환한 값
                    num_digitized ** num_states,
            
                    # 열 수는 행동의 가지수
                    num_actions
                )
            )
    
    # 관측된 상태(연속값)을 이산변수로 변환하는 구간 계산
    def bins(self, clip_min, clip_max, num):
        
        return np.linspace(clip_min, clip_max, num + 1)[1: -1]
    
    # 관측된 상태 observation을 이산변수로 변환
    def digitize_state(self, observation):
        
        cart_pos, cart_v, pole_angle, pole_v = observation
        digitized = [
            np.digitize(cart_pos, bins = self.bins(-2.4, 2.4, num_digitized)),
            np.digitize(cart_v, bins = self.bins(-3.0, 3.0, num_digitized)),
            np.digitize(pole_angle, bins = self.bins(-0.5, 0.5, num_digitized)),
            np.digitize(pole_v, bins = self.bins(-2.0, 2.0, num_digitized))
        ]
        
        return sum([x * (num_digitized ** i) for i, x in enumerate(digitized)])
    
    # Q-learning으로 Q_table 수정
    def update_Q_table(self, observation, action, reward, observation_next):
        
        # 상태를 이산변수로 변환
        state = self.digitize_state(observation)
        
        # 다음 상태를 이산변수로 변환
        state_next = self.digitize_state(observation_next)
        
        Max_Q_next = max(self.Q_table[state_next][:])
        self.Q_table[state, action] = self.Q_table[state, action] + eta * (reward + gamma * Max_Q_next - self.Q_table[state, action])
    
    # epsilon-greedy 알고리즘을 적용하여 서서히 최적행동의 비중을 증가시킴
    def decide_action(self, observation, epoch):
        
        state = self.digitize_state(observation)
        epsilon = 0.5 * (1 / (epoch + 1))
        
        # 두 가지 행동 중 하나를 무작위 선택
        if epsilon <= np.random.uniform(0, 1):
            action = np.argmax(self.Q_table[state][:])
        
        else:
            action = np.random.choice(self.num_actions)
        
        return action

In [6]:
'''
Environment 클래스 구현
'''

# OpenAI Gym이 실행되는 실행 환경 CartPole을 실행
class Environment:
    
    def __init__(self):
        
        # 실행할 태스크 설정
        self.env = gym.make(ENV)
        
        # 태스크의 상태 변수 수를 구함
        num_states = self.env.observation_space.shape[0]
        
        # 가능한 행동 수를 구함
        num_actions = self.env.action_space.n
        
        # Agent 객체 생성
        self.agent = Agent(num_states, num_actions)
    
    # 실행
    def run(self):
        
        # 195단계 이상 버틴 epoch 수
        complete_epochs = 0
        
        # 마지막 epoch 여부
        is_epoch_final = False
        
        # 애니메이션을 만드는 데 사용할 이미지를 저장하는 변수
        frames = []
        
        # nb_epoch 만큼 반복
        for epoch in range(nb_epochs):
            
            # 환경 초기화
            observation = self.env.reset()
            
            # 각 epoch에 해당하는 반복
            for step in range(max_steps):
                
                # 마지막 에피소드면 frames에 각 단계의 이미지를 저장
                if is_epoch_final is True:
                    frames.append(self.env.render(mode = 'rgb_array'))
                    
                # 행동 선택
                action = self.agent.get_action(observation, epoch)
                
                # 행동 action_t를 실행해 state_t+1, reward_t+1을 계산
                # reward, info는 사용하지 않으므로 under-bar로 처리
                observation_next, _, done, _ = self.env.step(action)
                
                # 보상 부여
                if done:
                    
                    # 200단계를 넘어서거나 일정 각도 이상 기울면 done의 값이 True가 됨
                    if step < 195:
                        
                        # 봉이 쓰러지면 패널티 보상 -1 부여
                        reward = -1
                        
                        # 195단계 이상 버티면 해당 epoch 성공 처리
                        complete_epochs = 0
                        
                    else:
                        
                        # 쓰러지지 않고 epoch를 끝내면 보상 1 부여
                        reward = 1
                        
                        # epoch 연속 성공 기록을 업데이트
                        complete_epochs += 1
                        
                # epoch 중에는 보상 0 부여
                else:
                    reward = 0
                    
                # 다음 단계 상태 observation_next로 Q함수 수정
                self.agent.update_Q_function(observation, action, reward, observation_next)
                
                # 다음 단계 상태 업데이트
                observation = observation_next
                
                # epoch 마무리
                if done:
                    print("{} Epoch was finished after {} time steps".format(epoch, step + 1))
                    
                    break
                    
            # 마지막 epoch에서 애니메이션을 만들고 저장
            if is_epoch_final is True:
                display_frames_as_gif(frames)
                
                break
                
            # 10연속 이상 성공한 경우(195단계 이상 지속) 조기 종료
            if complete_epochs >= 10:
                print('10연속 이상 성공했습니다.')
                is_epoch_final = True

In [7]:
# main
cartpole_env = Environment()
cartpole_env.run()

0 Epoch was finished after 17 time steps
1 Epoch was finished after 13 time steps
2 Epoch was finished after 33 time steps
3 Epoch was finished after 23 time steps
4 Epoch was finished after 32 time steps
5 Epoch was finished after 29 time steps
6 Epoch was finished after 181 time steps
7 Epoch was finished after 31 time steps
8 Epoch was finished after 45 time steps
9 Epoch was finished after 160 time steps
10 Epoch was finished after 23 time steps
11 Epoch was finished after 25 time steps
12 Epoch was finished after 25 time steps
13 Epoch was finished after 16 time steps
14 Epoch was finished after 12 time steps
15 Epoch was finished after 26 time steps
16 Epoch was finished after 21 time steps
17 Epoch was finished after 52 time steps
18 Epoch was finished after 32 time steps
19 Epoch was finished after 27 time steps
20 Epoch was finished after 183 time steps
21 Epoch was finished after 59 time steps
22 Epoch was finished after 83 time steps
23 Epoch was finished after 24 time steps

MovieWriter ffmpeg unavailable; using Pillow instead.


144 Epoch was finished after 10 time steps


The 'clear_temp' parameter of setup() was deprecated in Matplotlib 3.3 and will be removed two minor releases later. If any parameter follows 'clear_temp', they should be passed as keyword, not positionally.
  super(HTMLWriter, self).setup(fig, outfile, dpi,


AttributeError: 'HTMLWriter' object has no attribute '_temp_names'