<a href="https://colab.research.google.com/github/BDH-teacher/RL_from_basics/blob/main/RL_from_basic_ch_6.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 6. MDP를 모를 때 최고의 정책 찾기

In [1]:
import random
import numpy as np

from tqdm import trange

- q(s,a)를 담기 위해 numpy라이브러리 import

## 6.1 몬테카를로 컨트롤

### GridWorld 클래스

In [2]:
class GridWorld():
    def __init__(self):
        self.x=0
        self.y=0

    def step(self, a):
        # 0번 액션: 왼쪽, 1번 액션: 위, 2번 액션: 오른쪽, 3번 액션: 아래쪽
        if a==0:
            self.move_left()
        elif a==1:
            self.move_up()
        elif a==2:
            self.move_right()
        elif a==3:
            self.move_down()

        reward = -1  # 보상은 항상 -1로 고정
        done = self.is_done()
        return (self.x, self.y), reward, done

    def move_left(self):
        if self.y==0:
            pass
        elif self.y==3 and self.x in [0,1,2]:
            pass
        elif self.y==5 and self.x in [2,3,4]:
            pass
        else:
            self.y -= 1

    def move_right(self):
        if self.y==1 and self.x in [0,1,2]:
            pass
        elif self.y==3 and self.x in [2,3,4]:
            pass
        elif self.y==6:
            pass
        else:
            self.y += 1

    def move_up(self):
        if self.x==0:
            pass
        elif self.x==3 and self.y==2:
            pass
        else:
            self.x -= 1

    def move_down(self):
        if self.x==4:
            pass
        elif self.x==1 and self.y==4:
            pass
        else:
            self.x+=1

    def is_done(self):
        if self.x==4 and self.y==6: # 목표 지점인 (4,6)에 도달하면 끝난다
            return True
        else:
            return False

    def reset(self):
        self.x = 0
        self.y = 0
        return (self.x, self.y)

- 환경에 해당하는 클래스 정의 (GridWorld)
   - step 함수 : 에이전트로 부터 액션을 받아서 다음 상태와 보상, 에피소드가 끝났는지 여부를 리턴해주는 함수
   - 나머지 기타 함수들은 이 step 함수를 잘 동작하게 하기 위해 존재함
      - 벽에 막혀 있을 때 벽의 방향으로 진행하는 액션 무시를 위한 move가 복잡해졌을 뿐 챕터 5와 거의 유사함

### Q_Agent 클래스

In [3]:
class QAgent():
    def __init__(self):
        self.q_table = np.zeros((5, 7, 4)) # q벨류를 저장하는 변수. 모두 0으로 초기화.
        self.eps = 0.9
        self.alpha = 0.01

    def select_action(self, s):
        # eps-greedy로 액션을 선택
        x, y = s
        coin = random.random()
        if coin < self.eps:
            action = random.randint(0, 3)
        else:
            action_val = self.q_table[x,y,:]
            action = np.argmax(action_val)
        return action

    def update_table(self, history):
        # 한 에피소드에 해당하는 history를 입력으로 받아 q 테이블의 값을 업데이트 한다
        cum_reward = 0
        for transition in history[::-1]:
            s, a, r, s_prime = transition
            x,y = s
            # 몬테 카를로 방식을 이용하여 업데이트.
            self.q_table[x,y,a] = self.q_table[x,y,a] + self.alpha * (cum_reward - self.q_table[x,y,a])
            cum_reward = cum_reward + r

    def anneal_eps(self):
        self.eps -= 0.03
        self.eps = max(self.eps, 0.1)

    def show_table(self):
        # 학습이 각 위치에서 어느 액션의 q 값이 가장 높았는지 보여주는 함수
        q_lst = self.q_table.tolist()
        data = np.zeros((5,7))
        for row_idx in range(len(q_lst)):
            row = q_lst[row_idx]
            for col_idx in range(len(row)):
                col = row[col_idx]
                action = np.argmax(col)
                data[row_idx, col_idx] = action
        print(data)

- 에이전트에 해당하는 객체를 정의함
  - 에이전트 객체 내부에 q(s,a)의 값을 저장하기 위한 테이블 가지고 있으며, 실제로 에이전트가 액션을 선택할 때 사용됨
  - select_action 함수 : 상태 s를 input으로 받아 s에서 알맞은 액션을 입실론 그리디 방식을 통해 선택함
    - 이를 위한 epsilon이라는 값 존재
  - update_table 함수 : epsilon의 값을 점차 조금씩 줄여주기 위해 필요한 함수
  - show_table 함수 : 학습이 끝난 후에 상태별로 q(s,a)의 값이 가장 큰 액션을 뽑아서 보여주는 함수

### 초기 환경

In [4]:
env = GridWorld()
agent = QAgent()

### 에피소드 진행(학습)

In [5]:
for k in trange(1000): # 총 1,000 에피소드 동안 학습
    done = False
    history = []

    # env 초기화
    s = env.reset()

    while not done: # 에피소드 1회 진행
        a = agent.select_action(s)
        s_prime, r, done = env.step(a)
        history.append((s, a, r, s_prime))
        s = s_prime

    agent.update_table(history) # 히스토리를 이용하여 에이전트를 업데이트
    agent.anneal_eps()

100%|██████████| 1000/1000 [00:00<00:00, 5096.11it/s]


- 하나의 에피소드가 끝날 때까지 history라는 변수에 상태 전이 과정을 모두 저장해 두었다가, 에피소드가 끝난 순간 해당 변수를 이용해 에이전트 내부의 q 테이블을 업데이트함
- 그리고 epsilon의 값을 조금 씩 줄여줌

### 결과 확인

In [6]:
agent.show_table()

[[2. 3. 0. 3. 2. 3. 1.]
 [3. 0. 0. 2. 2. 3. 3.]
 [2. 3. 0. 1. 0. 2. 3.]
 [2. 2. 3. 1. 0. 2. 3.]
 [2. 1. 2. 1. 0. 2. 0.]]


## 6.2 TD 컨트롤 1 - SARSA

### GridWorld 클래스

In [7]:
class GridWorld():
    def __init__(self):
        self.x=0
        self.y=0

    def step(self, a):
        # 0번 액션: 왼쪽, 1번 액션: 위, 2번 액션: 오른쪽, 3번 액션: 아래쪽
        if a==0:
            self.move_left()
        elif a==1:
            self.move_up()
        elif a==2:
            self.move_right()
        elif a==3:
            self.move_down()

        reward = -1  # 보상은 항상 -1로 고정
        done = self.is_done()
        return (self.x, self.y), reward, done

    def move_left(self):
        if self.y==0:
            pass
        elif self.y==3 and self.x in [0,1,2]:
            pass
        elif self.y==5 and self.x in [2,3,4]:
            pass
        else:
            self.y -= 1

    def move_right(self):
        if self.y==1 and self.x in [0,1,2]:
            pass
        elif self.y==3 and self.x in [2,3,4]:
            pass
        elif self.y==6:
            pass
        else:
            self.y += 1

    def move_up(self):
        if self.x==0:
            pass
        elif self.x==3 and self.y==2:
            pass
        else:
            self.x -= 1

    def move_down(self):
        if self.x==4:
            pass
        elif self.x==1 and self.y==4:
            pass
        else:
            self.x+=1

    def is_done(self):
        if self.x==4 and self.y==6: # 목표 지점인 (4,6)에 도달하면 끝난다
            return True
        else:
            return False

    def reset(self):
        self.x = 0
        self.y = 0
        return (self.x, self.y)

### Agent 클래스

In [8]:
class QAgent():
    def __init__(self):
        self.q_table = np.zeros((5, 7, 4)) # 마찬가지로 Q 테이블을 0으로 초기화
        self.eps = 0.9

    def select_action(self, s):
        x, y = s
        coin = random.random()
        if coin < self.eps:
            action = random.randint(0,3)
        else:
            action_val = self.q_table[x,y,:]
            action = np.argmax(action_val)
        return action

    def update_table(self, transition):
        s, a, r, s_prime = transition
        x,y = s
        next_x, next_y = s_prime
        a_prime = self.select_action(s_prime) # S'에서 선택할 액션 (실제로 취한 액션이 아님)
        # SARSA 업데이트 식을 이용
        self.q_table[x,y,a] = self.q_table[x,y,a] + 0.1 * (r + self.q_table[next_x,next_y,a_prime] - self.q_table[x,y,a])

    def anneal_eps(self):
        self.eps -= 0.03
        self.eps = max(self.eps, 0.1)

    def show_table(self):
        q_lst = self.q_table.tolist()
        data = np.zeros((5,7))
        for row_idx in range(len(q_lst)):
            row = q_lst[row_idx]
            for col_idx in range(len(row)):
                col = row[col_idx]
                action = np.argmax(col)
                data[row_idx, col_idx] = action
        print(data)

- update_table 함수 : MC에서는 에이전트가 경험한 history 전체를 인자로 받았지만, TD에서는 트랜지션(transition)을 인풋으로 받음

- 트랜지션(transition) : 상태 전이 1번을 뜻함
  - 상태 s에서 a를 해서 보상 r을 받고 상태 s'에 도달했다면 (s,a,r,s')이 하나의 트랜지션임

### 초기 환경

In [9]:
env = GridWorld()
agent = QAgent()

### 에피소드 진행 (학습)

In [10]:
for k in trange(1000):
    done = False

    s = env.reset()
    while not done:
        a = agent.select_action(s)
        s_prime, r, done = env.step(a)
        agent.update_table((s,a,r,s_prime))
        s = s_prime
    agent.anneal_eps()

100%|██████████| 1000/1000 [00:00<00:00, 4497.75it/s]


- QAgent에서 update_table 함수를 호출하는 주기가 다름
  - MC에서는 한 에피소드가 끝나고 update_table을 호출했지만, TD는 한 step마다 호출
  - 트랜지션 데이터를 통해 Q_table 업데이트

### 결과 확인

In [11]:
agent.show_table()

[[2. 3. 0. 1. 3. 3. 2.]
 [3. 3. 0. 2. 2. 3. 3.]
 [2. 3. 0. 1. 0. 3. 3.]
 [2. 2. 2. 1. 0. 3. 3.]
 [0. 1. 2. 1. 0. 2. 0.]]


## 6.3 TD 컨트롤 2 - Q러닝

### GridWorld 클래스

In [12]:
class GridWorld():
    def __init__(self):
        self.x=0
        self.y=0

    def step(self, a):
        # 0번 액션: 왼쪽, 1번 액션: 위, 2번 액션: 오른쪽, 3번 액션: 아래쪽
        if a==0:
            self.move_left()
        elif a==1:
            self.move_up()
        elif a==2:
            self.move_right()
        elif a==3:
            self.move_down()

        reward = -1 # 보상은 항상 -1로 고정
        done = self.is_done()
        return (self.x, self.y), reward, done

    def move_left(self):
        if self.y==0:
            pass
        elif self.y==3 and self.x in [0,1,2]:
            pass
        elif self.y==5 and self.x in [2,3,4]:
            pass
        else:
            self.y -= 1

    def move_right(self):
        if self.y==1 and self.x in [0,1,2]:
            pass
        elif self.y==3 and self.x in [2,3,4]:
            pass
        elif self.y==6:
            pass
        else:
            self.y += 1

    def move_up(self):
        if self.x==0:
            pass
        elif self.x==3 and self.y==2:
            pass
        else:
            self.x -= 1

    def move_down(self):
        if self.x==4:
            pass
        elif self.x==1 and self.y==4:
            pass
        else:
            self.x+=1

    def is_done(self):
        if self.x==4 and self.y==6:
            return True
        else:
            return False

    def reset(self):
        self.x = 0
        self.y = 0
        return (self.x, self.y)

### Agent 클래스

In [13]:
class QAgent():
    def __init__(self):
        self.q_table = np.zeros((5, 7, 4)) # 마찬가지로 Q 테이블을 0으로 초기화
        self.eps = 0.9

    def select_action(self, s):
        # eps-greedy로 액션을 선택해준다
        x, y = s
        coin = random.random()
        if coin < self.eps:
            action = random.randint(0,3)
        else:
            action_val = self.q_table[x,y,:]
            action = np.argmax(action_val)
        return action

    def update_table(self, transition):
        s, a, r, s_prime = transition
        x,y = s
        next_x, next_y = s_prime

        # Q러닝 업데이트 식을 이용
        self.q_table[x,y,a] = self.q_table[x,y,a] + 0.1 * (r + np.amax(self.q_table[next_x,next_y,:]) - self.q_table[x,y,a])

    def anneal_eps(self):
        self.eps -= 0.01  # Q러닝에선 epsilon 이 좀더 천천히 줄어 들도록 함
        self.eps = max(self.eps, 0.2)

    def show_table(self):
        q_lst = self.q_table.tolist()
        data = np.zeros((5,7))
        for row_idx in range(len(q_lst)):
            row = q_lst[row_idx]
            for col_idx in range(len(row)):
                col = row[col_idx]
                action = np.argmax(col)
                data[row_idx, col_idx] = action
        print(data)

- Q러닝은 기존에 SARSA와 비슷함
  - update_table 함수와 anneal_eps 함수의 내부만 약간 수정

### 초기 환경

In [14]:
env = GridWorld()
agent = QAgent()

### 에피소드 진행(학습)

In [15]:
for n_epi in trange(1000):
    done = False

    s = env.reset()
    while not done:
        a = agent.select_action(s)
        s_prime, r, done = env.step(a)
        agent.update_table((s,a,r,s_prime))
        s = s_prime
    agent.anneal_eps()

100%|██████████| 1000/1000 [00:00<00:00, 2410.17it/s]


### 결과 확인

In [16]:
agent.show_table()

[[3. 3. 0. 2. 3. 2. 3.]
 [3. 3. 0. 2. 2. 2. 3.]
 [2. 3. 0. 1. 0. 3. 3.]
 [2. 2. 2. 1. 0. 2. 3.]
 [3. 2. 1. 1. 0. 2. 0.]]
