# FrozenLake [8x8 | random map | on slip]

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import random
import numpy as np
from collections import deque
import gymnasium as gym
from gymnasium.envs.toy_text.frozen_lake import generate_random_map

# seed
seed = 42
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)

# device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Pytorch device:", device)

Pytorch device: cuda


### Model

In [2]:
class FrozenLake8x8V0(nn.Module):
    def __init__(self, state_size, action_size, hidden_size=64):
        super(FrozenLake8x8V0, self).__init__()
        self.network = nn.Sequential(
            nn.Linear(state_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, action_size)
        )

    def forward(self, x):
        return self.network(x)

## Replay Buffer

In [3]:
class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)

    def push(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        state, action, reward, next_state, done = map(np.stack, zip(*batch))
        return state, action, reward, next_state, done

    def __len__(self):
        return len(self.buffer)

## Agent

In [4]:
class DQNAgent:
    def __init__(self, modelClass, state_size, action_size, lr=1e-3, gamma=0.99,
                 buffer_size=10000, batch_size=64, target_update=100):
        self.state_size = state_size
        self.action_size = action_size
        self.gamma = gamma
        self.batch_size = batch_size
        self.target_update = target_update

        # Q-network and target network
        self.q_net = modelClass(state_size, action_size).to(device)
        self.target_net = modelClass(state_size, action_size).to(device)
        self.optimizer = optim.Adam(self.q_net.parameters(), lr=lr)

        # Sync target network
        self.target_net.load_state_dict(self.q_net.state_dict())
        self.target_net.eval()

        self.memory = ReplayBuffer(buffer_size)
        self.step_count = 0

    def act(self, state, epsilon=0.0):
        if random.random() < epsilon:
            return random.randrange(self.action_size)
        state = torch.FloatTensor(state).unsqueeze(0).to(device)
        with torch.no_grad():
            q_values = self.q_net(state)
        return q_values.argmax().item()

    def remember(self, state, action, reward, next_state, done):
        self.memory.push(state, action, reward, next_state, done)

    def replay(self):
        if len(self.memory) < self.batch_size:
            return

        # sample batch
        states, actions, rewards, next_states, dones = self.memory.sample(self.batch_size)
        states = torch.FloatTensor(states).to(device)
        actions = torch.LongTensor(actions).to(device)
        rewards = torch.FloatTensor(rewards).to(device)
        next_states = torch.FloatTensor(next_states).to(device)
        dones = torch.BoolTensor(dones).to(device)

        # current q-values
        current_q_values = self.q_net(states).gather(1, actions.unsqueeze(1)).squeeze(1)

        # next q-values frm target network
        with torch.no_grad():
            next_q_values = self.target_net(next_states).max(1)[0]
            target_q_values = rewards + (self.gamma * next_q_values * (~dones))

        # compute loss
        loss = nn.MSELoss()(current_q_values, target_q_values)

        # Optimize
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        # update target network periodically
        self.step_count += 1
        if self.step_count % self.target_update == 0:
            self.target_net.load_state_dict(self.q_net.state_dict())


## Training Loop

In [5]:
def one_hot_state(s, size):
    vec = np.zeros(size)
    vec[s] = 1.0
    return vec

def train_dqn(modelClass, env_name="FrozenLake-v1", mapsize=4, is_slippery=False, episodes=2000, max_steps=100):
    env = gym.make(env_name, desc=generate_random_map(size=mapsize), is_slippery=is_slippery, render_mode=None)
    state_size = env.observation_space.n
    action_size = env.action_space.n  # 4 (left, down, right, up)

    agent = DQNAgent(modelClass=modelClass, state_size=state_size, action_size=action_size)

    scores = deque(maxlen=100)  # for moving average
    epsilon_start = 1.0
    epsilon_end = 0.01
    epsilon_decay = 0.999995

    epsilon = epsilon_start

    print("Starting training...")
    for episode in range(episodes):
        state, _ = env.reset()
        state = one_hot_state(state, state_size)
        total_reward = 0

        for t in range(max_steps):
            action = agent.act(state, epsilon)
            next_state, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated

            # FrozenLake gives sparse reward: 1 only if goal reached
            reward = reward - 0.01  # optional shaping to encourage speed

            next_state = one_hot_state(next_state, state_size)
            agent.remember(state, action, reward, next_state, done)
            agent.replay()

            state = next_state
            total_reward += reward

            if done:
                break

        scores.append(total_reward)
        epsilon = max(epsilon_end, epsilon_decay * epsilon)

        if episode % 100 == 0:
            avg_score = np.mean(scores)
            print(f"Episode {episode}, Avg Reward (last 100): {avg_score:.3f}, Epsilon: {epsilon:.3f}")


        # change the env, aka new random map
        env.close()
        env = gym.make(env_name, desc=generate_random_map(size=mapsize), is_slippery=is_slippery, render_mode=None)


    env.close()
    return agent, env


def evaluate_agent(agent, env, episodes=10, max_steps=100):
    state_size = env.observation_space.n

    success = 0
    for _ in range(episodes):
        state, _ = env.reset()
        state = one_hot_state(state, state_size)
        for _ in range(max_steps):
            action = agent.act(state, epsilon=0.0)  # greedy
            state, reward, terminated, truncated, _ = env.step(action)
            state = one_hot_state(state, state_size)
            if terminated or truncated:
                if reward == 1.0:
                    success += 1
                break
    print(f"\nSuccess rate over {episodes} episodes: {success}/{episodes} ({100 * success / episodes:.1f}%)")

In [6]:
agent, env = train_dqn(modelClass=FrozenLake8x8V0, mapsize=4, is_slippery=False, episodes=2000)

Starting training...
Episode 0, Avg Reward (last 100): 0.000, Epsilon: 0.995
Episode 100, Avg Reward (last 100): 0.140, Epsilon: 0.603
Episode 200, Avg Reward (last 100): 0.220, Epsilon: 0.365
Episode 300, Avg Reward (last 100): 0.300, Epsilon: 0.221
Episode 400, Avg Reward (last 100): 0.090, Epsilon: 0.134
Episode 500, Avg Reward (last 100): 0.170, Epsilon: 0.081
Episode 600, Avg Reward (last 100): 0.190, Epsilon: 0.049
Episode 700, Avg Reward (last 100): 0.310, Epsilon: 0.030
Episode 800, Avg Reward (last 100): 0.210, Epsilon: 0.018
Episode 900, Avg Reward (last 100): 0.280, Epsilon: 0.011
Episode 1000, Avg Reward (last 100): 0.300, Epsilon: 0.010
Episode 1100, Avg Reward (last 100): 0.150, Epsilon: 0.010
Episode 1200, Avg Reward (last 100): 0.300, Epsilon: 0.010
Episode 1300, Avg Reward (last 100): 0.260, Epsilon: 0.010
Episode 1400, Avg Reward (last 100): 0.320, Epsilon: 0.010
Episode 1500, Avg Reward (last 100): 0.300, Epsilon: 0.010
Episode 1600, Avg Reward (last 100): 0.340, Eps

In [7]:
# Evaluate
eval_env = gym.make("FrozenLake-v1", desc=generate_random_map(size=4), is_slippery=False, render_mode="human")
# eval_env = gym.make("FrozenLake-v1", is_slippery=False, render_mode="human")
evaluate_agent(agent, eval_env, episodes=2)
eval_env.close()

  from pkg_resources import resource_stream, resource_exists


KeyboardInterrupt: 

np.int64(16)