# 练习三 动态规划进行策略评估、策略迭代和价值迭代

## 1 小型方格世界MDP建模

In [2]:
# 4*4 方格状态命名
# 状态0和15为终止状态
#  0  1  2  3
#  4  5  6  7
#  8  9 10  11
# 12 13 14  15

In [3]:
S = [i for i in range(16)] # 状态空间
A = ["n", "e", "s", "w"] # 行为空间
# P,R,将由dynamics动态生成

ds_actions = {"n": -4, "e": 1, "s": 4, "w": -1} # 行为对状态的改变, 环境动力学

def dynamics(s, a): # 环境动力学
    '''模拟小型方格世界的环境动力学特征
    Args:
        s 当前状态 int 0 - 15
        a 行为 str in ['n','e','s','w'] 分别表示北、东、南、西
    Returns: tuple (s_prime, reward, is_end)
        s_prime 后续状态
        reward 奖励值
        is_end 是否进入终止状态
    '''
    s_prime = s
    if (s%4 == 0 and a == "w") or (s<4 and a == "n") \
        or ((s+1)%4 == 0 and a == "e") or (s > 11 and a == "s")\
        or s in [0, 15]:
        # 原地不动
        # 在最左边，往西走
        # 在最上边，往北走
        # 在最右边，往东走
        # 在最下边，往南走
        pass
    else:
        ds = ds_actions[a]
        s_prime = s + ds
    reward = 0 if s in [0, 15] else -1 # 状态0和15的reward=0，其余位置的reward=-1
    is_end = True if s in [0, 15] else False # 状态0和15为终止状态
    return s_prime, reward, is_end

def P(s, a, s1): # 状态转移概率函数
    # deterministic action
    s_prime, _, _ = dynamics(s, a)
    return s1 == s_prime

def R(s, a): # 奖励函数
    _, r, _ = dynamics(s, a)
    return r

gamma = 1.00
MDP = S, A, R, P, gamma

## 2 策略

In [4]:
def uniform_random_pi(MDP = None, V = None, s = None, a = None):
    # uniform pi,等概率
    _, A, _, _, _ = MDP
    n = len(A)
    return 0 if n == 0 else 1.0/n

def greedy_pi(MDP, V, s, a):
    # 贪婪policy, depends on v(s) value
    S, A, P, R, gamma = MDP
    max_v, a_max_v = -float('inf'), []
    for a_opt in A:# 统计后续状态的最大价值以及到达到达该状态的行为（可能不止一个）
        s_prime, reward, _ = dynamics(s, a_opt) # immediate reward和后续状态
        v_s_prime = get_value(V, s_prime) # 后续状态的v(s')
        if v_s_prime > max_v:
            max_v = v_s_prime
            a_max_v = [a_opt]
        elif(v_s_prime == max_v):
            a_max_v.append(a_opt)
    n = len(a_max_v)
    if n == 0: return 0.0
    return 1.0/n if a in a_max_v else 0.0 # input action a为argmax_a返回的action

def get_pi(Pi, s, a, MDP = None, V = None):
    # return pi(a|s)，为在state s下采取action a的概率
    return Pi(MDP, V, s, a)

In [5]:
# 辅助函数
def get_prob(P, s, a, s1): # 获取状态转移概率
    # P_ss1^a
    return P(s, a, s1)

def get_reward(R, s, a): # 获取奖励值
    return R(s, a)

def set_value(V, s, v): # 设置价值字典
    V[s] = v
    
def get_value(V, s): # 获取状态价值
    return V[s]

def display_V(V): # 显示状态价值
    for i in range(16):
        print('{0:>6.2f}'.format(V[i]),end = " ")
        if (i+1) % 4 == 0:
            print("")
    print()

## 3. 策略评估

In [6]:
# policy evaluation
def compute_q(MDP, V, s, a):
    '''
    evaluate q value
    根据给定的MDP，价值函数V，计算状态行为对s,a的价值qsa
    q_pi(s,a) = R + \sum_{s'}{P_ss'^a * V(s')}
    '''
    S, A, R, P, gamma = MDP
    q_sa = 0
    for s_prime in S:
        q_sa += get_prob(P, s, a, s_prime) * get_value(V, s_prime)
    q_sa = get_reward(R, s,a) + gamma * q_sa
    return q_sa

def compute_v(MDP, V, Pi, s):
    '''
    evaluate v value
    给定MDP下依据某一策略Pi和当前状态价值函数V计算某状态s的价值
    v_pi(s) = \sum_{a}{pi(a|s) * q(s, a)}
    '''
    S, A, R, P, gamma = MDP
    v_s = 0
    for a in A:
        v_s += get_pi(Pi, s, a, MDP, V) * compute_q(MDP, V, s, a)
    return v_s        

def update_V(MDP, V, Pi):
    '''
    计算所有状态的v(s)一次
    给定一个MDP和一个策略，更新该策略下的价值函数V
    '''
    S, _, _, _, _ = MDP
    V_prime = V.copy()
    for s in S:
        set_value(V_prime, s, compute_v(MDP, V_prime, Pi, s))
    return V_prime


def policy_evaluate(MDP, V, Pi, n):
    '''
    使用n次迭代计算来评估一个MDP在给定策略Pi下的状态价值，初始时价值为V
    '''
    for i in range(n):
        #print("====第{}次迭代====".format(i+1))
        V = update_V(MDP, V, Pi)
        #display_V(V)
    return V


In [11]:
V = [0  for _ in range(16)] # 状态价值
V_pi = policy_evaluate(MDP, V, uniform_random_pi, 1)
display_V(V_pi)

V = [0  for _ in range(16)] # 状态价值
V_pi = policy_evaluate(MDP, V, greedy_pi, 1)
display_V(V_pi)

  0.00  -1.00  -1.25  -1.31 
 -1.00  -1.50  -1.69  -1.75 
 -1.25  -1.69  -1.84  -1.90 
 -1.31  -1.75  -1.90   0.00 

  0.00  -1.00  -1.00  -1.00 
 -1.00  -1.00  -1.00  -1.00 
 -1.00  -1.00  -1.00  -1.00 
 -1.00  -1.00  -1.00   0.00 



## 4 策略迭代

In [12]:
def policy_iterate(MDP, V, Pi, n, m):
    """
    policy evaluation <-> policy improvement交替执行
    利用在当前策略评估下，通过跟随greedy pi来进行policy improvement
    m表示策略迭代的次数
    n表示在进行策略评估时，update V的次数（n越大，越接近于跟随当前策略下所有状态的价值v）
    """
    for i in range(m):
        V = policy_evaluate(MDP, V, Pi, n)
        Pi = greedy_pi # 第一次迭代产生新的价值函数后随机使用贪婪策略
    return V

In [16]:
V = [0  for _ in range(16)] # 重置状态价值
# policy evaluation 1次， policy iteration 100次
V_pi = policy_iterate(MDP, V, greedy_pi, 1, 100)
display_V(V_pi)

# policy evaluation 100次， policy iteration 1次
V_pi = policy_iterate(MDP, V, greedy_pi, 100, 1)
display_V(V_pi)

  0.00  -1.00  -2.00  -3.00 
 -1.00  -2.00  -3.00  -2.00 
 -2.00  -3.00  -2.00  -1.00 
 -3.00  -2.00  -1.00   0.00 

  0.00  -1.00  -2.00  -3.00 
 -1.00  -2.00  -3.00  -2.00 
 -2.00  -3.00  -2.00  -1.00 
 -3.00  -2.00  -1.00   0.00 



## 价值迭代

In [17]:
# 价值迭代得到最优状态价值过程
def compute_v_from_max_q(MDP, V, s):
    '''
    因为agent可以选择当前状态下，采取的最优action，通过q(s,a)来判断，因此可以取max q(s,a)。
    根据一个状态的下所有可能的行为价值中最大一个来确定当前状态价值
    '''
    S, A, R, P, gamma = MDP
    v_s = -float('inf')
    for a in A:
        qsa = compute_q(MDP, V, s, a)
        if qsa >= v_s:
            v_s = qsa
    return v_s

def update_V_without_pi(MDP, V):
    '''
    更新所有状态的价值v一次
    因为直接取最大的q(s,a)，所以不需要Pi来选择action
    在不依赖策略的情况下直接通过后续状态的价值来更新状态价值
    '''
    S, _, _, _, _ = MDP
    V_prime = V.copy()
    for s in S:
        set_value(V_prime, s, compute_v_from_max_q(MDP, V_prime, s))
    return V_prime

def value_iterate(MDP, V, n):
    '''
    多次更新状态的价值v，从而得到v_star
    价值迭代
    '''
    for i in range(n):
        V = update_V_without_pi(MDP, V)
        display_V(V)
    return V

In [18]:
V = [0  for _ in range(16)] # 重置状态价值
display_V(V)

V_star = value_iterate(MDP, V, 4)
display_V(V_star)

  0.00   0.00   0.00   0.00 
  0.00   0.00   0.00   0.00 
  0.00   0.00   0.00   0.00 
  0.00   0.00   0.00   0.00 

  0.00  -1.00  -1.00  -1.00 
 -1.00  -1.00  -1.00  -1.00 
 -1.00  -1.00  -1.00  -1.00 
 -1.00  -1.00  -1.00   0.00 

  0.00  -1.00  -2.00  -2.00 
 -1.00  -2.00  -2.00  -2.00 
 -2.00  -2.00  -2.00  -1.00 
 -2.00  -2.00  -1.00   0.00 

  0.00  -1.00  -2.00  -3.00 
 -1.00  -2.00  -3.00  -2.00 
 -2.00  -3.00  -2.00  -1.00 
 -3.00  -2.00  -1.00   0.00 

  0.00  -1.00  -2.00  -3.00 
 -1.00  -2.00  -3.00  -2.00 
 -2.00  -3.00  -2.00  -1.00 
 -3.00  -2.00  -1.00   0.00 

  0.00  -1.00  -2.00  -3.00 
 -1.00  -2.00  -3.00  -2.00 
 -2.00  -3.00  -2.00  -1.00 
 -3.00  -2.00  -1.00   0.00 



In [25]:
def greedy_policy(MDP, V, s):
    """
    直接输出v值最大的a。由于take action是deterministic的去到对应的后续状态，因此可以直接看后续状态的价值v来选择action a
    """
    S, A, P, R, gamma = MDP
    max_v, a_max_v = -float('inf'), ''
    for a_opt in A:# 统计后续状态的最大价值以及到达到达该状态的行为（可能不止一个）
        s_prime, reward, _ = dynamics(s, a_opt)
        v_s_prime = get_value(V, s_prime)
        if v_s_prime > max_v:
            max_v = v_s_prime
            a_max_v = a_opt
        elif(v_s_prime == max_v):
            a_max_v += a_opt
    return str(a_max_v)

def uniform_policy(MDP, V, s):
    S, A, P, R, gamma = MDP
    a_max_v = ''
    for a_opt in A:
        a_max_v += a_opt
    return str(a_max_v)

def display_policy(policy, MDP, V):
    S, A, P, R, gamma = MDP
    for i in range(16):
        print('{0:^6}'.format(policy(MDP, V, S[i])),end = " ")
        if (i+1) % 4 == 0:
            print("")
    print()

In [27]:
display_policy(greedy_policy, MDP, V_star)

 nesw    w      w      sw   
  n      nw    nesw    s    
  n     nesw    es     s    
  ne     e      e     nesw  

