In [1]:
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm
import time
import copy

### 그림그리는 함수

In [2]:
def show_v_table_small(v_table, env):
    for i in range(env.reward.shape[0]):        
        print("+----------"*env.reward.shape[1])
        print("|", end="")
        for j in range(env.reward.shape[1]):
            print("{0:8.2f}  |".format(v_table[i,j]),end="")
        print()
    print("+----------"*env.reward.shape[1])

# V table 그리기    
def show_v_table(v_table, env):    
    for i in range(env.reward.shape[0]):        
        print("+-----------------"*env.reward.shape[1],end="")
        print("+")
        for k in range(3):
            print("|",end="")
            for j in range(env.reward.shape[1]):
                if k==0:
                    print("                 |",end="")
                if k==1:
                        print("   {0:8.2f}      |".format(v_table[i,j]),end="")
                if k==2:
                    print("                 |",end="")
            print()
    print("+-----------------"*env.reward.shape[1],end="")
    print("+")
    
# Q table 그리기
def show_q_table(q_table,env):
    for i in range(env.reward.shape[0]):
        print("+-----------------"*env.reward.shape[1],end="")
        print("+")
        for k in range(3):
            print("|",end="")
            for j in range(env.reward.shape[1]):
                if k==0:
                    print("{0:10.2f}       |".format(q_table[i,j,0]),end="")
                if k==1:
                    print("{0:6.2f}    {1:6.2f} |".format(q_table[i,j,3],q_table[i,j,1]),end="")
                if k==2:
                    print("{0:10.2f}       |".format(q_table[i,j,2]),end="")
            print()
    print("+-----------------"*env.reward.shape[1],end="")
    print("+")
    

# 정책 policy 화살표로 그리기
def show_q_table_arrow(q_table,env):
    for i in range(env.reward.shape[0]):        
        print("+-----------------"*env.reward.shape[1],end="")
        print("+")
        for k in range(3):
            print("|",end="")
            for j in range(env.reward.shape[1]):
                if k==0:
                    if np.max(q[i,j,:]) == q[i,j,0]:
                        print("        ↑       |",end="")
                    else:
                        print("                 |",end="")
                if k==1:                    
                    if np.max(q[i,j,:]) == q[i,j,1] and np.max(q[i,j,:]) == q[i,j,3]:
                        print("      ←  →     |",end="")
                    elif np.max(q[i,j,:]) == q[i,j,1]:
                        print("          →     |",end="")
                    elif np.max(q[i,j,:]) == q[i,j,3]:
                        print("      ←         |",end="")
                    else:
                        print("                 |",end="")
                if k==2:
                    if np.max(q[i,j,:]) == q[i,j,2]:
                        print("        ↓       |",end="")
                    else:
                        print("                 |",end="")
            print()
    print("+-----------------"*env.reward.shape[1],end="")
    print("+")    
    
# 정책 policy 화살표로 그리기
def show_policy_small(policy,env):
    for i in range(env.reward.shape[0]):        
        print("+----------"*env.reward.shape[1],end="")
        print("+")
        print("|", end="")
        for j in range(env.reward.shape[1]):
            if env.reward_list1[i][j] == "road":
                if policy[i,j] == 0:
                    print("   ↑     |",end="")
                elif policy[i,j] == 1:
                    print("   →     |",end="")
                elif policy[i,j] == 2:
                    print("   ↓     |",end="")
                elif policy[i,j] == 3:
                    print("   ←     |",end="")
            else:
                print("          |",end="")
        print()
    print("+----------"*env.reward.shape[1],end="")
    print("+")
    
# 정책 policy 화살표로 그리기
def show_policy(policy,env):
    for i in range(env.reward.shape[0]):        
        print("+-----------------"*env.reward.shape[1],end="")
        print("+")
        for k in range(3):
            print("|",end="")
            for j in range(env.reward.shape[1]):
                if k==0:
                    print("                 |",end="")
                if k==1:
                    if policy[i,j] == 0:
                        print("      ↑         |",end="")
                    elif policy[i,j] == 1:
                        print("      →         |",end="")
                    elif policy[i,j] == 2:
                        print("      ↓         |",end="")
                    elif policy[i,j] == 3:
                        print("      ←         |",end="")
                if k==2:
                    print("                 |",end="")
            print()
    print("+-----------------"*env.reward.shape[1],end="")
    print("+")

### Agent 구현

In [3]:
class Agent():
    
    # 1. 행동에 따른 에이전트의 좌표 이동(위, 오른쪽, 아래, 왼쪽) 
    action = np.array([[-1,0],[0,1],[1,0],[0,-1]])
    
    # 2. 각 행동별 선택확률
    select_action_pr = np.array([0.25,0.25,0.25,0.25])
    
    # 3. 에이전트의 초기 위치 저장
    def __init__(self):
        self.pos = (0,0)
    
    # 4. 에이전트의 위치 저장
    def set_pos(self,position):
        self.pos = position
        return self.pos
    
    # 5. 에이전트의 위치 불러오기
    def get_pos(self):
        return self.pos

### Environment 구현

In [4]:
class Environment():
    
    # 1. 미로밖(절벽), 길, 목적지와 보상 설정
    cliff = -3
    road = -1
    goal = 1
    
    # 2. 목적지 좌표 설정
    goal_position = [2,2]
    
    # 3. 보상 리스트 숫자
    reward_list = [[road,road,road],
                   [road,road,road],
                   [road,road,goal]]
    
    # 4. 보상 리스트 문자
    reward_list1 = [["road","road","road"],
                    ["road","road","road"],
                    ["road","road","goal"]]
    
    # 5. 보상 리스트를 array로 설정
    def __init__(self):
        self.reward = np.asarray(self.reward_list)    

    # 6. 선택된 에이전트의 행동 결과 반환 (미로밖일 경우 이전 좌표로 다시 복귀)
    def move(self, agent, action):
        
        done = False
        
        # 6.1 행동에 따른 좌표 구하기
        new_pos = agent.pos + agent.action[action]
        
        # 6.2 현재좌표가 목적지 인지확인
        if self.reward_list1[agent.pos[0]][agent.pos[1]] == "goal":
            reward = self.goal
            observation = agent.set_pos(agent.pos)
            done = True
        # 6.3 이동 후 좌표가 미로 밖인 확인    
        elif new_pos[0] < 0 or new_pos[0] >= self.reward.shape[0] or new_pos[1] < 0 or new_pos[1] >= self.reward.shape[1]:
            reward = self.cliff
            observation = agent.set_pos(agent.pos)
            done = True
        # 6.4 이동 후 좌표가 길이라면
        else:
            observation = agent.set_pos(new_pos)
            reward = self.reward[observation[0],observation[1]]
            
        return observation, reward, done

## 정책 평가, 정책 개선

여기서 policy 변수는 각 상태에 대하여 어떠한 행동을 취할 지 저장하는 행렬로 정의하였다.

0, 1, 2, 3 = 위, 오른쪽, 아래, 왼쪽

예:

policy[0, 0] = 1 의 뜻 : 상태 (0, 0) 에서 오른쪽으로 이동하겠다

policy[1, 2] = 3 의 뜻 : 상태 (1, 2) 에서 왼쪽으로 이동하게 하겠다.

수식으로 표현하자면 policy[i, j] = a 라는 뜻은

$$\pi(a|s_{ij}) = 1, \pi(a'|s_{ij}) = 0\text{  }(a'\in A, a'\neq a)$$

### 정책평가

In [5]:
# V table 갱신 함수
def policy_evalution(env, agent, v_table, policy):
    gamma = 0.9
    while(True):
        # Δ←0
        delta = 0
        #  v←𝑉(𝑠)
        temp_v = copy.deepcopy(v_table)
        # 모든 𝑠∈𝑆에 대해 :
        for i in range(env.reward.shape[0]):
            for j in range(env.reward.shape[1]):
                # 에이전트를 지정된 좌표에 위치시킨후 가치함수를 계산
                agent.set_pos([i,j])
                # 현재 정책의 행동을 선택
                action = policy[i,j]
                observation, reward, done = env.move(agent, action)
                v_table[i,j] = reward + gamma * v_table[observation[0],observation[1]]
        # ∆←max⁡(∆,|v−𝑉(𝑠)|)
        # 계산전과 계산후의 가치의 차이를 계산
        delta = np.max([delta, np.max(np.abs(temp_v-v_table))])  
                
        # 7. ∆ <𝜃가 작은 양수 일 때까지 반복
        if delta < 0.000001:
            break
    return v_table, delta


### 정책개선

In [6]:
# policy 갱신 함수
def policy_improvement(env, agent, v_table, policy):

    gamma = 0.9  
    
    # policyStable ← true 
    policyStable = True

    # 모든 s∈S에 대해：
    for i in range(env.reward.shape[0]):
        for j in range(env.reward.shape[1]):            
            # 𝑜𝑙𝑑−𝑎𝑐𝑡𝑖𝑜𝑛←π(s) 
            old_action = policy[i,j]            
            # 가능한 행동중 최댓값을 가지는 행동을 선택
            temp_action = 0
            temp_value =  -1e+10           
            for action in range(len(agent.action)):
                agent.set_pos([i,j])
                observation, reward, done = env.move(agent,action)
                if temp_value < reward + gamma * v_table[observation[0],observation[1]]:
                    temp_action = action
                    temp_value = reward + gamma * v_table[observation[0],observation[1]]
            # 만약 𝑜𝑙𝑑−𝑎𝑐𝑡𝑖𝑜𝑛"≠π(s)"라면， "policyStable ← False" 
            # old-action과 새로운 action이 다른지 체크
            if old_action != temp_action :
                policyStable = False
            policy[i,j] = temp_action
    return policy, policyStable


### 정책 반복 (정책 평가 <=> 정책 개선)

In [7]:
# 정책 반복
# 환경과 에이전트에 대한 초기 설정
np.random.seed(0)
env = Environment()
agent = Agent()

# 1. 초기화
# 모든 𝑠∈𝑆에 대해 𝑉(𝑠)∈𝑅과 π(𝑠)∈𝐴(𝑠)를 임의로 설정

#shape : [h, w]
v_table =  np.random.rand(env.reward.shape[0], env.reward.shape[1])

#shape : [h, w]
#값 : 해당 상태에서 어떠한 행동을 취할 것인지 나타내는 정수
policy = np.random.randint(0, 4,(env.reward.shape[0], env.reward.shape[1]))

print("Initial random V(S)")
show_v_table(np.round(v_table,2),env)
print()
print("Initial random Policy π0(S)")
show_policy(policy,env)
print("start policy iteration")

# 시작 시간을 변수에 저장
start_time = time.time()

max_iter_number = 20000
for iter_number in range(max_iter_number):
    
    # 2.정책평가
    v_table, delta = policy_evalution(env, agent, v_table, policy)

    # 정책 평가 후 결과 표시                                            
    print("")
    print("Vπ{0:}(S) delta = {1:.10f}".format(iter_number,delta))
    show_v_table(np.round(v_table,2),env)
    print()    
    
    
    # 3.정책개선
    policy, policyStable = policy_improvement(env, agent, v_table, policy)

    # policy 변화 저장
    print("policy π{}(S)".format(iter_number+1))
    show_policy(policy,env)
    # 하나라도 old-action과 새로운 action이 다르다면 '2. 정책평가'를 반복
    if(policyStable == True):
        break

        
print("total_time = {}".format(time.time()-start_time))

Initial random V(S)
+-----------------+-----------------+-----------------+
|                 |                 |                 |
|       0.55      |       0.72      |       0.60      |
|                 |                 |                 |
+-----------------+-----------------+-----------------+
|                 |                 |                 |
|       0.54      |       0.42      |       0.65      |
|                 |                 |                 |
+-----------------+-----------------+-----------------+
|                 |                 |                 |
|       0.44      |       0.89      |       0.96      |
|                 |                 |                 |
+-----------------+-----------------+-----------------+

Initial random Policy π0(S)
+-----------------+-----------------+-----------------+
|                 |                 |                 |
|      ↓         |      →         |      ↓         |
|                 |                 |                 |
+-

### 에피소드 생성


In [8]:
def generate_episode(env, agent, *args, **kwargs):
    gamma = 0.9
    # 에피소드를 저장할 리스트
    episode = []
    # 이전에 방문여부 체크
    
    
    # 에이전트가 모든 상태에서 출발할 수 있게 출발지점을 무작위로 설정
    i = np.random.randint(0,env.reward.shape[0])
    j = np.random.randint(0,env.reward.shape[1])
    agent.set_pos([i,j])    
    
    #에피소드의 수익을 초기화
    G = 0
    
    #감쇄율의 지수
    step = 0
    max_step = 100
    
    # 에피소드 생성
    for k in range(max_step):
        pos = agent.get_pos()            
        action = np.random.randint(0,len(agent.action))            
        observaetion, reward, done = env.move(agent, action)
        #[position, action, reward, dummy_G_value = 0]
        episode.append([pos, action, reward, 0])

        # 에피소드가 종료했다면 루프에서 탈출
        if done == True:                
            break
            
    # episode 순간마다 G값 구하기
    for ep_i in range(len(episode)-1, -1, -1):
        G = G*gamma + episode[ep_i][2] # + reward
        episode[ep_i][3] = G
            
    return i, j, G, episode

In [9]:
#에피소드 생성 실험
np.random.seed(0)

for i in range(10):
  print("===episode %d===" % (i+1))
  _, _, G, episode = generate_episode(env, agent, True)
  total_reward = 0
  for where, action, reward, G_s in episode:
    print(where,"", "a:", ["up", "right", "down", "left"][action], "reward:", reward)
    total_reward += reward
  print("total reward:", total_reward)
  print("G(감가율 적용한 가치): ", G)

===episode 1===
[0, 1]  a: up reward: -3
total reward: -3
G(감가율 적용한 가치):  -3.0
===episode 2===
[1, 1]  a: down reward: -1
[2 1]  a: up reward: -1
[1 1]  a: left reward: -1
[1 0]  a: down reward: -1
[2 0]  a: up reward: -1
[1 0]  a: up reward: -1
[0 0]  a: up reward: -3
total reward: -9
G(감가율 적용한 가치):  -6.2799130000000005
===episode 3===
[2, 1]  a: down reward: -3
total reward: -3
G(감가율 적용한 가치):  -3.0
===episode 4===
[2, 0]  a: right reward: -1
[2 1]  a: right reward: 1
[2 2]  a: right reward: 1
total reward: 1
G(감가율 적용한 가치):  0.71
===episode 5===
[1, 0]  a: right reward: -1
[1 1]  a: up reward: -1
[0 1]  a: left reward: -1
[0 0]  a: up reward: -3
total reward: -6
G(감가율 적용한 가치):  -4.897
===episode 6===
[1, 2]  a: left reward: -1
[1 1]  a: left reward: -1
[1 0]  a: up reward: -1
[0 0]  a: down reward: -1
[1 0]  a: left reward: -3
total reward: -7
G(감가율 적용한 가치):  -5.4073
===episode 7===
[0, 1]  a: left reward: -1
[0 0]  a: right reward: -1
[0 1]  a: left reward: -1
[0 0]  a: left reward: 

### First-visit and Every-Visit MC Prediction

#### -- "프로그래머를 위한 강화학습" 에는 없는 내용 --

First-visit MC는 (에피소드 내) 상태 s의 첫 방문시의 반환값만 고려하는 방법

Every-visit MC는 (에피소드 내) 상태 s의 모든 방문의 반환값을 고려하는 방법

In [10]:
# first-visit MC and every-visit MC prediction
np.random.seed(0)
# 환경, 에이전트를 초기화
env = Environment()
agent = Agent()

# 임의의 상태 가치 함수𝑉
v_table = np.zeros((env.reward.shape[0], env.reward.shape[1]))

# 상태별로 에피소드 출발횟수를 저장하는 테이블
v_start = np.zeros((env.reward.shape[0], env.reward.shape[1]))

# 상태별로 도착지점 도착횟수를 저장하는 테이블
v_success = np.zeros((env.reward.shape[0], env.reward.shape[1]))

# 𝑅𝑒𝑡𝑢𝑟𝑛(𝑠)←빈 리스트 (모든 s∈𝑆에 대해)
Return_s = [[[] for j in range(env.reward.shape[1])] for i in range(env.reward.shape[0])]

# 최대 에피소드 수를 지정
max_episode = 100000

# first visit 를 사용할지 every visit를 사용할 지 결정
# first_visit = True : first visit
# first_visit = False : every visit

for first_visit in [True, False]:
    if first_visit:
        print("start first visit MC")
    else : 
        print("start every visit MC")
    print()



    for epi in tqdm(range(max_episode)):

        i,j,G,episode = generate_episode(env, agent)

        visit = np.zeros((env.reward.shape[0], env.reward.shape[1]))
        v_start[i,j] += 1
        for where, action, reward, G_s in episode:
            s_i, s_j = where
            if first_visit and visit[s_i][s_j] == 1:
                continue
            visit[s_i][s_j] = 1
            Return_s[s_i][s_j].append(G_s)


        ## 수익 𝐺를 𝑅𝑒𝑡𝑢𝑟𝑛(𝑠)에 추가(append)
        #Return_s[i][j].append(G)
        #
        ## 에피소드 발생 횟수 계산
        #episode_count = len(Return_s[i][j])
        ## 상태별 발생한 수익의 총합 계산
        #total_G = np.sum(Return_s[i][j])
        ## 상태별 발생한 수익의 평균 계산
        #v_table[i,j] = total_G / episode_count

        # 도착지점에 도착(reward = 1)했는지 체크    
        # episode[-1][2] : 에피소드 마지막 상태의 보상
        if episode[-1][2] == 1:
            v_success[i,j] += 1


    # 에피소드 출발 횟수 저장 
    for i in range(env.reward.shape[0]):
        for j in range(env.reward.shape[1]):
            visit_count = len(Return_s[i][j])
            total_G = np.sum(Return_s[i][j])
            v_table[i,j] = total_G / visit_count

    print("V(s)")
    show_v_table(np.round(v_table,2),env)
    print("V_start_count(s)")
    show_v_table(np.round(v_start,2),env)
    print("V_success_pr(s)")
    show_v_table(np.round(v_success/v_start,2),env)


start first visit MC



100%|███████████████████████████████████████████████████████████████████████| 100000/100000 [00:03<00:00, 25901.22it/s]


V(s)
+-----------------+-----------------+-----------------+
|                 |                 |                 |
|      -3.80      |      -3.99      |      -3.44      |
|                 |                 |                 |
+-----------------+-----------------+-----------------+
|                 |                 |                 |
|      -4.01      |      -3.89      |      -2.42      |
|                 |                 |                 |
+-----------------+-----------------+-----------------+
|                 |                 |                 |
|      -3.45      |      -2.42      |       1.00      |
|                 |                 |                 |
+-----------------+-----------------+-----------------+
V_start_count(s)
+-----------------+-----------------+-----------------+
|                 |                 |                 |
|   11302.00      |   11148.00      |   11021.00      |
|                 |                 |                 |
+-----------------+-------

100%|███████████████████████████████████████████████████████████████████████| 100000/100000 [00:04<00:00, 24727.88it/s]


V(s)
+-----------------+-----------------+-----------------+
|                 |                 |                 |
|      -3.81      |      -4.01      |      -3.45      |
|                 |                 |                 |
+-----------------+-----------------+-----------------+
|                 |                 |                 |
|      -4.02      |      -3.91      |      -2.44      |
|                 |                 |                 |
+-----------------+-----------------+-----------------+
|                 |                 |                 |
|      -3.45      |      -2.44      |       1.00      |
|                 |                 |                 |
+-----------------+-----------------+-----------------+
V_start_count(s)
+-----------------+-----------------+-----------------+
|                 |                 |                 |
|   22405.00      |   22257.00      |   22302.00      |
|                 |                 |                 |
+-----------------+-------

### Incremental mean (증분 평균) 을 이용하는 몬테카를로 Prediction 알고리즘

In [11]:
#Incremental mean 을 이용하는 몬테카를로 Prediction 알고리즘
np.random.seed(0)
# 환경, 에이전트를 초기화
env = Environment()
agent = Agent()

# 임의의 상태 가치 함수𝑉
v_table = np.zeros((env.reward.shape[0], env.reward.shape[1]))

# 추가
# 상태를 방문한 횟수를 저장하는 테이블
v_visit = np.zeros((env.reward.shape[0], env.reward.shape[1]))

# 삭제
# # 𝑅𝑒𝑡𝑢𝑟𝑛(𝑠)←빈 리스트 (모든 s∈𝑆에 대해) : 
# Return_s = [[[] for j in range(env.reward.shape[1])] for i in range(env.reward.shape[0])]

# 최대 에피소드 수와 에피소드 최대 길이지정
max_episode = 100000

# first visit을 사용할지 every visit을 사용할 지 결정
# first_visit = True : first visit
# first_visit = False : every visit
for first_visit in [True, False]:
    if first_visit:
        print("start first visit MC")
    else : 
        print("start every visit MC")
    print()

    for epi in tqdm(range(max_episode)):

        i,j,G,episode = generate_episode(env, agent, first_visit)

        visit = np.zeros((env.reward.shape[0], env.reward.shape[1]))
        v_start[i,j] += 1
        for where, action, reward, G_s in episode:
            s_i, s_j = where
            if first_visit and visit[s_i][s_j] == 1:
                continue
            visit[s_i][s_j] = 1

            v_visit[s_i,s_j] += 1
            v_table[s_i,s_j] += 1 / v_visit[s_i,s_j] * (G_s - v_table[s_i,s_j])




    print("V(s)")
    show_v_table(np.round(v_table,2),env)
    print()

start first visit MC



100%|███████████████████████████████████████████████████████████████████████| 100000/100000 [00:05<00:00, 17666.80it/s]


V(s)
+-----------------+-----------------+-----------------+
|                 |                 |                 |
|      -3.80      |      -3.99      |      -3.44      |
|                 |                 |                 |
+-----------------+-----------------+-----------------+
|                 |                 |                 |
|      -4.01      |      -3.89      |      -2.42      |
|                 |                 |                 |
+-----------------+-----------------+-----------------+
|                 |                 |                 |
|      -3.45      |      -2.42      |       1.00      |
|                 |                 |                 |
+-----------------+-----------------+-----------------+

start every visit MC



100%|███████████████████████████████████████████████████████████████████████| 100000/100000 [00:06<00:00, 15752.53it/s]

V(s)
+-----------------+-----------------+-----------------+
|                 |                 |                 |
|      -3.81      |      -4.01      |      -3.45      |
|                 |                 |                 |
+-----------------+-----------------+-----------------+
|                 |                 |                 |
|      -4.02      |      -3.91      |      -2.44      |
|                 |                 |                 |
+-----------------+-----------------+-----------------+
|                 |                 |                 |
|      -3.45      |      -2.44      |       1.00      |
|                 |                 |                 |
+-----------------+-----------------+-----------------+






## 모든 코드를 실행해 보고 출력 결과를 얻어보자.