In [20]:
pip install git+https://github.com/mimoralea/gym-walk#egg=gym-walk

Collecting gym-walk
  Cloning https://github.com/mimoralea/gym-walk to /tmp/pip-install-gvv1srt6/gym-walk_fa29fcdef61346ad9162ee438066d47d
  Running command git clone --filter=blob:none --quiet https://github.com/mimoralea/gym-walk /tmp/pip-install-gvv1srt6/gym-walk_fa29fcdef61346ad9162ee438066d47d
  Resolved https://github.com/mimoralea/gym-walk to commit b915b94cf2ad16f8833a1ad92ea94e88159279f5
  Preparing metadata (setup.py) ... [?25l[?25hdone


In [30]:
import warnings ; warnings.filterwarnings('ignore')

import gym, gym_walk
import numpy as np

import random
import warnings

warnings.filterwarnings('ignore', category=DeprecationWarning)
np.set_printoptions(suppress=True)
random.seed(123); np.random.seed(123)

In [31]:
def print_policy(pi, P, action_symbols=('<', 'v', '>', '^'), n_cols=4, title='Policy:'):
    print(title)
    arrs = {k: v for k, v in enumerate(action_symbols)}
    for s in range(len(P)):
        a = pi[s]
        print("| ", end="")
        # If all transitions from state s are terminal, leave blank
        if np.all([done for action in P[s].values() for _, _, _, done in action]):
            print("".rjust(9), end=" ")
        else:
            print(str(s).zfill(2), arrs[a].rjust(6), end=" ")
        if (s + 1) % n_cols == 0:
            print("|")

In [32]:
def print_state_value_function(V, P, n_cols=4, prec=3, title='State-value function:'):
    print(title)
    for s in range(len(P)):
        v = V[s]
        print("| ", end="")
        # If state s is terminal, leave blank
        if np.all([done for action in P[s].values() for _, _, _, done in action]):
            print("".rjust(9), end=" ")
        else:
            print(str(s).zfill(2), f"{np.round(v, prec)}".rjust(6), end=" ")
        if (s + 1) % n_cols == 0:
            print("|")

In [33]:
def probability_success(env, pi, goal_state, n_episodes=100, max_steps=200):
    successes = 0
    env.action_space.seed(123)

    for episode in range(n_episodes):
        reset_return = env.reset(seed=123 + episode)
        if isinstance(reset_return, tuple):
            state, _ = reset_return
        else:
            state = reset_return

        done = False
        steps = 0

        while not done and steps < max_steps:
            action = pi[state]
            nxt_return = env.step(action)
            nxt_state, _, terminated, truncated, _ = nxt_return
            done = terminated or truncated
            state = nxt_state
            steps += 1

        if state == goal_state:
            successes += 1

    return successes / n_episodes

In [34]:
import numpy as np

def mean_return(env, pi, n_episodes=100, max_steps=200):
    returns = []
    env.action_space.seed(123)

    for episode in range(n_episodes):
        reset_return = env.reset(seed=123 + episode)
        if isinstance(reset_return, tuple):
            state, _ = reset_return
        else:
            state = reset_return

        done = False
        steps = 0
        G = 0.0

        while not done and steps < max_steps:
            action = pi[state]
            nxt_return = env.step(action)
            nxt_state, reward, terminated, truncated, _ = nxt_return
            done = terminated or truncated
            G += reward
            state = nxt_state
            steps += 1

        returns.append(G)

    return np.mean(returns)

In [35]:
def value_iteration(P, gamma=1.0, theta=1e-10):
    """
    Perform value iteration on MDP defined by P.
    Returns:
      V: optimal state-value function (numpy array of length |S|)
      pi: optimal policy as a numpy array (length |S|) where pi[s] is best action
    """
    nS = len(P)
    nA = len(P[0])
    V = np.zeros(nS, dtype=np.float64)

    while True:
        Q = np.zeros((nS, nA), dtype=np.float64)
        for s in range(nS):
            for a in range(nA):
                for prob, next_state, reward, done in P[s][a]:
                    Q[s][a] += prob * (reward + gamma * V[next_state] * (not done))
        new_V = np.max(Q, axis=1)
        if np.max(np.abs(V - new_V)) < theta:
            break
        V = new_V.copy()

    # Construct deterministic policy from final Q
    pi = np.argmax(Q, axis=1)
    return V, pi

In [36]:
env = gym.make('FrozenLake-v1')
P = env.env.P
init_state = env.reset()
goal_state = 15
#LEFT, RIGHT = range(2)

In [37]:
P

{0: {0: [(0.3333333333333333, 0, 0.0, False),
   (0.3333333333333333, 0, 0.0, False),
   (0.3333333333333333, 4, 0.0, False)],
  1: [(0.3333333333333333, 0, 0.0, False),
   (0.3333333333333333, 4, 0.0, False),
   (0.3333333333333333, 1, 0.0, False)],
  2: [(0.3333333333333333, 4, 0.0, False),
   (0.3333333333333333, 1, 0.0, False),
   (0.3333333333333333, 0, 0.0, False)],
  3: [(0.3333333333333333, 1, 0.0, False),
   (0.3333333333333333, 0, 0.0, False),
   (0.3333333333333333, 0, 0.0, False)]},
 1: {0: [(0.3333333333333333, 1, 0.0, False),
   (0.3333333333333333, 0, 0.0, False),
   (0.3333333333333333, 5, 0.0, True)],
  1: [(0.3333333333333333, 0, 0.0, False),
   (0.3333333333333333, 5, 0.0, True),
   (0.3333333333333333, 2, 0.0, False)],
  2: [(0.3333333333333333, 5, 0.0, True),
   (0.3333333333333333, 2, 0.0, False),
   (0.3333333333333333, 1, 0.0, False)],
  3: [(0.3333333333333333, 2, 0.0, False),
   (0.3333333333333333, 1, 0.0, False),
   (0.3333333333333333, 0, 0.0, False)]},
 2:

Exponentially decaying schedule


In [38]:
def mc_control(
    env,
    gamma=1.0,
    init_alpha=0.5,
    min_alpha=0.01,
    alpha_decay_ratio=0.5,
    init_epsilon=1.0,
    min_epsilon=0.1,
    epsilon_decay_ratio=0.9,
    n_episodes=3000,
    max_steps=200,
    first_visit=True
):
    """
    Monte Carlo Control with (first-visit) every-episode updates.
    Returns:
      Q: final action-value estimates, shape (nS, nA)
      v: final state-value estimates (max_a Q[s,a])
      pi: final greedy policy array of length nS
      Q_track: Q over episodes, shape (n_episodes, nS, nA)
      pi_track: list of length n_episodes, each is a policy array (nS,)
    """
    nS = env.observation_space.n
    nA = env.action_space.n

    # Precompute discount factors for up to max_steps
    discounts = np.array([gamma ** i for i in range(max_steps)])

    # Create schedules for alpha and epsilon
    def decay_schedule(init_value, min_value, decay_ratio, total_steps, log_start=-2, log_base=10):
        decay_steps = int(total_steps * decay_ratio)
        rem_steps = total_steps - decay_steps
        values = np.logspace(log_start, 0, decay_steps, base=log_base, endpoint=True)[::-1]
        values = (values - values.min()) / (values.max() - values.min())
        values = (init_value - min_value) * values + min_value
        values = np.pad(values, (0, rem_steps), 'edge')
        return values

    alphas = decay_schedule(init_alpha, min_alpha, alpha_decay_ratio, n_episodes)
    epsilons = decay_schedule(init_epsilon, min_epsilon, epsilon_decay_ratio, n_episodes)

    # Initialize Q, trackers
    Q = np.zeros((nS, nA), dtype=np.float64)
    Q_track = np.zeros((n_episodes, nS, nA), dtype=np.float64)
    pi_track = []

    # Epsilon-greedy selector
    def select_action(state, Q, epsilon):
        if np.random.random() > epsilon:
            return int(np.argmax(Q[state]))
        else:
            return int(np.random.randint(nA))

    # Main MC loop
    for e in tqdm(range(n_episodes), leave=False):
        epsilon = epsilons[e]
        alpha = alphas[e]

        trajectory = generate_trajectory(select_action, Q, epsilon, env, max_steps)
        visited = np.zeros((nS, nA), dtype=bool)

        # Process each (state, action, reward, next_state, done) in trajectory
        for t, (state, action, reward, _, _) in enumerate(trajectory):
            if first_visit and visited[state][action]:
                continue
            visited[state][action] = True

            # Compute return G from time t onward
            rest = trajectory[t:]
            rewards = np.array([step[2] for step in rest], dtype=np.float64)
            n_steps = len(rewards)
            G = np.dot(discounts[:n_steps], rewards)

            # Update Q
            Q[state][action] += alpha * (G - Q[state][action])

        Q_track[e] = Q.copy()
        pi_track.append(np.argmax(Q, axis=1))

    # Final state-value and policy
    v = np.max(Q, axis=1)
    pi = np.argmax(Q, axis=1)

    return Q, v, pi, Q_track, pi_track

Exploratory Policy Trajectories

In [39]:
from itertools import count
import numpy as np
from tqdm import tqdm

In [40]:

def generate_trajectory(select_action, Q, epsilon, env, max_steps=200):
    # Reset environment once per trajectory
    reset_return = env.reset()
    if isinstance(reset_return, tuple):
        state, _ = reset_return
    else:
        state = reset_return

    trajectory = []
    for t in count():
        action = select_action(state, Q, epsilon)
        nxt_return = env.step(action)
        nxt_state, reward, terminated, truncated, _ = nxt_return
        done = terminated or truncated

        trajectory.append((state, action, reward, nxt_state, done))

        if done or t >= max_steps - 1:
            break

        state = nxt_state

    return np.array(trajectory, dtype=object)

Monte Carlo control

In [41]:
def mc_control(env,
               gamma=1.0,
               init_alpha=0.5,
               min_alpha=0.01,
               alpha_decay_ratio=0.5,
               init_epsilon=1.0,
               min_epsilon=0.1,
               epsilon_decay_ratio=0.9,
               n_episodes=3000,
               max_steps=200,
               first_visit=True):

    nS, nA = env.observation_space.n, env.action_space.n

    discounts = np.logspace(
        0, max_steps,
        num=max_steps, base=gamma,
        endpoint=False)

    alphas = decay_schedule(
        init_alpha, min_alpha,
        alpha_decay_ratio,
        n_episodes)

    epsilons = decay_schedule(
        init_epsilon, min_epsilon,
        epsilon_decay_ratio,
        n_episodes)

    pi_track = []
    Q = np.zeros((nS, nA), dtype=np.float64)
    Q_track = np.zeros((n_episodes, nS, nA), dtype=np.float64)

    select_action = lambda state, Q, epsilon:np.argmax(Q[state]) if np.random.random() > epsilon else np.random.randint(len(Q[state]))

    for e in tqdm(range(n_episodes), leave=False):
        trajectory = generate_trajectory(select_action, Q, epsilons[e], env, max_steps)
        visited = np.zeros((nS, nA), dtype=bool)

        for t, (state, action, reward, _, _) in enumerate(trajectory):
            if visited[state][action] and first_visit:
                continue
            visited[state][action] = True

            n_steps = len(trajectory[t:])
            G = np.sum(discounts[:n_steps] * trajectory[t:, 2])
            Q[state][action] = Q[state][action] + alphas[e] * (G - Q[state][action])

        Q_track[e] = Q
        pi_track.append(np.argmax(Q, axis=1))

    v = np.max(Q, axis=1)
    pi = np.argmax(Q, axis=1)
    return Q, v, pi, Q_track, pi_track


In [43]:
print('Name : Yuvaraj S   Register Number: 212222240119')

optimal_results = mc_control(env, n_episodes=3000)
optimal_Q, optimal_V, optimal_pi, _, _ = optimal_results

print_state_value_function(optimal_V, P, n_cols=4, prec=2, title='Action-value function:')
print_state_value_function(optimal_V, P, n_cols=4, prec=2, title='State-value function:')
print_policy(optimal_pi, P)

Name : Yuvaraj S   Register Number: 212222240119


                                                     

Action-value function:
| 00   0.25 | 01   0.09 | 02   0.09 | 03   0.08 |
| 04   0.28 |           | 06   0.12 |           |
| 08   0.32 | 09   0.39 | 10   0.33 |           |
|           | 13   0.56 | 14   0.71 |           |
State-value function:
| 00   0.25 | 01   0.09 | 02   0.09 | 03   0.08 |
| 04   0.28 |           | 06   0.12 |           |
| 08   0.32 | 09   0.39 | 10   0.33 |           |
|           | 13   0.56 | 14   0.71 |           |
Policy:
| 00      < | 01      v | 02      > | 03      ^ |
| 04      < |           | 06      < |           |
| 08      ^ | 09      v | 10      v |           |
|           | 13      > | 14      > |           |




In [44]:
# Find the probability of success and the mean return of you your policy
print('Name : Yuvaraj S   Register Number: 212222240119')

print('Reaches goal {:.2f}%. Obtains an average undiscounted return of {:.4f}.'.format(
    probability_success(env, optimal_pi, goal_state=goal_state)*100,
    mean_return(env, optimal_pi)))


Name : Yuvaraj S   Register Number: 212222240119
Reaches goal 51.00%. Obtains an average undiscounted return of 0.5100.


In [46]:
envdesc = [
    "SFFF",
    "FHFH",
    "FFHF",
    "GFFH"
]
env = gym.make('FrozenLake-v1', desc=envdesc, is_slippery=True)
P = env.env.P  # Transition dictionary: P[state][action] = [(prob, next_state, reward, done), ...]

goal_state = 12  # Index of 'G' in this 4×4 layout

print("Name : Yuvaraj S   Register Number: 212222240119\n")

# Run MC control to learn Q, V, and pi
optimal_Q, optimal_V, optimal_pi, _, _ = mc_control(env, n_episodes=3000)

# Print the final state-value function and derived policy
print_state_value_function(optimal_V, P, n_cols=4, prec=2, title='State-value function:')
print_policy(optimal_pi, P, action_symbols=('<', 'v', '>', '^'), n_cols=4)

Name : Yuvaraj S   Register Number: 212222240119



                                                     

State-value function:
| 00   0.86 | 01   0.73 | 02   0.56 | 03   0.42 |
| 04   0.89 |           | 06    0.1 |           |
| 08   0.92 | 09   0.52 |           | 11    0.0 |
|           | 13   0.45 | 14    0.1 |           |
Policy:
| 00      < | 01      ^ | 02      ^ | 03      ^ |
| 04      < |           | 06      < |           |
| 08      < | 09      v |           | 11      < |
|           | 13      ^ | 14      v |           |


