### Monte Carlo Methods for Reinforcement Learning

**Monte Carlo (MC)** methods are model-free approaches that learn optimal policies through experience sampling, without requiring knowledge of environment dynamics.

**Key Principles:**
- Learn from complete episodes (trajectories)
- Estimate action-value function $Q(s,a)$ from sample returns

- Use first-visit or every-visit averaging**Algorithm:** Sample episodes → Compute returns → Update Q-values → Improve policy

- Apply $\epsilon$-greedy exploration for policy improvement

#### Environment Setup

We use the stochastic **FrozenLake** environment where transitions are probabilistic, making it ideal for demonstrating model-free learning.

In [1]:
import numpy as np
import gymnasium as gym

env = gym.make("FrozenLake-v1", is_slippery=True)

#### Episode Generation

Generate episodes using **$\epsilon$-greedy exploration**: with probability $\epsilon$, select random action (explore); otherwise, follow policy (exploit). Each episode provides state-action-reward sequences for learning.

#### Sample Episode

Test trajectory generation with a random policy. Each trajectory contains $(s_t, a_t, r_t, s_{t+1}, done_t)$ tuples.

In [2]:
def sample_trajectory(pi, env, max_steps=50, epsilon=0.1):
    done = False
    trajectory = []
    num_steps = 0

    state, _ =  env.reset()

    while not done:
        if np.random.rand() < epsilon:
            action = env.action_space.sample() # Explore random action

        else: 
            action = int(pi[state]) # exploit from best known


        next_state, reward, done, _, _ = env.step(action)

        experience = (state, int(action), reward, next_state, done)
        trajectory.append(experience)

        num_steps += 1

        if num_steps >= max_steps:
            # No success
            done = False
            break

        state = next_state

    return trajectory

In [3]:
policy = np.random.randint(env.action_space.n, size=(env.observation_space.n,))
trajectory = sample_trajectory(policy, env)

trajectory

[(0, 1, 0.0, 1, False),
 (1, 3, 0.0, 2, False),
 (2, 1, 0.0, 3, False),
 (3, 3, 0.0, 3, False),
 (3, 0, 0.0, 7, True)]

#### Example Returns

Demonstrate return calculation for the sampled episode. Each $(s,a)$ pair gets its first-visit return value.

#### Return Calculation

Compute discounted returns $G_t = \sum_{k=0}^{\infty} \gamma^k R_{t+k+1}$ by working backwards from episode end. Uses **first-visit** Monte Carlo approach.

In [4]:
def compute_returns(trajectory, gamma=0.99):
    returns = {}
    G = 0
    for t in reversed(trajectory):
        state, action, reward, _, _ = t

        G = reward + gamma*G

        # first visit
        if (state, action) not in returns:
            returns[(state, action)] = G

    return returns

#### Estimated Q-Values

Compute Q-values for the random policy using Monte Carlo sampling over multiple episodes.

In [5]:
compute_returns(trajectory)

{(3, 0): 0.0, (3, 3): 0.0, (2, 1): 0.0, (1, 3): 0.0, (0, 1): 0.0}

#### Q-Value Estimation

**Monte Carlo estimation**: $Q(s,a) = \text{average of returns following first visits to } (s,a)$. Collect returns from multiple episodes and average them for each state-action pair.

#### Complete Monte Carlo Algorithm

**Monte Carlo Policy Iteration**: Alternate between policy evaluation (estimate Q-values) and policy improvement (greedy update) until convergence.

#### Learn Optimal Policy

Run complete Monte Carlo policy iteration to find the optimal policy for the stochastic FrozenLake environment.

In [6]:
def monte_carlo_estimate(pi, env, gamma=0.99, max_steps=50, num_episode=5000):
    Q = np.zeros((env.observation_space.n, env.action_space.n))
    returns = {(s, a): [] for s in range(env.observation_space.n) for a in range(env.action_space.n)}

    for _ in range(num_episode):
        trajectory = sample_trajectory(pi, env, max_steps)

        returns_for_trajectory = compute_returns(trajectory, gamma)

        for (state, action), G in returns_for_trajectory.items():
            returns[(state, action)].append(G)

    for (state, action), returns_list in returns.items():

        if len(returns_list) > 0:
            Q[state, action] =  np.mean(returns_list)

    return Q

In [7]:
monte_carlo_estimate(policy, env)

array([[0.00332493, 0.00992774, 0.00643614, 0.02329738],
       [0.01791337, 0.02646487, 0.01079234, 0.01779288],
       [0.03384885, 0.02789411, 0.04023248, 0.03281185],
       [0.02208107, 0.02071262, 0.02101518, 0.03204948],
       [0.0059296 , 0.00190535, 0.        , 0.        ],
       [0.        , 0.        , 0.        , 0.        ],
       [0.11359636, 0.03988523, 0.0306771 , 0.        ],
       [0.        , 0.        , 0.        , 0.        ],
       [0.00167689, 0.01033685, 0.03358163, 0.02283056],
       [0.        , 0.        , 0.        , 0.07182285],
       [0.21436083, 0.0825    , 0.1321796 , 0.07924917],
       [0.        , 0.        , 0.        , 0.        ],
       [0.        , 0.        , 0.        , 0.        ],
       [0.        , 0.740025  , 0.        , 0.08839286],
       [0.33      , 0.398     , 0.56437086, 0.39082782],
       [0.        , 0.        , 0.        , 0.        ]])

#### Policy Improvement

**Greedy policy improvement**: $\pi'(s) = \arg\max_a Q(s,a)$. Select action with highest estimated Q-value for each state.

In [8]:
def policy_improvement(Q):
    return np.argmax(Q, axis=-1)

In [9]:
def monte_carlo_policy_iteration(env, gamma=0.99, max_steps=50, num_episodes=10000):
    policy = np.random.randint(env.action_space.n, size=(env.observation_space.n,))

    while True:
        Q = monte_carlo_estimate(policy, env, gamma, max_steps, num_episodes)

        new_policy = policy_improvement(Q)

        if np.array_equal(policy, new_policy):
            break

        policy = new_policy

    return policy, Q

In [10]:
optimal_policy, optimal_Q = monte_carlo_policy_iteration(env)
print(optimal_policy)

[1 3 3 3 0 0 0 0 3 1 0 0 0 2 1 0]


#### Policy Evaluation

Test the learned optimal policy by measuring success rate over multiple episodes. This validates the effectiveness of Monte Carlo learning.

In [11]:
def test_policy(policy, env, num_episodes=500):
    success_count = 0

    for _ in range(num_episodes):
        state, _ = env.reset()
        done = False

        while not done:
            action = policy[state]
            state, reward, done, _, _ = env.step(action)

            if done and reward == 1.0:  # Reached the goal
                success_count += 1

    success_rate = success_count / num_episodes
    print(f"Policy Success Rate: {success_rate * 100:.2f}%")

test_policy(optimal_policy, env)

Policy Success Rate: 81.80%
