In [1]:
import gymnasium as gym
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import random
from collections import deque
from gymnasium.envs.toy_text.frozen_lake import generate_random_map
import matplotlib.pyplot as plt

In [2]:
# Create the FrozenLake environment
random.seed(21)
np.random.seed(21)
torch.manual_seed(21)
env = gym.make("FrozenLake-v1", desc=generate_random_map(size=5), is_slippery=False)
state_space = env.observation_space.n
action_space = env.action_space.n
env.action_space.seed(21)
env.observation_space.seed(21)  # Seed the observation space
env.reset(seed=21)

# Parameters
gamma = 0.99
epsilon = 1.0
epsilon_min = 0.1
epsilon_decay = 0.999
batch_size = 64
buffer_size = 10000
target_update_freq = 10
num_episodes = 500
history_size = 15 # Number of past states to consider

In [3]:
# Replay buffer
class ReplayBuffer:
    def __init__(self, capacity, history_size):
        self.buffer = deque(maxlen=capacity)
        self.history_size = history_size

    def add(self, transition):
        self.buffer.append(transition) #adding a transition to buffer

    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)  #Sample a batch of transitions uniformly
        actions, rewards, next_states, dones = zip(*[transition[1:] for transition in batch])

        state_histories, reward_histories = [], []

        for idx in range(batch_size):
            state_history, reward_history = self._get_history(idx)
            state_histories.append(state_history)
            reward_histories.append(reward_history)

        return (np.array(state_histories),  # Histories of states
                np.array(actions),          # Actions
                np.array(rewards),          # Current rewards
                np.array(next_states),      # Next states
                np.array(dones),            # Done flags
                np.array(reward_histories)) # Histories of rewards
    

    def _get_history(self, index):
        state_history = []
        reward_history = []

        for i in range(self.history_size):
            if index - i >= 0:
                state_history.insert(0, self.buffer[index - i][0]) 
                reward_history.insert(0, self.buffer[index - i][2])  
            else:
                state_history.insert(0, np.zeros_like(self.buffer[0][0]))
                reward_history.insert(0, 0)

        return np.array(state_history), np.array(reward_history)

    def __len__(self):
        return len(self.buffer)

In [4]:
replay_buffer = ReplayBuffer(buffer_size, history_size) #Calling the replaybuffer

In [5]:
# Converting state to one-hot encoding
def one_hot_state(state, num_states):
    one_hot = np.zeros(num_states)
    one_hot[state] = 1
    return one_hot

In [6]:
def custom_reward(state, action, reward, done, goal_state, visited_states):
    if int(state.argmax()) == int(goal_state.argmax()):
        return reward+100  # High reward for reaching the goal
    elif done:
        return -20  # Heavy penalty for falling
    if tuple(state) in visited_states:
        return reward-5   # Penalty for revisiting a state (looping)
    else:
        return 1

In [7]:
# Attention mechanism
class AttentionMechanism(nn.Module):
    def __init__(self, state_dim, hidden_dim):
        super(AttentionMechanism, self).__init__()
        self.key_layer = nn.Linear(state_dim, hidden_dim)
        self.query_layer = nn.Linear(state_dim, hidden_dim)
        self.value_layer = nn.Linear(1 , hidden_dim)

    def forward(self, query, keys, values):
        query = self.query_layer(query).unsqueeze(1)  # (batch_size, 1, hidden_dim)
        keys = self.key_layer(keys)                   # (batch_size, seq_len, hidden_dim)
        values = self.value_layer(values)             # (batch_size, seq_len, hidden_dim)  

        # Computing attention scores
        scores = torch.bmm(query, keys.transpose(1, 2)/ (keys.size(-1) ** 0.5))  # Dot product - relevance score # (batch_size, 1, seq_len)
        

        include_rewards = values[..., 0]   # Remove last dim, shape: (batch_size, seq_len)
        
        include_rewards = include_rewards/ (include_rewards.norm(dim=-1, keepdim=True) + 1e-8)  # Normalize
        include_rewards= include_rewards.unsqueeze(1)
        scores = scores + include_rewards  # Shape: (batch_size, 1, seq_len)

        attention_weights = torch.softmax(scores, dim=-1)  # (batch_size, 1, seq_len)

        context = torch.bmm(attention_weights, values).squeeze(1)  # Weighted sum  # (batch_size, hidden_dim)
        
        return context

In [8]:
# DQN with Attention
class AttentionDQN(nn.Module):
    def __init__(self, state_dim, action_dim, hidden_dim=256):
        super(AttentionDQN, self).__init__()
        self.attention = AttentionMechanism(state_dim, hidden_dim)
        self.fc = nn.Sequential(
            nn.Linear(hidden_dim + state_dim, 256),
            nn.ReLU(),
            nn.Linear(256, 256),
            nn.ReLU(),
            nn.Linear(256, 256),
            nn.ReLU(),
            nn.Linear(256, action_dim))

    def forward(self, current_state, history, rewards):
        """
        current_state: Current state (batch_size, state_dim)
        history: Past states (batch_size, seq_len, state_dim)
        rewards: Past rewards (batch_size, seq_len, 1)
        """
        context = self.attention(current_state, history, rewards)  # Applying attention
        combined = torch.cat([current_state, context], dim=-1)  # Combine context with current state
        return self.fc(combined)

In [9]:
policy_net = AttentionDQN(state_space, action_space)
target_net = AttentionDQN(state_space, action_space)
target_net.load_state_dict(policy_net.state_dict())
optimizer = optim.Adam(policy_net.parameters(), lr=0.1)

In [10]:
#Training Loop
total_rewards = []
success_count = 0
steps_to_goal = []

for episode in range(num_episodes):
    state, _ = env.reset()
    state = one_hot_state(state, state_space)
    history = deque(maxlen=history_size)
    rewards = deque(maxlen=history_size)
    done = False
    total_reward = 0
    step_count = 0
    goal_state = state_space - 1  # Assumption - the last state corresponds to the goal
    goal_state = one_hot_state(goal_state, state_space)
    visited_states = set()

    while not done:
        while len(history) < history_size:  
            history.append(np.zeros_like(state))  # Padding the history with zeros before enough steps are taken.
            rewards.append(0)

        #converting to tensors
        history_tensor = torch.tensor(np.array(history), dtype=torch.float32).unsqueeze(0)
        rewards_tensor = torch.tensor(np.array(rewards), dtype=torch.float32).unsqueeze(-1).unsqueeze(0)
        state_tensor = torch.tensor(state, dtype=torch.float32).unsqueeze(0)

        # Epsilon-greedy action selection
        if random.random() < epsilon:
            action = random.choice(range(action_space))  # Exploration
        else:
            with torch.no_grad():
                action = torch.argmax(policy_net(state_tensor, history_tensor, rewards_tensor)).item()  # Exploitation

        next_state, reward, done, _, _ = env.step(action)
        next_state = one_hot_state(next_state, state_space)

        reward = custom_reward(next_state, action, reward, done, goal_state, visited_states)
        total_reward += reward
        step_count += 1
 
        visited_states.add(tuple(next_state))  #updating visiting states for checking looping

        history.append(state)  
        rewards.append(reward)  
        replay_buffer.add((state, action, reward, next_state, done)) # Storing transition in replay buffer

        state = next_state

        # Training the Q-Network
        if len(replay_buffer) > batch_size:
            state_histories, actions, rewards_batch, next_states, dones, reward_histories = replay_buffer.sample(batch_size)

            state_histories = torch.tensor(state_histories, dtype=torch.float32)
            reward_histories = torch.tensor(reward_histories, dtype=torch.float32).unsqueeze(-1)
            actions = torch.tensor(actions, dtype=torch.long)
            rewards_batch = torch.tensor(rewards_batch, dtype=torch.float32)
            next_states = torch.tensor(next_states, dtype=torch.float32)
            dones = torch.tensor(dones, dtype=torch.float32)

            # Computing target Q-values
            with torch.no_grad():
                next_state_histories = torch.cat((state_histories[:, 1:, :], next_states.unsqueeze(1)), dim=1)
                next_reward_histories = torch.cat((reward_histories[:, 1:, :], rewards_batch.unsqueeze(1).unsqueeze(-1)), dim=1)
                target_q_values = rewards_batch + gamma * (1 - dones) * torch.max(target_net(next_states, next_state_histories, next_reward_histories), dim=1)[0]

            # Computing current Q-values
            current_q_values = policy_net(state_histories[:, -1, :], state_histories, reward_histories).gather(1, actions.unsqueeze(1)).squeeze(1)

            loss = nn.MSELoss()(current_q_values, target_q_values)
            optimizer.zero_grad()
            loss.backward()
            torch.nn.utils.clip_grad_norm_(policy_net.parameters(), max_norm=1.0) #gradient clipping
            optimizer.step()

    epsilon = max(epsilon * epsilon_decay, epsilon_min)  #updating epsilon

    if episode % target_update_freq == 0:  #Updating target_net
        target_net.load_state_dict(policy_net.state_dict())

    total_rewards.append(total_reward)
    if done and int(state.argmax()) == int(goal_state.argmax()):
        success_count += 1
        steps_to_goal.append(step_count)

    print(f"Episode {episode + 1}, Total Reward: {total_reward}, Steps: {step_count}, Goal Reached: {'Yes' if done and int(state.argmax()) == int(goal_state.argmax()) else 'No'}")

# Calculating metrics
avg_reward = np.mean(total_rewards)
success_rate = (success_count / num_episodes) * 100
avg_steps = np.mean(steps_to_goal) if steps_to_goal else float('inf')

print("\n--- Training Metrics ---")
print(f"Average Reward: {avg_reward:.2f}")
print(f"Success Rate: {success_rate:.2f}%")
print(f"Average Steps to Goal (for successful episodes): {avg_steps:.2f}")
print("------------------------")

# Saving the trained model
torch.save(policy_net.state_dict(), "model/Attention_DQN_policy_net.pth")
print("Training complete. Model saved.")


Episode 1, Total Reward: -62.0, Steps: 13, Goal Reached: No
Episode 2, Total Reward: -24.0, Steps: 3, Goal Reached: No
Episode 3, Total Reward: -46.0, Steps: 11, Goal Reached: No
Episode 4, Total Reward: 9.0, Steps: 35, Goal Reached: Yes
Episode 5, Total Reward: -24.0, Steps: 3, Goal Reached: No
Episode 6, Total Reward: -20, Steps: 1, Goal Reached: No
Episode 7, Total Reward: 83.0, Steps: 13, Goal Reached: Yes
Episode 8, Total Reward: -74.0, Steps: 19, Goal Reached: No
Episode 9, Total Reward: -32.0, Steps: 13, Goal Reached: No
Episode 10, Total Reward: -22.0, Steps: 5, Goal Reached: No
Episode 11, Total Reward: -19, Steps: 2, Goal Reached: No
Episode 12, Total Reward: -24.0, Steps: 3, Goal Reached: No
Episode 13, Total Reward: -19, Steps: 2, Goal Reached: No
Episode 14, Total Reward: -27.0, Steps: 12, Goal Reached: No
Episode 15, Total Reward: -17, Steps: 4, Goal Reached: No
Episode 16, Total Reward: -33.0, Steps: 6, Goal Reached: No
Episode 17, Total Reward: -331.0, Steps: 80, Goal R

KeyboardInterrupt: 

In [11]:
#Testing Loop
def test_attention_dqn(env, policy_net, state_space, action_space, history_size, custom_reward, num_episodes=100):
    
    total_rewards_test = []
    success_count_test = 0
    steps_to_goal_test = []

    policy_net.load_state_dict(torch.load("model/Attention_DQN_policy_net.pth"))
    policy_net.eval()

    for episode in range(num_episodes):
        state, _ = env.reset()
        state = one_hot_state(state, state_space)
        history = deque(maxlen=history_size)
        rewards = deque(maxlen=history_size)
        done = False
        total_reward = 0
        step_count = 0
        goal_state = one_hot_state(state_space - 1, state_space)
        visited_states = set()

        while not done:
            while len(history) < history_size:
                history.append(np.zeros_like(state))
                rewards.append(0)

            history_tensor = torch.tensor(np.array(history), dtype=torch.float32).unsqueeze(0)
            rewards_tensor = torch.tensor(np.array(rewards), dtype=torch.float32).unsqueeze(-1).unsqueeze(0)
            state_tensor = torch.tensor(state, dtype=torch.float32).unsqueeze(0)

            if random.random() < 0.1:  # 10% chance to explore
                     action = random.choice(range(action_space))
            else: 
                with torch.no_grad():
                    action = torch.argmax(policy_net(state_tensor, history_tensor, rewards_tensor)).item()

            next_state, reward, done, _, _ = env.step(action)
            next_state = one_hot_state(next_state, state_space)

            reward = custom_reward(next_state, action, reward, done, goal_state, visited_states)
            total_reward += reward
            step_count += 1

            visited_states.add(tuple(next_state))

            history.append(state)
            rewards.append(reward)

            state = next_state

        # Updating testing metrics
        total_rewards_test.append(total_reward)
        if done and int(state.argmax()) == int(goal_state.argmax()):
            success_count_test += 1
            steps_to_goal_test.append(step_count)

        print(f"Test Episode {episode + 1}, Total Reward: {total_reward}, Steps: {step_count}, Goal Reached: {'Yes' if done and int(state.argmax()) == int(goal_state.argmax()) else 'No'}")

    # Calculating test metrics
    avg_reward_test = np.mean(total_rewards_test)
    success_rate_test = (success_count_test / num_episodes) * 100
    avg_steps_test = np.mean(steps_to_goal_test) if steps_to_goal_test else float('inf')

    print("\n--- Testing Metrics ---")
    print(f"Average Reward: {avg_reward_test:.2f}")
    print(f"Success Rate: {success_rate_test:.2f}%")
    print(f"Average Steps to Goal (for successful episodes): {avg_steps_test:.2f}")
    print("------------------------")

    # Return metrics as a dictionary
    metrics = {"Average Reward": avg_reward_test,
        "Success Rate (%)": success_rate_test,
        "Average Steps to Goal": avg_steps_test}

    return metrics


In [12]:
print("Testing the trained policy...")
Attention_DQN_metrics = test_attention_dqn(env, policy_net, state_space, action_space, history_size, custom_reward, num_episodes=200)

Testing the trained policy...
Test Episode 1, Total Reward: -205.0, Steps: 73, Goal Reached: Yes


  policy_net.load_state_dict(torch.load("model/Attention_DQN_policy_net.pth"))


Test Episode 2, Total Reward: -2658.0, Steps: 566, Goal Reached: Yes
Test Episode 3, Total Reward: -2674.0, Steps: 568, Goal Reached: Yes
Test Episode 4, Total Reward: -1506.0, Steps: 332, Goal Reached: Yes
Test Episode 5, Total Reward: -829.0, Steps: 199, Goal Reached: Yes
Test Episode 6, Total Reward: -4035.0, Steps: 839, Goal Reached: Yes
Test Episode 7, Total Reward: -2664.0, Steps: 566, Goal Reached: Yes
Test Episode 8, Total Reward: -3429.0, Steps: 719, Goal Reached: Yes
Test Episode 9, Total Reward: -1464.0, Steps: 326, Goal Reached: Yes
Test Episode 10, Total Reward: -847.0, Steps: 199, Goal Reached: Yes
Test Episode 11, Total Reward: -20, Steps: 1, Goal Reached: No
Test Episode 12, Total Reward: -3845.0, Steps: 801, Goal Reached: Yes
Test Episode 13, Total Reward: -1076.0, Steps: 246, Goal Reached: Yes
Test Episode 14, Total Reward: -1203.0, Steps: 275, Goal Reached: Yes
Test Episode 15, Total Reward: -526.0, Steps: 136, Goal Reached: Yes
Test Episode 16, Total Reward: -381.0,

Implementation of Metrics