<a href="https://colab.research.google.com/github/Vyshnavijulapelly/Reinforcement-Learning/blob/main/RL_Lab_03.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import gymnasium as gym
import numpy as np
from collections import defaultdict
from typing import Callable, Tuple

# ---------------------------
# Utilities
# ---------------------------

def epsilon_greedy(Q: np.ndarray, s: int, epsilon: float) -> int:
    if np.random.rand() < epsilon:
        return np.random.randint(Q.shape[1])
    return int(np.argmax(Q[s]))

def run_episode(env, policy: Callable[[int], int], gamma: float = 0.99, max_steps: int = 1000) -> Tuple[float, int]:
    s, _ = env.reset()
    total_reward, steps = 0.0, 0
    for _ in range(max_steps):
        a = policy(s)
        s_next, r, terminated, truncated, _ = env.step(a)
        total_reward += r * (gamma ** steps)
        steps += 1
        s = s_next
        if terminated or truncated:
            break
    return total_reward, steps

# ---------------------------
# TD(0) Policy Evaluation
# ---------------------------

def td0_policy_evaluation(env_id: str = "FrozenLake-v1",
                          is_slippery: bool = True,
                          gamma: float = 0.99,
                          alpha: float = 0.1,
                          epsilon_random_policy: float = 1.0,
                          episodes: int = 20_000,
                          seed: int = 0) -> np.ndarray:
    """
    Estimates V^pi for a fixed policy using TD(0).
    By default, pi is a fully random policy (epsilon_random_policy = 1.0).
    """
    env = gym.make(env_id, is_slippery=is_slippery)
    rng = np.random.default_rng(seed)
    env.reset(seed=seed)

    nS = env.observation_space.n
    nA = env.action_space.n
    V = np.zeros(nS, dtype=np.float64)

    def policy(s: int) -> int:
        # Equiprobable random (or epsilon-random around greedy on V, but typically random)
        if rng.random() < epsilon_random_policy:
            return rng.integers(nA)
        # If not fully random, do a crude one-step lookahead using V (optional)
        return rng.integers(nA)

    for ep in range(episodes):
        s, _ = env.reset()
        done = False
        while not done:
            a = policy(s)
            s_next, r, terminated, truncated, _ = env.step(a)
            td_target = r + (0.0 if (terminated or truncated) else gamma * V[s_next])
            V[s] += alpha * (td_target - V[s])
            s = s_next
            done = terminated or truncated

    env.close()
    return V

# ---------------------------
# SARSA (on-policy TD control)
# ---------------------------

def sarsa_control(env_id: str = "FrozenLake-v1",
                  is_slippery: bool = True,
                  gamma: float = 0.99,
                  alpha: float = 0.1,
                  epsilon_start: float = 1.0,
                  epsilon_end: float = 0.05,
                  epsilon_decay_steps: int = 50_000,
                  episodes: int = 100_000,
                  max_steps_per_ep: int = 200,
                  seed: int = 1) -> Tuple[np.ndarray, np.ndarray]:
    """
    Learns an epsilon-greedy policy for Q using on-policy SARSA.
    Returns learned Q and the derived greedy policy (as a 1D array of actions).
    """
    env = gym.make(env_id, is_slippery=is_slippery)
    env.reset(seed=seed)
    rng = np.random.default_rng(seed)

    nS = env.observation_space.n
    nA = env.action_space.n
    Q = np.zeros((nS, nA), dtype=np.float64)

    def epsilon_by_step(t: int) -> float:
        # Linear decay
        frac = min(1.0, max(0.0, t / max(1, epsilon_decay_steps)))
        return epsilon_start + (epsilon_end - epsilon_start) * frac

    timestep = 0
    for ep in range(episodes):
        s, _ = env.reset()
        eps = epsilon_by_step(timestep)
        a = rng.integers(nA) if rng.random() < eps else int(np.argmax(Q[s]))
        for _ in range(max_steps_per_ep):
            s_next, r, terminated, truncated, _ = env.step(a)
            eps_next = epsilon_by_step(timestep + 1)
            a_next = rng.integers(nA) if rng.random() < eps_next else int(np.argmax(Q[s_next]))
            td_target = r + (0.0 if (terminated or truncated) else gamma * Q[s_next, a_next])
            Q[s, a] += alpha * (td_target - Q[s, a])

            timestep += 1
            s, a = s_next, a_next
            if terminated or truncated:
                break

    policy_greedy = np.argmax(Q, axis=1)
    env.close()
    return Q, policy_greedy

# ---------------------------
# Quick demo (optional)
# ---------------------------

if __name__ == "__main__":
    # TD(0) prediction under a random policy
    V = td0_policy_evaluation(
        env_id="FrozenLake-v1",
        is_slippery=True,   # set False for deterministic grid
        gamma=0.99,
        alpha=0.1,
        epsilon_random_policy=1.0,
        episodes=25_000,
        seed=42
    )
    print("TD(0) Value function estimate (V):")
    print(V.reshape(4, 4))  # for 4x4 FrozenLake

    # SARSA control to learn a policy
    Q, pi = sarsa_control(
        env_id="FrozenLake-v1",
        is_slippery=True,
        gamma=0.99,
        alpha=0.1,
        epsilon_start=1.0,
        epsilon_end=0.05,
        epsilon_decay_steps=80_000,
        episodes=120_000,
        max_steps_per_ep=200,
        seed=7
    )
    print("\nGreedy policy from SARSA (actions 0:Left, 1:Down, 2:Right, 3:Up):")
    print(pi.reshape(4, 4))
    print("\nState-action values Q[s,a] (reshaped per action for readability):")
    for a in range(Q.shape[1]):
        print(f"Action {a}:")
        print(Q[:, a].reshape(4, 4))

TD(0) Value function estimate (V):
[[0.01154136 0.00756639 0.02747178 0.013248  ]
 [0.01991066 0.         0.0626995  0.        ]
 [0.03684871 0.0758343  0.18880016 0.        ]
 [0.         0.16529731 0.52072376 0.        ]]

Greedy policy from SARSA (actions 0:Left, 1:Down, 2:Right, 3:Up):
[[0 3 1 3]
 [0 0 2 0]
 [3 1 0 0]
 [0 2 1 0]]

State-action values Q[s,a] (reshaped per action for readability):
Action 0:
[[0.35879099 0.256256   0.2354679  0.12917957]
 [0.40523588 0.         0.15157585 0.        ]
 [0.2666601  0.318245   0.56247467 0.        ]
 [0.         0.30634394 0.63039223 0.        ]]
Action 1:
[[0.31427124 0.19830804 0.24873389 0.11229348]
 [0.23471058 0.         0.169121   0.        ]
 [0.26211337 0.56930223 0.30913391 0.        ]
 [0.         0.38829745 0.85720356 0.        ]]
Action 2:
[[0.2981604  0.10287446 0.2361989  0.14233501]
 [0.24663785 0.         0.21868749 0.        ]
 [0.25617631 0.40590125 0.30485473 0.        ]
 [0.         0.70056179 0.62130542 0.        ]]


In [None]:
#implementing Q-Learning for discrete action space problems
import numpy as np
import gymnasium as gym
from collections import defaultdict
from typing import Tuple, Callable

def make_epsilon_schedule(
    eps_start: float = 1.0,
    eps_end: float = 0.05,
    decay_steps: int = 20_000,
    mode: str = "cosine"
) -> Callable[[int], float]:
    """
    Returns epsilon(t) in [eps_end, eps_start], decaying over 'decay_steps'.
    Modes: 'linear' or 'cosine' (smooth).
    """
    assert mode in {"linear", "cosine"}
    def schedule(t: int) -> float:
        x = min(t / max(1, decay_steps), 1.0)
        if mode == "linear":
            return eps_start + (eps_end - eps_start) * x
        # cosine ease-out
        c = (1 + np.cos(np.pi * x)) / 2  # 1→0 smoothly
        return eps_end + (eps_start - eps_end) * c
    return schedule

def argmax_with_random_tie_breaking(q_row: np.ndarray, rng: np.random.Generator) -> int:
    max_val = q_row.max()
    max_idxs = np.flatnonzero(np.isclose(q_row, max_val))
    return rng.choice(max_idxs)

def epsilon_greedy_action(q_table: np.ndarray, state: int, epsilon: float,
                          action_space_n: int, rng: np.random.Generator) -> int:
    if rng.random() < epsilon:
        return rng.integers(0, action_space_n)
    return argmax_with_random_tie_breaking(q_table[state], rng)

def train_q_learning(
    env_id: str = "FrozenLake-v1",
    seed: int = 7,
    episodes: int = 50_000,
    alpha: float = 0.8,       # learning rate
    gamma: float = 0.99,      # discount
    eps_start: float = 1.0,
    eps_end: float = 0.05,
    eps_decay_steps: int = 30_000,
    eps_mode: str = "cosine",
    render_every: int = 0,    # set >0 to watch occasionally (slow)
    map_name: str = "4x4",
    is_slippery: bool = True,
) -> Tuple[np.ndarray, dict]:
    """
    Trains a Q-table on a discrete env. Returns (Q, info).
    For FrozenLake you can pass map_name and is_slippery.
    """
    # Create environment
    if env_id == "FrozenLake-v1":
        env = gym.make(env_id, map_name=map_name, is_slippery=is_slippery)
    else:
        env = gym.make(env_id)

    # Reproducibility
    env.reset(seed=seed)
    rng = np.random.default_rng(seed)

    # Infer sizes
    assert isinstance(env.observation_space, gym.spaces.Discrete), "State must be discrete."
    assert isinstance(env.action_space, gym.spaces.Discrete), "Action space must be discrete."
    nS = env.observation_space.n
    nA = env.action_space.n

    # Initialize Q-table
    Q = np.zeros((nS, nA), dtype=np.float32)

    eps_fn = make_epsilon_schedule(eps_start, eps_end, eps_decay_steps, eps_mode)

    steps = 0
    returns = []  # episodic returns
    lengths = []

    for ep in range(episodes):
        state, _ = env.reset(seed=None)
        done = False
        ep_return = 0.0
        ep_len = 0

        while not done:
            epsilon = eps_fn(steps)
            action = epsilon_greedy_action(Q, state, epsilon, nA, rng)

            next_state, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated

            # Q-learning target
            best_next_q = 0.0 if done else np.max(Q[next_state])
            td_target = reward + gamma * best_next_q
            td_error  = td_target - Q[state, action]
            Q[state, action] += alpha * td_error

            state = next_state
            ep_return += reward
            ep_len += 1
            steps += 1

            if render_every and (ep % render_every == 0):
                env.render()

        returns.append(ep_return)
        lengths.append(ep_len)

    env.close()
    info = {
        "returns": np.array(returns, dtype=np.float32),
        "lengths": np.array(lengths, dtype=np.int32),
        "episodes": episodes,
        "nS": nS,
        "nA": nA,
    }
    return Q, info

def greedy_policy(Q: np.ndarray) -> np.ndarray:
    """Returns greedy action for each state."""
    return np.argmax(Q, axis=1)

def evaluate_policy(env_id: str, policy: np.ndarray, episodes: int = 100, seed: int = 123,
                    map_name: str = "4x4", is_slippery: bool = True) -> float:
    if env_id == "FrozenLake-v1":
        env = gym.make(env_id, map_name=map_name, is_slippery=is_slippery)
    else:
        env = gym.make(env_id)
    env.reset(seed=seed)
    rng = np.random.default_rng(seed)

    total_reward = 0.0
    for _ in range(episodes):
        s, _ = env.reset()
        done = False
        while not done:
            a = policy[s]  # deterministic greedy
            s, r, terminated, truncated, _ = env.step(a)
            done = terminated or truncated
            total_reward += r
    env.close()
    return total_reward / episodes

if __name__ == "__main__":
    # === Train on FrozenLake (4x4). For easier learning you can set is_slippery=False.
    Q, info = train_q_learning(
        env_id="FrozenLake-v1",
        seed=7,
        episodes=30_000,
        alpha=0.8,
        gamma=0.99,
        eps_start=1.0,
        eps_end=0.05,
        eps_decay_steps=20_000,
        eps_mode="cosine",
        map_name="4x4",
        is_slippery=True,   # try False for a much easier MDP
    )

    # Extract and evaluate greedy policy
    pi = greedy_policy(Q)
    avg_return = evaluate_policy("FrozenLake-v1", pi, episodes=200, map_name="4x4", is_slippery=True)

    print("Q-table shape:", Q.shape)
    print("Sample of greedy policy (state -> action):")
    for s in range(min(16, Q.shape[0])):
        print(f"  {s:2d} -> {pi[s]}")
    print(f"Average return over 200 eval episodes: {avg_return:.3f}")

Q-table shape: (16, 4)
Sample of greedy policy (state -> action):
   0 -> 0
   1 -> 3
   2 -> 0
   3 -> 3
   4 -> 0
   5 -> 0
   6 -> 0
   7 -> 0
   8 -> 3
   9 -> 1
  10 -> 0
  11 -> 0
  12 -> 0
  13 -> 2
  14 -> 2
  15 -> 0
Average return over 200 eval episodes: 0.645
