In [56]:
pip install git+https://github.com/mimoralea/gym-walk#egg=gym-walk

Collecting gym-walk
  Cloning https://github.com/mimoralea/gym-walk to /tmp/pip-install-e7njz66g/gym-walk_2062edd18da34431bea6828d473c068c
  Running command git clone --filter=blob:none --quiet https://github.com/mimoralea/gym-walk /tmp/pip-install-e7njz66g/gym-walk_2062edd18da34431bea6828d473c068c
  Resolved https://github.com/mimoralea/gym-walk to commit 5999016267d6de2f5a63307fb00dfd63de319ac1
  Preparing metadata (setup.py) ... [?25l[?25hdone


In [57]:
import warnings ; warnings.filterwarnings('ignore')
import gym
import gym, gym_walk
import numpy as np

import random
import warnings

warnings.filterwarnings('ignore', category=DeprecationWarning)
np.set_printoptions(suppress=True)
random.seed(123); np.random.seed(123)
from builtins import object

In [58]:
def print_policy(pi, P, action_symbols=('<', 'v', '>', '^'), n_cols=4, title='Policy:'):
    print(title)
    arrs = {k:v for k,v in enumerate(action_symbols)}
    for s in range(len(P)):
        a = pi[s]
        print("| ", end="")
        if np.all([done for action in P[s].values() for _, _, _, done in action]):
            print("".rjust(9), end=" ")
        else:
            print(str(s).zfill(2), arrs[a].rjust(6), end=" ")
        if (s + 1) % n_cols == 0: print("|")

In [59]:
def print_state_value_function(V, P, n_cols=4, prec=3, title='State-value function:'):
    print(title)
    for s in range(len(P)):
        v = V[s]
        print("| ", end="")
        if np.all([done for action in P[s].values() for _, _, _, done in action]):
            print("".rjust(9), end=" ")
        else:
            print(str(s).zfill(2), '{}'.format(np.round(v, prec)).rjust(6), end=" ")
        if (s + 1) % n_cols == 0: print("|")


In [60]:
def probability_success(env, pi, goal_state, n_episodes=100, max_steps=1000):
    results = []
    for _ in range(n_episodes):
        state, done, steps = env.reset(), False, 0
        while not done and steps < max_steps:
            state, _, done, _ = env.step(pi[state])
            steps += 1
        results.append(state == goal_state)
    return np.mean(results)

In [61]:
def mean_return(env, pi, n_episodes=100, max_steps=1000):
    returns = []
    for _ in range(n_episodes):
        state, done, steps, return_ = env.reset(), False, 0, 0
        while not done and steps < max_steps:
            state, reward, done, _ = env.step(pi[state])
            return_ += reward
            steps += 1
        returns.append(return_)
    return np.mean(returns)

In [63]:
env = gym.make('SlipperyWalkFive-v0')
P = env.env.P
init_state = env.reset()
goal_state = 6
LEFT, RIGHT = range(2)

In [64]:
optimal_pi = {}

In [74]:
goal_state = 6

In [75]:
P

{0: {0: [(0.5000000000000001, 0, 0.0, True),
   (0.3333333333333333, 0, 0.0, True),
   (0.16666666666666666, 0, 0.0, True)],
  1: [(0.5000000000000001, 0, 0.0, True),
   (0.3333333333333333, 0, 0.0, True),
   (0.16666666666666666, 0, 0.0, True)]},
 1: {0: [(0.5000000000000001, 0, 0.0, True),
   (0.3333333333333333, 1, 0.0, False),
   (0.16666666666666666, 2, 0.0, False)],
  1: [(0.5000000000000001, 2, 0.0, False),
   (0.3333333333333333, 1, 0.0, False),
   (0.16666666666666666, 0, 0.0, True)]},
 2: {0: [(0.5000000000000001, 1, 0.0, False),
   (0.3333333333333333, 2, 0.0, False),
   (0.16666666666666666, 3, 0.0, False)],
  1: [(0.5000000000000001, 3, 0.0, False),
   (0.3333333333333333, 2, 0.0, False),
   (0.16666666666666666, 1, 0.0, False)]},
 3: {0: [(0.5000000000000001, 2, 0.0, False),
   (0.3333333333333333, 3, 0.0, False),
   (0.16666666666666666, 4, 0.0, False)],
  1: [(0.5000000000000001, 4, 0.0, False),
   (0.3333333333333333, 3, 0.0, False),
   (0.16666666666666666, 2, 0.0, Fa

In [76]:
def decay_schedule(init_value, min_value, decay_ratio, max_steps, log_start=-2, log_base=10):
  decay_steps = int(max_steps * decay_ratio)
  rem_steps = max_steps - decay_steps
  values = np.logspace(log_start, 0, decay_steps, base=log_base, endpoint=True)[::-1]
  values = (values - values.min()) / (values.max() - values.min())
  values = (init_value - min_value) * values + min_value
  values = np.pad(values, (0, rem_steps), 'edge')
  return values

In [77]:
from itertools import count
def generate_trajectory(select_action, Q, epsilon, env, max_steps=200):
  done, trajectory = False, []
  while not done:
    state = env.reset()
    for t in count():
      action = select_action(state, Q, epsilon)
      next_state, reward, done, _ = env.step(action)
      experience = (state, action, reward, next_state, done)
      trajectory.append(experience)
      if done:
        break
      if t >= max_steps - 1:
        trajectory = []
        break
      state = next_state
  return np.array(trajectory, object)

In [78]:
from numpy.lib.function_base import select
from collections import defaultdict
def mc_control(env, gamma=1.0, init_alpha=0.5, min_alpha=0.01, alpha_decay_ratio=0.5,
               init_epsilon=1.0, min_epsilon=0.1, epsilon_decay_ratio=0.9,
               n_episodes=3000, max_steps=200, first_visit=True):

    nS, nA = env.observation_space.n, env.action_space.n
    Q = defaultdict(lambda: np.zeros(nA))
    V = defaultdict(float)
    pi = defaultdict(lambda: np.random.choice(nA))
    Q_track = []
    pi_track = []
    select_action = lambda state , Q, epsilon:\
    np.argmax(Q[state])\
    if np.random.random() > epsilon\
    else np.random.randint(len(Q[state]))
    for episode in range(n_episodes):
        epsilon = max(init_epsilon * (epsilon_decay_ratio ** episode), min_epsilon)
        alpha = max(init_alpha * (alpha_decay_ratio ** episode), min_alpha)
        trajectory = generate_trajectory(select_action, Q, epsilon, env, max_steps)
        n = len(trajectory)
        G = 0
        for t in range(n - 1, -1, -1):
            state, action, reward, _, _ = trajectory[t]
            G = gamma * G + reward
            if first_visit and (state, action) not in [(s, a) for s, a, _, _, _ in trajectory[:t]]:
                Q[state][action] += alpha * (G - Q[state][action])
                V[state] = np.max(Q[state])
                pi[state] = np.argmax(Q[state])
        Q_track.append(Q.copy())
        pi_track.append(pi.copy)
    return Q, V, pi

In [79]:
optimal_Q, optimal_V, optimal_pi = mc_control(env)
print_state_value_function(optimal_Q, P, n_cols=4, prec=2, title='Action-Value-Function')

Action-Value-Function
|           | 01 [0.2  0.14] | 02 [0.44 0.73] | 03 [0.81 0.92] |
| 04 [0.9  0.97] | 05 [0.92 0.99] |           

In [80]:
print_state_value_function(optimal_V, P, n_cols = 4, prec=2, title='State-Value-Function')

State-Value-Function
|           | 01    0.2 | 02   0.73 | 03   0.92 |
| 04   0.97 | 05   0.99 |           

In [81]:
print_policy(optimal_pi,P)

Policy:
|           | 01      < | 02      v | 03      v |
| 04      v | 05      v |           

In [82]:
print('Reaches goal {:.2f}%. Obtains an average undiscounted return of {:.4f}.'.format(
    probability_success(env,optimal_pi, goal_state=goal_state)*100,
    mean_return(env,optimal_pi)))

Reaches goal 93.00%. Obtains an average undiscounted return of 0.9000.
