# 정책 이터레이션


해당 노트북은 정책 이터레이션을 그리드월드에서 구현하는 예제 소스코드 `policy_iteration.py`와 `environment.py`에 대한 정리 노트북이다.

1. `policy_iteration.py`

  Policyiteration 클래스를 포함하며, 해당 클래스에는 정책 이터레이션 함수 및 main 함수가 포함되어 있다.

2. `environment.py`

  그리드월드 예제 화면을 구성하고, 상태, 보상 등 환경에 대한 정보를 제공하기 위한 함수로 구성되어 있다. 해당 코드는 깃허브의 'rlcode/reinforcement-learning-kr-v2'로부터 가져왔다. 
  
  그리드월드의 경우 환경을 직접 만든 것이기 때문에 이러한 파일이 필요하나, 일반적으로는 이미 구축되어 있는 환경에 강화학습을 적용하는 경우가 많으므로 에이전트와 관련된 파일만 필요한 경우가 많다.

## 1. `policy_iteration.py`

main 함수를 제외한 `policy_iteration.py`는 다음과 같다.

In [2]:
import numpy as np
from environment import GraphicDisplay, Env

class Policyiteration:
  def __init__(self, env):
    self.env = env

    # 가치함수를 2차원 리스트로 초기화
    self.value_table = [[0.0] * env.width for _ in range(env.height)]

    # 모두 동일한 확률로 초기화
    self.policy_table = [[[0.25, 0.25, 0.25, 0.25]] * env.width
                         for _ in range(env.height)]

    # terminal state 설정
    self.policy_table[2][2] = []

    # 할인율 0.9
    self.discount_factor = 0.9
  
  # 벨만 기대 방정식으로 정책 평가
  def policy_evaluation(self):
    # 다음 가치함수 초기화
    next_value_table = [[0.00] * self.env.width
                        for _ in range(self.env.height)]

    # 모든 상태에 대해서 벨만 기대 방정식 계산
    for state in self.env.get_all_states():
      value = 0.0

      if state == [2,2]: 
        next_value_table[state[0]][state[1]] = value
        continue

      # 벨만 기대 방정식
      for action in self.env.possible_actions:
        next_state = self.env.state_after_action(state, action)
        reward = self.env.get_reward(state,action)
        next_value = self.get_value(next_state)

        value += (self.get_policy(state)[action] *
                  (reward + self.discount_factor * next_value))
        
      next_value_table[state[0]][state[1]] = value

    self.value_table = next_value_table

  # 현재 가치함수에 대해 탐욕 정책 발전
  def policy_improvement(self):
        next_policy = self.policy_table
        for state in self.env.get_all_states():
            if state == [2, 2]:
                continue
            
            value_list = []

            # 정책 초기화
            result = [0.0, 0.0, 0.0, 0.0]

            # 모든 행동에 대해 [보상 + (할인율 * 다음 상태 가치함수)] 계산
            for index, action in enumerate(self.env.possible_actions):
                next_state = self.env.state_after_action(state, action)
                reward = self.env.get_reward(state, action)
                next_value = self.get_value(next_state)
                value = reward + self.discount_factor * next_value
                value_list.append(value)

            # 받을 보상이 최대인 행동들에 대해 탐욕 정책 발전
            max_idx_list = np.argwhere(value_list == np.amax(value_list))
            max_idx_list = max_idx_list.flatten().tolist()
            prob = 1 / len(max_idx_list)

            for idx in max_idx_list:
                result[idx] = prob

            next_policy[state[0]][state[1]] = result

        self.policy_table = next_policy

  # 특정 상태에서 정책에 따라 무작위로 행동 반환
  def get_action(self, state):
      policy = self.get_policy(state)
      policy = np.array(policy)
      return np.random.choice(4, 1, p=policy)[0]

  # 상태에 따른 정책 반환
  def get_policy(self, state):
      return self.policy_table[state[0]][state[1]]

  # 가치함수 값을 반환
  def get_value(self, state):
      return self.value_table[state[0]][state[1]]

`policy_iteration.py`에서는 먼저 `environment.py`의 GraphicDisplay와 Env 클래스를 import 한다.

In [3]:
from environment import GraphicDisplay, Env

정책 이터레이션의 에이전트는 `policy_iteration.py` 파일 내에서 Policyiteration 클래스로 정의되어 있다. Env 클래스로 정의한 객체 env를 Policyiteration 클래스의 인수로 전달함으로써 에이전트가 환경의 Env 클래스에 접근할 수 있다.

다음 함수를 실행하면 그리드월드 화면이 나오게 된다.

In [4]:
# main 함수
if __name__ == '__main__':
  env = Env()
  policy_iteration = Policyiteration(env)
  grid_world = GraphicDisplay(policy_iteration)
  grid_world.mainloop()

: 

: 

출력되는 화면에서 Evaluation을 누르면 `policy_evaluation()` 함수가, Improvement 버튼을 누르면 `policy_improvement()` 함수가 실행된다. 즉, 두 버튼을 번갈아가면서 누르면 최적 정책에 수렴할 수 있다.

그렇게 정책이 최적 정책에 수렴이 되어 더 이상의 발전이 이루어지지 않을 때 (그리드월드는 간단한 환경이므로 수렴 이후 정책 변동이 거의 없다) Move 버튼을 누르면 `get_action(state)` 함수가 실행되며 에이전트가 정책에 따라 행동을 수행한다.

Reset 버튼을 누르면 모든 변수를 초기화시켜 다시 진행할 수 있다.

**이제 `policy_iteration.py`의 코드를 하나씩 분석해보도록 하자.**

### \_\_init\_\_


    def __init__(self, env):
        self.env = env

        # 가치함수를 2차원 리스트로 초기화
        self.value_table = [[0.0] * env.width for _ in range(env.height)]

        # 모두 동일한 확률로 초기화
        self.policy_table = [[[0.25, 0.25, 0.25, 0.25]] * env.width
                                for _ in range(env.height)]

        # terminal state 설정
        self.policy_table[2][2] = []

        # 할인율 0.9
        self.discount_factor = 0.9

먼저 Policyiteration 코드 내에서 가장 먼저 정의되는 \_\_init\_\_ 함수를 살펴보자.

해당 함수는 먼저 env를 self.env로 정의함으로써 환경에 대한 객체를 선언한다.

env 객체에 정의되어 있는 변수 및 함수는 다음과 같다.

| 코드 | 설명 | 반환값|
|-----|-----|-----|
|`env.width`, `env.height`|그리드월드의 너비와 높이|그리드월드의 가로, 세로를 정수로 반환|
|`env.state_after_action(state, action)`|state에서 action을 했을 때 에이전트가 가는 다음 상태|행동 후의 상태 좌표를 리스트로 반환|
|`env.get_all_states()`|존재하는 모든 상태|모든 상태를 반환|
|`env.get_reward(state, action)`|state의 보상|보상을 정수 형태로 반환|
|`env.possible_actions`|상, 하, 좌, 우|[0,1,2,3] 반환. 순서대로 상하좌우|

정책 이터레이션은 모든 상태에 대해 가치함수를 계산하기 때문에 `value_table`이라는 2차원 리스트 변수로 가치함수를 선언한다.

여기서 2차원 리스트의 크기는 `env.width`와 `env.height`로 정해지므로 그리드월드의 크기인 5*5이다.

모든 상태의 가치함수에 대한 초기화 값은 0이다.


```
# 가치함수를 2차원 리스트로 초기화
self.value_table = [[0.0] * env.width for _ in range(env.height)]
```

정책 `policy_table`은 모든 상태에 대해 상, 하, 좌, 우에 해당하는 각 행동의 확률을 담고 있는 리스트이다.

따라서 5 * 5 * 4의 크기를 가지는 3차원 리스트이며, 모든 행동에 대하여 같은 확률인 0.25로 초기화를 진행하였다.
 
```
# 모두 동일한 확률로 초기화
self.policy_table = [[[0.25, 0.25, 0.25, 0.25]] * env.width
                        for _ in range(env.height)]
```

마지막 상태, 즉 에이전트가 도달하고자 하는 상태를 정의하기 위해 terminal state를 정의하였다.

```
# terminal state 설정
self.policy_table[2][2] = []
```

벨만 방정식에 사용되는 할인율을 0.9로 정의해주었다.

```
# 할인율 0.9
self.discount_factor = 0.9
```

### policy_evaluation

```
# 벨만 기대 방정식으로 정책 평가
def policy_evaluation(self):
  # 다음 가치함수 초기화
  next_value_table = [[0.00] * self.env.width
                      for _ in range(self.env.height)]

  # 모든 상태에 대해서 벨만 기대 방정식 계산
  for state in self.env.get_all_states():
    value = 0.0

    if state != [2,2]: 
      continue
    else: # terminal state
      next_value_table[state[0]][state[1]] = value

    # 벨만 기대 방정식
    for action in self.env.possible_actions:
      next_state = self.env.state_after_action(state, action)
      reward = self.env.get_reward(state,action)
      next_value = self.get_value(next_state)

      value += (self.get_policy(state)[action] *
                (reward + self.discount_factor * next_value))
      
    next_value_table[state[0]][state[1]] = value

  self.value_table = next_value_table
```

정책 평가에서 에이전트는 모든 상태의 가치함수를 업데이트 한다. 이를 위해 `next_value_table`을 선언한 후 계산 결과를 저장한다. 이는 위에서 `value_table`을 선언하는 방식과 동일하게 진행된다.

```
# 다음 가치함수 초기화
next_value_table = [[0.00] * self.env.width
                    for _ in range(self.env.height)]
```

모든 상태에 대해 벨만 기대 방정식을 계산한다.

```
# 모든 상태에 대해서 벨만 기대 방정식 계산
for state in self.env.get_all_states():
    value = 0.0

    if state == [2,2]: # terminal state
        next_value_table[state[0]][state[1]] = value
        continue

    # 벨만 기대 방정식
    for action in self.env.possible_actions:
        next_state = self.env.state_after_action(state, action)
        reward = self.env.get_reward(state,action)
        next_value = self.get_value(next_state)

        value += (self.get_policy(state)[action] *
                (reward + self.discount_factor * next_value))
        
    next_value_table[state[0]][state[1]] = value
```

위의 벨만 기대 방정식을 수식으로 나타내면 다음과 같다.

$v_{k+1}(s) = \Sigma_{a\in A}\pi(a|s)(r_{(s,a)} + \gamma v_k(s'))$

$P^a_{ss'}$이 생략된 이유는 상태 변환 확률을 1로 설정했기 때문이다.

action을 취했을 때 다음 상태가 어디인지 알려주는 역할은 `env.state_after_action(state, action)`이 수행한다.

모든 상태에 대해 벨만 기대 방정식의 계산이 끝나면 현재의 `value_table`에 `next_value_table`을 덮어씌우는 방식으로 정책 평가를 진행한다.

```
self.value_table = next_value_table
```

### policy_improvement

```
# 현재 가치함수에 대해 탐욕 정책 발전
def policy_improvement(self):
    next_policy = self.policy_table
    for state in self.env.get_all_states():
        if state == [2, 2]:
            continue
        
        value_list = []

        # 정책 초기화
        result = [0.0, 0.0, 0.0, 0.0]

        # 모든 행동에 대해 [보상 + (할인율 * 다음 상태 가치함수)] 계산
        for index, action in enumerate(self.env.possible_actions):
            next_state = self.env.state_after_action(state, action)
            reward = self.env.get_reward(state, action)
            next_value = self.get_value(next_state)
            value = reward + self.discount_factor * next_value
            value_list.append(value)

        # 받을 보상이 최대인 행동들에 대해 탐욕 정책 발전
        max_idx_list = np.argwhere(value_list == np.amax(value_list))
        max_idx_list = max_idx_list.flatten().tolist()
        prob = 1 / len(max_idx_list)

        for idx in max_idx_list:
            result[idx] = prob

        next_policy[state[0]][state[1]] = result

    self.policy_table = next_policy
```

정책 평가를 통해 정책을 평가하면 그에 따른 새로운 가치함수를 얻는다.

에이전트는 이렇게 얻어진 새로운 가치함수를 이용해 정책을 업데이트한다.

정책 평가에서와 비슷하게, 정책 `policy_table`을 복사한 `next_policy`에 업데이트 된 정책을 저장한다.

```
next_policy = self.policy_table
```

이때 정책의 업데이트에는 탐욕 정책 발전 방법을 사용한다.

```
# 받을 보상이 최대인 행동들에 대해 탐욕 정책 발전
max_idx_list = np.argwhere(value_list == np.amax(value_list))
max_idx_list = max_idx_list.flatten().tolist()
prob = 1 / len(max_idx_list)
```

탐욕 정책 발전은 가치가 가장 높은 하나의 행동을 선택하는 것인데, 이것은 여러 개일 수 있다. 이 경우 각 최적 행동을 다시 동일한 확률로 선택하는 정책으로 업데이트한다.

탐욕 정책을 구하는 순서는 다음과 같다.

1. 현재 상태에서 가능한 행동에 대해 $r_{(s,a)} + \gamma v_k(s')$을 계산한다. 이 결과는 `value_list`에 저장한다.
    ```
    for index, action in enumerate(self.env.possible_actions):
        next_state = self.env.state_after_action(state, action)
        reward = self.env.get_reward(state, action)
        next_value = self.get_value(next_state)
        value = reward + self.discount_factor * next_value
        value_list.append(value)
    ```

2. `value_list`에 담긴 값 중 가장 큰 값을 max 함수를 통해 알아낸다. 그 후 numpy의 argwhere 함수를 통해 가장 큰 값의 index를 알아낸다.
    ```
    max_idx_list = np.argwhere(value_list == np.amax(value_list))
    ```

3. `max_idx_list`에 담긴 값이 여러 개라면(최적 행동이 여러 개라면) 에이전트는 해당 리스트 내 index들을 동일한 확률로 선택한다. 이를 구현하기 위해 1을 `max_idx_list`의 길이로 나누어 행동의 확률을 계산한다. 이후 해당 행동에 계산한 확률값을 저장한다.

    ```
    max_idx_list = max_idx_list.flatten().tolist()
    prob = 1 / len(max_idx_list)

    for idx in max_idx_list:
        result[idx] = prob
    ```

### get_action

```  
# 특정 상태에서 정책에 따라 무작위로 행동 반환
def get_action(self, state):
    policy = self.get_policy(state)
    policy = np.array(policy)
    return np.random.choice(4, 1, p=policy)[0]
```

에이전트가 정책에 따라 움직이려면 특정 상태에서 어떤 행동을 해야할지 알아야 하는데, 그 역할을 하는 것이 `get_action` 함수이다.

정책은 각 행동을 할 확률이므로 확률에 기반하여 행동을 선택해야하고, 이를 수행하는 것이 `np.random.choice`이다.

첫 번째 인자는 행동의 개수이고 두 번째 인자는 몇 개의 행동을 샘플링할지 정하며, 세 번째 인자는 각 행동을 샘플링할 확률로, policy이다.

```
return np.random.choice(4, 1, p=policy)[0]
```

### get_policy, get_value

```
# 상태에 따른 정책 반환
def get_policy(self, state):
    return self.policy_table[state[0]][state[1]]

# 가치함수 값을 반환
def get_value(self, state):
    return self.value_table[state[0]][state[1]]
```

`get_policy`의 경우 `self.policy_table`로 저장되어 있는 정책에서 해당 상태에 대한 정책을 반환한다.

`get_value`의 경우 `self.value_table`로 저장되어 있는 가치함수에서 해당 상태에 해당하는 가치함수를 반환한다.