<a href="https://colab.research.google.com/github/2303a51420/reinforcement/blob/main/lab%203.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [7]:
import gymnasium as gym

import numpy as np

from collections import defaultdict

from typing import Dict, Tuple, List, Callable

# ---------- Helpers ----------

def make_epsilon_greedy_policy(Q: Dict, nA: int, epsilon: float) -> Callable:

    """Return a policy function that takes state and returns action probabilities."""

    def policy_fn(state):

        # Ensure state exists in Q

        _ = Q[state]  # triggers defaultdict to create if missing

        probs = np.ones(nA, dtype=float) * (epsilon / nA)

        best_a = int(np.argmax(Q[state]))

        probs[best_a] += (1.0 - epsilon)

        return probs

    return policy_fn

def generate_episode(env, policy_fn: Callable) -> List[Tuple[Tuple, int, float]]:

    """Generate an episode: returns list of (state, action, reward). Uses policy as action-prob function."""

    episode: List[Tuple[Tuple, int, float]] = []

    state, _ = env.reset()

    done = False

    while not done:

        probs = policy_fn(state)

        action = int(np.random.choice(len(probs), p=probs))

        next_state, reward, terminated, truncated, _ = env.step(action)

        done = terminated or truncated

        episode.append((state, action, reward))

        state = next_state

    return episode

# ---------- First-Visit MC Policy Evaluation ----------

def first_visit_mc_policy_evaluation(env, policy_fn: Callable, gamma: float = 1.0, num_episodes: int = 10000):
    """
    First-Visit MC Policy Evaluation. Estimates V for a given policy.

    Args:
        env: The Gymnasium environment.
        policy_fn: A function that takes a state and returns action probabilities.
        gamma: Discount factor.
        num_episodes: Number of episodes to generate.

    Returns:
        A dictionary mapping states to estimated values.
    """

    # State-Value function
    V = defaultdict(float)

    # Sum of returns for each state, and count of visits for each state
    returns_sum = defaultdict(float)
    returns_count = defaultdict(float)

    for i_episode in range(1, num_episodes + 1):
        # Generate an episode
        episode = generate_episode(env, policy_fn)

        # Track first-visit indices for (state)
        state_first_visit = {}
        for idx, (s, _, _) in enumerate(episode):
            if s not in state_first_visit:
                state_first_visit[s] = idx

        # For each first-visited state, compute the return and update the value
        for state, first_idx in state_first_visit.items():
            # Calculate the return
            G = 0.0
            power = 0
            for (_, _, r) in episode[first_idx:]:
                G += (gamma ** power) * r
                power += 1

            returns_sum[state] += G
            returns_count[state] += 1.0
            V[state] = returns_sum[state] / returns_count[state]

    return V

# ---------- On-Policy MC Control with ε-greedy ----------

def mc_control_epsilon_greedy(

    env,

    num_episodes: int = 500000,

    gamma: float = 1.0,

    epsilon_start: float = 1.0,

    epsilon_min: float = 0.05,

    epsilon_decay: float = 0.9995,

):


    nA = env.action_space.n

    Q = defaultdict(lambda: np.zeros(nA, dtype=float))

    returns_sum = defaultdict(float)          # keyed by (state, action)

    returns_count = defaultdict(float)        # keyed by (state, action)

    epsilon = epsilon_start

    def policy_for_episode(state):

        # policy used while generating episodes (current ε)

        probs = np.ones(nA, dtype=float) * (epsilon / nA)

        best_a = int(np.argmax(Q[state]))

        probs[best_a] += (1.0 - epsilon)

        return probs

    for ep in range(1, num_episodes + 1):

        episode = generate_episode(env, policy_for_episode)

        # Track first-visit indices for (s,a)

        sa_first_visit = {}

        for idx, (s, a, _) in enumerate(episode):

            if (s, a) not in sa_first_visit:

                sa_first_visit[(s, a)] = idx

        # For each first-visited (s,a), compute return and update

        for (s, a), first_idx in sa_first_visit.items():

            G = 0.0

            power = 0

            for (_, _, r) in episode[first_idx:]:

                G += (gamma ** power) * r

                power += 1

            returns_sum[(s, a)] += G

            returns_count[(s, a)] += 1.0

            Q[s][a] = returns_sum[(s, a)] / returns_count[(s, a)]

        # Decay epsilon

        epsilon = max(epsilon_min, epsilon * epsilon_decay)

    # Return a policy function tied to the learned Q with the final epsilon

    learned_policy = make_epsilon_greedy_policy(Q, env.action_space.n, epsilon)

    return Q, learned_policy, epsilon

# ---------- Utilities ----------

def greedy_policy_from_Q(Q: Dict, nA: int) -> Callable:

    """Deterministic greedy policy from Q."""

    def policy_fn(state):

        _ = Q[state]  # ensure state exists

        probs = np.zeros(nA, dtype=float)

        best_a = int(np.argmax(Q[state]))

        probs[best_a] = 1.0

        return probs

    return policy_fn

def evaluate_policy_avg_return(env, policy_fn: Callable, episodes: int = 10000) -> float:

    """Estimate average return by running the given policy for a number of episodes."""

    total = 0.0

    for _ in range(episodes):

        ep = generate_episode(env, policy_fn)

        total += sum(r for (_, _, r) in ep)

    return total / episodes

# ---------- Main ----------

if __name__ == "__main__":

    # Create Blackjack env (Gymnasium)

    # Action space: 0 = Stick, 1 = Hit

    env = gym.make("Blackjack-v1")  # uses toy_text Blackjack

    nA = env.action_space.n

    # (A) Example: Evaluate a simple baseline policy (stick on 20+ else hit)

    def baseline_policy(state):

        player_sum, dealer_card, usable_ace = state

        action = 0 if player_sum >= 20 else 1

        probs = np.zeros(nA, dtype=float)

        probs[action] = 1.0

        return probs

    print("Evaluating baseline policy (stick on 20+, else hit)...")

    V_baseline = first_visit_mc_policy_evaluation(env, baseline_policy, gamma=1.0, num_episodes=20000)

    avg_return_baseline = evaluate_policy_avg_return(env, baseline_policy, episodes=10000)

    print(f"Baseline policy average return (approx): {avg_return_baseline:.4f}")

    # (B) Learn via MC control ε-greedy

    print("Learning with MC control (ε-greedy)... this may take a bit for many episodes.")

    Q, learned_eps_policy, final_eps = mc_control_epsilon_greedy(

        env,

        num_episodes=200000,   # increase for stronger results

        gamma=1.0,

        epsilon_start=1.0,

        epsilon_min=0.05,

        epsilon_decay=0.9995,

    )

    greedy_pi = greedy_policy_from_Q(Q, nA)

    avg_return_greedy = evaluate_policy_avg_return(env, greedy_pi, episodes=20000)

    print(f"Final epsilon after training: {final_eps:.4f}")

    print(f"Greedy policy (from learned Q) average return (approx): {avg_return_greedy:.4f}")

    # Example: show a few Q-values

    sample_states = [

        (20, 10, False),  # player 20 vs dealer 10, no usable ace

        (13, 2, False),

        (12, 6, True),

        (18, 9, True),

    ]

    for s in sample_states:

        _ = Q[s]  # ensure exists

        print(f"Q{str(s)} = {Q[s]}")

Evaluating baseline policy (stick on 20+, else hit)...
Baseline policy average return (approx): -0.3648
Learning with MC control (ε-greedy)... this may take a bit for many episodes.
Final epsilon after training: 0.0500
Greedy policy (from learned Q) average return (approx): -0.0555
Q(20, 10, False) = [ 0.43704922 -0.87336245]
Q(13, 2, False) = [-0.53846154 -0.39742765]
Q(12, 6, True) = [-0.26666667 -1.        ]
Q(18, 9, True) = [-0.21568627 -0.5       ]
