<a href="https://colab.research.google.com/github/chaitanya18c/monte-carlo-control/blob/main/RL_05.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
pip install git+https://github.com/mimoralea/gym-walk#egg=gym-walk

Collecting gym-walk
  Cloning https://github.com/mimoralea/gym-walk to /tmp/pip-install-6qdrkbz8/gym-walk_40d910933ac144758cce07b6523c13b4
  Running command git clone --filter=blob:none --quiet https://github.com/mimoralea/gym-walk /tmp/pip-install-6qdrkbz8/gym-walk_40d910933ac144758cce07b6523c13b4
  Resolved https://github.com/mimoralea/gym-walk to commit 5999016267d6de2f5a63307fb00dfd63de319ac1
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: gym-walk
  Building wheel for gym-walk (setup.py) ... [?25l[?25hdone
  Created wheel for gym-walk: filename=gym_walk-0.0.2-py3-none-any.whl size=4059 sha256=2bbbc0e6640d28d53177e5c8e3ab8750d8d01dc4da8a83e1a73675c5a654de3a
  Stored in directory: /tmp/pip-ephem-wheel-cache-38arw8c3/wheels/24/fe/c4/0cbc7511d29265bad7e28a09311db3f87f0cafba74af54d530
Successfully built gym-walk
Installing collected packages: gym-walk
Successfully installed gym-walk-0.0.2


In [2]:
import warnings ; warnings.filterwarnings('ignore')

import gym, gym_walk
import numpy as np

import random
import warnings

warnings.filterwarnings('ignore', category=DeprecationWarning)
np.set_printoptions(suppress=True)
random.seed(123); np.random.seed(123)

In [3]:
def print_policy(pi, P, action_symbols=('<', 'v', '>', '^'), n_cols=4, title='Policy:'):
    print(title)
    arrs = {k:v for k,v in enumerate(action_symbols)}
    for s in range(len(P)):
        a = pi[s]
        print("| ", end="")
        if np.all([done for action in P[s].values() for _, _, _, done in action]):
            print("".rjust(9), end=" ")
        else:
            print(str(s).zfill(2), arrs[a].rjust(6), end=" ")
        if (s + 1) % n_cols == 0: print("|")

In [4]:
def print_state_value_function(V, P, n_cols=4, prec=3, title='State-value function:'):
    print(title)
    for s in range(len(P)):
        v = V[s]
        print("| ", end="")
        if np.all([done for action in P[s].values() for _, _, _, done in action]):
            print("".rjust(9), end=" ")
        else:
            print(str(s).zfill(2), '{}'.format(np.round(v, prec)).rjust(6), end=" ")
        if (s + 1) % n_cols == 0: print("|")

In [5]:
env = gym.make('FrozenLake-v1')
P = env.env.P
init_state = env.reset()
goal_state = env.observation_space.n - 1
LEFT, RIGHT = range(2)

In [6]:
P

{0: {0: [(0.3333333333333333, 0, 0.0, False),
   (0.3333333333333333, 0, 0.0, False),
   (0.3333333333333333, 4, 0.0, False)],
  1: [(0.3333333333333333, 0, 0.0, False),
   (0.3333333333333333, 4, 0.0, False),
   (0.3333333333333333, 1, 0.0, False)],
  2: [(0.3333333333333333, 4, 0.0, False),
   (0.3333333333333333, 1, 0.0, False),
   (0.3333333333333333, 0, 0.0, False)],
  3: [(0.3333333333333333, 1, 0.0, False),
   (0.3333333333333333, 0, 0.0, False),
   (0.3333333333333333, 0, 0.0, False)]},
 1: {0: [(0.3333333333333333, 1, 0.0, False),
   (0.3333333333333333, 0, 0.0, False),
   (0.3333333333333333, 5, 0.0, True)],
  1: [(0.3333333333333333, 0, 0.0, False),
   (0.3333333333333333, 5, 0.0, True),
   (0.3333333333333333, 2, 0.0, False)],
  2: [(0.3333333333333333, 5, 0.0, True),
   (0.3333333333333333, 2, 0.0, False),
   (0.3333333333333333, 1, 0.0, False)],
  3: [(0.3333333333333333, 2, 0.0, False),
   (0.3333333333333333, 1, 0.0, False),
   (0.3333333333333333, 0, 0.0, False)]},
 2:

Exponentially decaying schedule


In [7]:
def decay_schedule(init_value,
                   min_value,
                   decay_ratio,
                   max_steps,
                   log_start = -2,
                   log_base=10):

  decay_steps = int(max_steps*decay_ratio)
  rem_steps=max_steps - decay_steps
  values = np.logspace(log_start,0,decay_steps,base=log_base, endpoint=True)[::-1]
  values=(values-values.min())/(values.max()-values.min())
  values=(init_value- min_value)*values + min_value
  values=np.pad(values, (0,rem_steps),"edge")
  return values

Exploratory Policy Trajectories

In [8]:
from itertools import count
import numpy as np

def gen_traj(select_action,Q,epsilon,env,max_steps=200):
    done,traj = False,[]

    while not done:
        state = env.reset()

        for t in count():
            action = select_action(state,Q,epsilon)
            next,reward,done,_ = env.step(action)
            exp = (state,action,reward,next,done)
            traj.append(exp)

            if done:
                break
            if t>=max_steps-1:
                traj = []
                break

            state = next
        return np.array(traj, object)

Monte Carlo control

In [9]:
import numpy as np
from tqdm import tqdm

def mc_control(env,
               gamma=1.0,
               init_alpha=0.5,
               min_alpha=0.01,
               alpha_decay_ratio=0.5,
               init_epsilon=1.0,
               min_epsilon=0.1,
               epsilon_decay_ratio=0.3,
               n_episodes=3000,
               max_steps=200,
               first_visit=True):

    nS, nA = env.observation_space.n, env.action_space.n

    disc = np.logspace(0, max_steps, num=max_steps, base=gamma, endpoint=False)

    def decay_schedule(init_value, min_value, decay_ratio, n):
        return np.maximum(min_value, init_value * (decay_ratio ** np.arange(n)))

    alphas = decay_schedule(init_alpha, min_alpha, alpha_decay_ratio, n_episodes)
    epsilons = decay_schedule(init_epsilon, min_epsilon, epsilon_decay_ratio, n_episodes)

    Q = np.zeros((nS, nA), dtype=np.float64)
    Q_track = np.zeros((n_episodes, nS, nA), dtype=np.float64)

    def select_action(state, Q, epsilon):
        return np.argmax(Q[state]) if np.random.random() > epsilon else np.random.randint(nA)

    for e in tqdm(range(n_episodes), leave=False):
        traj = gen_traj(select_action, Q, epsilons[e], env, max_steps)
        visited = np.zeros((nS, nA), dtype = bool)

        for t, (state, action, reward, _, _) in enumerate(traj):
            if visited[state][action] and first_visit:
                continue
            visited[state][action] = True

            n_steps = len(traj[t:])
            G = np.sum(disc[:n_steps] * traj[t:, 2])
            Q[state][action] = Q[state][action] + alphas[e] * (G - Q[state][action])

        Q_track[e] = Q

    V = np.max(Q, axis=1)
    pi = {s: np.argmax(Q[s]) for s in range(nS)}

    return Q, V, pi

In [10]:
print("Name: CHAITANYA P S\n Reg No:212222230024\n")
optimal_Q, optimal_V, optimal_pi = mc_control(env, n_episodes=5000)
print_state_value_function(optimal_Q, P, n_cols=4, prec=2, title='Action-value function:')
print()
print_state_value_function(optimal_V, P, n_cols=4, prec=2, title='State-value function:')
print()
print_policy(optimal_pi, P)

Name: CHAITANYA P S
 Reg No:212222230024



                                                    

Action-value function:
| 00 [0.25 0.18 0.22 0.19] | 01 [0.01 0.12 0.02 0.03] | 02 [0.15 0.05 0.01 0.02] | 03 [0.02 0.   0.   0.  ] |
| 04 [0.26 0.14 0.16 0.17] |           | 06 [0.14 0.04 0.03 0.  ] |           |
| 08 [0.13 0.18 0.13 0.3 ] | 09 [0.15 0.34 0.17 0.14] | 10 [0.39 0.09 0.04 0.05] |           |
|           | 13 [0.04 0.08 0.11 0.35] | 14 [0.14 0.14 0.65 0.1 ] |           |

State-value function:
| 00   0.25 | 01   0.12 | 02   0.15 | 03   0.02 |
| 04   0.26 |           | 06   0.14 |           |
| 08    0.3 | 09   0.34 | 10   0.39 |           |
|           | 13   0.35 | 14   0.65 |           |

Policy:
| 00      < | 01      v | 02      < | 03      < |
| 04      < |           | 06      < |           |
| 08      ^ | 09      v | 10      < |           |
|           | 13      ^ | 14      > |           |




In [11]:
import random
import numpy as np

def probability_success(env, pi, goal_state, n_episodes=100, max_steps=200, seed=123):
    random.seed(seed)
    np.random.seed(seed)
    env.seed(seed)
    results = []

    for _ in range(n_episodes):
        state, done, steps = env.reset(), False, 0
        while not done and steps < max_steps:
            action = pi[state]
            state, _, done, _ = env.step(action)
            steps += 1
        results.append(state == goal_state)

    success_rate = np.sum(results) / len(results)
    return success_rate

In [12]:
def mean_return(env, pi, n_episodes=100, max_steps=200, seed=123):
    random.seed(seed)
    np.random.seed(seed)
    env.seed(seed)
    results = []

    for _ in range(n_episodes):
        state, done, steps = env.reset(), False, 0
        returns = 0.0
        while not done and steps < max_steps:
            action = pi[state]
            state, reward, done, _ = env.step(action)
            returns += reward
            steps += 1
        results.append(returns)

    average_return = np.mean(results)
    return average_return

In [13]:
def results(env, optimal_pi, goal_state, seed=123):
    success_rate = probability_success(env, optimal_pi, goal_state=goal_state, seed=seed)
    avg_return = mean_return(env, optimal_pi, seed=seed)

    print(f'Reaches goal {success_rate:.2%}.Obtains an average undiscounted return of: {avg_return:.4f}.')

results(env, optimal_pi, goal_state=goal_state)

Reaches goal 37.00%.Obtains an average undiscounted return of: 0.3700.
