In [1]:
from dataclasses import dataclass
from typing import Tuple, Dict,Mapping, Iterator
from rl.markov_decision_process import FiniteMarkovDecisionProcess
from rl.markov_decision_process import FinitePolicy, StateActionMapping
from rl.markov_process import FiniteMarkovProcess, FiniteMarkovRewardProcess
from rl.distribution import Categorical, Constant
from scipy.stats import poisson
from rl.dynamic_programming import value_iteration_result,value_iteration,almost_equal_vfs
from rl.iterate import converged, iterate,converge
import math

##Define the stateclass
@dataclass(frozen=True)
class MazeState:
    x: int
    y: int

# 0 is up, 1 is down, 2 is left,3 is right
MazeStateMapping = StateActionMapping[MazeState, int]


class SimpleMazeMDP(FiniteMarkovDecisionProcess[MazeState, int]):

    def __init__(
        self,
        maze: Mapping[Tuple[int, int], str],
        reward_type:int
    ):
        self.maze: Mapping[Tuple[int, int], str] = maze
        self.reward_type=reward_type
        super().__init__(self.get_action_transition_reward_map())
    
    ##Check if an action is valid
    def checkAction(self,state:MazeState,action:int):
        currentx=state.x
        currenty=state.y
        #check up
        if(action==0):
            if((currentx,currenty-1) in self.maze.keys()):
                return self.maze[(currentx,currenty-1)]=="SPACE" or self.maze[(currentx,currenty-1)]=="GOAL"
            else:
                return False
        #check down
        if(action==1):
            if((currentx,currenty+1) in self.maze.keys()):
                return self.maze[(currentx,currenty+1)]=="SPACE" or self.maze[(currentx,currenty+1)]=="GOAL"
            else:
                return False
        #check left
        if(action==2):
            if((currentx-1,currenty) in self.maze.keys()):
                return self.maze[(currentx-1,currenty)]=="SPACE" or self.maze[(currentx-1,currenty)]=="GOAL"
            else:
                return False
        #check right
        if((currentx+1,currenty) in self.maze.keys()):
            return self.maze[(currentx+1,currenty)]=="SPACE" or self.maze[(currentx+1,currenty)]=="GOAL"
        else:
            return False
        
    ## take action and return the new maze state
    def takeAction(self,state:MazeState,action:int):
        if(action==0):
            return MazeState(state.x,state.y-1)
        
        if(action==1):
            return MazeState(state.x,state.y+1)
        
        if(action==2):
            return MazeState(state.x-1,state.y)
        return MazeState(state.x+1,state.y)
    
    ## Check if the state is at the terminal state
    def checkGoal(self,state:MazeState):
        return self.maze[(state.x,state.y)]=="GOAL"

    def get_action_transition_reward_map(self) -> MazeStateMapping:
        d: Dict[MazeState, Dict[int, Categorical[Tuple[MazeState,float]]]] = {}
        for pos in self.maze.keys():
            #iterate through all the non-Block state
            if(self.maze[pos]!="BLOCK"):
                state: MazeState=MazeState(pos[0],pos[1])
                # terminal state returns None for the mapping
                if(self.checkGoal(state)):
                    d[state]=None
                else:
                    d1:Dict[int, Categorical[Tuple[MazeState,float]]]={}
                    for action in range(4):
                        sr_probs_dict: Dict[Tuple[MazeState, float], float]={}
                        if(self.checkAction(state,action)):
                            newState=self.takeAction(state,action)
                            # if the reward_type is 0, implement the reward policy where only arriving at terminal
                            # state will give award and the gamma is non zero
                            if(self.reward_type==0):
                                if(self.checkGoal(newState)):
                                    sr_probs_dict[(newState,1.0)]=1.0
                                else:
                                    sr_probs_dict[(newState,0.0)]=1.0
                            # if rewardtype is 1, implement the reward policy with no discounting factor but with a
                            # negative reward for any step not into the terminal state and a big positive reward for
                            # arriving at terminal state
                            else:
                                if(self.checkGoal(newState)):
                                    sr_probs_dict[(newState,100.0)]=1.0
                                else:
                                    sr_probs_dict[(newState,-1.0)]=1.0                             
                        d1[action]=Categorical(sr_probs_dict)
                    d[state]=d1
        return d

SPACE = 'SPACE'
BLOCK = 'BLOCK'
GOAL = 'GOAL'
maze_grid = {(0, 0): SPACE, (0, 1): BLOCK, (0, 2): SPACE, (0, 3): SPACE, (0, 4): SPACE, 
             (0, 5): SPACE, (0, 6): SPACE, (0, 7): SPACE, (1, 0): SPACE, (1, 1): BLOCK,
             (1, 2): BLOCK, (1, 3): SPACE, (1, 4): BLOCK, (1, 5): BLOCK, (1, 6): BLOCK, 
             (1, 7): BLOCK, (2, 0): SPACE, (2, 1): BLOCK, (2, 2): SPACE, (2, 3): SPACE, 
             (2, 4): SPACE, (2, 5): SPACE, (2, 6): BLOCK, (2, 7): SPACE, (3, 0): SPACE, 
             (3, 1): SPACE, (3, 2): SPACE, (3, 3): BLOCK, (3, 4): BLOCK, (3, 5): SPACE, 
             (3, 6): BLOCK, (3, 7): SPACE, (4, 0): SPACE, (4, 1): BLOCK, (4, 2): SPACE, 
             (4, 3): BLOCK, (4, 4): SPACE, (4, 5): SPACE, (4, 6): SPACE, (4, 7): SPACE, 
             (5, 0): BLOCK, (5, 1): BLOCK, (5, 2): SPACE, (5, 3): BLOCK, (5, 4): SPACE, 
             (5, 5): BLOCK, (5, 6): SPACE, (5, 7): BLOCK, (6, 0): SPACE, (6, 1): BLOCK, 
             (6, 2): BLOCK, (6, 3): BLOCK, (6, 4): SPACE, (6, 5): BLOCK, (6, 6): SPACE, 
             (6, 7): SPACE, (7, 0): SPACE, (7, 1): SPACE, (7, 2): SPACE, (7, 3): SPACE, 
             (7, 4): SPACE, (7, 5): BLOCK, (7, 6): BLOCK, (7, 7): GOAL}
from pprint import pprint
user_gamma=0.9
si_mdp1: FiniteMarkovDecisionProcess[MazeState, int] =SimpleMazeMDP(maze=maze_grid,reward_type=0)
generator=value_iteration(si_mdp1,gamma=user_gamma)
count=0
for i in converge(generator,almost_equal_vfs):
    count+=1
print("number of iteration to convergence for the first reward type is",count)
opt_vf_vi, opt_policy_vi = value_iteration_result(si_mdp1, gamma=user_gamma)
print("number of steps to finish maze is",int(round(math.log(opt_vf_vi[MazeState(0,0)],user_gamma)+1)))

number of iteration to convergence for the first reward type is 17
number of steps to finish maze is 16


In [2]:
si_mdp2: FiniteMarkovDecisionProcess[MazeState, int] =SimpleMazeMDP(maze=maze_grid,reward_type=1)
generator2=value_iteration(si_mdp2,gamma=user_gamma)
count=0
for i in converge(generator2,almost_equal_vfs):
    count+=1
print("number of iteration to convergence for the second reward type is",count)
opt_vf_vi2, opt_policy_vi2 = value_iteration_result(si_mdp2, gamma=1.0)
print("number of steps to finish maze is",int(round(100-opt_vf_vi2[MazeState(0,0)]+1)))

number of iteration to convergence for the second reward type is 17
number of steps to finish maze is 16


In [3]:
def get_path(start,end,policy_map):
    path=[start]
    while(path[-1]!=end):
        current=path[-1]
        action=list(policy_map[MazeState(current[0],current[1])])[0][0]
        if(action==0):
            path.append((current[0],current[1]-1))
        if(action==1):
            path.append((current[0],current[1]+1))
        if(action==2):
            path.append((current[0]-1,current[1]))
        if(action==3):
            path.append((current[0]+1,current[1]))
    return path

In [4]:
print(get_path((0,0),(7,7),opt_policy_vi.policy_map))
print(get_path((0,0),(7,7),opt_policy_vi2.policy_map))

[(0, 0), (1, 0), (2, 0), (3, 0), (3, 1), (3, 2), (2, 2), (2, 3), (2, 4), (2, 5), (3, 5), (4, 5), (4, 6), (5, 6), (6, 6), (6, 7), (7, 7)]
[(0, 0), (1, 0), (2, 0), (3, 0), (3, 1), (3, 2), (2, 2), (2, 3), (2, 4), (2, 5), (3, 5), (4, 5), (4, 6), (5, 6), (6, 6), (6, 7), (7, 7)]


In [5]:
from scipy.stats import poisson
poisson.pmf(2,1)

0.18393972058572114

In [6]:
##Define the stateclass
@dataclass(frozen=True)
class SalaryState:
    x: int

# 0 is up, 1 is down, 2 is left,3 is right
SalaryStateMapping = StateActionMapping[SalaryState,Tuple[int,int]]


class SalaryMDP(FiniteMarkovDecisionProcess[SalaryState, int]):

    def __init__(
        self,
        H:int,
        W:int,
        alpha:float,
        beta:float
    ):
        self.H = H
        self.W=W
        self.alpha=alpha
        self.beta=beta
        super().__init__(self.get_action_transition_reward_map())

    def get_action_transition_reward_map(self) -> SalaryStateMapping:
        d: Dict[SalaryState, Dict[Tuple[int,int], Categorical[Tuple[SalaryState,float]]]] = {}
        for salary in range (self.W):
            d1:Dict[Tuple[int,int], Categorical[Tuple[SalaryState,float]]]={}
            if salary==self.W:
                sr_probs_dict: Dict[Tuple[SalaryState, float], float]={}
                sr_probs_dict[(SalaryState(salary),salary*H)]=1.0
                d1[(0,0)]=Categorical(sr_probs_dict)
            else:
                for l in range(self.H+1):
                    for s in range(self.H+1-l):
                        sr_probs_dict: Dict[Tuple[SalaryState, float], float]={}
                        reward=salary*(self.H-l-s)
                        action=(l,s)
                        prob_offer=self.beta*s/self.H
                        offer_price=min(salary+1,self.W)
                        for bonus in range(self.W-salary):
                            prob_bonus=poisson.pmf(bonus,self.alpha*l)
                            sr_probs_dict[(SalaryState(salary+bonus),reward)]=prob_bonus
                        prob_reach_max=1.0-poisson.cdf(self.W-1,self.alpha*l)
                        sr_probs_dict[(SalaryState(self.W),reward)]=prob_reach_max
                        prob_stay_zero=poisson.pmf(0,self.alpha*l)*(1-prob_offer)
                        sr_probs_dict[(SalaryState(salary),reward)]=prob_stay_zero
                        prob_accept_other_offer=poisson.pmf(0,self.alpha*l)*(prob_offer)
                        sr_probs_dict[(SalaryState(salary+1),reward)]=prob_accept_other_offer
                        d1[action]=Categorical(sr_probs_dict)
                d[SalaryState(salary)]=d1             
        return d

In [13]:
mdp1: FiniteMarkovDecisionProcess[SalaryState, int] =SalaryMDP(H=10,W=30,alpha=0.08,beta=0.82)
print(mdp1)

From State SalaryState(x=0):
  With Action (0, 0):
    To [State SalaryState(x=0) and Reward 0.000] with Probability 1.000
    To [State SalaryState(x=1) and Reward 0.000] with Probability 0.000
    To [State SalaryState(x=2) and Reward 0.000] with Probability 0.000
    To [State SalaryState(x=3) and Reward 0.000] with Probability 0.000
    To [State SalaryState(x=4) and Reward 0.000] with Probability 0.000
    To [State SalaryState(x=5) and Reward 0.000] with Probability 0.000
    To [State SalaryState(x=6) and Reward 0.000] with Probability 0.000
    To [State SalaryState(x=7) and Reward 0.000] with Probability 0.000
    To [State SalaryState(x=8) and Reward 0.000] with Probability 0.000
    To [State SalaryState(x=9) and Reward 0.000] with Probability 0.000
    To [State SalaryState(x=10) and Reward 0.000] with Probability 0.000
    To [State SalaryState(x=11) and Reward 0.000] with Probability 0.000
    To [State SalaryState(x=12) and Reward 0.000] with Probability 0.000
    To [St

In [14]:
opt_vf_vi,opt_policy_vi = value_iteration_result(mdp1, gamma=0.95)

In [15]:
opt_policy_vi

For State SalaryState(x=0):
  Do Action (0, 10) with Probability 1.000
For State SalaryState(x=1):
  Do Action (0, 10) with Probability 1.000
For State SalaryState(x=2):
  Do Action (0, 10) with Probability 1.000
For State SalaryState(x=3):
  Do Action (0, 10) with Probability 1.000
For State SalaryState(x=4):
  Do Action (0, 10) with Probability 1.000
For State SalaryState(x=5):
  Do Action (0, 10) with Probability 1.000
For State SalaryState(x=6):
  Do Action (0, 10) with Probability 1.000
For State SalaryState(x=7):
  Do Action (0, 10) with Probability 1.000
For State SalaryState(x=8):
  Do Action (0, 10) with Probability 1.000
For State SalaryState(x=9):
  Do Action (0, 10) with Probability 1.000
For State SalaryState(x=10):
  Do Action (0, 10) with Probability 1.000
For State SalaryState(x=11):
  Do Action (0, 10) with Probability 1.000
For State SalaryState(x=12):
  Do Action (0, 10) with Probability 1.000
For State SalaryState(x=13):
  Do Action (0, 10) with Probability 1.000
Fo