In [1]:

# Parameters (papermill overrides these when run from the UI)
rows = 5
cols = 5
start = (0, 0)
goal = (4, 4)
holes = [(0, 3), (1, 0), (2, 2), (3, 0), (3, 2)]
slippery = False
slip_prob = 0.2
gamma = 0.99
theta = 1e-8

# Policy Iteration on FrozenLake-v1 (parameterized)
import numpy as np
import time

# Prefer gymnasium if present
try:
    import gymnasium as gym
except ImportError:
    import gym
import ast
# Policy Iteration on FrozenLake-v1 (parameterized)
import numpy as np
import time

# Parse parameters that may have been injected as strings by papermill

def _to_tuple_2(x):
    if isinstance(x, tuple) and len(x) == 2:
        return x
    if isinstance(x, str):
        return tuple(ast.literal_eval(x))
    # assume sequence length 2
    t = tuple(x)
    return (int(t[0]), int(t[1]))

def _to_tuple_list(x):
    if isinstance(x, str):
        v = ast.literal_eval(x)
    else:
        v = x
    return [tuple(map(int, t)) for t in v]

start = _to_tuple_2(start)
goal = _to_tuple_2(goal)
holes = _to_tuple_list(holes)

# Build desc from parameters

def build_desc(rows, cols, start, goal, holes):
    arr = np.full((rows, cols), 'F', dtype='<U1')
    sr, sc = start
    gr, gc = goal
    arr[sr, sc] = 'S'
    arr[gr, gc] = 'G'
    for hr, hc in holes:
        arr[hr, hc] = 'H'
    return ["".join(row) for row in arr]

custom_map = build_desc(rows, cols, start, goal, holes)
map_desc = custom_map

# Create env with human rendering so a window appears when stepping
# Note: slip_prob is not directly configurable in standard FrozenLake-v1.
env = gym.make(
    "FrozenLake-v1",
    is_slippery=bool(slippery),
    desc=map_desc,
    render_mode="human",
)

# Parse parameters that may have been injected as strings by papermill

def _to_tuple_2(x):
    if isinstance(x, tuple) and len(x) == 2:
        return x
    if isinstance(x, str):
        return tuple(ast.literal_eval(x))
    # assume sequence length 2
    t = tuple(x)
    return (int(t[0]), int(t[1]))

def _to_tuple_list(x):
    if isinstance(x, str):
        v = ast.literal_eval(x)
    else:
        v = x
    return [tuple(map(int, t)) for t in v]

start = _to_tuple_2(start)
goal = _to_tuple_2(goal)
holes = _to_tuple_list(holes)

# Build desc from parameters

def build_desc(rows, cols, start, goal, holes):
    arr = np.full((rows, cols), 'F', dtype='<U1')
    sr, sc = start
    gr, gc = goal
    arr[sr, sc] = 'S'
    arr[gr, gc] = 'G'
    for hr, hc in holes:
        arr[hr, hc] = 'H'
    return ["".join(row) for row in arr]

custom_map = build_desc(rows, cols, start, goal, holes)
map_desc = custom_map



In [2]:
#file: Frozen_POLICY_ITERATION.py

# Policy Iteration on FrozenLake-v1
import numpy as np
import gym
import time

Gym has been unmaintained since 2022 and does not support NumPy 2.0 amongst other critical functionality.
Please upgrade to Gymnasium, the maintained drop-in replacement of Gym, or contact the authors of your software and request that they upgrade.
Users of this version of Gym should be able to simply replace 'import gym' with 'import gymnasium as gym' in the vast majority of cases.
See the migration guide at https://gymnasium.farama.org/introduction/migration_guide/ for additional information.


Personnalise et initialise une version déterministe de l'environnement FrozenLake.

In [3]:

custom_map = [
    "SFFHF",
    "HFFFF",
    "FFHFF",
    "HHFHF",
    "FFFGF"
]

map_desc = custom_map

env = gym.make(
    "FrozenLake-v1",
    is_slippery=False,
    desc=map_desc,
    render_mode="human"
)

In [4]:
# Access MDP transitions and spaces
P = (env.unwrapped.P).copy() # dict: P[s][a] -> list of (prob, next_state, reward, terminated)
nS = env.observation_space.n
nA = env.action_space.n

# Hyperparameters
gamma = 0.99
theta = 1e-8

.P → this is a dictionary describing the Markov Decision Process (MDP) transitions.

P[s][a] gives a list of possible transitions from state s when taking action a.

Each element in P[s][a] is a tuple:

(probability, next_state, reward, terminated)

In [5]:
print (f"Number of states: {nS}, Number of actions: {nA}")

Number of states: 25, Number of actions: 4


In [6]:
print(f"P= {P}")

P= {0: {0: [(1.0, 0, 0.0, False)], 1: [(1.0, 5, 0.0, True)], 2: [(1.0, 1, 0.0, False)], 3: [(1.0, 0, 0.0, False)]}, 1: {0: [(1.0, 0, 0.0, False)], 1: [(1.0, 6, 0.0, False)], 2: [(1.0, 2, 0.0, False)], 3: [(1.0, 1, 0.0, False)]}, 2: {0: [(1.0, 1, 0.0, False)], 1: [(1.0, 7, 0.0, False)], 2: [(1.0, 3, 0.0, True)], 3: [(1.0, 2, 0.0, False)]}, 3: {0: [(1.0, 3, 0, True)], 1: [(1.0, 3, 0, True)], 2: [(1.0, 3, 0, True)], 3: [(1.0, 3, 0, True)]}, 4: {0: [(1.0, 3, 0.0, True)], 1: [(1.0, 9, 0.0, False)], 2: [(1.0, 4, 0.0, False)], 3: [(1.0, 4, 0.0, False)]}, 5: {0: [(1.0, 5, 0, True)], 1: [(1.0, 5, 0, True)], 2: [(1.0, 5, 0, True)], 3: [(1.0, 5, 0, True)]}, 6: {0: [(1.0, 5, 0.0, True)], 1: [(1.0, 11, 0.0, False)], 2: [(1.0, 7, 0.0, False)], 3: [(1.0, 1, 0.0, False)]}, 7: {0: [(1.0, 6, 0.0, False)], 1: [(1.0, 12, 0.0, True)], 2: [(1.0, 8, 0.0, False)], 3: [(1.0, 2, 0.0, False)]}, 8: {0: [(1.0, 7, 0.0, False)], 1: [(1.0, 13, 0.0, False)], 2: [(1.0, 9, 0.0, False)], 3: [(1.0, 3, 0.0, True)]}, 9: {0:

In [7]:
def policy_evaluation(pi, V=None, gamma: float = gamma, theta: float = theta):
    if V is None:
        V = np.zeros(nS, dtype=np.float64)
    else:
        V = np.array(V, dtype=np.float64, copy=True)

    while True:
        delta = 0.0
        for s in range(nS):
            v_old = V[s]
            a = pi[s]
            v_new = 0.0
            for (prob, ns, r, done) in P[s][a]:
                v_new += prob * (r + gamma * (0.0 if done else V[ns]))
            V[s] = v_new
            delta = max(delta, abs(v_old - v_new))
        if delta < theta:
            break
    return V

In [8]:
def policy_improvement(V, gamma: float = gamma):
    pi = np.zeros(nS, dtype=int)
    for s in range(nS):
        q = np.zeros(nA, dtype=np.float64)
        for a in range(nA):
            for (prob, ns, r, done) in P[s][a]:
                q[a] += prob * (r + gamma * (0.0 if done else V[ns]))
        pi[s] = int(np.argmax(q))
    return pi

In [9]:
# Full policy iteration loop

def policy_iteration(gamma: float = gamma, theta: float = theta):
    # Step 1: Initialize policy randomly and value function
    pi = np.random.randint(0, nA, size=nS, dtype=int)
    V = np.zeros(nS, dtype=np.float64)

    iteration = 0
    while True:
        iteration += 1

        # Step 2: Evaluate current policy
        V = policy_evaluation(pi, V, gamma=gamma, theta=theta)

        # Step 3: Improve the policy using the current value function
        new_pi = policy_improvement(V, gamma=gamma)

        # Step 4: Check if policy has changed
        policy_stable = np.array_equal(pi, new_pi)

        # Step 5: Update policy
        pi = new_pi

        # Step 6: Stop if stable (converged)
        if policy_stable:
            break

    return pi, V, iteration



In [10]:
pi_opt, V_opt, iters = policy_iteration()

In [11]:
def run_episode(env, pi):
    obs, info = env.reset()
    terminated = False
    truncated = False
    total_reward = 0.0
    steps = 0
    while not (terminated or truncated):
        a = int(pi[obs])
        obs, r, terminated, truncated, info = env.step(a)
        total_reward += r
        steps += 1
    return total_reward, steps, terminated, truncated


In [12]:
total_reward, steps, terminated, truncated = run_episode(env, pi_opt)

  from pkg_resources import resource_stream, resource_exists
  if not isinstance(terminated, (bool, np.bool8)):


In [13]:
print(f"Episode -> reward: {total_reward}, steps: {steps}, terminated: {terminated}, truncated: {truncated}")


Episode -> reward: 1.0, steps: 9, terminated: True, truncated: False


In [14]:
r= np.random.randint(0, nA, size=nS, dtype=int)
print(r)

[0 2 1 0 1 0 2 3 2 0 1 1 2 1 0 3 3 2 3 2 0 3 0 2 3]


In [15]:
# Execute and display results


print(f"Converged in {iters} iterations")
side = int(np.sqrt(nS))
print("Optimal V (reshaped if square):")
if side * side == nS:
    print(np.round(V_opt.reshape(side, side), 3))
else:
    print(np.round(V_opt, 3))

arrow_map = {0: "←", 1: "↓", 2: "→", 3: "↑"}
print("\nOptimal Policy:")
if side * side == nS:
    grid = np.array([arrow_map[a] for a in pi_opt]).reshape(side, side)
    for r in range(side):
        print(" ".join(grid[r]))
else:
    print(pi_opt)

#Run one episode to ensure a window appears




Converged in 10 iterations
Optimal V (reshaped if square):
[[0.923 0.932 0.941 0.    0.961]
 [0.    0.941 0.951 0.961 0.97 ]
 [0.923 0.932 0.    0.97  0.98 ]
 [0.    0.    0.99  0.    0.99 ]
 [0.98  0.99  1.    0.    1.   ]]

Optimal Policy:
→ ↓ ↓ ← ↓
← → → ↓ ↓
→ ↑ ← → ↓
← ← ↓ ← ↓
→ → → ← ←


In [16]:
# Keep window visible briefly and pump events to avoid "Not Responding"
try:
    import pygame
    for _ in range(100):  # ~3 seconds
        pygame.event.pump()
        time.sleep(0.03)
except Exception:
    time.sleep(3.0)

env.close()