![Logo](https://raw.githubusercontent.com/BartaZoltan/deep-reinforcement-learning-course/main/notebooks/shared_assets/logo.png)


**Developers:** Domonkos Nagy, Balazs Nagy, Zoltan Barta  
**Date:** 2026-02-23  
**Version:** 2025-26/2

[<img src="https://colab.research.google.com/assets/colab-badge.svg">](https://colab.research.google.com/github/BartaZoltan/deep-reinforcement-learning-course/blob/main/notebooks/sessions/session_02_mdp_dynamic_programming/session2_mdp_dp_dev.ipynb)

# Practice 2: MDP Dynamic Programming

## Summary

This notebook introduces **tabular Markov Decision Processes (MDPs)** and **Dynamic Programming** control methods in a practical, coding-first format.

Content outline:
- agent-environment interface and reusable GridWorld MDP design,
- Value Iteration implementation and convergence analysis,
- Policy Iteration implementation and behavior analysis,
- larger-map stress tests for scalability and robustness,
- transfer to Gymnasium FrozenLake,
- optional extension: Gambler's Problem.


## Introduction

This practical session develops tabular Dynamic Programming from first principles and then tests it across increasingly realistic environments. Following Sutton and Barto (Ch. 3-4), we start with a fully transparent custom GridWorld to formalize the agent-environment interface, state and action spaces, transition-reward dynamics, and the Markov property. On top of this model, we implement and analyze the two classical planning algorithms for finite MDPs: Value Iteration and Policy Iteration. The focus is not only on obtaining a final policy, but on understanding algorithmic behavior through convergence curves, value-function evolution, policy snapshots, sensitivity studies (e.g., $\gamma$, stopping thresholds, stochasticity), and rollout-based validation. After baseline experiments on small maps, we stress-test both methods on larger layouts to study scalability, robustness, and computational trade-offs. We then transfer the same workflow to Gymnasium FrozenLake to connect custom tabular implementations with standard RL tooling and interface conventions (`reset`, `step`, `terminated`, `truncated`, and transition model access in toy-text environments). As an optional extension, we include Gambler’s Problem to broaden intuition beyond GridWorld and show how the same DP ideas apply to a different tabular structure. By the end of the notebook, you should be able to build a clean tabular MDP, implement both DP control methods correctly, interpret their dynamics with meaningful diagnostics, and move confidently between custom and Gymnasium-based environments.

This notebook follows Chapter 3-4 of Sutton & Barto {cite}`sutton2018`.

## Markdov Decision Process, Agent-Environment interface, and GridWorld


In reinforcement learning, the interaction loop is:
- agent observes state $S_t$
- agent takes action $A_t$
- environment returns reward $R_{t+1}$ and next state $S_{t+1}$

The **MDP assumption** is that the future depends on the current state-action pair, not the full history:
$$
P(S_{t+1}, R_{t+1} \mid S_t, A_t)
$$

For planning with Dynamic Programming later, we need an explicit model:
- state space $\mathcal{S}$
- action space $\mathcal{A}$
- transition-reward dynamics $p(s', r \mid s, a)$
- discount factor $\gamma$

So in this part we build a GridWorld that supports both:
- a simulator-like API (`reset`, `step`) for rollouts,
- and a tabular model (`P[s][a]`) for Value/Policy Iteration.


In [None]:
from dataclasses import dataclass
from typing import Dict, List, Tuple, Optional

import numpy as np
import matplotlib.pyplot as plt
from matplotlib.colors import ListedColormap

SEED = 42
rng = np.random.default_rng(SEED)

# Action encoding for consistency across all later parts
UP, RIGHT, DOWN, LEFT = 0, 1, 2, 3
ACTIONS = {
    UP: (-1, 0),
    RIGHT: (0, 1),
    DOWN: (1, 0),
    LEFT: (0, -1),
}
ACTION_SYMBOLS = {UP: "↑", RIGHT: "→", DOWN: "↓", LEFT: "←"}
Transition = Tuple[float, int, float, bool]  # (prob, next_state, reward, done)


### Task 1 
**Build a reusable GridWorld MDP (10 min)**

We use a character map:
- `S`: start
- `.`: free cell
- `#`: wall (blocked)
- `G`: terminal goal
- `H`: terminal hole

Design choice (important for later DP):
- terminal states are absorbing in the model (`P[s_terminal][a] -> s_terminal`),
- walls are not states,
- invalid moves (off-grid / into wall) keep the agent in place.


In [None]:
@dataclass
class GridWorldConfig:
    char_map: List[str]
    step_reward: float = -1.0
    goal_reward: float = 10.0
    hole_reward: float = -10.0
    slip_prob: float = 0.0  # 0.0 = deterministic dynamics


@dataclass
class TabularMDP:
    nS: int
    nA: int
    P: Dict[int, Dict[int, List[Transition]]]
    state_to_pos: Dict[int, Tuple[int, int]]
    pos_to_state: Dict[Tuple[int, int], int]
    terminal_states: set
    start_state: int
    grid_chars: np.ndarray


### GridWorld implementation details

This class exposes:
- `reset()` and `step(action)` for interaction,
- `as_mdp()` that returns a full tabular model for planning algorithms.

This keeps one source of truth for dynamics and avoids mismatch bugs later.


In [None]:
class GridWorldEnv:
    def __init__(self, cfg: GridWorldConfig):
        self.cfg = cfg
        self.grid = np.array([list(row) for row in cfg.char_map], dtype="<U1")
        self.H, self.W = self.grid.shape

        self.pos_to_state: Dict[Tuple[int, int], int] = {}
        self.state_to_pos: Dict[int, Tuple[int, int]] = {}

        s_idx = 0
        for r in range(self.H):
            for c in range(self.W):
                if self.grid[r, c] != "#":
                    self.pos_to_state[(r, c)] = s_idx
                    self.state_to_pos[s_idx] = (r, c)
                    s_idx += 1

        self.nS = s_idx
        self.nA = 4

        self.terminal_states = set()
        self.start_state: Optional[int] = None

        for (r, c), s in self.pos_to_state.items():
            cell = self.grid[r, c]
            if cell in ("G", "H"):
                self.terminal_states.add(s)
            if cell == "S":
                if self.start_state is not None:
                    raise ValueError("Map must contain exactly one start cell 'S'.")
                self.start_state = s

        if self.start_state is None:
            raise ValueError("Map must contain one start cell 'S'.")

        self._P = self._build_transition_model()
        self._state = self.start_state

    def _move(self, r: int, c: int, a: int) -> Tuple[int, int]:
        dr, dc = ACTIONS[a]
        nr, nc = r + dr, c + dc
        if nr < 0 or nr >= self.H or nc < 0 or nc >= self.W or self.grid[nr, nc] == "#":
            return r, c
        return nr, nc

    def _cell_reward_done(self, r: int, c: int) -> Tuple[float, bool]:
        cell = self.grid[r, c]
        if cell == "G":
            return self.cfg.goal_reward, True
        if cell == "H":
            return self.cfg.hole_reward, True
        return self.cfg.step_reward, False

    def _build_transition_model(self) -> Dict[int, Dict[int, List[Transition]]]:
        left_of = {UP: LEFT, RIGHT: UP, DOWN: RIGHT, LEFT: DOWN}
        right_of = {UP: RIGHT, RIGHT: DOWN, DOWN: LEFT, LEFT: UP}

        P: Dict[int, Dict[int, List[Transition]]] = {
            s: {a: [] for a in range(self.nA)} for s in range(self.nS)
        }

        for s, (r, c) in self.state_to_pos.items():
            if s in self.terminal_states:
                for a in range(self.nA):
                    P[s][a] = [(1.0, s, 0.0, True)]
                continue

            for a in range(self.nA):
                outcomes = [(1.0 - self.cfg.slip_prob, a)]
                if self.cfg.slip_prob > 0:
                    outcomes.append((self.cfg.slip_prob / 2.0, left_of[a]))
                    outcomes.append((self.cfg.slip_prob / 2.0, right_of[a]))

                acc: Dict[Tuple[int, float, bool], float] = {}
                for p, a_eff in outcomes:
                    nr, nc = self._move(r, c, a_eff)
                    s_next = self.pos_to_state[(nr, nc)]
                    reward, done = self._cell_reward_done(nr, nc)
                    key = (s_next, reward, done)
                    acc[key] = acc.get(key, 0.0) + p

                P[s][a] = [(p, s_next, reward, done) for (s_next, reward, done), p in acc.items()]

        return P

    def reset(self) -> int:
        self._state = self.start_state
        return self._state

    def step(self, action: int) -> Tuple[int, float, bool, dict]:
        if action not in ACTIONS:
            raise ValueError(f"Invalid action {action}. Must be in {list(ACTIONS.keys())}.")

        transitions = self._P[self._state][action]
        probs = np.array([p for p, _, _, _ in transitions], dtype=float)
        idx = int(rng.choice(len(transitions), p=probs))
        _, s_next, reward, done = transitions[idx]

        self._state = s_next
        return s_next, reward, done, {}

    def as_mdp(self) -> TabularMDP:
        return TabularMDP(
            nS=self.nS,
            nA=self.nA,
            P=self._P,
            state_to_pos=self.state_to_pos,
            pos_to_state=self.pos_to_state,
            terminal_states=self.terminal_states,
            start_state=self.start_state,
            grid_chars=self.grid,
        )


### GridWorld diagnostics and sanity checks

Before running any algorithms, we should verify the environment logic visually:
- map rendering,
- greedy/random rollout trace,
- transition probability sanity check (`sum_p = 1`).


In [None]:
from pathlib import Path
from IPython.display import Image, display
from matplotlib.collections import LineCollection


def _draw_cell_grid(ax, H: int, W: int) -> None:
    # Draw grid lines on cell borders (not through cell centers)
    ax.set_xticks(np.arange(-0.5, W, 1), minor=True)
    ax.set_yticks(np.arange(-0.5, H, 1), minor=True)
    ax.grid(which="minor", color="gray", linewidth=0.8)
    ax.tick_params(which="minor", bottom=False, left=False)
    ax.set_xlim(-0.5, W - 0.5)
    ax.set_ylim(H - 0.5, -0.5)


def _map_code_array(grid_chars: np.ndarray) -> np.ndarray:
    # Supports custom GridWorld and FrozenLake chars
    code_map = {"#": 0, ".": 1, "F": 1, "S": 2, "G": 3, "H": 4}
    arr = np.full(grid_chars.shape, 1, dtype=int)
    for r in range(grid_chars.shape[0]):
        for c in range(grid_chars.shape[1]):
            arr[r, c] = code_map.get(grid_chars[r, c], 1)
    return arr


def values_to_grid(mdp: TabularMDP, V: np.ndarray) -> np.ndarray:
    H, W = mdp.grid_chars.shape
    grid = np.full((H, W), np.nan, dtype=float)
    for s, (r, c) in mdp.state_to_pos.items():
        grid[r, c] = V[s]
    return grid


def _text_color_for_value(v: float, vmin: float, vmax: float, threshold: float = 0.55) -> str:
    if not np.isfinite(v):
        return "black"
    if vmax <= vmin:
        return "black"
    norm = (v - vmin) / (vmax - vmin)
    return "white" if norm < threshold else "black"


def render_map(env: GridWorldEnv, title: str = "GridWorld map") -> None:
    arr = _map_code_array(env.grid)
    cmap = ListedColormap(["black", "white", "#A7D3F5", "#B7E4C7", "#F8B4B4"])

    fig, ax = plt.subplots(figsize=(5, 5))
    ax.imshow(arr, cmap=cmap, vmin=0, vmax=4)
    ax.set_title(title)
    ax.set_xticks(range(env.W))
    ax.set_yticks(range(env.H))
    _draw_cell_grid(ax, env.H, env.W)

    for r in range(env.H):
        for c in range(env.W):
            txt = " " if env.grid[r, c] in (".", "F") else env.grid[r, c]
            ax.text(c, r, txt, ha="center", va="center", color="black", fontsize=12, fontweight="bold")

    plt.tight_layout()
    plt.show()


def render_policy_arrows(mdp: TabularMDP, greedy_actions: np.ndarray, title: str = "Policy arrows") -> None:
    H, W = mdp.grid_chars.shape
    bg = np.zeros((H, W), dtype=float)

    fig, ax = plt.subplots(figsize=(5, 5))
    ax.imshow(bg, cmap=ListedColormap(["#f7f7f7"]))
    ax.set_title(title)
    ax.set_xticks(range(W))
    ax.set_yticks(range(H))
    _draw_cell_grid(ax, H, W)

    for r in range(H):
        for c in range(W):
            ch = mdp.grid_chars[r, c]
            if ch == "#":
                txt = "#"
            elif ch in ("G", "H"):
                txt = ch
            elif ch in (".", "F"):
                txt = " "
            else:
                s = mdp.pos_to_state[(r, c)]
                txt = ACTION_SYMBOLS[int(greedy_actions[s])]
            ax.text(c, r, txt, ha="center", va="center", color="black", fontsize=12, fontweight="bold")

    plt.tight_layout()
    plt.show()


def validate_transition_model(mdp: TabularMDP) -> None:
    for s in range(mdp.nS):
        for a in range(mdp.nA):
            p_sum = sum(p for p, _, _, _ in mdp.P[s][a])
            if not np.isclose(p_sum, 1.0):
                raise AssertionError(f"Transition probabilities do not sum to 1 at state={s}, action={a}: {p_sum}")
    print("Transition model sanity check passed (all action distributions sum to 1).")


def plot_visitation_heatmap(env: GridWorldEnv, counts: np.ndarray, title: str = "Random policy visitation"):
    grid = np.full((env.H, env.W), np.nan, dtype=float)
    for s, (r, c) in env.state_to_pos.items():
        grid[r, c] = counts[s]

    fig, ax = plt.subplots(figsize=(5, 5))
    im = ax.imshow(grid, cmap="magma")
    ax.set_title(title)
    ax.set_xticks(range(env.W))
    ax.set_yticks(range(env.H))
    _draw_cell_grid(ax, env.H, env.W)

    max_v = np.nanmax(grid) if np.any(~np.isnan(grid)) else 1.0
    for r in range(env.H):
        for c in range(env.W):
            ch = env.grid[r, c]
            if ch == "#":
                ax.text(c, r, "#", ha="center", va="center", color="white", fontweight="bold")
            elif not np.isnan(grid[r, c]):
                color = "black" if grid[r, c] > 0.45 * max_v else "white"
                ax.text(c, r, f"{int(grid[r, c])}", ha="center", va="center", color=color, fontsize=9)

    fig.colorbar(im, ax=ax, shrink=0.8, label="visit count")
    plt.tight_layout()
    plt.show()


def plot_value_heatmap(mdp: TabularMDP, V: np.ndarray, title: str = "Value heatmap"):
    grid = values_to_grid(mdp, V)
    H, W = grid.shape

    fig, ax = plt.subplots(figsize=(5, 5))
    im = ax.imshow(grid, cmap="viridis")
    ax.set_title(title)
    ax.set_xticks(range(W))
    ax.set_yticks(range(H))
    _draw_cell_grid(ax, H, W)

    finite_vals = grid[np.isfinite(grid)]
    vmin = float(np.min(finite_vals)) if finite_vals.size else 0.0
    vmax = float(np.max(finite_vals)) if finite_vals.size else 1.0

    for r in range(H):
        for c in range(W):
            ch = mdp.grid_chars[r, c]
            if ch == "#":
                ax.text(c, r, "#", ha="center", va="center", color="white", fontweight="bold")
            elif np.isfinite(grid[r, c]):
                color = _text_color_for_value(grid[r, c], vmin, vmax, threshold=0.55)
                ax.text(c, r, f"{grid[r, c]:.1f}", ha="center", va="center", color=color, fontsize=9)

    fig.colorbar(im, ax=ax, shrink=0.8, label="V(s)")
    plt.tight_layout()
    plt.show()


def plot_vi_convergence(deltas: list, title: str = "Value Iteration convergence"):
    fig, ax = plt.subplots(figsize=(6, 3.5))
    ax.plot(deltas, lw=2)
    ax.set_yscale("log")
    ax.set_xlabel("Sweep")
    ax.set_ylabel("max update delta")
    ax.set_title(title)
    ax.grid(True, alpha=0.3)
    plt.tight_layout()
    plt.show()


def plot_vi_snapshots(mdp: TabularMDP, history: list, k_list=None, title_prefix: str = "VI snapshot"):
    if not history:
        return
    if k_list is None:
        last = len(history) - 1
        mid = last // 2
        k_list = sorted(set([0, mid, last]))

    fig, axes = plt.subplots(1, len(k_list), figsize=(4 * len(k_list), 4))
    if len(k_list) == 1:
        axes = [axes]

    all_grids = [values_to_grid(mdp, history[k]) for k in k_list]
    all_vals = np.concatenate([g[np.isfinite(g)] for g in all_grids if np.any(np.isfinite(g))])
    vmin = float(np.min(all_vals)) if all_vals.size else 0.0
    vmax = float(np.max(all_vals)) if all_vals.size else 1.0

    for ax, k in zip(axes, k_list):
        grid = values_to_grid(mdp, history[k])
        im = ax.imshow(grid, cmap="viridis")
        ax.set_title(f"{title_prefix} k={k+1}")
        ax.set_xticks(range(grid.shape[1]))
        ax.set_yticks(range(grid.shape[0]))
        _draw_cell_grid(ax, grid.shape[0], grid.shape[1])

        for r in range(grid.shape[0]):
            for c in range(grid.shape[1]):
                ch = mdp.grid_chars[r, c]
                if ch == "#":
                    ax.text(c, r, "#", ha="center", va="center", color="white", fontweight="bold")
                elif np.isfinite(grid[r, c]):
                    color = _text_color_for_value(grid[r, c], vmin, vmax, threshold=0.55)
                    ax.text(c, r, f"{grid[r, c]:.1f}", ha="center", va="center", color=color, fontsize=8)

    fig.colorbar(im, ax=axes, shrink=0.75)
    plt.tight_layout()
    plt.show()


def _draw_policy_on_axis(ax, mdp: TabularMDP, actions: np.ndarray, title: str):
    arr = np.zeros(mdp.grid_chars.shape, dtype=float)
    ax.imshow(arr, cmap=ListedColormap(["#f7f7f7"]))
    ax.set_title(title, fontsize=11)
    ax.set_xticks(range(arr.shape[1]))
    ax.set_yticks(range(arr.shape[0]))
    _draw_cell_grid(ax, arr.shape[0], arr.shape[1])

    for r in range(arr.shape[0]):
        for c in range(arr.shape[1]):
            ch = mdp.grid_chars[r, c]
            if ch == "#":
                txt = "#"
            elif ch in ("G", "H"):
                txt = ch
            elif ch in (".", "F"):
                txt = " "
            else:
                s = mdp.pos_to_state[(r, c)]
                txt = ACTION_SYMBOLS[int(actions[s])]
            ax.text(c, r, txt, ha="center", va="center", color="black", fontsize=11, fontweight="bold")


def _draw_value_on_axis(ax, mdp: TabularMDP, V: np.ndarray, title: str):
    grid = values_to_grid(mdp, V)
    im = ax.imshow(grid, cmap="viridis")
    ax.set_title(title, fontsize=11)
    ax.set_xticks(range(grid.shape[1]))
    ax.set_yticks(range(grid.shape[0]))
    _draw_cell_grid(ax, grid.shape[0], grid.shape[1])

    finite_vals = grid[np.isfinite(grid)]
    vmin = float(np.min(finite_vals)) if finite_vals.size else 0.0
    vmax = float(np.max(finite_vals)) if finite_vals.size else 1.0

    for r in range(grid.shape[0]):
        for c in range(grid.shape[1]):
            ch = mdp.grid_chars[r, c]
            if ch == "#":
                ax.text(c, r, "#", ha="center", va="center", color="white", fontweight="bold")
            elif np.isfinite(grid[r, c]):
                color = _text_color_for_value(grid[r, c], vmin, vmax, threshold=0.55)
                ax.text(c, r, f"{grid[r, c]:.1f}", ha="center", va="center", color=color, fontsize=8)

    return im


def save_vi_value_gif(
    mdp: TabularMDP,
    history: list,
    gif_path: Path,
    title_prefix: str = "VI value evolution",
    show_after_save: bool = True,
):
    if not history:
        print("No history available; GIF was not created.")
        return

    from matplotlib.animation import FuncAnimation, PillowWriter

    gif_path.parent.mkdir(parents=True, exist_ok=True)
    grids = [values_to_grid(mdp, V) for V in history]
    H, W = grids[0].shape

    fig, ax = plt.subplots(figsize=(5, 5))
    im = ax.imshow(grids[0], cmap="viridis")
    txt = ax.set_title(f"{title_prefix} | sweep=1")
    ax.set_xticks(range(W))
    ax.set_yticks(range(H))
    _draw_cell_grid(ax, H, W)

    def update(frame_idx):
        im.set_data(grids[frame_idx])
        txt.set_text(f"{title_prefix} | sweep={frame_idx + 1}")
        return im, txt

    ani = FuncAnimation(fig, update, frames=len(grids), interval=180, blit=False)
    ani.save(gif_path, writer=PillowWriter(fps=6))
    plt.close(fig)
    print(f"Saved GIF: {gif_path}")

    if show_after_save:
        display(Image(filename=str(gif_path)))


def save_vi_convergence_png(
    deltas: list,
    png_path: Path,
    title: str = "VI convergence",
    show_after_save: bool = True,
):
    if not deltas:
        print("No deltas available; PNG was not created.")
        return

    png_path.parent.mkdir(parents=True, exist_ok=True)

    fig, ax = plt.subplots(figsize=(6, 3.5))
    ax.plot(deltas, lw=2)
    ax.set_yscale("log")
    ax.set_xlabel("Sweep")
    ax.set_ylabel("max update delta")
    ax.set_title(title)
    ax.grid(True, alpha=0.3)
    fig.tight_layout()
    fig.savefig(png_path, dpi=160, bbox_inches="tight")
    plt.close(fig)
    print(f"Saved PNG: {png_path}")

    if show_after_save:
        display(Image(filename=str(png_path)))


def save_trajectory_gif(
    env,
    states: List[int],
    gif_path: Path,
    title: str = "Trajectory evolution",
    fps: int = 6,
    interval_ms: int = 170,
    trail_len: int = 18,
    show_after_save: bool = True,
):
    from matplotlib.animation import FuncAnimation, PillowWriter

    gif_path.parent.mkdir(parents=True, exist_ok=True)
    pts = np.array([env.state_to_pos[s] for s in states], dtype=float)
    ys, xs = pts[:, 0], pts[:, 1]

    arr = _map_code_array(env.grid)
    cmap = ListedColormap(["black", "white", "#A7D3F5", "#B7E4C7", "#F8B4B4"])

    fig, ax = plt.subplots(figsize=(5, 5))
    ax.imshow(arr, cmap=cmap, vmin=0, vmax=4)
    ax.set_title(title)
    ax.set_xticks(range(env.W))
    ax.set_yticks(range(env.H))
    _draw_cell_grid(ax, env.H, env.W)

    for r in range(env.H):
        for c in range(env.W):
            txt = " " if env.grid[r, c] in (".", "F") else env.grid[r, c]
            ax.text(c, r, txt, ha="center", va="center", color="black", fontsize=10, fontweight="bold")

    lc = LineCollection([], linewidths=2.5, capstyle="round")
    ax.add_collection(lc)

    ax.scatter(xs[0], ys[0], color="orange", s=90, zorder=6, label="start")
    head = ax.scatter([], [], color="red", s=75, zorder=7, label="current")
    ax.legend(loc="upper right")

    base_rgb = np.array([31/255, 119/255, 180/255])

    def update(i):
        if i == 0:
            lc.set_segments([])
            head.set_offsets([[xs[0], ys[0]]])
            return lc, head

        start_idx = max(0, i - trail_len)
        segs, cols = [], []
        window = i - start_idx

        for j in range(start_idx, i):
            segs.append([(xs[j], ys[j]), (xs[j + 1], ys[j + 1])])
            age = i - j
            alpha = max(0.08, 1.0 - (age - 1) / max(1, window))
            cols.append((base_rgb[0], base_rgb[1], base_rgb[2], alpha))

        lc.set_segments(segs)
        lc.set_color(cols)
        head.set_offsets([[xs[i], ys[i]]])
        return lc, head

    ani = FuncAnimation(fig, update, frames=len(states), interval=interval_ms, blit=False)
    ani.save(gif_path, writer=PillowWriter(fps=fps))
    plt.close(fig)
    print(f"Saved GIF: {gif_path}")

    if show_after_save:
        display(Image(filename=str(gif_path)))


In [None]:
# Example map intended for later Value/Policy Iteration experiments
char_map = [
    "S...",
    ".#..",
    "..H.",
    "...G",
]

cfg = GridWorldConfig(
    char_map=char_map,
    step_reward=-1.0,
    goal_reward=10.0,
    hole_reward=-10.0,
    slip_prob=0.10,
)

env = GridWorldEnv(cfg)
mdp = env.as_mdp()

render_map(env, "GridWorld")
validate_transition_model(mdp)
print(f"nS={mdp.nS}, nA={mdp.nA}, start={mdp.start_state}, terminal_states={sorted(mdp.terminal_states)}")


In [None]:
# Quick rollout demo with a random policy to validate reset/step interface
state = env.reset()
trajectory = [state]
rewards = []

for t in range(25):
    action = int(rng.integers(0, env.nA))
    next_state, reward, done, _ = env.step(action)
    trajectory.append(next_state)
    rewards.append(reward)
    if done:
        break

print("Rollout length:", len(trajectory) - 1)
print("Total reward:", float(np.sum(rewards)))
print("Visited states:", trajectory)


### Why this implementation is suitable for later DP

- **Single source of dynamics truth**: simulator and tabular model are derived from the same transition logic.
- **Tabular structure ready**: `P[s][a] -> [(prob, s_next, reward, done), ...]` exactly what Value/Policy Iteration needs.
- **Configurable stochasticity**: `slip_prob` lets us move from deterministic GridWorld to FrozenLake-like behavior.
- **Debuggable**: map rendering + rollout + transition checks catch modeling errors early.


---CUT---

### Experiments - Random walk and diagnostics

These experiments deepen intuition before dynamic programming algorithms.

1. Random-walk baseline performance.
2. Monte Carlo visitation heatmap.
3. Transition probability sanity probes.
4. Reward sensitivity sweep.
5. Single-episode trajectory visualization.


In [None]:
def run_random_episode(env: GridWorldEnv, max_steps: int = 200):
    state = env.reset()
    states = [state]
    actions = []
    rewards = []

    done = False
    for _ in range(max_steps):
        a = int(rng.integers(0, env.nA))
        s_next, r, done, _ = env.step(a)
        actions.append(a)
        rewards.append(r)
        states.append(s_next)
        if done:
            break

    return {
        "states": states,
        "actions": actions,
        "rewards": rewards,
        "done": done,
        "return": float(np.sum(rewards)),
        "length": len(actions),
        "terminal_state": states[-1],
    }


def evaluate_random_policy(env: GridWorldEnv, n_episodes: int = 2000, max_steps: int = 200):
    success = 0
    holes = 0
    timeouts = 0
    returns = []
    lengths = []

    for _ in range(n_episodes):
        ep = run_random_episode(env, max_steps=max_steps)
        returns.append(ep["return"])
        lengths.append(ep["length"])

        if ep["done"]:
            r, c = env.state_to_pos[ep["terminal_state"]]
            cell = env.grid[r, c]
            if cell == "G":
                success += 1
            elif cell == "H":
                holes += 1
        else:
            timeouts += 1

    return {
        "episodes": n_episodes,
        "success_rate": success / n_episodes,
        "hole_rate": holes / n_episodes,
        "timeout_rate": timeouts / n_episodes,
        "mean_return": float(np.mean(returns)),
        "std_return": float(np.std(returns)),
        "mean_length": float(np.mean(lengths)),
    }


def print_baseline_report(title: str, report: dict):
    print(title)
    print(f"  episodes:     {report['episodes']}")
    print(f"  success_rate: {report['success_rate']*100:6.2f}%")
    print(f"  hole_rate:    {report['hole_rate']*100:6.2f}%")
    print(f"  timeout_rate: {report['timeout_rate']*100:6.2f}%")
    print(f"  mean_return:  {report['mean_return']:7.3f} ± {report['std_return']:.3f}")
    print(f"  mean_length:  {report['mean_length']:7.2f}")


cfg_det = GridWorldConfig(char_map=char_map, step_reward=-1.0, goal_reward=10.0, hole_reward=-10.0, slip_prob=0.0)
cfg_slip = GridWorldConfig(char_map=char_map, step_reward=-1.0, goal_reward=10.0, hole_reward=-10.0, slip_prob=0.20)

env_det = GridWorldEnv(cfg_det)
env_slip = GridWorldEnv(cfg_slip)

rep_det = evaluate_random_policy(env_det, n_episodes=1500, max_steps=120)
rep_slip = evaluate_random_policy(env_slip, n_episodes=1500, max_steps=120)

print_baseline_report("Random policy baseline | deterministic", rep_det)
print()
print_baseline_report("Random policy baseline | slippery (slip_prob=0.20)", rep_slip)


**Monte Carlo visitation heatmap**

State visitation frequencies show which areas are naturally explored under random behavior.
This is useful context before policy optimization.


In [None]:
def visitation_counts_random(env: GridWorldEnv, n_episodes: int = 2000, max_steps: int = 200) -> np.ndarray:
    counts = np.zeros(env.nS, dtype=float)

    for _ in range(n_episodes):
        s = env.reset()
        counts[s] += 1
        for _ in range(max_steps):
            a = int(rng.integers(0, env.nA))
            s_next, _, done, _ = env.step(a)
            counts[s_next] += 1
            if done:
                break

    return counts


counts_det = visitation_counts_random(env_det, n_episodes=2000, max_steps=120)
counts_slip = visitation_counts_random(env_slip, n_episodes=2000, max_steps=120)

plot_visitation_heatmap(env_det, counts_det, "Visitation heatmap | deterministic")
plot_visitation_heatmap(env_slip, counts_slip, "Visitation heatmap | slippery")


**Transition sanity probes**

For selected states/actions, inspect the full $p(s', r, done \mid s, a)$ entries.
This validates edge handling, wall collisions, and slip dynamics.


In [None]:
def print_transition_probe(env: GridWorldEnv, state_pos: Tuple[int, int], action: int):
    s = env.pos_to_state[state_pos]
    print(f"Probe at state={s}, pos={state_pos}, action={action} ({ACTION_SYMBOLS[action]})")
    rows = sorted(env.as_mdp().P[s][action], key=lambda x: (-x[0], x[1]))
    for p, s_next, r, done in rows:
        print(f"  p={p:.3f} -> s'={s_next:2d}, pos'={env.state_to_pos[s_next]}, reward={r:5.1f}, done={done}")
    print()


print("Deterministic probes")
print_transition_probe(env_det, (0, 0), RIGHT)
print_transition_probe(env_det, (0, 0), UP)
print_transition_probe(env_det, (1, 0), RIGHT)  # attempts to move into wall at (1,1)

print("Slippery probes")
print_transition_probe(env_slip, (0, 0), RIGHT)
print_transition_probe(env_slip, (1, 0), RIGHT)


**Reward sensitivity sweep**

Without changing dynamics, vary `step_reward` and evaluate random-policy return.
Even before optimization, reward scale influences expected return statistics.


In [None]:
def reward_sensitivity_experiment(step_rewards=(-0.1, -0.5, -1.0, -2.0), slip_prob=0.10):
    rows = []
    for sr in step_rewards:
        cfg_tmp = GridWorldConfig(
            char_map=char_map,
            step_reward=float(sr),
            goal_reward=10.0,
            hole_reward=-10.0,
            slip_prob=slip_prob,
        )
        env_tmp = GridWorldEnv(cfg_tmp)
        rep = evaluate_random_policy(env_tmp, n_episodes=1200, max_steps=120)
        rows.append((sr, rep["mean_return"], rep["success_rate"], rep["hole_rate"]))

    print("step_reward | mean_return | success_rate | hole_rate")
    for sr, mr, suc, hol in rows:
        print(f"{sr:10.2f} | {mr:11.3f} | {suc*100:10.2f}% | {hol*100:8.2f}%")

    x = [r[0] for r in rows]
    y = [r[1] for r in rows]
    plt.figure(figsize=(6, 3.5))
    plt.plot(x, y, marker="o")
    plt.xlabel("step_reward")
    plt.ylabel("mean random-policy return")
    plt.title("Reward sensitivity under random policy")
    plt.grid(True, alpha=0.3)
    plt.tight_layout()
    plt.show()


reward_sensitivity_experiment(step_rewards=(-0.1, -0.5, -1.0, -2.0), slip_prob=0.10)


**Trajectory visualization**

Plot a single random episode path on top of the grid for debugging and teaching.


In [None]:

from pathlib import Path
from matplotlib.animation import FuncAnimation, PillowWriter
from matplotlib.collections import LineCollection
from IPython.display import Image, display

def save_random_walk_gif_fading(
    env: GridWorldEnv,
    states: List[int],
    gif_path: str = "random_walk_fading.gif",
    title: str = "Random walk trajectory (fading trail)",
    fps: int = 6,
    trail_len: int = 18,
    show_after_save: bool = True,   # <- new
):
    code_map = {"#": 0, ".": 1, "S": 2, "G": 3, "H": 4}
    arr = np.vectorize(code_map.get)(env.grid)
    cmap = ListedColormap(["black", "white", "#A7D3F5", "#B7E4C7", "#F8B4B4"])

    pts = np.array([env.state_to_pos[s] for s in states], dtype=float)
    ys, xs = pts[:, 0], pts[:, 1]

    fig, ax = plt.subplots(figsize=(5, 5))
    ax.imshow(arr, cmap=cmap, vmin=0, vmax=4)
    ax.set_title(title)
    ax.set_xticks(range(env.W))
    ax.set_yticks(range(env.H))
    _draw_cell_grid(ax, env.H, env.W)

    for r in range(env.H):
        for c in range(env.W):
            txt = " " if env.grid[r, c] == "." else env.grid[r, c]
            ax.text(c, r, txt, ha="center", va="center", color="black", fontsize=10, fontweight="bold")

    lc = LineCollection([], linewidths=2.5, capstyle="round")
    ax.add_collection(lc)

    head = ax.scatter([], [], color="red", s=80, zorder=6)
    ax.scatter(xs[0], ys[0], color="orange", s=90, zorder=6, label="start")
    ax.legend(loc="upper right")

    base_rgb = np.array([31/255, 119/255, 180/255])

    def update(i):
        if i == 0:
            lc.set_segments([])
            head.set_offsets([[xs[0], ys[0]]])
            return lc, head

        start_idx = max(0, i - trail_len)
        segs, cols = [], []
        window = i - start_idx

        for j in range(start_idx, i):
            segs.append([(xs[j], ys[j]), (xs[j + 1], ys[j + 1])])
            age = i - j
            alpha = max(0.08, 1.0 - (age - 1) / max(1, window))
            cols.append((base_rgb[0], base_rgb[1], base_rgb[2], alpha))

        lc.set_segments(segs)
        lc.set_color(cols)
        head.set_offsets([[xs[i], ys[i]]])
        return lc, head

    ani = FuncAnimation(fig, update, frames=len(states), interval=170, blit=False)

    out = Path(gif_path)
    out.parent.mkdir(parents=True, exist_ok=True)
    ani.save(out, writer=PillowWriter(fps=fps))
    plt.close(fig)
    print(f"Saved GIF: {out}")

    if show_after_save:
        display(Image(filename=str(out)))


# Example
ep = run_random_episode(env_slip, max_steps=60)
print(f"Trajectory length={ep['length']}, return={ep['return']:.2f}, done={ep['done']}")
save_random_walk_gif_fading(
    env_slip,
    ep["states"],
    gif_path="notebooks/sessions/session_02_mdp_dynamic_programming/assets/web_outputs/random_walk_slippery_fading.gif",
    title="Random walk trajectory (slippery, fading trail)",
    fps=6,
    trail_len=18,
    show_after_save=True,
)


## Value Iteration

### Short theory recap (Sutton & Barto, Ch. 4)

Value Iteration repeatedly applies the Bellman optimality operator:
$$
V_{k+1}(s) = \max_a \sum_{s',r} p(s',r\mid s,a) \left[r + \gamma V_k(s')\right]
$$

Intuition:
- each sweep improves the approximation of the optimal value function,
- the `max` operation already performs policy improvement implicitly,
- once $V$ converges, an optimal greedy policy can be extracted from it.


### Task 2
**Implement Value Iteration (10-20 min)**

Goal:
- implement a Bellman optimality backup,
- run the full Value Iteration loop (`delta < theta`),
- extract the greedy policy.


In [None]:
def bellman_optimality_backup(mdp: TabularMDP, V: np.ndarray, s: int, gamma: float) -> float:
    if s in mdp.terminal_states:
        return 0.0

    q_vals = np.zeros(mdp.nA, dtype=float)
    for a in range(mdp.nA):
        for p, s_next, r, done in mdp.P[s][a]:
            q_vals[a] += p * (r + gamma * (0.0 if done else V[s_next]))
    return float(np.max(q_vals))


def greedy_actions_from_values(mdp: TabularMDP, V: np.ndarray, gamma: float) -> np.ndarray:
    actions = np.zeros(mdp.nS, dtype=int)
    for s in range(mdp.nS):
        if s in mdp.terminal_states:
            actions[s] = 0
            continue

        q_vals = np.zeros(mdp.nA, dtype=float)
        for a in range(mdp.nA):
            for p, s_next, r, done in mdp.P[s][a]:
                q_vals[a] += p * (r + gamma * (0.0 if done else V[s_next]))
        actions[s] = int(np.argmax(q_vals))
    return actions


def value_iteration(
    mdp: TabularMDP,
    gamma: float = 0.95,
    theta: float = 1e-10,
    max_sweeps: int = 10_000,
    return_history: bool = False,
):
    V = np.zeros(mdp.nS, dtype=float)
    deltas = []
    history = []

    for sweep in range(1, max_sweeps + 1):
        V_old = V.copy()
        delta = 0.0

        for s in range(mdp.nS):
            v_new = bellman_optimality_backup(mdp, V_old, s, gamma)
            V[s] = v_new
            delta = max(delta, abs(V[s] - V_old[s]))

        deltas.append(delta)
        if return_history:
            history.append(V.copy())

        if delta < theta:
            break

    greedy_actions = greedy_actions_from_values(mdp, V, gamma)
    result = {
        "V": V,
        "greedy_actions": greedy_actions,
        "sweeps": sweep,
        "deltas": deltas,
    }
    if return_history:
        result["history"] = history
    return result


# Futtatás a két Part-1 környezeten
res_det = value_iteration(env_det.as_mdp(), gamma=0.95, theta=1e-10, return_history=True)
res_slip = value_iteration(env_slip.as_mdp(), gamma=0.95, theta=1e-10, return_history=True)

print(f"Deterministic: sweeps={res_det['sweeps']}, final_delta={res_det['deltas'][-1]:.3e}")
print(f"Slippery:      sweeps={res_slip['sweeps']}, final_delta={res_slip['deltas'][-1]:.3e}")


### Value Iteration visual diagnostics

We visualize:
- value-function heatmaps,
- greedy-policy arrow maps,
- convergence curves (`delta` per sweep),
- intermediate Value Iteration snapshots.


In [None]:
mdp_det = env_det.as_mdp()
mdp_slip = env_slip.as_mdp()

plot_value_heatmap(mdp_det, res_det["V"], title="VI value heatmap | deterministic")
render_policy_arrows(mdp_det, res_det["greedy_actions"], title="VI greedy policy | deterministic")
plot_vi_convergence(res_det["deltas"], title="VI convergence | deterministic")
plot_vi_snapshots(mdp_det, res_det["history"], title_prefix="Det VI")

plot_value_heatmap(mdp_slip, res_slip["V"], title="VI value heatmap | slippery")
render_policy_arrows(mdp_slip, res_slip["greedy_actions"], title="VI greedy policy | slippery")
plot_vi_convergence(res_slip["deltas"], title="VI convergence | slippery")
plot_vi_snapshots(mdp_slip, res_slip["history"], title_prefix="Slip VI")


try:
    out_dir = Path("notebooks/sessions/session_02_mdp_dynamic_programming/assets/web_outputs")

    # GIFs only for gridworld-like spatial evolution
    save_vi_value_gif(
        mdp_det,
        res_det["history"],
        out_dir / "session2_vi_value_det.gif",
        title_prefix="VI value evolution (deterministic)",
        show_after_save=True,
    )
    save_vi_value_gif(
        mdp_slip,
        res_slip["history"],
        out_dir / "session2_vi_value_slip.gif",
        title_prefix="VI value evolution (slippery)",
        show_after_save=True,
    )

    # Line plots as static PNG
    save_vi_convergence_png(
        res_det["deltas"],
        out_dir / "session2_vi_conv_det.png",
        title="VI convergence (deterministic)",
        show_after_save=True,
    )
    save_vi_convergence_png(
        res_slip["deltas"],
        out_dir / "session2_vi_conv_slip.png",
        title="VI convergence (slippery)",
        show_after_save=True,
    )

except Exception as e:
    print(f"Asset export skipped: {e}")


### Value Iteration experiment A: `gamma` sensitivity

What we analyze:
- number of convergence sweeps,
- value at the start state,
- changes in the greedy policy.


In [None]:
def compare_policies(actions_a: np.ndarray, actions_b: np.ndarray, terminal_states: set) -> float:
    mask = np.ones_like(actions_a, dtype=bool)
    for s in terminal_states:
        mask[s] = False
    return float(np.mean(actions_a[mask] == actions_b[mask]))


gammas = [0.70, 0.85, 0.95, 0.99]
rows = []
policies = {}

for g in gammas:
    out = value_iteration(mdp_det, gamma=g, theta=1e-10, return_history=False)
    s0 = mdp_det.start_state
    rows.append((g, out["sweeps"], out["V"][s0]))
    policies[g] = out["greedy_actions"]

print("gamma | sweeps | V(start)")
for g, sw, vs in rows:
    print(f"{g:4.2f} | {sw:6d} | {vs:8.3f}")

print("\nPolicy agreement vs gamma=0.95")
base = policies[0.95]
for g in gammas:
    agree = compare_policies(base, policies[g], mdp_det.terminal_states)
    print(f"gamma={g:4.2f} -> agreement={agree*100:6.2f}%")

plt.figure(figsize=(6, 3.5))
plt.plot([r[0] for r in rows], [r[2] for r in rows], marker="o")
plt.xlabel("gamma")
plt.ylabel("V(start)")
plt.title("Gamma sensitivity (deterministic GridWorld)")
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()


### Value Iteration experiment B: `theta` (stopping threshold) sensitivity

What we analyze:
- number of sweeps and runtime,
- solution accuracy compared to a strict reference solution.


In [None]:
import time

thetas = [1e-4, 1e-6, 1e-8, 1e-10]
ref = value_iteration(mdp_det, gamma=0.95, theta=1e-12, return_history=False)
V_ref = ref["V"]

results = []
for th in thetas:
    t0 = time.perf_counter()
    out = value_iteration(mdp_det, gamma=0.95, theta=th, return_history=False)
    dt_ms = (time.perf_counter() - t0) * 1e3
    linf = float(np.max(np.abs(out["V"] - V_ref)))
    results.append((th, out["sweeps"], dt_ms, linf))

print("theta | sweeps | runtime_ms | L_inf_to_ref")
for th, sw, dt, linf in results:
    print(f"{th:>5.0e} | {sw:6d} | {dt:10.3f} | {linf:11.3e}")

plt.figure(figsize=(6, 3.5))
plt.plot([r[0] for r in results], [r[1] for r in results], marker="o")
plt.xscale("log")
plt.xlabel("theta")
plt.ylabel("sweeps")
plt.title("Stopping threshold vs sweep count")
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()


### Value Iteration example: greedy-policy rollouts

We execute episodes with the greedy policy returned by Value Iteration to inspect behavior, not only value tables.


In [None]:
def rollout_with_greedy_actions(env: GridWorldEnv, greedy_actions: np.ndarray, max_steps: int = 100):
    s = env.reset()
    states = [s]
    rewards = []
    done = False

    for _ in range(max_steps):
        a = int(greedy_actions[s])
        s_next, r, done, _ = env.step(a)
        states.append(s_next)
        rewards.append(r)
        s = s_next
        if done:
            break

    return {
        "states": states,
        "return": float(np.sum(rewards)),
        "length": len(rewards),
        "done": done,
        "terminal_state": states[-1],
    }


def evaluate_greedy_policy(env: GridWorldEnv, greedy_actions: np.ndarray, n_episodes: int = 200):
    returns = []
    success = 0

    for _ in range(n_episodes):
        ep = rollout_with_greedy_actions(env, greedy_actions, max_steps=120)
        returns.append(ep["return"])
        if ep["done"]:
            r, c = env.state_to_pos[ep["terminal_state"]]
            if env.grid[r, c] == "G":
                success += 1

    return float(np.mean(returns)), success / n_episodes


mean_ret_det, succ_det = evaluate_greedy_policy(env_det, res_det["greedy_actions"], n_episodes=200)
mean_ret_slip, succ_slip = evaluate_greedy_policy(env_slip, res_slip["greedy_actions"], n_episodes=200)

print(f"Deterministic greedy policy: mean_return={mean_ret_det:.3f}, success_rate={succ_det*100:.1f}%")
print(f"Slippery greedy policy:      mean_return={mean_ret_slip:.3f}, success_rate={succ_slip*100:.1f}%")

ep_det = rollout_with_greedy_actions(env_det, res_det["greedy_actions"], max_steps=60)
ep_slip = rollout_with_greedy_actions(env_slip, res_slip["greedy_actions"], max_steps=60)





try:
    out_dir = Path("notebooks/sessions/session_02_mdp_dynamic_programming/assets/web_outputs")
    gif_det = out_dir / "session2_vi_greedy_traj_det.gif"
    gif_slip = out_dir / "session2_vi_greedy_traj_slip.gif"

    save_trajectory_gif(
        env_det,
        ep_det["states"],
        gif_det,
        title="Greedy rollout evolution (deterministic)",
        show_after_save=True,
    )
    save_trajectory_gif(
        env_slip,
        ep_slip["states"],
        gif_slip,
        title="Greedy rollout evolution (slippery)",
        show_after_save=True,
    )

except Exception as e:
    print(f"PI trajectory GIF export skipped: {e}")


## Policy Evaluation + Policy Iteration

### Short theory recap (Sutton & Barto, Ch. 4)

Policy Iteration alternates between two operators:

1. **Policy Evaluation**: compute $V^{\pi}$ for the current policy $\pi$.
2. **Policy Improvement**: update policy greedily with respect to the evaluated value function.

Policy evaluation uses the Bellman expectation equation:
$$
V^{\pi}(s)=\sum_a \pi(a\mid s)\sum_{s',r}p(s',r\mid s,a)\left[r+\gamma V^{\pi}(s')\right]
$$

Policy improvement is greedy:
$$
\pi_{new}(s)\in\argmax_a \sum_{s',r}p(s',r\mid s,a)\left[r+\gamma V^{\pi}(s')\right]
$$

With finite tabular MDPs, repeated evaluation + improvement converges to an optimal policy.


### Task 3
**Implement Policy Iteration (10-20 min)**

Goal:
- implement iterative policy evaluation,
- implement greedy policy improvement,
- run full policy iteration until the policy is stable.


In [None]:
def policy_evaluation(
    mdp: TabularMDP,
    policy: np.ndarray,
    gamma: float = 0.95,
    theta: float = 1e-10,
    max_sweeps: int = 50_000,
):
    V = np.zeros(mdp.nS, dtype=float)
    deltas = []

    for sweep in range(1, max_sweeps + 1):
        V_old = V.copy()
        delta = 0.0

        for s in range(mdp.nS):
            if s in mdp.terminal_states:
                V[s] = 0.0
                continue

            v_new = 0.0
            for a in range(mdp.nA):
                pi_sa = policy[s, a]
                if pi_sa == 0.0:
                    continue
                for p, s_next, r, done in mdp.P[s][a]:
                    v_new += pi_sa * p * (r + gamma * (0.0 if done else V_old[s_next]))

            V[s] = v_new
            delta = max(delta, abs(V[s] - V_old[s]))

        deltas.append(delta)
        if delta < theta:
            return V, sweep, deltas

    return V, max_sweeps, deltas


def greedy_policy_from_values(mdp: TabularMDP, V: np.ndarray, gamma: float = 0.95) -> np.ndarray:
    policy = np.zeros((mdp.nS, mdp.nA), dtype=float)

    for s in range(mdp.nS):
        if s in mdp.terminal_states:
            policy[s, :] = 1.0 / mdp.nA
            continue

        q_vals = np.zeros(mdp.nA, dtype=float)
        for a in range(mdp.nA):
            for p, s_next, r, done in mdp.P[s][a]:
                q_vals[a] += p * (r + gamma * (0.0 if done else V[s_next]))

        best_a = int(np.argmax(q_vals))
        policy[s, best_a] = 1.0

    return policy


def policy_iteration(
    mdp: TabularMDP,
    gamma: float = 0.95,
    eval_theta: float = 1e-10,
    max_outer_loops: int = 1_000,
    return_history: bool = False,
):
    policy = np.ones((mdp.nS, mdp.nA), dtype=float) / mdp.nA

    history = []
    eval_sweeps_list = []
    eval_delta_curves = []

    for outer in range(1, max_outer_loops + 1):
        V, eval_sweeps, eval_deltas = policy_evaluation(
            mdp,
            policy,
            gamma=gamma,
            theta=eval_theta,
        )

        eval_sweeps_list.append(eval_sweeps)
        eval_delta_curves.append(eval_deltas)

        improved_policy = greedy_policy_from_values(mdp, V, gamma=gamma)

        stable = np.array_equal(np.argmax(policy, axis=1), np.argmax(improved_policy, axis=1))

        if return_history:
            history.append(
                {
                    "outer": outer,
                    "V": V.copy(),
                    "policy": improved_policy.copy(),
                    "eval_sweeps": eval_sweeps,
                    "eval_deltas": eval_deltas,
                    "stable": stable,
                }
            )

        policy = improved_policy
        if stable:
            break

    result = {
        "policy": policy,
        "V": V,
        "outer_loops": outer,
        "eval_sweeps": eval_sweeps_list,
        "eval_delta_curves": eval_delta_curves,
    }
    if return_history:
        result["history"] = history
    return result


pi_det = policy_iteration(mdp_det, gamma=0.95, eval_theta=1e-10, return_history=True)
pi_slip = policy_iteration(mdp_slip, gamma=0.95, eval_theta=1e-10, return_history=True)

print(f"Policy Iteration (deterministic): outer_loops={pi_det['outer_loops']}, eval_sweeps_total={sum(pi_det['eval_sweeps'])}")
print(f"Policy Iteration (slippery):      outer_loops={pi_slip['outer_loops']}, eval_sweeps_total={sum(pi_slip['eval_sweeps'])}")


### Policy Iteration visual diagnostics

We visualize:
- final policy and value maps,
- policy-iteration progress across outer loops,
- policy-evaluation sweep counts per outer iteration,
- policy-evaluation convergence curves.


In [None]:
def plot_policy_iteration_summary(mdp: TabularMDP, result: dict, title_prefix: str = "PI"):
    plot_value_heatmap(mdp, result["V"], title=f"{title_prefix} value heatmap")
    final_actions = np.argmax(result["policy"], axis=1)
    render_policy_arrows(mdp, final_actions, title=f"{title_prefix} greedy policy")

    plt.figure(figsize=(6, 3.5))
    plt.plot(result["eval_sweeps"], marker="o")
    plt.xlabel("Outer loop")
    plt.ylabel("Policy-evaluation sweeps")
    plt.title(f"{title_prefix} evaluation sweeps per outer loop")
    plt.grid(True, alpha=0.3)
    plt.tight_layout()
    plt.show()

    plt.figure(figsize=(6, 3.5))
    for i, curve in enumerate(result["eval_delta_curves"], start=1):
        plt.plot(curve, label=f"outer={i}")
    plt.yscale("log")
    plt.xlabel("Evaluation sweep")
    plt.ylabel("max update delta")
    plt.title(f"{title_prefix} policy-evaluation convergence")
    plt.grid(True, alpha=0.3)
    if len(result["eval_delta_curves"]) <= 8:
        plt.legend(loc="best", fontsize=8)
    plt.tight_layout()
    plt.show()


plot_policy_iteration_summary(mdp_det, pi_det, title_prefix="PI | deterministic")
plot_policy_iteration_summary(mdp_slip, pi_slip, title_prefix="PI | slippery")


### Policy Iteration snapshots and GIF

We capture intermediate policies and values across outer loops to show how policy improvement progresses.


In [None]:
from IPython.display import Image, display


def plot_policy_iteration_snapshots(mdp: TabularMDP, history: list, title_prefix: str = "PI snapshot"):
    if not history:
        print("No policy-iteration history available.")
        return

    idxs = sorted(set([0, len(history)//2, len(history)-1]))
    for idx in idxs:
        h = history[idx]
        outer = h["outer"]
        V = h["V"]
        actions = np.argmax(h["policy"], axis=1)
        plot_value_heatmap(mdp, V, title=f"{title_prefix} value | outer={outer}")
        render_policy_arrows(mdp, actions, title=f"{title_prefix} policy | outer={outer}")


def save_policy_iteration_gif(
    mdp: TabularMDP,
    history: list,
    gif_path: Path,
    title_prefix: str = "Policy Iteration",
    show_after_save: bool = True,   # <- new
):
    if not history:
        print("No history available; GIF was not created.")
        return

    from matplotlib.animation import FuncAnimation, PillowWriter

    gif_path.parent.mkdir(parents=True, exist_ok=True)

    value_grids = [values_to_grid(mdp, h["V"]) for h in history]
    action_grids = []
    for h in history:
        a = np.argmax(h["policy"], axis=1)
        g = np.full(mdp.grid_chars.shape, -1, dtype=int)
        for s, (r, c) in mdp.state_to_pos.items():
            if mdp.grid_chars[r, c] not in ("G", "H"):
                g[r, c] = a[s]
        action_grids.append(g)

    vec = {
        UP: (0.0, -0.35),
        RIGHT: (0.35, 0.0),
        DOWN: (0.0, 0.35),
        LEFT: (-0.35, 0.0),
    }

    fig, ax = plt.subplots(figsize=(5, 5))
    im = ax.imshow(value_grids[0], cmap="viridis")
    txt = ax.set_title(f"{title_prefix} | outer=1")
    ax.set_xticks(range(value_grids[0].shape[1]))
    ax.set_yticks(range(value_grids[0].shape[0]))
    _draw_cell_grid(ax, value_grids[0].shape[0], value_grids[0].shape[1])  # optional nice alignment

    q = None

    def update(frame_idx):
        nonlocal q
        grid = value_grids[frame_idx]
        actions = action_grids[frame_idx]
        im.set_data(grid)

        if q is not None:
            q.remove()

        X, Y, U, V = [], [], [], []
        for r in range(actions.shape[0]):
            for c in range(actions.shape[1]):
                if mdp.grid_chars[r, c] in ("#", "G", "H"):
                    continue
                a = actions[r, c]
                if a < 0:
                    continue
                u, v = vec[a]
                X.append(c); Y.append(r); U.append(u); V.append(v)

        if X:
            q = ax.quiver(X, Y, U, V, angles="xy", scale_units="xy", scale=1, color="white", width=0.008)

        outer = history[frame_idx]["outer"]
        txt.set_text(f"{title_prefix} | outer={outer}")
        return (im, txt)

    ani = FuncAnimation(fig, update, frames=len(history), interval=550, blit=False)
    ani.save(gif_path, writer=PillowWriter(fps=2))
    plt.close(fig)
    print(f"Saved GIF: {gif_path}")

    if show_after_save:
        display(Image(filename=str(gif_path)))


plot_policy_iteration_snapshots(mdp_det, pi_det["history"], title_prefix="PI deterministic")
plot_policy_iteration_snapshots(mdp_slip, pi_slip["history"], title_prefix="PI slippery")

try:
    out_dir = Path("notebooks/sessions/session_02_mdp_dynamic_programming/assets/web_outputs")
    save_policy_iteration_gif(
        mdp_det, pi_det["history"], out_dir / "session2_pi_evolution_det.gif",
        title_prefix="Policy Iteration deterministic", show_after_save=True
    )
    save_policy_iteration_gif(
        mdp_slip, pi_slip["history"], out_dir / "session2_pi_evolution_slip.gif",
        title_prefix="Policy Iteration slippery", show_after_save=True
    )
except Exception as e:
    print(f"PI GIF export skipped: {e}")


### Policy Iteration experiment A: `gamma` sensitivity

What we analyze:
- number of outer loops,
- total policy-evaluation sweeps,
- value at the start state,
- policy agreement relative to $\gamma=0.95$.


In [None]:
gammas_pi = [0.70, 0.85, 0.95, 0.99]
rows_pi = []
policies_pi = {}

for g in gammas_pi:
    out = policy_iteration(mdp_det, gamma=g, eval_theta=1e-10, return_history=False)
    start_v = out["V"][mdp_det.start_state]
    rows_pi.append((g, out["outer_loops"], int(sum(out["eval_sweeps"])), start_v))
    policies_pi[g] = np.argmax(out["policy"], axis=1)

print("gamma | outer_loops | eval_sweeps_total | V(start)")
for g, ol, es, vs in rows_pi:
    print(f"{g:4.2f} | {ol:11d} | {es:17d} | {vs:8.3f}")

print("\nPolicy agreement vs gamma=0.95")
base = policies_pi[0.95]
for g in gammas_pi:
    agree = compare_policies(base, policies_pi[g], mdp_det.terminal_states)
    print(f"gamma={g:4.2f} -> agreement={agree*100:6.2f}%")

plt.figure(figsize=(6, 3.5))
plt.plot([r[0] for r in rows_pi], [r[3] for r in rows_pi], marker="o")
plt.xlabel("gamma")
plt.ylabel("V(start)")
plt.title("Policy Iteration gamma sensitivity")
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()


### Policy Iteration experiment B: evaluation-threshold sensitivity

What we analyze:
- how `eval_theta` affects computational effort,
- whether the final greedy policy changes.


In [None]:
eval_thetas = [1e-4, 1e-6, 1e-8, 1e-10]
rows_theta = []
pol_by_theta = {}

for th in eval_thetas:
    out = policy_iteration(mdp_det, gamma=0.95, eval_theta=th, return_history=False)
    actions = np.argmax(out["policy"], axis=1)
    rows_theta.append((th, out["outer_loops"], int(sum(out["eval_sweeps"]))))
    pol_by_theta[th] = actions

print("eval_theta | outer_loops | eval_sweeps_total")
for th, ol, es in rows_theta:
    print(f"{th:>9.0e} | {ol:11d} | {es:17d}")

base = pol_by_theta[1e-10]
print("\nPolicy agreement vs eval_theta=1e-10")
for th in eval_thetas:
    agree = compare_policies(base, pol_by_theta[th], mdp_det.terminal_states)
    print(f"eval_theta={th:>9.0e} -> agreement={agree*100:6.2f}%")

plt.figure(figsize=(6, 3.5))
plt.plot([r[0] for r in rows_theta], [r[2] for r in rows_theta], marker="o")
plt.xscale("log")
plt.xlabel("eval_theta")
plt.ylabel("total evaluation sweeps")
plt.title("Policy Iteration threshold sensitivity")
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()


### Policy Iteration example: greedy-policy rollouts

We run episodes with the final PI policy in deterministic and slippery environments.


In [None]:
pi_actions_det = np.argmax(pi_det["policy"], axis=1)
pi_actions_slip = np.argmax(pi_slip["policy"], axis=1)

mean_ret_det_pi, succ_det_pi = evaluate_greedy_policy(env_det, pi_actions_det, n_episodes=200)
mean_ret_slip_pi, succ_slip_pi = evaluate_greedy_policy(env_slip, pi_actions_slip, n_episodes=200)

print(f"PI deterministic policy: mean_return={mean_ret_det_pi:.3f}, success_rate={succ_det_pi*100:.1f}%")
print(f"PI slippery policy:      mean_return={mean_ret_slip_pi:.3f}, success_rate={succ_slip_pi*100:.1f}%")

ep_det_pi = rollout_with_greedy_actions(env_det, pi_actions_det, max_steps=60)
ep_slip_pi = rollout_with_greedy_actions(env_slip, pi_actions_slip, max_steps=60)

# plot_trajectory(env_det, ep_det_pi["states"], title="Greedy rollout (PI) | deterministic")
# plot_trajectory(env_slip, ep_slip_pi["states"], title="Greedy rollout (PI) | slippery")

try:
    out_dir = Path("notebooks/sessions/session_02_mdp_dynamic_programming/assets/web_outputs")
    gif_det = out_dir / "session2_pi_greedy_traj_det.gif"
    gif_slip = out_dir / "session2_pi_greedy_traj_slip.gif"

    # Assuming your save_trajectory_gif has show_after_save=True support
    save_trajectory_gif(
        env_det,
        ep_det_pi["states"],
        gif_det,
        title="Greedy rollout evolution (PI deterministic)",
        show_after_save=True,
    )
    save_trajectory_gif(
        env_slip,
        ep_slip_pi["states"],
        gif_slip,
        title="Greedy rollout evolution (PI slippery)",
        show_after_save=True,
    )

    # Fallback explicit display (works even if save_trajectory_gif ignores show_after_save)
    display(Image(filename=str(gif_det)))
    display(Image(filename=str(gif_slip)))

except Exception as e:
    print(f"PI trajectory GIF export skipped: {e}")


## Large-map experiments

Before switching to Gym/FrozenLake, we stress-test Value Iteration and Policy Iteration on larger custom GridWorld maps.

Focus:
- scalability with increasing state count,
- policy quality and agreement,
- runtime/sweep trade-offs,
- more visual intuition on complex layouts.


### Task 4 (10-20 min) - Scale to larger maps and benchmark VI vs PI

We create multiple larger maps and run both algorithms under the same hyperparameters.


In [None]:
import time
large_maps = {
    "maze_8x8": [
        "S..#....",
        ".##.#H#.",
        "...#.#..",
        ".#...##.",
        ".#.H....",
        ".###.#..",
        "...#..#.",
        ".H....G.",
    ],
    "corridor_10x10": [
        "S...#.....",
        ".##.#.###.",
        ".#..#...#.",
        ".#.###.#..",
        ".H....#.#.",
        "###.#.#.#.",
        "...#...#..",
        ".#.###.##.",
        ".#...H...#",
        "...##...G.",
    ],
    "rooms_12x12": [
        "S...#......G",
        ".##.#..##H#H",
        "...#.#...#..",
        ".#.#.###.#..",
        ".#...#......",
        ".###.#H###..",
        "...#...#....",
        ".#.###.#.##.",
        ".#...H.#...#",
        ".###H#.###.#",
        "...#...#....",
        ".H...#...#..",
    ],
}



def make_large_env(char_map, slip_prob=0.10, step_reward=-1.0, goal_reward=12.0, hole_reward=-12.0):
    cfg = GridWorldConfig(
        char_map=char_map,
        step_reward=step_reward,
        goal_reward=goal_reward,
        hole_reward=hole_reward,
        slip_prob=slip_prob,
    )
    return GridWorldEnv(cfg)


def benchmark_vi_pi_on_env(env: GridWorldEnv, gamma: float = 0.95, theta: float = 1e-10):
    mdp = env.as_mdp()

    t0 = time.perf_counter()
    vi = value_iteration(mdp, gamma=gamma, theta=theta, return_history=True)
    t_vi_ms = (time.perf_counter() - t0) * 1e3

    t0 = time.perf_counter()
    pi = policy_iteration(mdp, gamma=gamma, eval_theta=theta, return_history=True)
    t_pi_ms = (time.perf_counter() - t0) * 1e3

    vi_actions = vi["greedy_actions"]
    pi_actions = np.argmax(pi["policy"], axis=1)

    agreement = compare_policies(vi_actions, pi_actions, mdp.terminal_states)
    value_linf = float(np.max(np.abs(vi["V"] - pi["V"])))

    vi_ret, vi_succ = evaluate_greedy_policy(env, vi_actions, n_episodes=200)
    pi_ret, pi_succ = evaluate_greedy_policy(env, pi_actions, n_episodes=200)

    return {
        "mdp": mdp,
        "vi": vi,
        "pi": pi,
        "t_vi_ms": t_vi_ms,
        "t_pi_ms": t_pi_ms,
        "value_linf": value_linf,
        "policy_agreement": agreement,
        "vi_mean_return": vi_ret,
        "vi_success_rate": vi_succ,
        "pi_mean_return": pi_ret,
        "pi_success_rate": pi_succ,
    }


**Run VI vs PI on all larger maps**

Metrics:
- states/actions,
- VI and PI runtime,
- VI sweeps,
- PI outer loops and total evaluation sweeps,
- value-function $L_\infty$ difference,
- greedy-policy agreement,
- rollout return and success rate.


In [None]:
large_results = {}

for name, char_map in large_maps.items():
    env = make_large_env(char_map, slip_prob=0.0,step_reward=-0.1) # Negative -1 would go to hole because its less loss than reaching goal, so we use smaller step penalty to encourage reaching goal
    
    res = benchmark_vi_pi_on_env(env, gamma=0.95, theta=1e-10)
    large_results[name] = {"env": env, **res}

print(
    "map | nS | VI_ms | PI_ms | VI_sweeps | PI_outer | PI_eval_total | L_inf(VI-PI) | policy_agree | VI_succ | PI_succ"
)
for name, data in large_results.items():
    mdp = data["mdp"]
    vi = data["vi"]
    pi = data["pi"]
    print(
        f"{name:12s} | {mdp.nS:3d} | {data['t_vi_ms']:6.1f} | {data['t_pi_ms']:6.1f} | "
        f"{vi['sweeps']:9d} | {pi['outer_loops']:8d} | {sum(pi['eval_sweeps']):13d} | "
        f"{data['value_linf']:11.2e} | {data['policy_agreement']*100:11.1f}% | "
        f"{data['vi_success_rate']*100:6.1f}% | {data['pi_success_rate']*100:6.1f}%"
    )


**Experiment L3 - Visual deep dive on selected maps**
We inspect two maps in detail:
- value heatmaps,
- greedy policies,
- convergence diagnostics,
- policy-iteration snapshots.


In [None]:
def showcase_pi_collage(name: str, data: dict):
    mdp = data["mdp"]
    pi = data["pi"]
    pi_actions = np.argmax(pi["policy"], axis=1)

    fig, axes = plt.subplots(1, 3, figsize=(15, 5))
    fig.suptitle(f"Policy Iteration Showcase | {name}", fontsize=16, y=1.02)

    map_arr = _map_code_array(mdp.grid_chars)
    map_cmap = ListedColormap(["black", "white", "#A7D3F5", "#B7E4C7", "#F8B4B4"])

    axes[0].imshow(map_arr, cmap=map_cmap, vmin=0, vmax=4)
    axes[0].set_title("Map layout", fontsize=11)
    axes[0].set_xticks(range(map_arr.shape[1]))
    axes[0].set_yticks(range(map_arr.shape[0]))
    _draw_cell_grid(axes[0], map_arr.shape[0], map_arr.shape[1])
    for r in range(map_arr.shape[0]):
        for c in range(map_arr.shape[1]):
            txt = " " if mdp.grid_chars[r, c] in (".", "F") else mdp.grid_chars[r, c]
            axes[0].text(c, r, txt, ha="center", va="center", color="black", fontsize=10, fontweight="bold")

    im_pi = _draw_value_on_axis(axes[1], mdp, pi["V"], "PI final values")
    _draw_policy_on_axis(axes[2], mdp, pi_actions, "PI greedy policy")

    fig.colorbar(im_pi, ax=axes[1], fraction=0.046, pad=0.04)
    plt.tight_layout()
    plt.show()


selected_maps = ["maze_8x8", "corridor_10x10", "rooms_12x12"]

for name in selected_maps:
    data = large_results[name]
    env = data["env"]
    pi = data["pi"]

    print(f"\n=== {name} | Policy Iteration ===")
    showcase_pi_collage(name, data)

    out_dir = Path("notebooks/sessions/session_02_mdp_dynamic_programming/assets/web_outputs")
    out_dir.mkdir(parents=True, exist_ok=True)

    ep_pi = rollout_with_greedy_actions(env, np.argmax(pi["policy"], axis=1), max_steps=220)
    gif_pi = out_dir / f"session2_showcase_{name}_pi_traj.gif"

    save_trajectory_gif(
        env,
        ep_pi["states"],
        gif_pi,
        title=f"PI greedy rollout ({name})",
        trail_len=22,
        show_after_save=False,
    )

    print(f"Saved: {gif_pi}")
    display(Image(filename=str(gif_pi)))


In [None]:
def _ensure_large_showcase_assets(name: str, data: dict, out_dir: Path):
    """Generate all large-map GIF assets for one map."""
    env = data["env"]
    mdp = data["mdp"]
    vi = data["vi"]
    pi = data["pi"]

    p_vi_value = out_dir / f"session2_large_{name}_vi_value.gif"
    p_pi_evo = out_dir / f"session2_large_{name}_pi_evolution.gif"
    p_vi_traj = out_dir / f"session2_large_{name}_vi_traj.gif"
    p_pi_traj = out_dir / f"session2_large_{name}_pi_traj.gif"

    save_vi_value_gif(mdp, vi["history"], p_vi_value, title_prefix=f"VI value evolution ({name})", show_after_save=False)
    save_policy_iteration_gif(mdp, pi["history"], p_pi_evo, title_prefix=f"PI evolution ({name})", show_after_save=False)

    ep_vi = rollout_with_greedy_actions(env, vi["greedy_actions"], max_steps=220)
    ep_pi = rollout_with_greedy_actions(env, np.argmax(pi["policy"], axis=1), max_steps=220)

    save_trajectory_gif(env, ep_vi["states"], p_vi_traj, title=f"VI greedy rollout ({name})", trail_len=22, show_after_save=False)
    save_trajectory_gif(env, ep_pi["states"], p_pi_traj, title=f"PI greedy rollout ({name})", trail_len=22, show_after_save=False)

    return {
        "vi_value": p_vi_value,
        "pi_evolution": p_pi_evo,
        "vi_traj": p_vi_traj,
        "pi_traj": p_pi_traj,
    }


def show_large_map_merged_showcase(selected_maps, large_results):
    out_dir = Path("notebooks/sessions/session_02_mdp_dynamic_programming/assets/web_outputs")
    out_dir.mkdir(parents=True, exist_ok=True)

    for name in selected_maps:
        data = large_results[name]
        print(f"\n=== {name} ===")

        # Reuse compact static collage helper
        showcase_pi_collage(name, data)

        paths = _ensure_large_showcase_assets(name, data, out_dir)

        print("Evolution GIFs")
        display(Image(filename=str(paths["vi_value"])))
        display(Image(filename=str(paths["pi_evolution"])))

        print("Trajectory showcase GIFs")
        display(Image(filename=str(paths["vi_traj"])))
        display(Image(filename=str(paths["pi_traj"])))

    print(f"\nSaved large-map GIFs into: {out_dir}")


selected_maps = ["maze_8x8", "corridor_10x10", "rooms_12x12"]
show_large_map_merged_showcase(selected_maps, large_results)


### Experiment L5 - Robustness stress-test (stochasticity sweep)

We vary `slip_prob` on one larger map and compare policy quality for VI/PI.


In [None]:
stress_map_name = "rooms_12x12"
stress_map = large_maps[stress_map_name]
slips = [0.00, 0.05, 0.10, 0.20, 0.30]

rows = []
for sp in slips:
    env = make_large_env(stress_map, slip_prob=sp)
    data = benchmark_vi_pi_on_env(env, gamma=0.95, theta=1e-10)
    rows.append((sp, data["vi_success_rate"], data["pi_success_rate"], data["policy_agreement"], data["t_vi_ms"], data["t_pi_ms"]))

print("slip_prob | VI_success | PI_success | policy_agreement | VI_ms | PI_ms")
for sp, vi_s, pi_s, ag, tvi, tpi in rows:
    print(f"{sp:8.2f} | {vi_s*100:9.1f}% | {pi_s*100:9.1f}% | {ag*100:15.1f}% | {tvi:5.1f} | {tpi:5.1f}")

plt.figure(figsize=(6, 3.5))
plt.plot([r[0] for r in rows], [r[1] for r in rows], marker="o", label="VI success")
plt.plot([r[0] for r in rows], [r[2] for r in rows], marker="s", label="PI success")
plt.xlabel("slip_prob")
plt.ylabel("success rate")
plt.title(f"Robustness to stochasticity | {stress_map_name}")
plt.grid(True, alpha=0.3)
plt.legend()
plt.tight_layout()
plt.show()


## Part 4 - FrozenLake validation and Gymnasium interface

This section moves the same DP ideas from custom GridWorld into a standard Gymnasium environment: **FrozenLake-v1**.

Goal:
- show how Gymnasium environments are structured,
- convert FrozenLake into tabular MDP form,
- run Value Iteration and Policy Iteration,
- compare deterministic and slippery dynamics.


### Gymnasium quick tips (interface essentials)

Core API pattern:
- `env = gym.make(...)`
- `obs, info = env.reset(seed=...)`
- `obs, reward, terminated, truncated, info = env.step(action)`

Important fields:
- `obs`: current observation (for FrozenLake: discrete state id)
- `reward`: scalar reward from the transition
- `terminated`: true terminal condition of the task
- `truncated`: episode stopped by external limit (e.g., time limit)
- `info`: optional debug metadata

For tabular planning in toy-text environments, `env.unwrapped.P` exposes transition dynamics in the form:
- `P[s][a] = [(prob, s_next, reward, done), ...]`

FrozenLake action indexing in Gymnasium:
- `0: Left`, `1: Down`, `2: Right`, `3: Up`


In [None]:
try:
    import gymnasium as gym
except Exception:
    gym = None

if gym is None:
    print("Gymnasium is not installed. Install with: pip install gymnasium[toy-text]")
else:
    print("Gymnasium import successful.")


### Task 5 (10-20 min) - Transfer to FrozenLake with Gymnasium

We build an adapter from Gym's transition dictionary to `TabularMDP`.


In [None]:
GYM_ACTION_SYMBOLS = {0: "←", 1: "↓", 2: "→", 3: "↑"}
GYM_ACTION_VECTORS = {
    0: (-0.35, 0.0),   # LEFT
    1: (0.0, 0.35),    # DOWN
    2: (0.35, 0.0),    # RIGHT
    3: (0.0, -0.35),   # UP
}


def frozenlake_desc_to_grid(desc) -> np.ndarray:
    # desc is byte array in Gymnasium toy-text envs
    arr = np.array(
        [[ch.decode("utf-8") if isinstance(ch, (bytes, bytearray)) else str(ch) for ch in row] for row in desc],
        dtype="<U1",
    )
    return arr


def mdp_from_frozenlake_env(env) -> TabularMDP:
    p_raw = env.unwrapped.P
    nS = env.observation_space.n
    nA = env.action_space.n

    desc = frozenlake_desc_to_grid(env.unwrapped.desc)
    H, W = desc.shape

    state_to_pos = {s: (s // W, s % W) for s in range(nS)}
    pos_to_state = {(s // W, s % W): s for s in range(nS)}

    terminal_states = set()
    start_state = 0

    for s, (r, c) in state_to_pos.items():
        cell = desc[r, c]
        if cell in ("H", "G"):
            terminal_states.add(s)
        if cell == "S":
            start_state = s

    P = {s: {a: [] for a in range(nA)} for s in range(nS)}
    for s in range(nS):
        for a in range(nA):
            for prob, s_next, reward, done in p_raw[s][a]:
                P[s][a].append((float(prob), int(s_next), float(reward), bool(done)))

    return TabularMDP(
        nS=nS,
        nA=nA,
        P=P,
        state_to_pos=state_to_pos,
        pos_to_state=pos_to_state,
        terminal_states=terminal_states,
        start_state=start_state,
        grid_chars=desc,
    )


def render_frozenlake_map(mdp: TabularMDP, title: str = "FrozenLake map"):
    code_map = {"S": 0, "F": 1, "H": 2, "G": 3}
    arr = np.vectorize(code_map.get)(mdp.grid_chars)
    cmap = ListedColormap(["#A7D3F5", "#EAF6FF", "#F8B4B4", "#B7E4C7"])

    fig, ax = plt.subplots(figsize=(5, 5))
    ax.imshow(arr, cmap=cmap, vmin=0, vmax=3)
    ax.set_title(title)
    ax.set_xticks(range(arr.shape[1]))
    ax.set_yticks(range(arr.shape[0]))
    _draw_cell_grid(ax, arr.shape[0], arr.shape[1])

    for r in range(arr.shape[0]):
        for c in range(arr.shape[1]):
            ax.text(c, r, mdp.grid_chars[r, c], ha="center", va="center", color="black", fontsize=12, fontweight="bold")

    plt.tight_layout()
    plt.show()


def render_frozenlake_policy(mdp: TabularMDP, actions: np.ndarray, title: str = "FrozenLake policy"):
    bg = np.zeros(mdp.grid_chars.shape)
    fig, ax = plt.subplots(figsize=(5, 5))
    ax.imshow(bg, cmap=ListedColormap(["#f7f7f7"]))
    ax.set_title(title)
    ax.set_xticks(range(bg.shape[1]))
    ax.set_yticks(range(bg.shape[0]))
    _draw_cell_grid(ax, bg.shape[0], bg.shape[1])

    for s, (r, c) in mdp.state_to_pos.items():
        cell = mdp.grid_chars[r, c]
        if cell in ("H", "G"):
            txt = cell
        elif cell == "F":
            txt = " "
        else:
            txt = GYM_ACTION_SYMBOLS[int(actions[s])]
        ax.text(c, r, txt, ha="center", va="center", color="black", fontsize=12, fontweight="bold")

    plt.tight_layout()
    plt.show()


def save_frozenlake_policy_iteration_gif(
    mdp: TabularMDP,
    history: list,
    gif_path: Path,
    title_prefix: str = "FrozenLake PI",
    show_after_save: bool = True,
):
    if not history:
        print("No history available; GIF was not created.")
        return

    from matplotlib.animation import FuncAnimation, PillowWriter

    gif_path.parent.mkdir(parents=True, exist_ok=True)
    value_grids = [values_to_grid(mdp, h["V"]) for h in history]

    # policy per outer iteration
    action_grids = []
    for h in history:
        a = np.argmax(h["policy"], axis=1)
        g = np.full(mdp.grid_chars.shape, -1, dtype=int)
        for s, (r, c) in mdp.state_to_pos.items():
            if mdp.grid_chars[r, c] not in ("G", "H"):
                g[r, c] = a[s]
        action_grids.append(g)

    fig, ax = plt.subplots(figsize=(5, 5))
    im = ax.imshow(value_grids[0], cmap="viridis")
    txt = ax.set_title(f"{title_prefix} | outer=1")
    ax.set_xticks(range(value_grids[0].shape[1]))
    ax.set_yticks(range(value_grids[0].shape[0]))
    _draw_cell_grid(ax, value_grids[0].shape[0], value_grids[0].shape[1])

    q = None

    def update(frame_idx):
        nonlocal q
        im.set_data(value_grids[frame_idx])

        if q is not None:
            q.remove()

        X, Y, U, V = [], [], [], []
        actions = action_grids[frame_idx]
        for r in range(actions.shape[0]):
            for c in range(actions.shape[1]):
                if mdp.grid_chars[r, c] in ("#", "G", "H"):
                    continue
                a = actions[r, c]
                if a < 0:
                    continue
                u, v = GYM_ACTION_VECTORS[a]
                X.append(c); Y.append(r); U.append(u); V.append(v)

        if X:
            q = ax.quiver(X, Y, U, V, angles="xy", scale_units="xy", scale=1, color="white", width=0.008)

        outer = history[frame_idx]["outer"]
        txt.set_text(f"{title_prefix} | outer={outer}")
        return (im, txt)

    ani = FuncAnimation(fig, update, frames=len(history), interval=550, blit=False)
    ani.save(gif_path, writer=PillowWriter(fps=2))
    plt.close(fig)
    print(f"Saved GIF: {gif_path}")

    if show_after_save:
        display(Image(filename=str(gif_path)))


### FrozenLake experiment A: deterministic vs slippery (4x4)

We compare:
- `is_slippery=False` (deterministic transitions)
- `is_slippery=True` (stochastic transitions)

for both Value Iteration and Policy Iteration.


In [None]:
def run_vi_pi_on_frozenlake(map_name: str = "4x4", is_slippery: bool = True, gamma: float = 0.99, theta: float = 1e-10):
    env = gym.make("FrozenLake-v1", map_name=map_name, is_slippery=is_slippery)
    mdp = mdp_from_frozenlake_env(env)

    t0 = time.perf_counter()
    vi = value_iteration(mdp, gamma=gamma, theta=theta, return_history=True)
    t_vi_ms = (time.perf_counter() - t0) * 1e3

    t0 = time.perf_counter()
    pi = policy_iteration(mdp, gamma=gamma, eval_theta=theta, return_history=True)
    t_pi_ms = (time.perf_counter() - t0) * 1e3

    vi_actions = vi["greedy_actions"]
    pi_actions = np.argmax(pi["policy"], axis=1)

    agreement = compare_policies(vi_actions, pi_actions, mdp.terminal_states)
    value_linf = float(np.max(np.abs(vi["V"] - pi["V"])))

    # Quick PI consistency check: final policy should be greedy w.r.t. returned V
    pi_greedy_from_V = np.argmax(greedy_policy_from_values(mdp, pi["V"], gamma=gamma), axis=1)
    pi_consistent = np.array_equal(pi_actions, pi_greedy_from_V)

    return {
        "env": env,
        "mdp": mdp,
        "vi": vi,
        "pi": pi,
        "t_vi_ms": t_vi_ms,
        "t_pi_ms": t_pi_ms,
        "agreement": agreement,
        "value_linf": value_linf,
        "pi_consistent": pi_consistent,
    }


def evaluate_policy_in_gym(env, actions: np.ndarray, n_episodes: int = 500, max_steps: int = 200, seed: int = 123):
    returns = []
    success = 0

    for ep in range(n_episodes):
        obs, _ = env.reset(seed=seed + ep)
        total_r = 0.0

        for _ in range(max_steps):
            a = int(actions[obs])
            obs, reward, terminated, truncated, _ = env.step(a)
            total_r += reward
            if terminated or truncated:
                if reward > 0:
                    success += 1
                break

        returns.append(total_r)

    return float(np.mean(returns)), success / n_episodes


if gym is None:
    print("Skip: Gymnasium not available.")
else:
    fl_det = run_vi_pi_on_frozenlake(map_name="4x4", is_slippery=False)
    fl_slip = run_vi_pi_on_frozenlake(map_name="4x4", is_slippery=True)

    for name, data in [("4x4 deterministic", fl_det), ("4x4 slippery", fl_slip)]:
        mdp = data["mdp"]
        vi = data["vi"]
        pi = data["pi"]

        print(f"\n{name}")
        print(f"  nS={mdp.nS}, VI_sweeps={vi['sweeps']}, PI_outer={pi['outer_loops']}, PI_eval_total={sum(pi['eval_sweeps'])}")
        print(f"  VI_ms={data['t_vi_ms']:.2f}, PI_ms={data['t_pi_ms']:.2f}")
        print(f"  L_inf(VI-PI)={data['value_linf']:.3e}, policy_agreement={data['agreement']*100:.1f}%")
        print(f"  PI greedy-consistency check: {data['pi_consistent']}")

        vi_ret, vi_succ = evaluate_policy_in_gym(data["env"], vi["greedy_actions"], n_episodes=500)
        pi_ret, pi_succ = evaluate_policy_in_gym(data["env"], np.argmax(pi["policy"], axis=1), n_episodes=500)
        print(f"  VI eval: mean_return={vi_ret:.3f}, success_rate={vi_succ*100:.1f}%")
        print(f"  PI eval: mean_return={pi_ret:.3f}, success_rate={pi_succ*100:.1f}%")

        render_frozenlake_map(mdp, title=f"FrozenLake map | {name}")
        plot_value_heatmap(mdp, vi["V"], title=f"VI value heatmap | {name}")
        render_frozenlake_policy(mdp, vi["greedy_actions"], title=f"VI greedy policy | {name}")
        render_frozenlake_policy(mdp, np.argmax(pi["policy"], axis=1), title=f"PI greedy policy | {name}")


### FrozenLake experiment B: 8x8 slippery

This gives a larger Gym benchmark with the same algorithms.


In [None]:
if gym is None:
    print("Skip: Gymnasium not available.")
else:
    fl8 = run_vi_pi_on_frozenlake(map_name="8x8", is_slippery=True, gamma=0.99, theta=1e-10)

    mdp = fl8["mdp"]
    vi = fl8["vi"]
    pi = fl8["pi"]

    print("8x8 slippery")
    print(f"  nS={mdp.nS}, VI_sweeps={vi['sweeps']}, PI_outer={pi['outer_loops']}, PI_eval_total={sum(pi['eval_sweeps'])}")
    print(f"  VI_ms={fl8['t_vi_ms']:.2f}, PI_ms={fl8['t_pi_ms']:.2f}")
    print(f"  L_inf(VI-PI)={fl8['value_linf']:.3e}, policy_agreement={fl8['agreement']*100:.1f}%")

    vi_ret, vi_succ = evaluate_policy_in_gym(fl8["env"], vi["greedy_actions"], n_episodes=300)
    pi_ret, pi_succ = evaluate_policy_in_gym(fl8["env"], np.argmax(pi["policy"], axis=1), n_episodes=300)
    print(f"  VI eval: mean_return={vi_ret:.3f}, success_rate={vi_succ*100:.1f}%")
    print(f"  PI eval: mean_return={pi_ret:.3f}, success_rate={pi_succ*100:.1f}%")

    plot_value_heatmap(mdp, vi["V"], title="VI value heatmap | FrozenLake 8x8 slippery")
    render_frozenlake_policy(mdp, vi["greedy_actions"], title="VI greedy policy | FrozenLake 8x8 slippery")


### FrozenLake GIFs (time-evolving views)

We export:
- Value Iteration value evolution GIF,
- Policy Iteration evolution GIF,
- greedy rollout trajectory GIF.


In [None]:
if gym is None:
    print("Skip: Gymnasium not available.")
else:
    out_dir = Path("notebooks/sessions/session_02_mdp_dynamic_programming/assets/web_outputs")
    out_dir.mkdir(parents=True, exist_ok=True)

    frozenlake_cases = {
        "fl4_det": fl_det,
        "fl4_slip": fl_slip,
    }

    for tag, data in frozenlake_cases.items():
        mdp = data["mdp"]
        vi = data["vi"]
        pi = data["pi"]

        gif_vi_value = out_dir / f"session2_{tag}_vi_value.gif"
        gif_pi_evo = out_dir / f"session2_{tag}_pi_evolution.gif"
        gif_vi_traj = out_dir / f"session2_{tag}_vi_traj.gif"
        gif_pi_traj = out_dir / f"session2_{tag}_pi_traj.gif"

        save_vi_value_gif(
            mdp,
            vi["history"],
            gif_vi_value,
            title_prefix=f"VI value evolution ({tag})",
            show_after_save=True,
        )
        save_frozenlake_policy_iteration_gif(
            mdp,
            pi["history"],
            gif_pi_evo,
            title_prefix=f"FrozenLake PI evolution ({tag})",
            show_after_save=True,
        )

        # VI greedy trajectory from Gym rollouts projected to state ids
        env = data["env"]
        obs, _ = env.reset(seed=7)
        states_vi = [int(obs)]
        for _ in range(120):
            a = int(vi["greedy_actions"][obs])
            obs, _, terminated, truncated, _ = env.step(a)
            states_vi.append(int(obs))
            if terminated or truncated:
                break

        # PI greedy trajectory from Gym rollouts projected to state ids
        obs, _ = env.reset(seed=11)
        pi_actions = np.argmax(pi["policy"], axis=1)
        states_pi = [int(obs)]
        for _ in range(120):
            a = int(pi_actions[obs])
            obs, _, terminated, truncated, _ = env.step(a)
            states_pi.append(int(obs))
            if terminated or truncated:
                break

        # Lightweight adapter for trajectory renderer
        class _Adapter:
            pass

        adapter = _Adapter()
        adapter.grid = np.where(mdp.grid_chars == "F", ".", mdp.grid_chars)
        adapter.H, adapter.W = adapter.grid.shape
        adapter.state_to_pos = mdp.state_to_pos

        save_trajectory_gif(
            adapter,
            states_vi,
            gif_vi_traj,
            title=f"VI greedy trajectory ({tag})",
            trail_len=20,
            show_after_save=True,
        )
        save_trajectory_gif(
            adapter,
            states_pi,
            gif_pi_traj,
            title=f"PI greedy trajectory ({tag})",
            trail_len=20,
            show_after_save=True,
        )

    print(f"FrozenLake GIFs saved in: {out_dir}")


In [None]:
import gymnasium as gym
import numpy as np
from PIL import Image
from IPython.display import display, Image as IPyImage
import imageio.v2 as imageio
from pathlib import Path

# 1) Build env + MDP
env_train = gym.make("FrozenLake-v1", map_name="4x4", is_slippery=True)
mdp_fl = mdp_from_frozenlake_env(env_train)

# 2) Train with YOUR Policy Iteration
pi_res = policy_iteration(mdp_fl, gamma=0.99, eval_theta=1e-10, return_history=True)
pi_actions = np.argmax(pi_res["policy"], axis=1)  # learned greedy policy

print(f"PI outer loops: {pi_res['outer_loops']}, eval sweeps total: {sum(pi_res['eval_sweeps'])}")

# 3) Render rollout with learned policy
env_render = gym.make("FrozenLake-v1", map_name="4x4", is_slippery=True, render_mode="rgb_array")
obs, info = env_render.reset(seed=7)

frames = [env_render.render()]
for _ in range(100):
    a = int(pi_actions[obs])              # <- learned policy action
    obs, reward, terminated, truncated, info = env_render.step(a)
    frames.append(env_render.render())
    if terminated or truncated:
        break

print("Episode finished. Final reward:", reward)

# Show last frame
display(Image.fromarray(frames[-1]))

# 4) Save + show GIF
out_dir = Path("notebooks/sessions/session_02_mdp_dynamic_programming/assets/web_outputs")
out_dir.mkdir(parents=True, exist_ok=True)
gif_path = out_dir / "frozenlake_pi_policy_rollout.gif"
imageio.mimsave(gif_path, frames, duration=0.22)
display(IPyImage(filename=str(gif_path)))
print("Saved:", gif_path)


**FrozenLake GIF outputs** are saved under `notebooks/sessions/session_02_mdp_dynamic_programming/assets/web_outputs/`.

## Gambler's Problem (tabular MDP example)

Before moving to homework, we add one more classic DP example from Sutton & Barto: **Gambler's Problem**.

Setup:
- state $s \in \{0,1,\dots,100\}$ is current capital,
- terminal states: $0$ (loss) and $100$ (goal),
- action $a$ is the stake, with $a \in \{1,\dots,\min(s,100-s)\}$,
- with probability $p_h$ the coin is heads and capital increases by $a$,
- with probability $1-p_h$ capital decreases by $a$,
- reward is $1$ only when transitioning into state $100$.

This is a pure planning problem, solved here with Value Iteration.


### Extension - Implement Value Iteration for Gambler's Problem

In [None]:
def gambler_value_iteration(
    p_heads: float = 0.5,
    theta: float = 1e-12,
    gamma: float = 1.0,
    target: int = 100,
):
    V = np.zeros(target + 1, dtype=float)
    policy = np.zeros(target + 1, dtype=int)
    history = []
    deltas = []

    sweeps = 0
    while True:
        delta = 0.0
        V_old = V.copy()

        for s in range(1, target):
            max_stake = min(s, target - s)
            action_values = np.zeros(max_stake + 1, dtype=float)

            for a in range(1, max_stake + 1):
                s_win = s + a
                s_lose = s - a
                reward_win = 1.0 if s_win == target else 0.0

                action_values[a] = (
                    p_heads * (reward_win + gamma * V_old[s_win])
                    + (1.0 - p_heads) * gamma * V_old[s_lose]
                )

            best_a = int(np.argmax(action_values))
            best_v = float(action_values[best_a])

            V[s] = best_v
            policy[s] = best_a
            delta = max(delta, abs(V[s] - V_old[s]))

        history.append(V.copy())
        deltas.append(delta)
        sweeps += 1

        if delta < theta:
            break

    return {
        "V": V,
        "policy": policy,
        "history": history,
        "deltas": deltas,
        "sweeps": sweeps,
        "p_heads": p_heads,
        "theta": theta,
        "gamma": gamma,
    }


gambler_res = gambler_value_iteration(p_heads=0.5, theta=1e-12, gamma=1.0)
print(
    f"Converged in {gambler_res['sweeps']} sweeps | "
    f"final delta={gambler_res['deltas'][-1]:.3e}"
)


### Extension - Visualize value function and optimal policy

In [None]:
def plot_gambler_values_and_policy(result: dict, title_prefix: str = "Gambler"):
    V = result["V"]
    policy = result["policy"]
    target = len(V) - 1

    plt.figure(figsize=(10, 4))
    plt.plot(np.arange(1, target), V[1:target], linewidth=2)
    plt.xlabel("Capital")
    plt.ylabel("Value")
    plt.title(f"{title_prefix}: Optimal value function")
    plt.grid(alpha=0.3)
    plt.tight_layout()
    plt.show()

    plt.figure(figsize=(10, 4))
    plt.bar(np.arange(1, target), policy[1:target])
    plt.xlabel("Capital")
    plt.ylabel("Stake")
    plt.title(f"{title_prefix}: Greedy stake policy")
    plt.grid(alpha=0.3)
    plt.tight_layout()
    plt.show()


def plot_gambler_convergence(result: dict, title: str = "Gambler Value Iteration convergence"):
    deltas = result["deltas"]
    plt.figure(figsize=(7, 3.5))
    plt.plot(deltas)
    plt.yscale("log")
    plt.xlabel("Sweep")
    plt.ylabel("max update delta")
    plt.title(title)
    plt.grid(alpha=0.3)
    plt.tight_layout()
    plt.show()


plot_gambler_values_and_policy(gambler_res, title_prefix="Gambler (p_heads=0.4)")
plot_gambler_convergence(gambler_res)


### Extension - Value-evolution snapshots and GIF

In [None]:
def plot_gambler_snapshots(result: dict, k_list=None):
    hist = result["history"]
    if not hist:
        return

    if k_list is None:
        last = len(hist) - 1
        mid = last // 2
        k_list = sorted(set([0, mid, last]))

    plt.figure(figsize=(10, 4))
    for k in k_list:
        plt.plot(np.arange(1, len(hist[k]) - 1), hist[k][1:-1], label=f"sweep={k+1}")
    plt.xlabel("Capital")
    plt.ylabel("Value")
    plt.title("Gambler value evolution snapshots")
    plt.grid(alpha=0.3)
    plt.legend()
    plt.tight_layout()
    plt.show()


def save_gambler_value_gif(result: dict, gif_path: Path, title_prefix: str = "Gambler value evolution"):
    hist = result["history"]
    if not hist:
        print("No history available; GIF was not created.")
        return

    from matplotlib.animation import FuncAnimation, PillowWriter

    gif_path.parent.mkdir(parents=True, exist_ok=True)

    x = np.arange(1, len(hist[0]) - 1)
    fig, ax = plt.subplots(figsize=(9, 4))
    line, = ax.plot([], [], linewidth=2)
    ax.set_xlim(x[0], x[-1])
    ax.set_ylim(0.0, max(float(np.max(v)) for v in hist) * 1.05)
    ax.set_xlabel("Capital")
    ax.set_ylabel("Value")
    title = ax.set_title(f"{title_prefix} | sweep=1")
    ax.grid(alpha=0.3)

    def update(i):
        y = hist[i][1:-1]
        line.set_data(x, y)
        title.set_text(f"{title_prefix} | sweep={i+1}")
        return (line, title)

    ani = FuncAnimation(fig, update, frames=len(hist), interval=160, blit=False)
    ani.save(gif_path, writer=PillowWriter(fps=8))
    plt.close(fig)
    print(f"Saved GIF: {gif_path}")


plot_gambler_snapshots(gambler_res)

out_dir = Path("notebooks/sessions/session_02_mdp_dynamic_programming/assets/web_outputs")
out_dir.mkdir(parents=True, exist_ok=True)
save_gambler_value_gif(gambler_res, out_dir / "session2_gambler_value_evolution.gif")


### Extension - Probability sensitivity experiments

We compare $p_h \in \{0.25, 0.40, 0.55\}$ and inspect how value and policy structure changes.


In [None]:
p_list = [0.25, 0.40, 0.55]
results = {p: gambler_value_iteration(p_heads=p, theta=1e-12, gamma=1.0) for p in p_list}

print("p_heads | sweeps | V(50) | suggested stake at s=50")
for p in p_list:
    r = results[p]
    print(f"{p:6.2f} | {r['sweeps']:6d} | {r['V'][50]:6.3f} | {r['policy'][50]:24d}")

plt.figure(figsize=(10, 4))
for p in p_list:
    r = results[p]
    plt.plot(np.arange(1, 100), r["V"][1:100], label=f"p_heads={p:.2f}")
plt.xlabel("Capital")
plt.ylabel("Value")
plt.title("Gambler value function sensitivity to coin bias")
plt.grid(alpha=0.3)
plt.legend()
plt.tight_layout()
plt.show()

plt.figure(figsize=(10, 4))
for p in p_list:
    r = results[p]
    plt.step(np.arange(1, 100), r["policy"][1:100], where="mid", label=f"p_heads={p:.2f}")
plt.xlabel("Capital")
plt.ylabel("Stake")
plt.title("Gambler policy sensitivity to coin bias")
plt.grid(alpha=0.3)
plt.legend()
plt.tight_layout()
plt.show()


**Gambler GIF output** is saved under `notebooks/sessions/session_02_mdp_dynamic_programming/assets/web_outputs/`.
