In [52]:
import numpy as np

In [53]:
class GridWorld:
    '''
    Environment, Grid 4x4
    '''
    def __init__(self):
        # State Transition Probability가 무조건 1이다.
        self.state = np.zeros([4, 4]) # 4x4 Grid
        self.action_space = [0, 1, 2, 3] # U, D, L, R
        
        self.goal_pos = {
            'y': 3,
            'x': 3
        }
        
        self.gamma = 0.9
        
        self.y_min, self.x_min, self.y_max, self.x_max = 0, 0, 3, 3
    
    def reset(self):
        self.state = np.zeros([4, 4])
        return self.state
    
    def s_next(self, s_t, a_t):
        '''
        Args:
            s_t: (tuple) Environment에서 위치를 의미한다. (row, col)형태로 들어온다.
            a_t: [0, 3] 범위의 int 값으로 어떤 Action을 취하는지 의미한다.
        Return:
            pos: (list) Action에 따라 state를 이동한 후, state의 위치를 list에 담아 리턴.
        '''
        pos = list(s_t) # Tuple to List for edit
        
        # Action에 따라 state를 이동하면서 Boundary를 넘지 않도록 한다.
        if a_t == 0:
            pos[0] = max(s_t[0]-1, self.y_min)
        elif a_t == 1:
            pos[0] = min(s_t[0]+1, self.y_max)
        elif a_t == 2:
            pos[1] = max(s_t[1]-1, self.x_min)
        elif a_t == 3:
            pos[1] = min(s_t[1]+1, self.x_max)
        else:
            raise AssertionError("Invalid Action")
        
        return pos
    
    def reward(self, s_t, a_t, s_next):
        '''
        state(s_t)가 Goal에 들어가면 Reward 0 리턴
        그게 아니라면 모든 Reward는 -1
        '''
        if (s_t[0] == self.goal_pos['y'] and s_t[1] == self.goal_pos['x']):
            return 0
        else:
            return -1

# Policy Evaluation
* 어느 Policy를 평가하려면, 해당 Policy를 통해 각 State의 Value function을 구하면
Policy의 가치가 어느 정도인지 판단할 수 있다.

* 그러면, Value function은 어떻게 구할 것인가? Bellman equation을 통해 다음 state의
Value function과의 점화식을 도출했다면 recursive하게 구할 수 있다.

* 하지만, recursive하게 구하는 것은 너무 비효율적이고 계산하기 어렵다.
그래서, 제안된 방식이 iterative한 방식인데 이는 더 직관적이며 효율적이다.

* 또한, iterative한 방식이 작동할 수 있는 이유는 (각 V(s)가 특정 값에 근사할 수 있는 이유는)
Bellman equation의 Contraction mapping이라는 성질 때문이다.
이는 어떤 값이 갱신 될 때, 값들이 점점 더 가까워지도록 만드는 함수이다.

In [54]:
def policy_evaluation(env, policy):
    '''
    위 Markdown 참고
    Contraction Mapping에 의해 특정 값에 근사되기 때문에 
    이 변화 값이 theta보다 작을 때, 멈추도록 한다.
    '''
    delta = 1 # 이건 Initial Value, 1인 이유나 이런 거 상괍 없음.
    theta = 0.01
    
    loop_count = 0
    while delta >= theta:
        delta = 0 # 여기서 초기화 되기 때문에
        y, x = env.state.shape
        new_state = np.zeros((y, x)) # 여기에 각 State에 대한 V(s) 넣을 거임
        
        for i in range(y*x-1):
            # V(s)를 구할 State 가져오기
            v_s = 0
            s_t = np.divmod(i, y)
            
            for a_t in env.action_space:
                # 탐색 중인 State의 Policy 가져오기 + 특정 Action에 대한 Prob
                pi_a = policy[i][a_t]
                p_ss = 1.0 # state transition probability has only 1
                
                # Action에 따른 다음 State와 현재 state에 대한 reward
                s_t1 = env.s_next(s_t, a_t)
                reward = env.reward(s_t, a_t, s_t1)
                
                v_s = v_s + pi_a * p_ss * (reward + env.gamma * env.state[s_t1[0], s_t1[1]])
                
            new_state[s_t[0], s_t[1]] = v_s
        
        value_delta = np.sum(np.abs(new_state - env.state))
        env.state = new_state # V(s) 갱신
        
        delta = max(delta, value_delta)
        
        loop_count += 1
        print(f"(Policy Evaluation)[{loop_count:03d}] Delta: {delta}")
        
    return env.state

In [55]:
env = GridWorld()

policy = [np.array([0.25]*4) for _ in range(16)]

res = policy_evaluation(env, policy)

(Policy Evaluation)[001] Delta: 15.0
(Policy Evaluation)[002] Delta: 13.049999999999999
(Policy Evaluation)[003] Delta: 11.44125
(Policy Evaluation)[004] Delta: 10.046531250000005
(Policy Evaluation)[005] Delta: 8.836846875000003
(Policy Evaluation)[006] Delta: 7.777860468750003
(Policy Evaluation)[007] Delta: 6.850606640625003
(Policy Evaluation)[008] Delta: 6.0358126133056675
(Policy Evaluation)[009] Delta: 5.319780200925291
(Policy Evaluation)[010] Delta: 4.6894928129728095
(Policy Evaluation)[011] Delta: 4.134661461056415
(Policy Evaluation)[012] Delta: 3.6458383469337097
(Policy Evaluation)[013] Delta: 3.215154091310108
(Policy Evaluation)[014] Delta: 2.8355211386587618
(Policy Evaluation)[015] Delta: 2.500874475079919
(Policy Evaluation)[016] Delta: 2.2058087148637213
(Policy Evaluation)[017] Delta: 1.9456324929596231
(Policy Evaluation)[018] Delta: 1.7161873178144953
(Policy Evaluation)[019] Delta: 1.5138369561042664
(Policy Evaluation)[020] Delta: 1.3353668589267293
(Policy Eva

In [56]:
print(res.reshape(4, 4))

[[-9.35537351 -9.21356672 -8.96998585 -8.74548904]
 [-9.21356672 -8.96858758 -8.49571346 -7.96588579]
 [-8.96998585 -8.49571346 -7.41217763 -5.75452192]
 [-8.74548904 -7.96588579 -5.75452192  0.        ]]


# Policy Improvement (Greedy)
* $\pi'(s)=argmax_aq_{pi}(s,a)$, 이 식이 전부다.
$$ \pi'(s)=\begin{cases}1 & \text{if}\:a=argmax_aq_{\pi_k}(s,a)\\0 & \text{else}\end{cases} $$

In [57]:
def policy_improvement(env, policy):
    gamma = 1.0
    y, x = env.state.shape
    for i in range(y*x):
        s_t = np.divmod(i, y)
        action_values = np.zeros(len(env.action_space))
        
        # 어느 State에서 각 Action에 대한 Action-Value 구하기
        for a in env.action_space:
            action_value = 0
            p_s_next = 1.0 # state transition probability
            s_next = env.s_next(s_t, a)
            reward = env.reward(s_t, a, s_next)
            
            # 여기서 Policy Evaluation을 통해 각 State의 Value function을 
            # 구해두었기 때문에 Action Value를 구할 수 있다.
            action_value = p_s_next * (reward + env.gamma * env.state[s_next[0], s_next[1]])
            action_values[a] = action_value
        
        # 여기가 핵심(위 Markdown의 수식)
        a_max = action_values.argmax()
        policy[i][:] = 0
        policy[i][a_max] = 1
    return policy

# Policy Iteration
* Evaluation을 통해 현재 Policy에 대한 평가를 하고,
* Improvement를 통해 현재 Policy를 더 나은 방향으로 개선시킨다.
* 이 과정을 반복하면서 Policy Evaluation을 통해 나온 Value function 값이 더 이상 변화가 되지 않는다면,
* 제일 Optimal한 Policy가 되었다는 의미이기 때문에 Iteration을 멈춘다.

In [58]:
env = GridWorld()

policy = [np.array([0.25]*4) for _ in range(16)]

# Policy Iteration
value_vector = np.zeros(16)
delta, Delta, cnt = 5, 0, 0

while True:
    env.reset() # Value function 초기화의 목적
    new_value_vector = policy_evaluation(env, policy)
    Delta = 0
    Delta = max(Delta, np.sum(np.abs(new_value_vector[0]-value_vector[0])))
    
    if Delta >= delta:
        # Update 더 해야 함.
        value_vector = new_value_vector # 갱신
        policy = policy_improvement(env, policy)
        cnt += 1
        print(f"->(Policy Improvement) [{cnt:03d}] Delta: {Delta}")
    else:
        # Update 그만
        cnt += 1
        print(f"->(Policy Improvement) [{cnt:03d}] Delta: {Delta}")
        break

(Policy Evaluation)[001] Delta: 15.0
(Policy Evaluation)[002] Delta: 13.049999999999999
(Policy Evaluation)[003] Delta: 11.44125
(Policy Evaluation)[004] Delta: 10.046531250000005
(Policy Evaluation)[005] Delta: 8.836846875000003
(Policy Evaluation)[006] Delta: 7.777860468750003
(Policy Evaluation)[007] Delta: 6.850606640625003
(Policy Evaluation)[008] Delta: 6.0358126133056675
(Policy Evaluation)[009] Delta: 5.319780200925291
(Policy Evaluation)[010] Delta: 4.6894928129728095
(Policy Evaluation)[011] Delta: 4.134661461056415
(Policy Evaluation)[012] Delta: 3.6458383469337097
(Policy Evaluation)[013] Delta: 3.215154091310108
(Policy Evaluation)[014] Delta: 2.8355211386587618
(Policy Evaluation)[015] Delta: 2.500874475079919
(Policy Evaluation)[016] Delta: 2.2058087148637213
(Policy Evaluation)[017] Delta: 1.9456324929596231
(Policy Evaluation)[018] Delta: 1.7161873178144953
(Policy Evaluation)[019] Delta: 1.5138369561042664
(Policy Evaluation)[020] Delta: 1.3353668589267293
(Policy Eva