<a href="https://colab.research.google.com/github/everestso/summer25/blob/main/c166f25_QLearning.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import typing as tt
import gymnasium as gym
from collections import defaultdict, Counter
from torch.utils.tensorboard.writer import SummaryWriter


In [2]:
ENV_NAME = "FrozenLake-v1"
#ENV_NAME = "FrozenLake8x8-v1"      # uncomment for larger version
GAMMA = 0.9
TEST_EPISODES = 20

State = int
Action = int
RewardKey = tt.Tuple[State, Action, State]
TransitKey = tt.Tuple[State, Action]


In [15]:
class Agent:
    def __init__(self):
        self.env = gym.make(ENV_NAME)
        self.state, _ = self.env.reset()
        self.rewards: tt.Dict[RewardKey, float] = defaultdict(float)
        self.transits: tt.Dict[TransitKey, Counter] = defaultdict(Counter)
        self.values: tt.Dict[State, float] = defaultdict(float)

    def play_n_random_steps(self, n: int):
        for _ in range(n):
            action = self.env.action_space.sample()
            new_state, reward, is_done, is_trunc, _ = self.env.step(action)
            rw_key = (self.state, action, new_state)
            self.rewards[rw_key] = float(reward)
            tr_key = (self.state, action)
            self.transits[tr_key][new_state] += 1
            if is_done or is_trunc:
                self.state, _ = self.env.reset()
            else:
                self.state = new_state

    def calc_action_value(self, state: State, action: Action) -> float:
        target_counts = self.transits[(state, action)]
        total = sum(target_counts.values())
        action_value = 0.0
        for tgt_state, count in target_counts.items():
            rw_key = (state, action, tgt_state)
            reward = self.rewards[rw_key]
            val = reward + GAMMA * self.values[tgt_state]
            action_value += (count / total) * val
        return action_value

    def select_action(self, state: State) -> Action:
        best_action, best_value = None, None
        for action in range(self.env.action_space.n):
            action_value = self.calc_action_value(state, action)
            if best_value is None or best_value < action_value:
                best_value = action_value
                best_action = action
        return best_action

    def play_episode(self, env: gym.Env) -> float:
        total_reward = 0.0
        state, _ = env.reset()
        while True:
            action = self.select_action(state)
            new_state, reward, is_done, is_trunc, _ = env.step(action)
            rw_key = (state, action, new_state)
            self.rewards[rw_key] = float(reward)
            tr_key = (state, action)
            self.transits[tr_key][new_state] += 1
            total_reward += reward
            if is_done or is_trunc:
                break
            state = new_state
        return total_reward

    def value_iteration(self):
        for state in range(self.env.observation_space.n):
            state_values = [
                self.calc_action_value(state, action)
                for action in range(self.env.action_space.n)
            ]
            self.values[state] = max(state_values)

    def print_policy(self):
        desc = self.env.unwrapped.desc  # FrozenLake map as bytes
        size = desc.shape[0]            # assume square grid
        action_symbols = ["←", "↓", "→", "↑"]
        print("Current policy (grid):")
        for row in range(size):
            row_symbols = []
            for col in range(size):
                state = row * size + col
                tile = desc[row][col].decode("utf-8")

                if tile == "G":
                    row_symbols.append("G")
                elif tile == "H":
                    row_symbols.append("H")
                else:
                    action = self.select_action(state)
                    symbol = action_symbols[action] if action is not None else "?"
                    row_symbols.append(symbol)
            print(" ".join(row_symbols))


In [16]:
test_env = gym.make(ENV_NAME)
agent = Agent()
writer = SummaryWriter(comment="-v-iteration")

iter_no = 0
best_reward = 0.0
while True:
    iter_no += 1
    agent.play_n_random_steps(100)
    agent.value_iteration()

    reward = 0.0
    for _ in range(TEST_EPISODES):
        reward += agent.play_episode(test_env)
    reward /= TEST_EPISODES
    writer.add_scalar("reward", reward, iter_no)
    if reward > best_reward:
        print(f"{iter_no}: Best reward updated {best_reward:.3} -> {reward:.3}")
        best_reward = reward
    if reward > 0.80:
        print("Solved in %d iterations!" % iter_no)
        break
writer.close()

31: Best reward updated 0.0 -> 0.1
34: Best reward updated 0.1 -> 0.45
37: Best reward updated 0.45 -> 0.65
41: Best reward updated 0.65 -> 0.7
43: Best reward updated 0.7 -> 0.75
47: Best reward updated 0.75 -> 0.85
Solved in 47 iterations!


# Test Model

In [17]:
# Setup video recording wrapper
env = gym.make(ENV_NAME, render_mode="rgb_array")
env = gym.wrappers.RecordVideo(env, video_folder="video")
state, _ = env.reset()

total_reward = 0.0
total_steps = 0

while True:
    action = agent.select_action(state)
    state, reward, done, truncated, _ = env.step(action)
    total_reward += reward
    total_steps += 1
    if done or truncated or total_steps >= 1000:
        break

print(f"Episode done in {total_steps} steps, total reward {total_reward:.2f}")
env.close()

Episode done in 11 steps, total reward 1.00


  logger.warn(


In [18]:
from IPython.display import Video
import os

In [19]:
!ls video/rl-video-episode-0.mp4
agent.print_policy()
# Display video at its default speed
display(Video("video/rl-video-episode-0.mp4", embed=True))

video/rl-video-episode-0.mp4
Current policy (grid):
← ↑ ← ↑
← H → H
↑ ↓ ← H
H → ↓ G
