In [1]:
from typing import Mapping, Set, Sequence
from utils.generic_typevars import S, A
import numpy as np

### Write code for Policy Evaluation (tabular) algorithm

Problem: evaluate a given policy $\pi$ 

Solution: iterative application of Bellman expectation backup.

$$v^{k+1} = R^{\pi} + \gamma P^{\pi} v^k$$


In [9]:
class MP:
    """A class representing a Markov Process"""
    def __init__(self, transitions: Mapping[S, Mapping[S, float]]):
        """transitions: a dictionary of dictionaries that stores the transition matrix"""
        self.all_states_list = list(transitions.keys()) # a list that store the names of all states
        self.transitions = transitions # a dictionary of dictionaries that stores the transition matrix
        
    def get_all_states(self):
        return self.all_states_list
        
    def get_transition(self, s: S):
        try:
            return self.transitions[s]
        except ValueError:
            print("Invalid state!")
            
    def get_trans_mat(self):
        length = len(self.all_states_list)
        trans_mat = np.zeros((length, length))
        for i in range(length):
            for j in range(length):
                trans_mat[i, j] = self.transitions[self.all_states_list[i]].get(self.all_states_list[j], 0)
        return trans_mat
            
    def get_stationary_dist(self):
        trans_mat = self.get_trans_mat()
        eig_val, eig_vec = np.linalg.eig(trans_mat.T)
        res = np.array(eig_vec[:, np.where(np.abs(eig_val- 1.) < 1e-8)[0][0]])
        res /= np.sum(res)    
        return res


class MRP(MP):
    """A class representing a Markov Reward Process"""
    def __init__(self, transitions: Mapping[S, Mapping[S, float]], rewards: Mapping[S, float], gamma: float):
        super().__init__(transitions)
        self.rewards = rewards
        self.reward_vec = self.get_rewards_vec()
        self.gamma = gamma
        
    def get_reward(self, s: S):
        try:
            return self.rewards[s]
        except ValueError:
            print("Invalid state!")
            
    def get_rewards_vec(self):
        length = len(self.all_states_list)
        reward_vec = np.zeros(length)
        for i in range(length):
            reward_vec[i] = self.rewards[self.all_states_list[i]]
        return reward_vec             

        
class MDP():
    """A class representing a Markov Decision Process"""
    def __init__(self, transitions: Mapping[S, Mapping[A, Mapping[S, float]]], rewards: Mapping[S, Mapping[A, float]], gamma):
        self.all_states_list = list(transitions.keys()) 
        self.all_actions_list = self.get_action_list()
        self.transitions = transitions
        self.actions = self.get_actions()
        self.rewards = rewards
        self.gamma = gamma
        
    def get_all_states(self):
        return self.all_states_list
    
    def get_actions(self) ->  Mapping[S, Set[A]]:
        action_dict = {}
        for s in self.all_states_list:
            action_dict[s] = set()
        for s1, v1 in self.transitions.items():
            for a, v2 in v1.items():
                action_dict[s1].add(a)
        return action_dict
    
    def get_action_list(self):
        action_list = set()
        for s in self.all_states_list:
            for a in self.get_actions[s]:
                action_list.add(a)
        return list(action_list)
        
    def get_transition(self, s: S, a: A):
        try:
            return self.transitions[s][a]
        except ValueError:
            print("Invalid state!")
            
    def get_mrp(self, policy: Mapping[S, Mapping[A, float]]):
        transitions = {}
        rewards = {}
        for s1, v1 in self.transitions.items():
            transitions[s1] = {}
            for a, p in policy[s1].items():
                for s2, v2 in v1[a].items():
                    transitions[s1][s2] = transitions[s1].get(s2, 0) + p*v2
                    
        for s1, v1 in self.rewards.items():
            rewards[s1] = 0
            for a, p in policy[s1].items():
                rewards[s1] += p*v1[a]
        
        return MRP(transitions, rewards, self.gamma)
    
    def matrix_to_policy(self, policy_matrix) -> Mapping[S, Mapping[A, float]]:
        policy = {}
        for i in range(len(self.all_states_list)):
            s = self.all_states_list[i]
            policy[s] = {}
            for j in range(len(self.all_actions_list)):
                a = self.all_actions_list[j]
                policy[s][a] = policy_matrix[i][j]
        return policy
       

In [10]:
def policy_eval(mdp: MDP, policy: Mapping[S, Mapping[A, float]], thres = 1e-8):
    mrp = mdp.get_mrp(policy)
    length = len(mrp.all_states_list)
    old_v = np.zeros(length)   
    new_v = np.zeros(length) 
    while True:
        new_v = np.zeros(length) 
        for i in range(length):
            s1 = mrp.all_states_list[i]
            for s2 in mrp.transitions[s1]:
                new_v[s1] += mrp.rewards[s1] + mrp.gamma * mrp.transitions[s1][s2] * old_v[s2]
        if np.sum(np.abs(old_v-new_v)) < thres:
            break
        old_v = new_v
    return new_v

### Write code for Policy Iteration (tabular) algorithm

Solution: evaluate the policy $\pi$
    $$ v_{\pi}(s) = E[R_{t+1} + \gamma R_{t+2} + ...|S_t = s]$$
    
Improve the policy by acting greedily with respect to $v_{\pi}$.
$$ \pi' = greedy(v_{\pi})$$

In [8]:
def improve_policy(mdp: MDP, policy: Mapping[S, Mapping[A, float]], val_func):
    mrp = mdp.get_mrp(policy)
    lenS = len(mdp.all_states_list)
    lenA = len(mdp.all_actions_list)
    new_policy = np.zeros(lenS)
    for i in range(lenS):
        s1 = mdp.all_states_list[i]
        qVals = np.zeros(lenA)
        for j in range(lenA):
            for s2 in mrp.transitions[s1]:
                qVals[j] += mrp.rewards[s1] + mrp.gamma * mrp.transitions[s1][s2] * val_func[s2]
        new_policy[i] = np.argmax(qVals) 
    return new_policy
    


def policy_iter(mdp: MDP):
    lenS = len(mdp.all_states_list)
    lenA = len(mdp.all_actions_list)
    policy_matrix = np.zeros((lenS, lenA))
    new_policy = np.zeros((lenS, lenA))
    mrp = mdp.get_mrp(policy)
    while True:
        policy = mdp.matrix_to_policy(policy_matrix)
        val_func = policy_eval(mdp, policy, thres)
        new_policy = improve_policy(mdp, policy, val_func)
        if np.sum(np.abs(new_policy-policy_matrix)) == 0:
            break
        policy_matrix = new_policy
    return new_policy

### Write code for Value Iteration (tabular) algorithm

Problem: find optimal policy $\pi$

Solution: iterative application of Bellman optimality backup

In [11]:
def value_iteration(mdp: MDP, policy: Mapping[S, Mapping[A, float]], thres = 1e-8):  
    mrp = mdp.get_mrp(policy)
    lenS = len(mdp.all_states_list)
    lenA = len(mdp.all_actions_list)
    
    old_value = np.zeros(lenS)
    new_value = np.zeros(lenS)
    while True:
        new_value = np.zeros(lenS)
        for i in range(lenS):
            s1 = mdp.all_states_list[i]
            qVals = np.zeros(lenA)           
            for j in range(lenA):
                for s2 in mrp.transitions[s1]:
                    qVals[j] += mrp.rewards[s1] + mrp.gamma * mrp.transitions[s1][s2] * old_value[s2]
            
            new_value[i] = np.max(qVals)
            
        if np.sum(np.abs(new_value - old_value)) < thres:
            break
        old_value = new_value        
    return new_value