In [1]:
import numpy as np
import pandas as pd
import scipy
from typing import TypeVar,Mapping, Set, Generic, Sequence

### DP algorithms
- Write code for Policy Evaluation (tabular) algorithm
- Write code for Policy Iteration (tabular) algorithm
- Write code for Value Iteration (tabular) algorithm

<font color='blue'> **The code below is replicated in src folder with .py format** </font>
#### Import old code

In [2]:
# Helper functions
T = TypeVar("T",str,int,float)

# Identity helper function for str, int and float
def ind(x: T, y: T):
    if x == y or np.abs(x-y)<1e-5:
        return True
    else:
        return False
    
# Get state helper function
def get_states_helper(in_graph: dict) -> dict:
    state_list = list(in_graph.keys())
    ind = range(len(state_list))
    state = dict(zip(state_list,ind))
    return state

# Get transition matrix helper function
def get_transition_helper(in_graph: dict) -> np.ndarray:
    state = get_states_helper(in_graph)
    tran_mat = np.zeros((len(state),len(state)))
    for i, row in in_graph.items():
        for j, prob in row.items():
            ind_row = state[i]
            ind_col = state[j]
            if ind(tran_mat[ind_row,ind_col],0):
                tran_mat[ind_row,ind_col] = prob
    return tran_mat 
# Define MP by Graph
"""
    E.g.,
    Input = {'Sunny': {'Sunny': 0.1, 'Cloudy': 0.2, 'Rainy': 0.3, 'Cloudy': 0.4},
             'Cloudy': {'Sunny': 0.25, 'Cloudy': 0.25, 'Rainy': 0.3, 'Cloudy': 0.2},
             'Rainy': {'Sunny': 0.1, 'Cloudy': 0.2, 'Rainy': 0.3, 'Cloudy': 0.4},
             'Windy': {'Sunny': 0.25, 'Cloudy': 0.25, 'Rainy': 0.25, 'Cloudy': 0.25}}
    Meaning: Today's weather => tmr's weather
"""

class MP:
    # Initiate state dict & transition matrix
    def __init__(self, in_graph: dict) -> None:
        self.graph = in_graph
        state = get_states_helper(in_graph)
        tran_mat = get_transition_helper(in_graph)
        # Check transition matrix and match state set with transition probs
        if np.linalg.norm(np.sum(tran_mat, axis = 1)- np.ones(tran_mat.shape[0]))>1e-5:
            raise ValueError
        elif len(state) != tran_mat.shape[0]:
            raise ValueError
        else:
            self.state: dict = state
            self.tran_mat: np.ndarray = tran_mat
            
    # Get all states
    def get_states(self) -> set:
        return self.state
    
    # Get the transition matirx
    def get_tran_mat(self) -> np.ndarray:
        return self.tran_mat
    
    # Compute stationary distribution using eigenvalue decomposition
    def stationary_dist(self) -> np.array:
        e_value, e_vec = np.linalg.eig(self.tran_mat.T)
        out = np.array(e_vec[:, np.where(np.abs(e_value- 1.) < 1e-5)[0][0]])
        out = out/np.sum(out)
        return out
# Define MRP by Graph
"""
    E.g.,
    Input = {'Sunny': {'Sunny': 0.1, 'Cloudy': 0.2, 'Rainy': 0.3, 'Cloudy': 0.4},
             'Cloudy': {'Sunny': 0.25, 'Cloudy': 0.25, 'Rainy': 0.3, 'Cloudy': 0.2},
             'Rainy': {'Sunny': 0.1, 'Cloudy': 0.2, 'Rainy': 0.3, 'Cloudy': 0.4},
             'Windy': {'Sunny': 0.25, 'Cloudy': 0.25, 'Rainy': 0.25, 'Cloudy': 0.25}}
    state_reward = {'Rain': 1, 'Sunny': 2, 'Cloudy': 3, 'Windy': 4}
    gamma = 0.5
    Meaning: Today's weather => tmr's weather
"""
class MRP(MP):
    
    # Initiate state with reward and discount
    def __init__(self, in_graph: dict, state_reward: dict, gamma: float) -> None:
        super().__init__(in_graph)
        self.state = self.get_states()
        self.tran_mat = self.get_tran_mat()
        if gamma <0 or gamma >1:
            raise ValueError
        else:
            reward_vec = np.zeros(len(self.state))
            for key, ind in self.state.items():
                reward_vec[ind] = state_reward[key]
            
            self.reward: np.ndarray = reward_vec
            self.gamma: float = gamma
    
    # Get all states
    def get_states(self) -> set:
        return self.state
    
    # Get the transition matirx
    def get_tran_mat(self) -> np.ndarray:
        return self.tran_mat
    
    # Compute value function R(s)
    def value_func(self) -> float:
        return np.linalg.inv(np.identity(len(self.state))-self.gamma*self.tran_mat).dot(self.reward)

    # Compute value function r(s,s')
    def value_func_2nd(self,_2nd_def_reward) -> float:
        reward_dict = convert_reward(_2nd_def_reward)
        reward_vec = np.zeros(len(self.state))
        for key, ind in self.state.items():
            reward_vec[ind] = reward_dict[key]
        self.reward = reward_vec
        return self.value_func()

# Get actions helper function
def get_actions_helper(in_graph: dict) -> dict:
    state = get_states_helper(in_graph)
    actions_set = set()
    for s in state:
        temp_set = set(in_graph[s].keys())
        actions_set.update(temp_set)
    actions_list = list(actions_set)
    ind = range(len(actions_list))
    actions = dict(zip(actions_list,ind))
    return actions

# Get transition matrix helper function
def get_transition_helper_mdp(in_graph: dict) -> np.ndarray:
    states = get_states_helper(in_graph)
    actions = get_actions_helper(in_graph)
    tran_mat = np.zeros((len(states),len(actions),len(states))) # States * actions * states
    for i, row in in_graph.items():
        for c, action in row.items():
            for j, prob in action.items():
                ind_row = states[i]
                ind_height = states[j]
                ind_col = actions[c]
                if ind(tran_mat[ind_row,ind_col,ind_height],0):
                    tran_mat[ind_row,ind_col,ind_height] = prob
    return tran_mat 

# Get reward matrix helper function
def get_reward_helper(in_graph: dict, state_action_reward: dict) -> np.ndarray:
    states = get_states_helper(in_graph)
    actions = get_actions_helper(in_graph)
    reward_mat = np.zeros((len(states),len(actions))) # States * actions
    for i, row in state_action_reward.items():
        for j, reward in row.items():
            ind_row = states[i]
            ind_col = actions[j]
            if ind(reward_mat[ind_row,ind_col],0):
                reward_mat[ind_row,ind_col] = reward
    return reward_mat

# Get policy matrix helper function
def get_policy_helper(in_graph: dict, policy: dict) -> np.ndarray:
    states = get_states_helper(in_graph)
    actions = get_actions_helper(in_graph)
    policy_mat = np.zeros((len(states),len(actions)))
    for i, row in policy.items():
        for j, prob in row.items():
            ind_row = states[i]
            ind_col = actions[j]
            if ind(policy_mat[ind_row,ind_col],0):
                policy_mat[ind_row,ind_col] = prob
    return policy_mat

class MDP():

    def __init__(self, in_graph: dict, state_action_reward: dict, policy: dict, gamma: float) -> None:
        self.state: dict = get_states_helper(in_graph)
        self.action: dict = get_actions_helper(in_graph)
        self.tran_mat: np.ndarray = get_transition_helper_mdp(in_graph)
        self.reward: np.ndarray = get_reward_helper(in_graph,state_action_reward)
        self.policy: np.ndarray = get_policy_helper(in_graph,policy)
        self.gamma: float = gamma

    # Get all states
    def get_states(self) -> dict:
        return self.state

    # Get the transition matrix
    def get_tran_mat(self) -> np.ndarray:
        return self.tran_mat

    # Get all actions
    def get_actions(self) -> dict:
        return self.action

    # Get reward mat
    def get_reward(self) -> np.ndarray:
        return self.reward

    # Get policy mat
    def get_policy(self) -> np.ndarray:
        return self.policy
    
    # Get gamma
    def get_gamma(self) -> float:
        return self.gamma
    
    # Generate MRP
    def generate_MRP(self, policy: dict):
        print(self.reward)
        print(self.policy)
        R_S = dict(zip(list(self.state.keys()),[0]*len(self.state)))
        P_S = np.zeros((len(self.state),len(self.state)))
        in_graph = dict()
        for s_cur, i in self.state.items():
            tmp = 0
            value = dict()
            in_graph.update({s_cur:value})
            for a, j in self.action.items():
                R_S[s_cur] += self.reward[i,j]*self.policy[i,j]
                for s_next, c in self.state.items():
                    if s_next in value:
                        value[s_next] += self.tran_mat[i,j,c]*self.policy[i,j]
                    else:
                        value.update({s_next: self.tran_mat[i,j,c]*self.policy[i,j]})
        return MRP(in_graph,R_S,self.gamma)

    # Compute state value function v_{\pi}(s)
    def state_value_func(self, policy) -> float:
        MRP_tmp = self.generate_MRP(policy)
        return np.linalg.inv(np.identity(len(MRP_tmp.state))-MRP_tmp.gamma*MRP_tmp.tran_mat).dot(MRP_tmp.reward)

    # Compute action value function q_{\pi}(s,a)
    def action_value_func(self, policy) -> float:
        out = np.zeros((len(self.state),len(self.action)))
        for s_cur,i in self.state.items():
            for a,j in self.action.items():
                out[i,j] += self.reward[i,j]
                for s_next,c in self.state.items():
                    out[i,j] += self.gamma*self.tran_mat[i,j,c]*self.state_value_func(policy)[c]
        return out

#### New DP code

In [3]:
## Helper function
def policy_mat_transform(mdp: MDP, policy:dict)-> np.ndarray:
    states = mdp.get_states()
    actions = mdp.get_actions()
    policy_mat = np.zeros((len(states),len(actions)))
    for i, row in policy.items():
        for j, prob in row.items():
            ind_row = states[i]
            ind_col = actions[j]
            if ind(policy_mat[ind_row,ind_col],0):
                policy_mat[ind_row,ind_col] = prob
    return policy_mat

In [97]:
## Policy evaluation
# if input policy is in dict format
# policy_mat = policy_mat_transform(mdp, policy) 
def policy_eval(mdp: MDP, policy_mat: np.ndarray, eps = 1e-10) -> np.ndarray:
    reward = mdp.get_reward()
    states = mdp.get_states()
    actions = mdp.get_actions()
    tran_mat = mdp.get_tran_mat()
    gamma = mdp.get_gamma()
    reward_pi = np.zeros(len(states))
    tran_mat_pi = np.zeros((len(states),len(states)))
    for a,i in actions.items():
        reward_pi += policy_mat[:,i]*reward[:,i]
        tran_mat_pi += policy_mat[:,i]*tran_mat[:,i,:]
    reward_pi.reshape(-1,1)
    tran_mat_pi.reshape(-1,1)
    val = np.zeros_like(reward_pi)
    i = 0
    while True: 
        i += 1
        memo = val.copy()
        val = reward_pi + gamma * tran_mat_pi.dot(val)
        if np.linalg.norm(val - memo) < eps:
            break
    return val

In [108]:
## Policy iteration
def policy_iter(mdp: MDP, policy_eval_func = policy_eval, epa = 1e-10) -> np.ndarray:
    states = mdp.get_states()
    actions = mdp.get_actions()
    tran_mat = mdp.get_tran_mat()
    policy = [1/len(actions)]*(len(states)*len(actions)).reshape((len(states),len(actions)))
    while True:
        val = policy_eval_fn(mdp, policy)
        q_func = mdp.action_value_func(policy)
        for s,i in states.items():
            a_ind = np.argmax(policy[i,:])
            for a, j in actions.items():
                if j == a_ind:
                    policy[i,j] = 1
                else:
                    policy[i,j] = 0
        new_val = policy_eval_fn(mdp, policy)
        if np.linalg.norm(val-new_val)<eps:
            break
        val = new_val
    return policy, val

In [7]:
## Value iteration
def value_iter(mdp: MDP, eps = 1e-8):
    states = mdp.get_states()
    actions = mdp.get_actions()
    tran_mat = mdp.get_tran_mat()
    cur_val = np.zeros(len(states))
    det = 0
    while det == 0 or np.abs(det)>eps:
        for s in range(len(states)):
            action_vals = np.zeros(len(actions))
            for a, i in actions.items():
                for s_next, j in states.items():
                    action_vals[i] = tran_mat[states[s],i,j]*(reward[states[s],i] + mdp.gamma*val[j])
            action_val_max = np.max(action_vals)
            delta = max(delta, np.abs(action_val_max - cur_val[s]))
            cur_val[s] = action_val_max       
    
    policy = np.zeros((len(states),len(actions)))
    for s in range(len(states)):
        action_vals = np.zeros(len(actions))
        for a, i in actions.items():
            for s_next, j in states.items():
                action_vals[i] = tran_mat[states[s],i,j]*(reward[states[s],i] + mdp.gamma*val[j])
        a_ast = np.argmax(action_vals)
        policy[s, action_vals] = 1
    
    return policy, cur_val

**Test code**

In [105]:
def transition_rule(state_cub,move,s):
    ind = np.where(state_cub == s)
    ind_1 = ind[0] + move[0]
    ind_2 = ind[1] + move[1]
    if ind_1 <0 or ind_1 >= 4 or ind_2 <0 or ind_2 >= 4:
        ind_1 = ind[0]
        ind_2 = ind[1]        
    return state_cub[ind_1,ind_2]
state = dict(zip(range(16),range(16)))
state_cub = np.asarray(range(16)).reshape((4,4))
action = dict(zip([(1,0),(-1,0),(0,-1),(0,1)],range(4)))
state_reward = dict(zip(range(16),list([0]+[-1]*14+[0])))
Input = {}
for s_cur, i in state.items():
    act = {}
    Input.update({s_cur:act})
    for a, j in action.items():
        _next = {}
        act.update({a: _next})
        for s_next, c in state.items():
            if s_next == transition_rule(state_cub,a,s_cur):
                _next.update({s_next:1})
            else:
                _next.update({s_next:0})
state_action_reward = {}
for s_cur, i in state.items():
    act = {}
    state_action_reward.update({s_cur:act})
    for a, j in action.items():
        s_next = transition_rule(state_cub,a,s_cur)
        reward = state_reward[s_next[0]]
        act.update({a:reward})
policy = {}
for s_cur, i in state.items():
    act = {}
    policy.update({s_cur:act})
    for a, j in action.items():
        act.update({a:0.25})

In [106]:
mdp = MDP(Input,state_action_reward,policy,gamma = 1)
policy_mat = policy_mat_transform(mdp, policy)
policy_mat[0,0:4] = [0,0,0,0]
policy_mat[15,0:4] = [0,0,0,0]

In [107]:
print(policy_eval(mdp, policy_mat, eps = 1e-10))

[ -6.5 -13.  -19.  -21.  -13.  -17.  -19.  -19.  -19.  -19.  -17.  -13.
 -21.  -19.  -13.   -6.5]
