In [1]:
# import common packages
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns


In [2]:
# Environment description
actions={
    (2,0):['U','R'],
    (1,0):['U','D'],
    (0,0):['R','D'],
    (2,1):['R','L'],
    (0,1):['R','L'],
    (2,2):['U','R','L'],
    (1,2):['U','D','R'],
    (0,2):['R','L','D'],
    (2,3):['L'],
}

probs = {
    ((2, 0), 'U'): {(1, 0): 1.0},
    ((2, 0), 'D'): {(2, 0): 1.0},
    ((2, 0), 'L'): {(2, 0): 1.0},
    ((2, 0), 'R'): {(2, 1): 1.0},
    ((1, 0), 'U'): {(0, 0): 1.0},
    ((1, 0), 'D'): {(2, 0): 1.0},
    ((1, 0), 'L'): {(1, 0): 1.0},
    ((1, 0), 'R'): {(1, 0): 1.0},
    ((0, 0), 'U'): {(0, 0): 1.0},
    ((0, 0), 'D'): {(1, 0): 1.0},
    ((0, 0), 'L'): {(0, 0): 1.0},
    ((0, 0), 'R'): {(0, 1): 1.0},
    ((0, 1), 'U'): {(0, 1): 1.0},
    ((0, 1), 'D'): {(0, 1): 1.0},
    ((0, 1), 'L'): {(0, 0): 1.0},
    ((0, 1), 'R'): {(0, 2): 1.0},
    ((0, 2), 'U'): {(0, 2): 1.0},
    ((0, 2), 'D'): {(1, 2): 1.0},
    ((0, 2), 'L'): {(0, 1): 1.0},
    ((0, 2), 'R'): {(0, 3): 1.0},
    ((2, 1), 'U'): {(2, 1): 1.0},
    ((2, 1), 'D'): {(2, 1): 1.0},
    ((2, 1), 'L'): {(2, 0): 1.0},
    ((2, 1), 'R'): {(2, 2): 1.0},
    ((2, 2), 'U'): {(1, 2): 1.0},
    ((2, 2), 'D'): {(2, 2): 1.0},
    ((2, 2), 'L'): {(2, 1): 1.0},
    ((2, 2), 'R'): {(2, 3): 1.0},
    ((2, 3), 'U'): {(1, 3): 1.0},
    ((2, 3), 'D'): {(2, 3): 1.0},
    ((2, 3), 'L'): {(2, 2): 1.0},
    ((2, 3), 'R'): {(2, 3): 1.0},
    ((1, 2), 'U'): {(0, 2): 0.5, (1, 3): 0.5},
    ((1, 2), 'D'): {(2, 2): 1.0},
    ((1, 2), 'L'): {(1, 2): 1.0},
    ((1, 2), 'R'): {(1, 3): 1.0},
  }


In [3]:
# define the gridworld  class
class GridWorld():
    
    def __init__(self, rows, columns, start_position):
        self.rows = rows
        self.columns = columns
        #self.all_states = [(i,j) for i in range(rows) for j in range(columns)]
        self.i = start_position[0]
        self.j = start_position[1]
        
    def set_rewards_actions(self, rewards, actions, probs):
        self.rewards = rewards
        self.actions = actions
        self.probs = probs
        self.all_states = set(self.actions.keys()) | set(self.rewards.keys())
        #print (self.all_states)
    
    def set_state(self, s):
        self.i = s[0]
        self.j = s[1]
    
    def current_state(self):
        return i,j
    
    
    def undo_move(self, action):
        if action in self.actions[(self.i,self.j)]:
            if action == 'U':
                self.i += 1
            elif action == 'R':
                self.j -= 1
            elif action == 'L':
                self.j += 1
            else:
                self.i -= 1
        # should never happen
        assert (self.current_state() in self.all_states)
 
    def move(self, action):
        cur_state = (self.i, self.j)
        a = action
        next_action_prob = self.probs[(cur_state,a)]
        next_actions = list(next_action_prob.keys)
        next_probs = list(next_action_prob.values)
        next_state = np.random.choice(next_actions, p=next_probs)
        self.i = next_state[0]
        self.j = next_state[1]
        return self.rewards.get((self.i,self.j),0)

    def is_terminal (self, s):
        return s not in self.actions
    
    def game_over():
        return (self.i,self.j) in self.actions

In [38]:
SMALL_ENOUGH = 1e-3

def print_values(V,g):
    for i in range(g.rows):
        print("---------------------------")
        for j in range(g.columns):
            v = V.get((i,j),0)
            if v >= 0:
                print(" %.2f|" % v, end="")
            else:
                print("%.2f|" % v, end="")
        print ("")

def print_policy(P,g):
    for i in range(g.rows):
        print("---------------------------")
        for j in range(g.columns):
            a = P.get((i,j),' ')
            print(" %s |" % a, end="")
        print ("")

ACTION_SPACE = ('U', 'D', 'L', 'R')

def init_grid_world_penalized(step_cost):
    grid = GridWorld(3,4,(2,0))
    rewards = {
        (2,0):step_cost,
        (1,0):step_cost,
        (0,0):step_cost,
        (2,1):step_cost,
        (0,1):step_cost,
        (2,2):step_cost,
        (1,2):step_cost,
        (0,2):step_cost,
        (2,3):step_cost,
        (0,3):1,
        (1,3):-1
    }
    grid.set_rewards_actions(rewards, actions, probs)
    return grid

def init_transition_probs(grid, tr_probs, exp_rewards):
    for (s,a),v in grid.probs.items():
        for s2, p in v.items():
            tr_probs[(s,a,s2)] = p
            if s2 in grid.rewards:
                exp_rewards[(s,a,s2)] = grid.rewards[s2]
    return tr_probs, exp_rewards

gamma = 0.9

def evaluate_deterministic_policy(grid, cur_policy, tr_probs, exp_rewards):
    V = {}
    for s in grid.all_states:
        V[s]=0
          
    it = 0
    while True:
        biggest_change = 0
        for s in grid.all_states:
            if not grid.is_terminal(s):
                old_v = V[s] 
                new_v = 0
                for a in ACTION_SPACE:
                    for s2 in grid.all_states:
                        
                        action_prob = 1 if cur_policy.get(s) == a else 0
                        
                        r = exp_rewards.get((s,a,s2),0)
                        new_v += action_prob *tr_probs.get((s,a,s2),0)*(r + gamma *V[s2])
                
                V[s] = new_v
                biggest_change = max(biggest_change, np.abs(old_v - V[s]))
                
        #print ("iter: ", it, "biggest_change: ", biggest_change)
        #print_values(V,grid)
        it += 1
        
        if biggest_change < SMALL_ENOUGH:
            break   
    return V
    
def play_game(step_cost):
    
    transition_probs = {}
    expected_rewards = {}
        
    g = init_grid_world_penalized(step_cost)
   
    transition_probs, expected_rewards= init_transition_probs(g, transition_probs, expected_rewards) 
    #print (transition_probs)
    #print (expected_rewards)
    
    # initialize main variable   
    it_main = 0
    V = {}
    policy = {}
    for s in g.all_states:
        V[s]=0
    
    while True:
        biggest_change = 0
        #Evaluation the best value function
        for s in g.all_states:
            #print(f's:{s}')
            if not g.is_terminal(s):
                old_v = V[s]
                new_v = float('-inf') 
                                
                for a in ACTION_SPACE:    
                    #print(a)
                    v=0
                    for s2 in g.all_states:          
                        #print(s2)
                        r = expected_rewards.get((s,a,s2),0)
                        v += transition_probs.get((s,a,s2),0)*(r + gamma *V[s2])
                
                    if v > new_v:
                        new_v = v
                
                V[s] = new_v 
                #print(f"biggest_change : {biggest_change}, old_v : {old_v}, V[s] : {V[s]}")
                biggest_change = max(biggest_change, np.abs(old_v - V[s]))
                   
        if biggest_change < SMALL_ENOUGH:
            break
                                   
        it_main +=1
    
    print ("iter: ", it_main)
    print_values(V,g)  

    # loop once to find the associated best policy
    for s in g.actions.keys():
        if not g.is_terminal(s):
            old_v = V[s]
            new_v = float('-inf') 
                
            for a in ACTION_SPACE:
                v=0
                for s2 in g.all_states:          
                        
                    r = expected_rewards.get((s,a,s2),0)
                    v += transition_probs.get((s,a,s2),0)*(r + gamma *V[s2])
                
                if v > new_v:
                    new_v = v
                    policy[s] = a
    
    print_policy(policy,g)                 

In [40]:
play_game(-0.4)

iter:  3
---------------------------
 0.05| 0.50| 1.00| 0.00|
---------------------------
-0.36| 0.00|-0.25| 0.00|
---------------------------
-0.72|-0.96|-0.62|-0.96|
---------------------------
 R | R | R |   |
---------------------------
 U |   | U |   |
---------------------------
 U | R | U | L |
