<a href="https://colab.research.google.com/github/bob8dod/ML-studying/blob/main/RL/Cliff_Walking_Example.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [77]:
import gym
import matplotlib
import numpy as np
import random
import itertools
import sys
from collections import defaultdict
from gym.envs.toy_text.cliffwalking import CliffWalkingEnv # Cliff Walking 환경

QtoPolicy Class는 학습된 Q-value를 입력하면 해당하는 Q-value의 greedy 정책이 출력되도록 하는 함수 `printPolicy`를 구성하는 Class입니다. Q-learning 및 Sarsa를 이용하여 학습된 정책을 출력하기 위해 필요합니다.

In [78]:
class QtoPolicy:
    def __init__(self):
        self.action = ['↑','→','↓','←']
    
    def printPolicy(self, Q):
        policy = np.array([np.argmax(Q[key]) if key in Q else -1 for key in np.arange(48)])
        v = ([np.max(Q[key]) if key in Q else 0 for key in np.arange(48)])
        actions = np.stack([self.action for _ in range(len(policy))], axis=0)
        
        print(np.take(actions, np.reshape(policy, (4, 12))))
        print('')

아래는 Q-learning 알고리즘을 수행하는 Class의 정의입니다.
*   `update()` 메쏘드의 경우 state, action, reward, next_state, next_action이 주어졌을 때 Q-value를 업데이트하는 함수입니다.
*   `act()` 메쏘드의 경우 $\epsilon$-greedy 정책에 따라 action을 선택하는 함수입니다.



In [79]:
class QLearning:
    def __init__(self):
        self.action_no = 4 # state에서 취할 수 있는 action의 개수
        self.alpha = 0.01 # 반영률
        self.gamma = 0.9 # 미래의 값에 두는 가중치. 0.9로 미래의 값에 중요도를 높임
        self.epsilon = 0.2 # 입실론, 임의의 action을 취하는 확률
        self.q_values = defaultdict(lambda: [0.0] * self.action_no) # Q-Table
        
    # 학습과정
    def update(self, state, action, reward, next_state, next_action): 
        q_value = self.q_values[state][action] # 현 상태에서 주어진 action을 취했을 때의 q_value
        next_q_value = max(self.q_values[next_state]) # 다음 상태에서의 q_value
        # Q-learning 같은 경우에서의 Target정책은 무조건 탐욕적으로 행동을 결정하기에 
        # max값을 통해 가장 큰 q_value를 얻는 action을 취했을 때의 q_value값을 next_q_value로 저장
        
        td_error = reward + self.gamma * next_q_value - q_value 
        # 내가 따르고자 하는 Target정책 과 실제 행동이 이루어지는 정책간의  q_value 차이
        self.q_values[state][action] = q_value + self.alpha * td_error 
        # td_error를 반영하여 Q(S,A)를 업데이트
    
    # 입실론 그리디 정책 (행동을 결정)
    def act(self, state):
        if np.random.rand() < self.epsilon: # 입실론보다 작은 확률로
            action = np.random.choice(self.action_no) # 행동을 임의로 선택
        else: # '1-입실론' 확률로 
            q_values = self.q_values[state] # 현재 state가 가질 수 있는 모든 Q값
            action = np.argmax(q_values) 
            # 탐욕적인 행동 선택 진행 (해당 Q값 들 중 가장 큰 값의 Q값을 가지는 action 선택)
        return action # action 반환

아래는 Sarsa 알고리즘을 수행하는 Class의 정의입니다.
*   `update()` 메쏘드의 경우 state, action, reward, next_state, next_action이 주어졌을 때 Q-value를 업데이트하는 함수입니다.
*   `act()` 메쏘드의 경우 $\epsilon$-greedy 정책에 따라 action을 선택하는 함수입니다.



In [80]:
class Sarsa:
    def __init__(self):
        self.action_no = 4 # state에서 취할 수 있는 action의 개수
        self.alpha = 0.01 # 반영률
        self.gamma = 0.9 # 미래의 값에 두는 가중치. 0.9로 미래의 값에 중요도를 높임
        self.epsilon = 0.2  # 입실론, 임의의 action을 취하는 확률
        self.q_values = defaultdict(lambda: [0.0] * self.action_no) # Q-Table

    # 학습과정
    def update(self, state, action, reward, next_state, next_action):
        q_value = self.q_values[state][action] # 현 상태에서 주어진 action을 취했을 때의 q_value
        next_q_value =self.q_values[next_state][next_action] # 다음 상태에서의 q_value
        # Q-learning과의 차이점.Target 정책을 사용하지 않고 behavior 정책을 그대로 사용하여 학습 진행
        # 그렇기에 behavior 정책을 통해 다음 state의 다음 action을 받아와 next_q_value 측정

        td_error = reward + self.gamma * next_q_value - q_value # 경험을 통한 q_value 값의 차이 측정
        self.q_values[state][action] = q_value + self.alpha * td_error 
        # td_error를 반영하여 Q(S,A)를 업데이트 (실제적인 경험을 통해 가치함수를 업데이트하여 개선)
    
    # 입실론 그리디 정책 (행동을 결정)
    def act(self, state):
        if np.random.rand() < self.epsilon: # 입실론보다 작은 확률로
            action = np.random.choice(self.action_no) # 행동을 임의로 선택
        else: # '1-입실론' 확률로 
            q_values = self.q_values[state] # 현재 state가 가질 수 있는 모든 Q값
            action = np.argmax(q_values) 
            # 탐욕적인 행동 선택 진행 (해당 Q값 들 중 가장 큰 값의 Q값을 가지는 action 선택)
        return action # action 반환

OpenAI Gym에서의 Cliff Walking 환경을 로드하고 해당하는 환경을 살펴보기 위해 `render()` 메쏘드를 사용해봅니다.
그리고 `env.nS` 및 `env.nA` 변수를 통해 해당 환경의 state 및 action 개수를 확인합니다.

Cliff Walking 환경에서 각 state는 grid에서의 위치, action은 'up', 'right', 'down', 'left' 방향을 의미합니다.

In [81]:
env = CliffWalkingEnv()
env.render()
print ('Number of states: ', env.nS)
print ('Number of actions :', env.nA)

o  o  o  o  o  o  o  o  o  o  o  o
o  o  o  o  o  o  o  o  o  o  o  o
o  o  o  o  o  o  o  o  o  o  o  o
x  C  C  C  C  C  C  C  C  C  C  T

Number of states:  48
Number of actions : 4


주어진 Q-value에서 greedy policy를 출력하는 QtoPolicy Class를 정의합니다.

In [82]:
policy = QtoPolicy()

Q-learning Class를 정의하고 5000 episode 동안 학습을 수행합니다.

Gym 라이브러리의 환경에서는 `step(action)` 메쏘드를 통해 해당하는 time-step에서 action을 수행한 효과를 얻을 수 있습니다. 해당 메쏘드에서는 action을 수행하여 얻어지는 보상 (reward), 다음 상태 (next_state), done (episode 종료여부) 등이 출력으로 주어집니다.

In [83]:
agent_QL = QLearning() # Q-learning instance 생성
for ep in range(1,5001): # 5000 episode 동안 학습 수행
    done = False
    state = env.reset() # 초기 state
    action = agent_QL.act(state) # 초기 state에서의 action
    
    ep_rewards = 0 # 누적 보상
    while not done: # 종료될때까지 반복
        next_state, reward, done, info = env.step(action) # action을 통한 다음 state, reward 종료여부 반환

        next_action = agent_QL.act(next_state) # 다음 state에서의 다음 action (A_t+1)

        agent_QL.update(state, action, reward, next_state, next_action) # 학습 진행
        
        ep_rewards += reward # 누적 보상에 현 보상 추가
        state = next_state # state 업데이트
        action = next_action # action 업데이트
    if ep % 500 == 0: #500 episode 마다 출력
        print("episode: {}, rewards: {}".format(ep, ep_rewards))

episode: 500, rewards: -175
episode: 1000, rewards: -31
episode: 1500, rewards: -17
episode: 2000, rewards: -13
episode: 2500, rewards: -120
episode: 3000, rewards: -17
episode: 3500, rewards: -445
episode: 4000, rewards: -13
episode: 4500, rewards: -124
episode: 5000, rewards: -21


Sarsa에 대해서도 같은 방식으로 학습을 수행합니다.

In [84]:
agent_Sa = Sarsa() # Q-learning instance 생성
for ep in range(1,5001): # 5000 episode 동안 학습 수행
    done = False
    state = env.reset() # 초기 state
    action = agent_Sa.act(state) # 초기 state에서의 action
    
    ep_rewards = 0 # 누적 보상
    while not done: # 종료될때까지 반복
        next_state, reward, done, info = env.step(action)  # action을 통한 다음 state, reward 종료여부 반환

        next_action = agent_Sa.act(next_state) # 다음 state에서의 다음 action (A_t+1)

        agent_Sa.update(state, action, reward, next_state, next_action) # 학습 진행
        
        ep_rewards += reward # 누적 보상에 현 보상 추가
        state = next_state # state 업데이트
        action = next_action # action 업데이트
    if ep % 500 == 0: #500 episode 마다 출력
        print("episode: {}, rewards: {}".format(ep, ep_rewards))

episode: 500, rewards: -42
episode: 1000, rewards: -28
episode: 1500, rewards: -21
episode: 2000, rewards: -30
episode: 2500, rewards: -17
episode: 3000, rewards: -17
episode: 3500, rewards: -22
episode: 4000, rewards: -20
episode: 4500, rewards: -19
episode: 5000, rewards: -19


학습된 Q-value를 이용하여 학습된 정책을 출력합니다.

In [85]:
print('Learned policy by Q-learning')
policy.printPolicy(agent_QL.q_values)
print('Learned policy by Sarsa')
policy.printPolicy(agent_Sa.q_values)

Learned policy by Q-learning
[['→' '→' '→' '→' '→' '→' '→' '↓' '→' '→' '↓' '↓']
 ['↑' '↑' '↑' '→' '↑' '↓' '↓' '→' '↓' '↓' '↓' '↓']
 ['→' '→' '→' '→' '→' '→' '→' '→' '→' '→' '→' '↓']
 ['↑' '←' '←' '←' '←' '←' '←' '←' '←' '←' '←' '↑']]

Learned policy by Sarsa
[['→' '→' '→' '→' '→' '→' '→' '→' '→' '→' '→' '↓']
 ['↑' '→' '→' '→' '→' '→' '→' '→' '→' '→' '→' '↓']
 ['↑' '←' '↑' '→' '→' '←' '↑' '→' '→' '←' '→' '↓']
 ['↑' '←' '←' '←' '←' '←' '←' '←' '←' '←' '←' '↑']]



In [86]:
env.close()