In [None]:
import numpy as np
def policy_evaluation(pi,P,gamma=0.1,theta=1e-10):
  prev_v=np.zeros(len(P))
  while True:
    V=np.zeros(len(P))
    for s in range(len(P)):
      for prob, next_state,reward,done in P[s][pi(s)]:
        V[s]+=prob*(reward+gamma*prev_v[next_state]*(not done))
    if np.max(np.abs(prev_v-V))<theta:
      break
    prev_v=V.copy()
  return V

In [None]:
!pip install gym
!pip install git+https://github.com/mimoralea/gym-walk

Collecting git+https://github.com/mimoralea/gym-walk
  Cloning https://github.com/mimoralea/gym-walk to /tmp/pip-req-build-740stdz1
  Running command git clone --filter=blob:none --quiet https://github.com/mimoralea/gym-walk /tmp/pip-req-build-740stdz1
  Resolved https://github.com/mimoralea/gym-walk to commit 5999016267d6de2f5a63307fb00dfd63de319ac1
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: gym_walk
  Building wheel for gym_walk (setup.py) ... [?25l[?25hdone
  Created wheel for gym_walk: filename=gym_walk-0.0.2-py3-none-any.whl size=4058 sha256=0627e304998f6472ebb54a3ba2a9d9b6cf44ea2d7e30dcf011300f1a728e732d
  Stored in directory: /tmp/pip-ephem-wheel-cache-8_no2s87/wheels/24/fe/c4/0cbc7511d29265bad7e28a09311db3f87f0cafba74af54d530
Successfully built gym_walk
Installing collected packages: gym_walk
Successfully installed gym_walk-0.0.2


In [None]:
import warnings ; warnings.filterwarnings('ignore')

import gym, gym_walk
import numpy as np

import random
import warnings

warnings.filterwarnings('ignore', category=DeprecationWarning)
np.set_printoptions(suppress=True)
random.seed(123); np.random.seed(123)



def print_policy(pi, P, action_symbols=('<', 'v', '>', '^'), n_cols=4, title='Policy:'):
    print(title)
    arrs = {k:v for k,v in enumerate(action_symbols)}
    for s in range(len(P)):
        a = pi(s)
        print("| ", end="")
        if np.all([done for action in P[s].values() for _, _, _, done in action]):
            print("".rjust(9), end=" ")
        else:
            print(str(s).zfill(2), arrs[a].rjust(6), end=" ")
        if (s + 1) % n_cols == 0: print("|")


def print_state_value_function(V, P, n_cols=4, prec=3, title='State-value function:'):
    print(title)
    for s in range(len(P)):
        v = V[s]
        print("| ", end="")
        if np.all([done for action in P[s].values() for _, _, _, done in action]):
            print("".rjust(9), end=" ")
        else:
            print(str(s).zfill(2), '{}'.format(np.round(v, prec)).rjust(6), end=" ")
        if (s + 1) % n_cols == 0: print("|")


def probability_success(env, pi, goal_state, n_episodes=100, max_steps=200):
    random.seed(123); np.random.seed(123) ; env.seed(123)
    results = []
    for _ in range(n_episodes):
        state, done, steps = env.reset(), False, 0
        while not done and steps < max_steps:
            state, _, done, h = env.step(pi(state))
            steps += 1
        results.append(state == goal_state)
    return np.sum(results)/len(results)


def mean_return(env, pi, n_episodes=100, max_steps=200):
    random.seed(123); np.random.seed(123) ; env.seed(123)
    results = []
    for _ in range(n_episodes):
        state, done, steps = env.reset(), False, 0
        results.append(0.0)
        while not done and steps < max_steps:
            state, reward, done, _ = env.step(pi(state))
            results[-1] += reward
            steps += 1
    return np.mean(results)



env = gym.make('SlipperyWalkFive-v0')
P = env.env.P
init_state = env.reset()
goal_state = 6
LEFT, RIGHT = range(2)


P


init_state


state, reward, done, info = env.step(RIGHT)
print("state:{0} - reward:{1} - done:{2} - info:{3}".format(state, reward, done, info))


# First Policy
pi_1 = lambda s: {
    0:LEFT, 1:LEFT, 2:LEFT, 3:LEFT, 4:LEFT, 5:LEFT, 6:LEFT
}[s]
print_policy(pi_1, P, action_symbols=('<', '>'), n_cols=7)


# Find the probability of success and the mean return of the first policy
print('Reaches goal {:.2f}%. Obtains an average undiscounted return of {:.4f}.'.format(
    probability_success(env, pi_1, goal_state=goal_state)*100,
    mean_return(env, pi_1)))

# Second Policy
pi_2 = lambda s: {
    0:RIGHT, 1:RIGHT, 2:LEFT, 3:RIGHT, 4:RIGHT, 5:LEFT, 6:RIGHT
}[s]
print_policy(pi_2, P, action_symbols=('<', '>'), n_cols=7)

# Find the probability of success and the mean return of you your policy
print('Reaches goal {:.2f}%. Obtains an average undiscounted return of {:.4f}.'.format(
    probability_success(env, pi_2, goal_state=goal_state)*100,
    mean_return(env, pi_2)))



# Compare your policy with the first policy


def policy_evaluation(pi,P,gamma=0.1,theta=1e-10):
  prev_v=np.zeros(len(P))
  while True:
    V=np.zeros(len(P))
    for s in range(len(P)):
      for prob, next_state,reward,done in P[s][pi(s)]:
        V[s]+=prob*(reward+gamma*prev_v[next_state]*(not done))
    if np.max(np.abs(prev_v-V))<theta:
      break
    prev_v=V.copy()
  return V



state:2 - reward:0.0 - done:False - info:{'prob': 0.16666666666666666}
Policy:
|           | 01      < | 02      < | 03      < | 04      < | 05      < |           |
Reaches goal 3.00%. Obtains an average undiscounted return of 0.0300.
Policy:
|           | 01      > | 02      < | 03      > | 04      > | 05      < |           |
Reaches goal 73.00%. Obtains an average undiscounted return of 0.7300.


In [None]:
# Code to evaluate the first policy
V1 = policy_evaluation(pi_1, P)
print_state_value_function(V1, P, n_cols=7, prec=5)

V2 = policy_evaluation(pi_2, P)
print_state_value_function(V2, P, n_cols=7, prec=5)




State-value function:
|           | 01    0.0 | 02    0.0 | 03  5e-05 | 04 0.00298 | 05 0.17257 |           |
State-value function:
|           | 01    0.0 | 02  1e-05 | 03 0.00046 | 04 0.00895 | 05 0.17288 |           |


In [None]:
if(np.sum(V1>=V2)==7):
  print("The first policy is the better policy")
elif(np.sum(V2>=V1)==7):
  print("The second policy is the better policy")
else:
  print("Both policies have their merits.")

The second policy is the better policy
