## Value Iteration Algorithm

In [1]:
import typing as tt
import gymnasium as gym
from collections import defaultdict, Counter
from torch.utils.tensorboard.writer import SummaryWriter
from gymnasium.wrappers import RecordVideo

ENV_NAME = "FrozenLake-v1"
GAMMA = 0.9
TEST_EPISODES = 100

State = int
Action = int
RewardKey = tt.Tuple[State, Action, State]
TransitKey = tt.Tuple[State, Action]

In [2]:
class Agent:
    def __init__(self):
        self.env = gym.make(ENV_NAME)
        self.state, _ = self.env.reset()
        self.rewards: tt.Dict[RewardKey, float] = defaultdict(float)
        self.transits: tt.Dict[TransitKey, Counter] = defaultdict(Counter)
        self.values: tt.Dict[State, float] = defaultdict(float)

    def play_n_random_steps(self, n: int):
        for _ in range(n):
            action = self.env.action_space.sample()
            new_state, reward, is_done, is_trunc, _ = self.env.step(action)
            rw_key = (self.state, action, new_state)
            self.rewards[rw_key] = float(reward)
            tr_key = (self.state, action)
            self.transits[tr_key][new_state] += 1
            if is_done or is_trunc:
                self.state, _ = self.env.reset()
            else:
                self.state = new_state

    def calc_action_value(self, state: State, action: Action) -> float:
        target_counts = self.transits[(state, action)]
        total = sum(target_counts.values())
        action_value = 0.0
        for tgt_state, count in target_counts.items():
            rw_key = (state, action, tgt_state)
            reward = self.rewards[rw_key]
            val = reward + GAMMA * self.values[tgt_state]
            action_value += (count / total) * val
        return action_value

    def select_action(self, state: State) -> Action:
        best_action, best_value = None, None
        for action in range(self.env.action_space.n):
            action_value = self.calc_action_value(state, action)
            if best_value is None or best_value < action_value:
                best_value = action_value
                best_action = action
        return best_action

    def play_episode(self, env: gym.Env) -> float:
        total_reward = 0.0
        state, _ = env.reset()
        while True:
            action = self.select_action(state)
            new_state, reward, is_done, is_trunc, _ = env.step(action)
            rw_key = (state, action, new_state)
            self.rewards[rw_key] = float(reward)
            tr_key = (state, action)
            self.transits[tr_key][new_state] += 1
            total_reward += reward
            if is_done or is_trunc:
                break
            state = new_state
        return total_reward

    def value_iteration(self):
        for state in range(self.env.observation_space.n):
            state_values = [
                self.calc_action_value(state, action)
                for action in range(self.env.action_space.n)
            ]
            self.values[state] = max(state_values)


In [3]:
if __name__ == "__main__":
    test_env = gym.make(ENV_NAME)
    agent = Agent()
    writer = SummaryWriter(comment="-v-iteration")
    iter_no = 0
    best_reward = 0.0
    while True:
        iter_no += 1
        agent.play_n_random_steps(100)
        agent.value_iteration()
        reward = 0.0
        for _ in range(TEST_EPISODES):
            reward += agent.play_episode(test_env)
        reward /= TEST_EPISODES
        writer.add_scalar("reward", reward, iter_no)
        if reward > best_reward:
            print(f"{iter_no}: Best reward updated: {best_reward:.3f} -> {reward:.3f}")
            best_reward = reward
        if reward > 0.8:
            print(f"Solved in {iter_no} iterations!")
            break
    writer.close()

4: Best reward updated: 0.000 -> 0.010
5: Best reward updated: 0.010 -> 0.080
6: Best reward updated: 0.080 -> 0.120
10: Best reward updated: 0.120 -> 0.300
12: Best reward updated: 0.300 -> 0.430
13: Best reward updated: 0.430 -> 0.440
17: Best reward updated: 0.440 -> 0.490
18: Best reward updated: 0.490 -> 0.640
19: Best reward updated: 0.640 -> 0.740
20: Best reward updated: 0.740 -> 0.770
21: Best reward updated: 0.770 -> 0.800
67: Best reward updated: 0.800 -> 0.810
Solved in 67 iterations!


In [4]:
# -------- Testing + Recording Loop --------
num_episodes = 5
print("\n=== Testing & Recording trained agent ===")
video_env = RecordVideo(
        gym.make(ENV_NAME, render_mode="rgb_array"),
        video_folder="./videos",
        episode_trigger=lambda ep_id: True  # record every test episode
    )

for ep in range(num_episodes):
    state, _ = video_env.reset()
    total_reward = 0
    while True:
        action = agent.select_action(state)
        new_state, reward, is_done, is_trunc, _ = video_env.step(action)
        total_reward += reward
        state = new_state
        if is_done or is_trunc:
            break
    print(f"Test Episode {ep+1}: reward={total_reward}")

video_env.close()
print(f"All {num_episodes} test episodes recorded in ./videos folder")


=== Testing & Recording trained agent ===


  logger.warn(


Test Episode 1: reward=1.0
Test Episode 2: reward=1.0
Test Episode 3: reward=0.0
Test Episode 4: reward=1.0
Test Episode 5: reward=1.0
All 5 test episodes recorded in ./videos folder


In [4]:
agent.select_action(1), agent.transits, agent.rewards, agent.values

(3,
 defaultdict(collections.Counter,
             {(0, np.int64(2)): Counter({4: 1935, 0: 1925, 1: 1917}),
              (0, np.int64(3)): Counter({0: 492, 1: 248}),
              (1, np.int64(1)): Counter({2: 112, 0: 99, 5: 71}),
              (1, np.int64(3)): Counter({1: 1471, 0: 1402, 2: 1322}),
              (0, np.int64(1)): Counter({4: 368, 0: 347, 1: 346}),
              (1, np.int64(0)): Counter({5: 99, 1: 98, 0: 98}),
              (1, np.int64(2)): Counter({2: 100, 1: 97, 5: 91}),
              (0, np.int64(0)): Counter({0: 46137, 4: 23157}),
              (2, np.int64(2)): Counter({2: 1500, 3: 1499, 6: 1428}),
              (2, np.int64(1)): Counter({1: 47, 3: 46, 6: 45}),
              (3, np.int64(3)): Counter({3: 3028, 2: 1474}),
              (3, np.int64(0)): Counter({3: 30, 7: 22, 2: 19}),
              (2, np.int64(3)): Counter({3: 51, 2: 47, 1: 43}),
              (2, np.int64(0)): Counter({2: 843, 6: 836, 1: 795}),
              (6, np.int64(2)): Counter({2: 1717,

In [None]:
# Transition probabilities and rewards
env = gym.make(ENV_NAME)
env.unwrapped.P

{0: {0: [(0.3333333333333333, 0, 0.0, False),
   (0.3333333333333333, 0, 0.0, False),
   (0.3333333333333333, 4, 0.0, False)],
  1: [(0.3333333333333333, 0, 0.0, False),
   (0.3333333333333333, 4, 0.0, False),
   (0.3333333333333333, 1, 0.0, False)],
  2: [(0.3333333333333333, 4, 0.0, False),
   (0.3333333333333333, 1, 0.0, False),
   (0.3333333333333333, 0, 0.0, False)],
  3: [(0.3333333333333333, 1, 0.0, False),
   (0.3333333333333333, 0, 0.0, False),
   (0.3333333333333333, 0, 0.0, False)]},
 1: {0: [(0.3333333333333333, 1, 0.0, False),
   (0.3333333333333333, 0, 0.0, False),
   (0.3333333333333333, 5, 0.0, True)],
  1: [(0.3333333333333333, 0, 0.0, False),
   (0.3333333333333333, 5, 0.0, True),
   (0.3333333333333333, 2, 0.0, False)],
  2: [(0.3333333333333333, 5, 0.0, True),
   (0.3333333333333333, 2, 0.0, False),
   (0.3333333333333333, 1, 0.0, False)],
  3: [(0.3333333333333333, 2, 0.0, False),
   (0.3333333333333333, 1, 0.0, False),
   (0.3333333333333333, 0, 0.0, False)]},
 2:

In [2]:
import numpy as np
import gymnasium as gym
from gymnasium.wrappers import RecordVideo

In [3]:
env = gym.make("FrozenLake-v1")

In [4]:
num_states = env.observation_space.n
num_actions = env.action_space.n
num_states, num_actions

(np.int64(16), np.int64(4))

In [None]:
def value_iteration(num_states, num_actions, gamma=0.9):
    """Function to perform value iteration algorithm."""
    policy = np.zeros(num_states)
    V = np.zeros(num_states)
    V_old = -1 * np.ones(num_states)
    while np.max(np.abs(V - V_old)) > 1e-3:
        V_old = V.copy()
        for s in range(num_states):
            q = np.zeros(num_actions)
            for a in range(num_actions):
                for prob, next_s, reward, is_done in env.unwrapped.P[s][a]:
                    q[a] += prob * (reward + gamma * V_old[next_s]) # Computing the Q Value using the bellman equation of optimality
            V[s] = np.max(q) # Comouting the value(estimate) of a state
            policy[s] = np.argmax(q)
    return V, policy

In [6]:
value, policy = value_iteration(num_states, num_actions)

In [7]:
value.reshape(4, 4), policy.reshape(4, 4)

(array([[0.06253203, 0.05606969, 0.07051342, 0.05159888],
        [0.08603505, 0.        , 0.11007757, 0.        ],
        [0.14066756, 0.24422494, 0.29731176, 0.        ],
        [0.        , 0.3775274 , 0.6377414 , 0.        ]]),
 array([[0., 3., 0., 3.],
        [0., 0., 0., 0.],
        [3., 1., 0., 0.],
        [0., 2., 1., 0.]]))

In [10]:
num_episodes = 5
env_name = "FrozenLake-v1"
print("\n=== Testing & Recording trained agent ===")
video_env = RecordVideo(
        gym.make(env_name, render_mode="rgb_array"),
        video_folder="./videos_frozenlake_viter",
        episode_trigger=lambda ep_id: True  # record every test episode
    )

for ep in range(num_episodes):
    state, _ = video_env.reset()
    total_reward = 0
    while True:
        action = int(policy[state])
        new_state, reward, is_done, is_trunc, _ = video_env.step(action)
        total_reward += reward
        state = new_state
        if is_done or is_trunc:
            break
    print(f"Test Episode {ep+1}: reward={total_reward}")

video_env.close()
print(f"All {num_episodes} test episodes recorded in ./videos_frozenlake_viter folder")


=== Testing & Recording trained agent ===
Test Episode 1: reward=1.0
Test Episode 2: reward=1.0


  logger.warn(


Test Episode 3: reward=1.0
Test Episode 4: reward=0.0
Test Episode 5: reward=1.0
All 5 test episodes recorded in ./videos_frozenlake_viter folder


## Value Iteration on the gymnasium Taxi environment

In [1]:
import numpy as np
import gymnasium as gym
from gymnasium.wrappers import RecordVideo

In [2]:
env = gym.make("Taxi-v3")

In [3]:
num_states = env.observation_space.n
num_actions = env.action_space.n
num_states, num_actions

(np.int64(500), np.int64(6))

In [4]:
def value_iteration(num_states, num_actions, gamma=0.9):
    """Function to perform value iteration algorithm."""
    policy = np.zeros(num_states)
    V = np.zeros(num_states)
    V_old = -1 * np.ones(num_states)
    while np.max(np.abs(V - V_old)) > 1e-3:
        V_old = V.copy()
        for s in range(num_states):
            q = np.zeros(num_actions)
            for a in range(num_actions):
                for prob, next_s, reward, is_done in env.unwrapped.P[s][a]:
                    q[a] += prob * (reward + gamma * V_old[next_s]) # Computing the Q Value using the bellman equation of optimality
            V[s] = np.max(q) # Comouting the value(estimate) of a state
            policy[s] = np.argmax(q)
    return V, policy

In [5]:
value, policy = value_iteration(num_states, num_actions)

In [6]:
value, policy

(array([ 89.46916234,  32.81563744,  55.26016234,  37.57393009,
          8.42815264,  32.81563744,   8.42815264,  15.28085117,
         32.81563744,  18.08978465,  55.26016234,  21.21187144,
         12.75186641,  18.08978465,  12.75186641,  37.57393009,
        100.52229109,  37.57393009,  62.51229109,  42.85987234,
         79.52229109,  28.53411868,  48.73419109,  32.81563744,
         10.47672475,  37.57393009,  10.47672475,  18.08978465,
         28.53411868,  15.28085117,  48.73419109,  18.08978465,
         15.28085117,  21.21187144,  15.28085117,  42.85987234,
         89.46916234,  42.85987234,  55.26016234,  48.73419109,
         42.85987234,  12.75186641,  24.67980717,  15.28085117,
         24.67980717,  70.56916234,  24.67980717,  37.57393009,
         24.67980717,  12.75186641,  42.85987234,  15.28085117,
         18.08978465,  24.67980717,  18.08978465,  48.73419109,
         48.73419109,  79.52229109,  48.73419109,  55.26016234,
         37.57393009,  10.47672475,  21.

In [7]:
num_episodes = 5
env_name = "Taxi-v3"
print("\n=== Testing & Recording trained agent ===")
video_env = RecordVideo(
        gym.make(env_name, render_mode="rgb_array"),
        video_folder="./videos_taxi_viter",
        episode_trigger=lambda ep_id: True  # record every test episode
    )

for ep in range(num_episodes):
    state, _ = video_env.reset()
    total_reward = 0
    while True:
        action = int(policy[state])
        new_state, reward, is_done, is_trunc, _ = video_env.step(action)
        total_reward += reward
        state = new_state
        if is_done or is_trunc:
            break
    print(f"Test Episode {ep+1}: reward={total_reward}")

video_env.close()
print(f"All {num_episodes} test episodes recorded in ./videos_taxi_viter folder")


=== Testing & Recording trained agent ===


  logger.warn(


Test Episode 1: reward=11
Test Episode 2: reward=4
Test Episode 3: reward=7
Test Episode 4: reward=7
Test Episode 5: reward=12
All 5 test episodes recorded in ./videos_taxi_viter folder


## Tabular Q Learning on the FrozenLake Environment

In [4]:
import typing as tt
import gymnasium as gym
from collections import defaultdict
from torch.utils.tensorboard.writer import SummaryWriter
import random

In [2]:
ENV_NAME = "FrozenLake-v1"
GAMMA = 0.9
ALPHA = 0.2
EPSILON = 0.1
TEST_EPISODES = 20

In [3]:
State = int
Action = int
ValuesKey = tt.Tuple[State, Action]

In [None]:
class Agent:
    def __init__(self):
        self.env = gym.make(ENV_NAME)
        self.state, _ = self.env.reset()
        self.values: tt.Dict[ValuesKey, float] = defaultdict(float) 

    def sample_env(self) -> tt.Tuple[State, Action, float, State]:
        action = self.env.action_space.sample()
        old_state = self.state
        new_state, reward, is_done, is_trunc, _ = self.env.step(action)
        if is_done or is_trunc:
            self.state, _ = self.env.reset()
        else:
            self.state = new_state
        return old_state, action, reward, new_state
    
    def best_value_and_action(self, state: State) -> tt.Tuple[float, Action]:
        best_action, best_value = None, None
        for action in range(self.env.action_space.n):
            action_value = self.values[(state, action)]
            if best_value is None or best_value < action_value:
                best_value = action_value
                best_action = action
        if best_action is None:
            best_action = self.env.action_space.sample()
            best_value = self.values[(state, best_action)]
        return best_value, best_action

    def value_update(self, state: State, action: Action, reward: float, next_state: State):
        best_value, _ = self.best_value_and_action(next_state)
        # Q learning update
        self.values[(state, action)] = (1 - ALPHA) * self.values[(state, action)] + ALPHA * (reward + GAMMA * best_value)

    def play_episode(self, env: gym.Env) -> float:
        total_reward = 0.0
        state, _ = env.reset()
        while True:
            _, action = self.best_value_and_action(state)
            new_state, reward, is_done, is_trunc, _ = env.step(action)
            total_reward += reward
            if is_done or is_trunc:
                break
            state = new_state
        return total_reward


In [9]:
if __name__ == "__main__":
    test_env = gym.make(ENV_NAME)
    agent = Agent()
    writer = SummaryWriter(comment="-q-learning")
    iter_no = 0
    best_reward = 0.0
    while True:
        iter_no += 1
        state, action, reward, next_state = agent.sample_env()
        agent.value_update(state, action, reward, next_state)

        test_reward = 0.0
        for _ in range(TEST_EPISODES):
            test_reward += agent.play_episode(test_env)
        test_reward /= TEST_EPISODES
        writer.add_scalar("reward", test_reward, iter_no)
        if test_reward > best_reward:
            print(f"{iter_no}: Best reward updated: {best_reward:.3f} -> {test_reward:.3f}")
            best_reward = test_reward
        if test_reward > 0.8:
            print(f"Solved in {iter_no} iterations!")
            break
    writer.close()


2216: Best reward updated: 0.000 -> 0.050
2219: Best reward updated: 0.050 -> 0.100
2249: Best reward updated: 0.100 -> 0.200
2466: Best reward updated: 0.200 -> 0.250
2499: Best reward updated: 0.250 -> 0.400
4732: Best reward updated: 0.400 -> 0.450
4741: Best reward updated: 0.450 -> 0.500
4742: Best reward updated: 0.500 -> 0.700
4760: Best reward updated: 0.700 -> 0.850
Solved in 4760 iterations!


## Sarsa Algorithm on FrozenLake

In [1]:
import typing as tt
import gymnasium as gym
from collections import defaultdict
from torch.utils.tensorboard.writer import SummaryWriter

In [2]:
ENV_NAME = "FrozenLake-v1"
GAMMA = 0.9
ALPHA = 0.2
TEST_EPISODES = 20  

In [3]:
State = int
Action = int
ValuesKey = tt.Tuple[State, Action]