In [64]:
import numpy as np

In [65]:
ACTION_SPACE = ('U', 'D', 'L', 'R')

In [66]:
class WindyGrid:
    def __init__(self, rows, columns, start):
        self.rows = rows
        self.columns = columns
        self.i = start[0]
        self.j = start[1]

    def set_rewards_actions_probs(self, rewards, actions, probs):
        self.rewards = rewards # {s: r}
        self.actions = actions # {s: (a)}
        self.probs = probs # transition probabilities -> nested dict {(s,a): {(s'): prob}}
    
    def get_state(self):
        return (self.i, self.j)

    def set_state(self, s):
        self.i = s[0]
        self.j = s[1]

    # def get_next_state(self, s, a) is removed # -> because now next state is probabilistic so we can't know for sure the next state 
    
    def move(self, a):
        next_state_probs = self.probs[((self.i, self.j), a)]
        next_states = next_state_probs.keys()
        next_probs = next_state_probs.values()

        next_state = np.random.choice(next_states, p=next_probs) # random choice for each next states with their corresponding probability

        self.i, self.j = next_state

        return self.rewards.get(next_state, 0)
    
    def is_terminal(self, s):
        return s not in self.actions

    def game_over(self):
        return (self.i, self.j) not in self.actions

    def all_states(self):
        return set(self.actions.keys() | self.rewards.keys())

In [67]:
def grid_windy():
    g = WindyGrid(3, 4, (2, 0))
    rewards = {
        (0,3): 1,
        (1,3): -1
    }
    
    actions = {
        (0, 0): ('D', 'R'),
        (0, 1): ('L', 'R'),
        (0, 2): ('L', 'D', 'R'),
        (1, 0): ('U', 'D'),
        (1, 2): ('U', 'D', 'R'),
        (2, 0): ('U', 'R'),
        (2, 1): ('L', 'R'),
        (2, 2): ('U', 'L', 'R'),
        (2, 3): ('U', 'L')
    }

    probs = {
        ((2, 0), 'U'): {(1, 0): 1.0},
        ((2, 0), 'D'): {(2, 0): 1.0},
        ((2, 0), 'L'): {(2, 0): 1.0},
        ((2, 0), 'R'): {(2, 1): 1.0},
        ((1, 0), 'U'): {(0, 0): 1.0},
        ((1, 0), 'D'): {(2, 0): 1.0},
        ((1, 0), 'L'): {(1, 0): 1.0},
        ((1, 0), 'R'): {(1, 0): 1.0},
        ((0, 0), 'U'): {(0, 0): 1.0},
        ((0, 0), 'D'): {(1, 0): 1.0},
        ((0, 0), 'L'): {(0, 0): 1.0},
        ((0, 0), 'R'): {(0, 1): 1.0},
        ((0, 1), 'U'): {(0, 1): 1.0},
        ((0, 1), 'D'): {(0, 1): 1.0},
        ((0, 1), 'L'): {(0, 0): 1.0},
        ((0, 1), 'R'): {(0, 2): 1.0},
        ((0, 2), 'U'): {(0, 2): 1.0},
        ((0, 2), 'D'): {(1, 2): 1.0},
        ((0, 2), 'L'): {(0, 1): 1.0},
        ((0, 2), 'R'): {(0, 3): 1.0},
        ((2, 1), 'U'): {(2, 1): 1.0},
        ((2, 1), 'D'): {(2, 1): 1.0},
        ((2, 1), 'L'): {(2, 0): 1.0},
        ((2, 1), 'R'): {(2, 2): 1.0},
        ((2, 2), 'U'): {(1, 2): 1.0},
        ((2, 2), 'D'): {(2, 2): 1.0},
        ((2, 2), 'L'): {(2, 1): 1.0},
        ((2, 2), 'R'): {(2, 3): 1.0},
        ((2, 3), 'U'): {(1, 3): 1.0},
        ((2, 3), 'D'): {(2, 3): 1.0},
        ((2, 3), 'L'): {(2, 2): 1.0},
        ((2, 3), 'R'): {(2, 3): 1.0},
        ((1, 2), 'U'): {(0, 2): 0.5, (1, 3): 0.5},
        ((1, 2), 'D'): {(2, 2): 1.0},
        ((1, 2), 'L'): {(1, 2): 1.0},
        ((1, 2), 'R'): {(1, 3): 1.0},
    }

    g.set_rewards_actions_probs(rewards, actions, probs)

    return g

In [68]:
def grid_windy_penalized(step_cost = -0.1):
    g = WindyGrid(3, 4, (2, 0))
    rewards = {
        (0, 0): step_cost,
        (0, 1): step_cost,
        (0, 2): step_cost,
        (1, 0): step_cost,
        (1, 2): step_cost,
        (2, 0): step_cost,
        (2, 1): step_cost,
        (2, 2): step_cost,
        (2, 3): step_cost,
        (0, 3): 1,
        (1, 3): -1
    }
    
    actions = {
        (0, 0): ('D', 'R'),
        (0, 1): ('L', 'R'),
        (0, 2): ('L', 'D', 'R'),
        (1, 0): ('U', 'D'),
        (1, 2): ('U', 'D', 'R'),
        (2, 0): ('U', 'R'),
        (2, 1): ('L', 'R'),
        (2, 2): ('U', 'L', 'R'),
        (2, 3): ('U', 'L')
    }

    probs = {
        ((2, 0), 'U'): {(1, 0): 1.0},
        ((2, 0), 'D'): {(2, 0): 1.0},
        ((2, 0), 'L'): {(2, 0): 1.0},
        ((2, 0), 'R'): {(2, 1): 1.0},
        ((1, 0), 'U'): {(0, 0): 1.0},
        ((1, 0), 'D'): {(2, 0): 1.0},
        ((1, 0), 'L'): {(1, 0): 1.0},
        ((1, 0), 'R'): {(1, 0): 1.0},
        ((0, 0), 'U'): {(0, 0): 1.0},
        ((0, 0), 'D'): {(1, 0): 1.0},
        ((0, 0), 'L'): {(0, 0): 1.0},
        ((0, 0), 'R'): {(0, 1): 1.0},
        ((0, 1), 'U'): {(0, 1): 1.0},
        ((0, 1), 'D'): {(0, 1): 1.0},
        ((0, 1), 'L'): {(0, 0): 1.0},
        ((0, 1), 'R'): {(0, 2): 1.0},
        ((0, 2), 'U'): {(0, 2): 1.0},
        ((0, 2), 'D'): {(1, 2): 1.0},
        ((0, 2), 'L'): {(0, 1): 1.0},
        ((0, 2), 'R'): {(0, 3): 1.0},
        ((2, 1), 'U'): {(2, 1): 1.0},
        ((2, 1), 'D'): {(2, 1): 1.0},
        ((2, 1), 'L'): {(2, 0): 1.0},
        ((2, 1), 'R'): {(2, 2): 1.0},
        ((2, 2), 'U'): {(1, 2): 1.0},
        ((2, 2), 'D'): {(2, 2): 1.0},
        ((2, 2), 'L'): {(2, 1): 1.0},
        ((2, 2), 'R'): {(2, 3): 1.0},
        ((2, 3), 'U'): {(1, 3): 1.0},
        ((2, 3), 'D'): {(2, 3): 1.0},
        ((2, 3), 'L'): {(2, 2): 1.0},
        ((2, 3), 'R'): {(2, 3): 1.0},
        ((1, 2), 'U'): {(0, 2): 0.5, (1, 3): 0.5},
        ((1, 2), 'D'): {(2, 2): 1.0},
        ((1, 2), 'L'): {(1, 2): 1.0},
        ((1, 2), 'R'): {(1, 3): 1.0},
    }

    g.set_rewards_actions_probs(rewards, actions, probs)

    return g

In [69]:
def print_values(V, g):
    for i in range(g.rows):
        print("-------------------------")
        for j in range(g.columns):
            v = V.get((i, j), 0)
            if v >= 0:
                print(" %.2f|" % v, end="")
            else:
                print("%.2f|" % v, end="")
        print("")

def print_policy(P, g):
    for i in range(g.rows):
        print("-------------------------")
        for j in range(g.columns):
            a = P.get((i, j), ' ')
            print(" %s |" % a, end="")
        print("")

In [70]:
def t_probs_rewards(g):
    transition_probs = {}

    rewards = {}

    for i in range(g.rows):
        for j in range(g.columns):
            s = (i, j)
            if not g.is_terminal(s):
                for a in ACTION_SPACE:
                    next_states = g.probs[(s, a)]
                    for s_next in next_states.keys():
                        transition_probs[(s, a, s_next)] = next_states[s_next] # prob of s with action to reach s_next is probabilistic
                        rewards[(s, a, s_next)] = g.rewards.get(s_next, 0)
    return transition_probs, rewards

## Policy Iteration using policy evaluation and optimization

In [71]:
def evaluate_deterministic_policy(g, policy):
    V = {}
    for s in g.all_states():
        V[s] = 0
    iterations = 0

    while True:
        biggest_change = 0
        for s in g.all_states():
            old_v = V[s]
            new_v = 0
            for a in ACTION_SPACE:
                for s_next in g.all_states():
                    action_prob = 1 if policy.get(s) == a else 0

                    r = rewards.get((s, a, s_next), 0)

                    new_v += action_prob * transition_probs.get((s, a, s_next), 0) * (r + gamma * V[s_next])

            V[s] = new_v
            biggest_change = max(biggest_change, np.abs(old_v - V[s]))

        iterations += 1
        if biggest_change < delta:
            break

    return V

In [72]:
def get_optimal_policy(g, policy):
    while True:
        V = evaluate_deterministic_policy(g, policy)

        is_policy_converged = True
        for s in g.actions.keys():
            old_a = policy[s]
            new_a = None
            best_value = float('-inf')

            for a in ACTION_SPACE:
                v = 0
                for s_next in g.all_states():
                    r = rewards.get((s, a, s_next), 0)
                    v += transition_probs.get((s, a, s_next), 0) * (r + gamma * V[s_next])

                if v > best_value:
                    best_value = v
                    new_a = a
            
            policy[s] = new_a

            if new_a != old_a:
                is_policy_converged = False
            
        if is_policy_converged:
            break
    return V, policy

## Alternative Function: Value Iteration

In [73]:
def value_iteration(g):
    V = {}
    for s in g.all_states():
        V[s] = 0
    iterations = 0
    while True:
        biggest_change = 0
        for s in g.all_states():
            if not g.is_terminal(s):
                old_v = V[s]
                new_v = float('-inf')
                for a in ACTION_SPACE:
                    v = 0
                    for s_next in g.all_states():
                        r = rewards.get((s, a, s_next), 0)

                        v += transition_probs.get((s, a, s_next), 0) * (r + gamma * V[s_next])

                    if v > new_v:
                        new_v = v
                V[s] = new_v
                biggest_change = max(biggest_change, np.abs(old_v - V[s]))

        iterations += 1
        if biggest_change < delta:
            break
    return V

def get_optimal_policy_to_optimal_value(g, V):
    policy = {}
    for s in g.actions.keys():
        best_a = None
        best_value = float('-inf')

        for a in ACTION_SPACE:
            v = 0
            for s_next in g.all_states():
                r = rewards.get((s, a, s_next), 0)
                v += transition_probs.get((s, a, s_next), 0) * (r + gamma * V[s_next])

            if v > best_value:
                best_value = v
                best_a = a
        
        policy[s] = best_a
    return policy

## Policy Iteration

In [74]:
g = grid_windy()
delta = 1e-3
gamma = 0.9

# random policy
policy = {}
for s in g.actions.keys():
    policy[s] = np.random.choice(ACTION_SPACE)

print("initial policy")
print_policy(policy, g)
print("")

transition_probs, rewards = t_probs_rewards(g)
V, policy = get_optimal_policy(g, policy)
print("optimal policy")
print_policy(policy, g)
print("V")
print_values(V, g)

initial policy
-------------------------
 U | U | D |   |
-------------------------
 D |   | R |   |
-------------------------
 D | D | D | D |

optimal policy
-------------------------
 R | R | R |   |
-------------------------
 U |   | D |   |
-------------------------
 U | L | L | L |
V
-------------------------
 0.81| 0.90| 1.00| 0.00|
-------------------------
 0.73| 0.00| 0.48| 0.00|
-------------------------
 0.66| 0.59| 0.53| 0.48|


In [75]:
g = grid_windy_penalized(-0.15)
delta = 1e-3
gamma = 0.9

# random policy
policy = {}
for s in g.actions.keys():
    policy[s] = np.random.choice(ACTION_SPACE)

print("initial policy")
print_policy(policy, g)
print("")

transition_probs, rewards = t_probs_rewards(g)
V, policy = get_optimal_policy(g, policy)
print("optimal policy")
print_policy(policy, g)
print("V")
print_values(V, g)

initial policy
-------------------------
 R | D | R |   |
-------------------------
 U |   | R |   |
-------------------------
 D | R | L | L |

optimal policy
-------------------------
 R | R | R |   |
-------------------------
 U |   | U |   |
-------------------------
 U | L | L | L |
V
-------------------------
 0.53| 0.75| 1.00| 0.00|
-------------------------
 0.32| 0.00|-0.12| 0.00|
-------------------------
 0.14|-0.02|-0.17|-0.30|


## Value Iteration

In [76]:
g = grid_windy()
delta = 1e-3
gamma = 0.9

# random policy
policy = {}
for s in g.actions.keys():
    policy[s] = np.random.choice(ACTION_SPACE)

print("initial policy")
print_policy(policy, g)
print("")

transition_probs, rewards = t_probs_rewards(g)
V = value_iteration(g)
print("optimal V")
print_values(V, g)

policy = get_optimal_policy_to_optimal_value(g, V)
print("optimal policy")
print_policy(policy, g)

initial policy
-------------------------
 R | D | U |   |
-------------------------
 R |   | L |   |
-------------------------
 R | R | U | L |

optimal V
-------------------------
 0.81| 0.90| 1.00| 0.00|
-------------------------
 0.73| 0.00| 0.48| 0.00|
-------------------------
 0.66| 0.59| 0.53| 0.48|
optimal policy
-------------------------
 R | R | R |   |
-------------------------
 U |   | D |   |
-------------------------
 U | L | L | L |


In [80]:
g = grid_windy_penalized(-0.1)
delta = 1e-3
gamma = 0.9

# random policy
policy = {}
for s in g.actions.keys():
    policy[s] = np.random.choice(ACTION_SPACE)

print("initial policy")
print_policy(policy, g)
print("")

transition_probs, rewards = t_probs_rewards(g)
V = value_iteration(g)
print("optimal V")
print_values(V, g)

policy = get_optimal_policy_to_optimal_value(g, V)
print("optimal policy")
print_policy(policy, g)

initial policy
-------------------------
 L | L | R |   |
-------------------------
 L |   | U |   |
-------------------------
 U | R | U | R |

optimal V
-------------------------
 0.62| 0.80| 1.00| 0.00|
-------------------------
 0.46| 0.00|-0.04| 0.00|
-------------------------
 0.31| 0.18| 0.06|-0.04|
optimal policy
-------------------------
 R | R | R |   |
-------------------------
 U |   | D |   |
-------------------------
 U | L | L | L |
