In [1]:
import gymnasium as gym
import numpy as np
import imageio
from PIL import Image, ImageDraw, ImageFont

print("loaded")

loaded


In [2]:
"""
value iteration
high level
1. start with V(s) = 0 for every state
2. repeat sweeps/iterations for all states
       for each state s, look at every action a
       for each action, sum over every possible next-state s' using the known trans prob's P(s'|s,a)to compute
       expected value of taking that actions
       set V(s) = max over all the actions
3. stop when the largest change across all states (delta) is tiny
4. extract greedy policy  pi(s) = argmax_a V(s,a)

"""
def value_iteration(env, discount=0.99, theta=1e-6, max_iters=5000):
  n_states = env.observation_space.n # 16 fro 4x4 grid
  n_actions = env.action_space.n # 4 {0: '←', 1: '↓', 2: '→', 3: '↑'}

  # transition model
  # P[ice][right] = (0.7, 'hole', -20, done=true)
  # P[s][a] is a list of (prob, next_state, reward, done) tuples

  P = env.unwrapped.P

  V = np.zeros(n_states)

  for it in range(max_iters):
    delta = 0.0

    for s in range(n_states):
      v_vals = []
      for a in range(n_actions):
        v = 0.0
        for prob, next_s, reward, done in P[s][a]:
          v += prob * (reward + discount * V[next_s] * (not done))
        v_vals.append(v)

      best = max(v_vals)

      delta = max(delta, abs(best-V[s]))
      V[s] = best

    if delta < theta:
      print("converged in sweeps")
      break

  policy = np.zeros(n_states, dtype=int)
  for s in range(n_states):
    v_vals = []
    for a in range(n_actions):
      v = 0.0
      for prob, next_s, reward, done in P[s][a]:
        v += prob * (reward + discount * V[next_s] * (not done))
      v_vals.append(v)
      policy[s] = int(np.argmax(v_vals)) # best action index

  return V, policy

  print("value itr complete")


In [3]:
env = gym.make('FrozenLake-v1', map_name='4x4', is_slippery=True)

V_opt, policy_opt = value_iteration(env, discount=0.99, theta=1e-8)

action_map = {0: '←', 1: '↓', 2: '→', 3: '↑'}
for s in range(env.observation_space.n):
  arrow = action_map[policy_opt[s]]
  print(f"State {s:2d}: {arrow}")

def run_policy(policy, episodes=30, max_steps=100):
  wins = 0
  for _ in range(episodes):
    state = env.reset()
    if isinstance(state, tuple):
      state = state[0]
    done = False
    steps = 0
    while not done and steps < max_steps:
      action = policy[state]
      state, reward, terminated, truncated, _ = env.step(action)
      done = terminated or truncated
      steps +=1
      if reward > 0:
        wins+=1
        break
  return wins / episodes

sucess_rate = run_policy(policy_opt, episodes=200)
print(f"\nSuccess rate over 200 eval episodes: {sucess_rate*100:.1f}%")

converged in sweeps
State  0: ←
State  1: ↑
State  2: ↑
State  3: ↑
State  4: ←
State  5: ←
State  6: ←
State  7: ←
State  8: ↑
State  9: ↓
State 10: ←
State 11: ←
State 12: ←
State 13: →
State 14: ↓
State 15: ←

Success rate over 200 eval episodes: 71.0%


In [4]:
def create_grid_img(env, state, size=400):
  nrow, ncol = env.unwrapped.desc.shape
  cell_size = size // max(nrow, ncol)


  img = Image.new('RGB', (ncol * cell_size, nrow * cell_size), 'white')
  draw = ImageDraw.Draw(img)

  colors = {

            b'S':(144, 238, 144),
            b'F':(175, 216, 230),
            b'H':(105, 105, 105),
            b'G':(255, 215, 0)
  }

  for i in range(nrow):
    for j in range(ncol):
      cell = env.unwrapped.desc[i][j]
      color = colors.get(cell, (255, 255, 255))
      x1, y1 = j * cell_size, i * cell_size
      x2, y2 = x1 + cell_size, y1 + cell_size
      draw.rectangle([x1, y1, x2, y2], fill=color, outline='black', width=3)

      label = 'START' if cell == b'S' else 'GOAL' if cell == b'G' else 'HOLE' if cell == b'H' else ''
      if label:
        bbox = draw.textbbox((0, 0), label)
        tw, th = bbox[2] - bbox[0], bbox[3] - bbox[1]
        draw.text((x1 + (cell_size - tw) // 2, y1 + (cell_size - th) // 2), label, fill='black')


  row, col = divmod(state, ncol)
  cx = col * cell_size + cell_size // 2
  cy = row * cell_size + cell_size // 2
  r =  cell_size // 4
  draw.ellipse([cx - r, cy - r, cx + r, cy+r], fill='red', outline='black', width=3)

  return np.array(img)

def record_policy_episode(env, policy, max_steps=100):
  frames = []

  state = env.reset()
  if isinstance(state, tuple):
    state = state[0]
  frames.append(create_grid_img(env, state))

  done = False
  steps = 0
  reached, hole = False, False

  while not done and steps <max_steps:
    action = policy[state]
    state, reward, terminated, truncated, _ = env.step(action)
    done = terminated or truncated
    frames.append(create_grid_img(env, state))
    steps +=1

    if done:
      if reward > 0:
        reached = True
      else:
        hole = True
      for _ in range(5):
        frames.append(frames[-1])
      break
  return frames, steps, reached, hole

frames_dp, steps_dp, reached_dp, hole_dp = record_policy_episode(env, policy_opt)

In [5]:
def create_gif(frames, filename, text, steps, reached_goal, fell_in_hole):
  font = ImageFont.load_default()
  processed = []

  for i, frame in enumerate(frames):
    img = Image.fromarray(frame)
    draw = ImageDraw.Draw(img)

    stat = 'REACHED GOAL!' if reached_goal else 'FELL IN HOLE' if fell_in_hole else f'Step {i}/steps'
    title = f"{text}\n{stat}"

    y = 10

    for line in title.split('\n'):
      bbox = draw.textbbox((0, 0), line, font=font)
      tx = (img.width - (bbox[2]-bbox[0])) // 2
      draw.text((tx, y), line, fill='black', font=font)
      y += bbox[3] - bbox[1] +4

    processed.append(np.array(img))

  imageio.mimsave(filename, processed, fps=5, loop=1)
  print("saved file")

create_gif(frames_dp, 'froz.gif', 'DP frozen lake', steps_dp, reached_dp, hole_dp)

saved file
