In [164]:
import warnings ; warnings.filterwarnings('ignore')

import gym, gym_walk, gym_aima
import numpy as np
from pprint import pprint
from tqdm import tqdm_notebook as tqdm

from itertools import cycle

import random

np.set_printoptions(suppress=True)
random.seed(123); np.random.seed(123)

In [165]:
def print_policy(pi, P, action_symbols=('<', 'v', '>', '^'), n_cols=4, title='Policy:'):
    print(title)
    arrs = {k:v for k,v in enumerate(action_symbols)}
    for s in range(len(P)):
        a = pi(s)
        print("| ", end="")
        if np.all([done for action in P[s].values() for _, _, _, done in action]):
            print("".rjust(9), end=" ")
        else:
            print(str(s).zfill(2), arrs[a].rjust(6), end=" ")
        if (s + 1) % n_cols == 0: print("|")

In [166]:
def probability_success(env, pi, goal_state, n_episodes=100, max_steps=200):
    random.seed(123); np.random.seed(123) ; env.seed(123)
    results = []
    
    #run through 100 episodes
    for _ in range(n_episodes):
        state, done, steps = env.reset(), False, 0
        
        #if the exercise is done, or has reached the max steps end the episode
        #until add the number of times that you made it to the goal state
        while not done and steps < max_steps:
            state, _, done, h = env.step(pi(state))
            steps += 1
        results.append(state == goal_state)
        
    #get a mean of the number of successful episodes
    return np.sum(results)/len(results)

In [167]:
def mean_return(env, pi, n_episodes=100, max_steps=200):
    random.seed(123); np.random.seed(123) ; env.seed(123)
    results = []
    
    #run through 100 episodes
    for _ in range(n_episodes):
        state, done, steps = env.reset(), False, 0
        results.append(0.0)
        
        #if the exercise is done, or has reached the max steps end the episode
        #until then add the reward for each state
        while not done and steps < max_steps:
            state, reward, done, _ = env.step(pi(state))
            results[-1] += reward
            steps += 1
    return np.mean(results)

In [168]:
def print_state_value_function(V, P, n_cols=4, prec=3, title='State-value function:'):
    print(title)
    
    for s in range(len(P)):
        print("\n")
        #print(P[s])
    
        v = V[s]
        print("| ", end="")
        if np.all([done for action in P[s].values() for _, _, _, done in action]):
            print("".rjust(9), end=" ")
        else:
            print(str(s).zfill(2), '{}'.format(np.round(v, prec)).rjust(6), end=" ")
        if (s + 1) % n_cols == 0: print("|")

# Slippery Walk Five MDP and sample policy

In [169]:
env = gym.make('SlipperyWalkFive-v0')
P = env.env.P
init_state = env.reset()
goal_state = 6

LEFT, RIGHT = range(2)
pi = lambda s: {
    0:LEFT, 1:LEFT, 2:LEFT, 3:LEFT, 4:LEFT, 5:LEFT, 6:LEFT
}[s]
print_policy(pi, P, action_symbols=('<', '>'), n_cols=7)
print('Reaches goal {:.2f}%. Obtains an average undiscounted return of {:.4f}.'.format(
    probability_success(env, pi, goal_state=goal_state)*100, 
    mean_return(env, pi)))

Policy:
|           | 01      < | 02      < | 03      < | 04      < | 05      < |           |
Reaches goal 7.00%. Obtains an average undiscounted return of 0.0700.


# Policy Evaluation

$$\huge v_{k+1}(s) = \sum_{a} \pi(a|s) \sum_{a, s'} p(s',r|s, a)[r + \gamma v_k(s')]$$

In [170]:
def policy_evaluation(pi, P, gamma=1.0, theta=1e-10):
    prev_V = np.zeros(len(P), dtype=np.float64) #initialize first iteration estimates
    
    while True:
        V = np.zeros(len(P), dtype=np.float64)
        
        for s in range(len(P)):
            count = 0
            #calculate the value for all 7 (0 to 6) states
            #prob for each state = 50% action success, 33.33% no effects, and 16.66% backward
            #the not done is to make sure that terminal states are 0 reward.
            for prob, next_state, reward, done in P[s][pi(s)]:
                V[s] += prob * (reward + gamma * prev_V[next_state] * (not done))
       
        if np.max(np.abs(prev_V - V)) < theta:
            break
        prev_V = V.copy()
    return V

In [171]:
V = policy_evaluation(pi, P)
print_state_value_function(V, P, n_cols=7, prec=5)

State-value function:


|           

| 01 0.00275 

| 02 0.01099 

| 03 0.03571 

| 04 0.10989 

| 05 0.33242 

|           |


# Policy Improvement

$$\huge \pi'(s) = \underset{a}{\mathrm{argmin}} \sum_{a, s'} p(s',r|s, a)[r + \gamma v_\pi(s')]$$

In [172]:
def policy_improvement(V, P, gamma=1.0):
    Q = np.zeros((len(P), len(P[0])), dtype=np.float64)
    
    #iterate through all 7 states (0 - 6)
    for s in range(len(P)):
        
        # get the probs for left and right (0, 1)
        for a in range(len(P[s])):
            
            # based on the policy we gather which is 
            #V = [0. 0.00274725 0.01098901 0.03571429 0.10989011 0.33241758 0.]
            #which is the best direction, then we calculate for both directions
            for prob, next_state, reward, done in P[s][a]:
                Q[s][a] += prob * (reward + gamma * V[next_state] * (not done))
    
    #          Left        Right
    # Q = [[0.         0.        ]
    #       [0.00274725 0.00641026]
    #       [0.01098901 0.02197802]
    #       [0.03571429 0.06868132]
    #       [0.10989011 0.20879121]
    #       [0.33241758 0.62912088]
    #       [0.         0.        ]]

    #get the index of the highest value in each row, this will give us the column that corresponds to left
    #or right
    new_pi = lambda s: {s:a for s, a in enumerate(np.argmax(Q, axis=1))}[s]
    print("new policy", [new_pi(i) for i in range(len(P))])
    print("\n")

    return new_pi

In [173]:
improved_pi = policy_improvement(V, P)
print_policy(improved_pi, P, action_symbols=('<', '>'), n_cols=7)
print('Reaches goal {:.2f}%. Obtains an average undiscounted return of {:.4f}.'.format(
    probability_success(env, improved_pi, goal_state=goal_state)*100, mean_return(env, improved_pi)))

new policy [0, 1, 1, 1, 1, 1, 0]


Policy:
|           | 01      > | 02      > | 03      > | 04      > | 05      > |           |
Reaches goal 93.00%. Obtains an average undiscounted return of 0.9300.


In [174]:
# how about we evaluate the improved policy?
improved_V = policy_evaluation(improved_pi, P)
print_state_value_function(improved_V, P, n_cols=7, prec=5)

State-value function:


|           

| 01 0.66758 

| 02 0.89011 

| 03 0.96429 

| 04 0.98901 

| 05 0.99725 

|           |


In [175]:
# can we improved the improved policy?
improved_improved_pi = policy_improvement(improved_V, P)
print_policy(improved_improved_pi, P, action_symbols=('<', '>'), n_cols=7)
print('Reaches goal {:.2f}%. Obtains an average undiscounted return of {:.4f}.'.format(
    probability_success(env, improved_improved_pi, goal_state=goal_state)*100, 
    mean_return(env, improved_improved_pi)))

new policy [0, 1, 1, 1, 1, 1, 0]


Policy:
|           | 01      > | 02      > | 03      > | 04      > | 05      > |           |
Reaches goal 93.00%. Obtains an average undiscounted return of 0.9300.


In [176]:
# it is the same policy
# if we evaluate again, we can see there is nothing to improve 
# that also means we reached the optimal policy
improved_improved_V = policy_evaluation(improved_improved_pi, P)
print_state_value_function(improved_improved_V, P, n_cols=7, prec=5)

State-value function:


|           

| 01 0.66758 

| 02 0.89011 

| 03 0.96429 

| 04 0.98901 

| 05 0.99725 

|           |


In [177]:
# state-value function didn't improve, then we reach the optimal policy
assert np.all(improved_V == improved_improved_V)