### 1. 问题描述

<img src="./problem.png" width="40%">

### 2. 建立并求解MRP

其中状态集$S$有7个状态，状态转换矩阵为7\*7，奖励函数用7个标量表示，分别表示离开某一个状态得到的即时奖励值。

In [1]:
import numpy as np
import pprint
pp = pprint.PrettyPrinter(indent=4)

In [2]:
# 建立MRP
num_states = 7
states = ["C1","C2","C3","Ps","Bar","Pho","Res"]
Pss = np.array([
    # C1  C2   C3   Ps   Bar  Pho  Res
    [0.0, 0.5, 0.0, 0.0, 0.0, 0.5, 0.0],
    [0.0, 0.0, 0.8, 0.0, 0.0, 0.0, 0.2],
    [0.0, 0.0, 0.0, 0.6, 0.4, 0.0, 0.0],
    [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0],
    [0.2, 0.4, 0.4, 0.0, 0.0, 0.0, 0.0],
    [0.1, 0.0, 0.0, 0.0, 0.0, 0.9, 0.0],
    [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0]
])
rewards = [-2, -2, -2, 10, 1, -1, 0]
gamma = 0.5

In [3]:
# 计算任一状态序列中某一状态的收获
def compute_return(start_index = 0,
                   chain = None,
                   gamma = 0.5):
    value,t = 0.0,0
    for i in range(start_index, len(chain)):
        value += np.power(gamma, t) * rewards[states.index(chain[i])]
        t += 1
    return value

In [4]:
# 解析解
def compute_value(Pss, rewards, gamma=0.05):
    rewards_col = np.array(rewards).reshape((-1,1))
    values = np.dot(np.linalg.inv(np.eye(7,7) - gamma*Pss), rewards)
    return values
values = compute_value(Pss, rewards, 0.99999)
pp.pprint(values.reshape(-1,1))
# print(values.reshape(-1,1))

array([[-12.54073351],
       [  1.45690179],
       [  4.32117045],
       [ 10.        ],
       [  0.80308417],
       [-22.53857963],
       [  0.        ]])


In [5]:
# 验证
chains = [
    ["C1", "C2", "C3", "Ps", "Res"],
    ["C1", "Pho", "Pho", "C1", "C2", "Res"],
    ["C1", "C2", "C3", "Bar", "C2", "C3", "Ps", "Res"],
    ["C1", "Pho", "Pho", "C1", "C2", "C3", "Bar", "C1",\
     "Pho", "Pho", "Pho", "C1", "C2", "C3", "Bar", "C2", "Res"]
]
print(compute_return(0, chains[3], 0.5))

-3.196044921875


### 3. 建立MDP

<img src="./MDP.png" width="40%">

状态集$S$中有五个元素，行为集中也有5个元素，但是具体到某一个状态则只有2个可能的行为。

In [6]:
S = ["Pho", "C1", "C2", "C3", "Res"]
A = ["GoPho", "Study", "LeaPho", "GoBar", "QuitStudy"]
gamma = 1.0

In [7]:
# 设置奖励矩阵
R = np.zeros((len(S),len(A)))
# for i in range(len(S)):
#     for j in range(len(A)):
#         R[i,j] = float("-inf")
R[0,0] = -1
R[0,2] = 0
R[1,0] = -1
R[1,1] = -2
R[2,1] = -2
R[2,4] = 0
R[3,1] = 10
R[3,3] = 1

def print_R(i,j):
    if i<len(S) and j<len(A):
        print(S[i] + "-" + A[j] + " -> %f"%R[i,j])
    else:
        print("Error!")
    
print_R(3,8)

Error!


In [8]:
# 设置状态转移矩阵
P = np.zeros((len(S),len(A),len(S)))
P[0,0,0] = 1.0
P[0,2,1] = 1.0
P[1,0,0] = 1.0
P[1,1,2] = 1.0
P[2,1,3] = 1.0
P[2,4,4] = 1.0
P[3,1,4] = 1.0
P[3,3,1] = 0.2
P[3,3,2] = 0.4
P[3,3,3] = 0.4

def print_P(i,j,k):
    if (i<len(S) and j<len(A)) and k<len(S):
        print(S[i] + "-" + A[j] + "-" + S[k] + " -> %f"%P[i,j,k])
    else:
        print("Error!")
    
print_P(3,2,3)

C3-LeaPho-C3 -> 0.000000


In [9]:
# 设置策略矩阵
pi = np.zeros((len(S),len(A)))
pi[0,0] = 0.5
pi[0,2] = 0.5
pi[1,0] = 0.5
pi[1,1] = 0.5
pi[2,1] = 0.5
pi[2,4] = 0.5
pi[3,1] = 0.5
pi[3,3] = 0.5

def print_pi(i,j):
    if i<len(S) and j<len(A):
        print(S[i] + "-" + A[j] + " -> %f"%pi[i,j])
    else:
        print("Error!")
    
print_pi(3,1)

C3-Study -> 0.500000


In [10]:
MDP = (S, A, R, P, gamma)
# 价值函数
V = np.zeros((len(S),))

In [11]:
def compute_q(MDP, V, s, a):
    S, A, R, P, gamma = MDP
    s_index = S.index(s)
    a_index = A.index(a)
#     if R[s_index,a_index] == float("-inf"):
#         return float("-inf")
    q_sa = 0
    for s_prime in S:
        s_prime_index = S.index(s_prime)
        q_sa += P[s_index,a_index,s_prime_index]*V[s_prime_index]
    q_sa = R[s_index,a_index]+gamma*q_sa
    return q_sa

In [12]:
def compute_v(MDP, V, pi, s):
    S, A, R, P, gamma = MDP
    v_s = 0
    s_index = S.index(s)
    for a in A:
        a_index = A.index(a)
        if pi[s_index,a_index] != 0:
            v_s += pi[s_index,a_index]*compute_q(MDP, V, s, a)
    return v_s

In [13]:
# 迭代求解价值函数
def update_V(MDP, V, pi):
    S, _, _, _, _ = MDP
    # V_prime = V.copy()
    for s in S:
        s_index = S.index(s)
        # V_prime[s_index] = compute_v(MDP, V_prime, pi, s)
        V[s_index] = compute_v(MDP, V, pi, s)
    # return V_prime

In [14]:
# 策略评估
def policy_evaluate(MDP, V, pi, n):
    for i in range(n):
        update_V(MDP, V, pi)
    # return V
policy_evaluate(MDP, V, pi, 100)
print(V)

[-2.30769231 -1.30769231  2.69230769  7.38461538  0.        ]


In [15]:
# 验证
v = compute_v(MDP, V, pi, "C3")
print(v)

7.3846153846146745


In [16]:
# 价值迭代
V_star = np.zeros((len(S),))
def compute_v_from_max_q(MDP, V, s):
    S, A, R, P, gamma = MDP
    v_s = float("-inf")
    for a in A:
        # a_index = A.index(a)
        qsa = compute_q(MDP, V, s, a)
        # print(qsa)
        if qsa > v_s:
            v_s = qsa
    return v_s

def update_V_without_pi(MDP, V):
    S, _, _, _, _ = MDP
    for s in S:
        s_index = S.index(s)
        V[s_index] = compute_v_from_max_q(MDP, V, s)
    # return V
        
def value_iteration(MDP, V, n):
    for i in range(n):
        update_V_without_pi(MDP, V)

value_iteration(MDP, V_star, 4)
print(V_star)

[ 6.  6.  8. 10.  0.]
