<a href="https://colab.research.google.com/github/obedotto/monte-carlo-control/blob/main/MonteCarloControlExp.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [33]:
import warnings ; warnings.filterwarnings('ignore')

import gym, gym_walk
import numpy as np

import random
import warnings
import tqdm

warnings.filterwarnings('ignore', category=DeprecationWarning)
np.set_printoptions(suppress=True)
random.seed(123); np.random.seed(123)

In [34]:
pip install git+https://github.com/mimoralea/gym-walk#egg=gym-walk

Collecting gym-walk
  Cloning https://github.com/mimoralea/gym-walk to /tmp/pip-install-lssaiuxj/gym-walk_6fb855d3c4184547ab1441004bd38e80
  Running command git clone --filter=blob:none --quiet https://github.com/mimoralea/gym-walk /tmp/pip-install-lssaiuxj/gym-walk_6fb855d3c4184547ab1441004bd38e80
  Resolved https://github.com/mimoralea/gym-walk to commit 5999016267d6de2f5a63307fb00dfd63de319ac1
  Preparing metadata (setup.py) ... [?25l[?25hdone


In [35]:
def print_policy(pi, P, action_symbols=('<', 'v', '>', '^'), n_cols=4, title='Policy:'):
    print(title)
    arrs = {k:v for k,v in enumerate(action_symbols)}
    for s in range(len(P)):
        a = pi[s]
        print("| ", end="")
        if np.all([done for action in P[s].values() for _, _, _, done in action]):
            print("".rjust(9), end=" ")
        else:
            print(str(s).zfill(2), arrs[a].rjust(6), end=" ")
        if (s + 1) % n_cols == 0: print("|")

In [36]:
def print_state_value_function(V, P, n_cols=4, prec=3, title='State-value function:'):
    print(title)
    for s in range(len(P)):
        v = V[s]
        print("| ", end="")
        if np.all([done for action in P[s].values() for _, _, _, done in action]):
            print("".rjust(9), end=" ")
        else:
            print(str(s).zfill(2), '{}'.format(np.round(v, prec)).rjust(6), end=" ")
        if (s + 1) % n_cols == 0: print("|")

In [37]:
env = gym.make('FrozenLake-v1')
P = env.env.P
init_state = env.reset()
#goal_state = 6
#LEFT, RIGHT = range(2)

In [38]:
P

{0: {0: [(0.3333333333333333, 0, 0.0, False),
   (0.3333333333333333, 0, 0.0, False),
   (0.3333333333333333, 4, 0.0, False)],
  1: [(0.3333333333333333, 0, 0.0, False),
   (0.3333333333333333, 4, 0.0, False),
   (0.3333333333333333, 1, 0.0, False)],
  2: [(0.3333333333333333, 4, 0.0, False),
   (0.3333333333333333, 1, 0.0, False),
   (0.3333333333333333, 0, 0.0, False)],
  3: [(0.3333333333333333, 1, 0.0, False),
   (0.3333333333333333, 0, 0.0, False),
   (0.3333333333333333, 0, 0.0, False)]},
 1: {0: [(0.3333333333333333, 1, 0.0, False),
   (0.3333333333333333, 0, 0.0, False),
   (0.3333333333333333, 5, 0.0, True)],
  1: [(0.3333333333333333, 0, 0.0, False),
   (0.3333333333333333, 5, 0.0, True),
   (0.3333333333333333, 2, 0.0, False)],
  2: [(0.3333333333333333, 5, 0.0, True),
   (0.3333333333333333, 2, 0.0, False),
   (0.3333333333333333, 1, 0.0, False)],
  3: [(0.3333333333333333, 2, 0.0, False),
   (0.3333333333333333, 1, 0.0, False),
   (0.3333333333333333, 0, 0.0, False)]},
 2:

Exponentially decaying schedule


In [39]:
def decay_schedule(
    init_value, min_value, decay_ratio,
    max_steps, log_start = -2, log_base=10):
  decay_steps = int(max_steps*decay_ratio)
  rem_steps = max_steps - decay_steps

  values = np.logspace(log_start,0,decay_steps,base = log_base,endpoint = True)[::-1]
  values = (values-values.min())/(values.max()-values.min())
  values = (init_value-min_value)*values+min_value
  values = np.pad(values,(0,rem_steps),'edge')
  #Write your code here

  return values

Exploratory Policy Trajectories

In [40]:
from itertools import count
def generate_trajectory(
    select_action, Q, epsilon,
    env, max_steps=200):
  done, trajectory = False, []

  while not done:
    state = env.reset()
    for t in count():
      action = select_action(state,Q,epsilon)
      next_state,reward,done,_=env.step(action)
      experience = (state,action,reward,next_state,done)

      trajectory.append(experience)
      if done:
        break
      if t>=max_steps -1:
        trajectory=[]
        break
      state = next_state

  #Write your code here

  return np.array(trajectory, np.object)

Monte Carlo control

In [41]:
from tqdm import tqdm
def mc_control (env, gamma = 1.0,
                init_alpha = 0.5,min_alpha = 0.01, alpha_decay_ratio = 0.5,
                init_epsilon = 1.0, min_epsilon = 0.1, epsilon_decay_ratio = 0.9,
                n_episodes = 3000, max_steps = 200, first_visit = True):
  nS, nA = env.observation_space.n, env.action_space.n

  #Write your code here
  discounts = np.logspace(0,max_steps,base=gamma,endpoint=False)


  alphas = decay_schedule(init_alpha, min_alpha,alpha_decay_ratio,n_episodes)

  epsilons = decay_schedule(init_epsilon,min_epsilon,epsilon_decay_ratio,n_episodes)

  pi_track = []

  Q = np.zeros((nS,nA),dtype =np.float64)
  Q_track = np.zeros((n_episodes,nS,nA), dtype = np.float64)

  select_action = lambda state,Q,epsilon:np.argmax(Q[state])if np.random.random() > epsilon else np.random.randint(len(Q[state]))

  for e in tqdm(range(n_episodes),leave = False):
    trajectory = generate_trajectory(select_action,Q,epsilons[e],env,max_steps)
    visited = np.zeros((nS,nA),dtype = np.bool)
    for t,(state,action,reward,_,_) in enumerate(trajectory):
      if visited[state][action] and first_visit:
        continue
      visited[state][action] = True
     # n_steps = len(trajectory) - t
      #n_steps = len(trajectory[t:])
      #G = np.sum(discounts[:n_steps]*trajectory[t:,2])
      n_steps = min(len(trajectory) - t, len(discounts))  # Take the minimum of available steps and discounts
      G = np.sum(discounts[:n_steps] * trajectory[t:t+n_steps, 2])

      Q[state][action] = Q[state][action] + alphas[e]*(G-Q[state][action])

    Q_track[e]=Q
    pi_track.append(np.argmax(Q,axis = 1))
  V = np.max(Q,axis=1)
 #pi = lambda s: {s:a for s, a in enumerate(np.argmax(Q,axis=1))}[s]
  pi = {s: a for s, a in enumerate(np.argmax(Q, axis=1))}

  #return Q,V,pi,Q_track,pi_track
  #return Q, V, pi, Q_track, pi_track
  return Q, V, pi

In [42]:
optimal_Q, optimal_V, optimal_pi = mc_control(env)
print_state_value_function(optimal_Q, P, n_cols=4, prec=2, title='Action-value function:')
print_state_value_function(optimal_V, P, n_cols=4, prec=2, title='State-value function:')
print_policy(optimal_pi, P)

                                                     

Action-value function:
| 00 [0.1  0.11 0.1  0.12] | 01 [0.05 0.09 0.08 0.14] | 02 [0.15 0.11 0.09 0.07] | 03 [0.05 0.   0.01 0.  ] |
| 04 [0.11 0.03 0.06 0.05] |           | 06 [0.05 0.06 0.17 0.01] |           |
| 08 [0.02 0.02 0.11 0.07] | 09 [0.11 0.17 0.23 0.1 ] | 10 [0.36 0.1  0.12 0.07] |           |
|           | 13 [0.05 0.14 0.39 0.21] | 14 [0.39 0.34 0.29 0.62] |           |
State-value function:
| 00   0.12 | 01   0.14 | 02   0.15 | 03   0.05 |
| 04   0.11 |           | 06   0.17 |           |
| 08   0.11 | 09   0.23 | 10   0.36 |           |
|           | 13   0.39 | 14   0.62 |           |
Policy:
| 00      ^ | 01      ^ | 02      < | 03      < |
| 04      < |           | 06      > |           |
| 08      > | 09      > | 10      < |           |
|           | 13      > | 14      ^ |           |




In [43]:
import random
import numpy as np

def probability_success(env, pi, goal_state, n_episodes=100, max_steps=200, seed=123):
    random.seed(seed)
    np.random.seed(seed)
    env.seed(seed)
    results = []

    for _ in range(n_episodes):
        state, done, steps = env.reset(), False, 0
        while not done and steps < max_steps:
            action = pi[state]
            state, _, done, _ = env.step(action)
            steps += 1
        results.append(state == goal_state)

    success_rate = np.sum(results) / len(results)
    return success_rate

def mean_return(env, pi, n_episodes=100, max_steps=200, seed=123):
    random.seed(seed)
    np.random.seed(seed)
    env.seed(seed)
    results = []

    for _ in range(n_episodes):
        state, done, steps = env.reset(), False, 0
        returns = 0.0
        while not done and steps < max_steps:
            action = pi[state]
            state, reward, done, _ = env.step(action)
            returns += reward
            steps += 1
        results.append(returns)

    average_return = np.mean(results)
    return average_return

def results(env, optimal_pi, goal_state, seed=123):
    success_rate = probability_success(env, optimal_pi, goal_state=goal_state, seed=seed)
    avg_return = mean_return(env, optimal_pi, seed=seed)

    print(f'Reaches goal {success_rate:.2%}. Obtains an average undiscounted return of: {avg_return:.4f}.')

goal_state = 15
results(env, optimal_pi, goal_state=goal_state)


Reaches goal 21.00%. Obtains an average undiscounted return of: 0.2100.
