In [None]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque
from numpy import average
import random

#### Defining Basic Functions

In [None]:
GOAL_STATE = (1, 6)
START_STATE = (7, 2)
ACTIONS = ["up", "down", "left", "right"]
NUM_ACTIONS = len(ACTIONS)

def init_env() -> np.ndarray:
    """
    Function to initialize the environment matrix
    """
    # Environment matrix
    # regular = 0    wall = 1    red = 2     yellow = 3    start = 4    end = 5
    Env = np.zeros([10, 10])

    # Defining border walls
    Env[:, [0]] = np.ones([10, 1])
    Env[:, [9]] = np.ones([10, 1])
    Env[0, :] = np.ones([1, 10])
    Env[9, :] = np.ones([1, 10])

    # Defining cell properties inside the maze
    # wall = 1
    Env[2, 4] = 1
    Env[3, 3:7] = np.ones([1, 4])
    Env[3, 8] = 1
    Env[4, 4] = 1
    Env[5, 6] = 1
    Env[6, 1:4] = np.ones([1, 3])
    Env[6, 6] = 1
    Env[7, 6] = 1
    Env[8, 6] = 1

    # red = 2
    Env[3, 2] = 2
    Env[6, 4] = 2
    Env[7, 3] = 2
    Env[7, 7] = 2

    # yellow = 3
    Env[1, 2] = 3
    Env[3, 7] = 3
    Env[5, 2] = 3
    Env[5, 7] = 3

    # start = 4
    Env[START_STATE[0], START_STATE[1]] = 4

    # goal = 5
    Env[GOAL_STATE[0], GOAL_STATE[1]] = 5

    return Env

def get_all_states(Env):
    """
    Function to get all usable states in the environment
    """
    all_states = []
    for i in range(Env.shape[0]):
        for j in range(Env.shape[1]):
            if Env[i, j] != 1:  # should not be a wall
                all_states.append((i, j))
    
    return all_states

def get_as_matrix(values, Env, type):
    """
    Function to convert the policy dictionary to a matrix
    """
    matrix = np.full(Env.shape, '', dtype=object)
    all_states = get_all_states(Env)
    i = 0
    for state in all_states:
        if type == "policy":
            matrix[state[0], state[1]] = ACTIONS[values[i]]
        elif type == "q_values":
            matrix[state[0], state[1]] = str(np.round(values[i], 2))
        i += 1
    
    return matrix

def plot_env(Env, title, annot_matrix=False, show=True) -> None:
    """
    Function to plot the environment matrix
    """
    # Define colors
    colors = {
        0: [1, 1, 1],        # White
        1: [0, 0, 0],        # Black
        2: [0.55, 0, 0],     # Light Brown
        3: [0.96, 0.8, 0.6], # Dark Red
        4: [0, 0, 1],        # Green
        5: [0, 1, 0]         # Blue
    }


    rgb_maze = np.zeros((Env.shape[0], Env.shape[1], 3))
    for i in range(Env.shape[0]):
        for j in range(Env.shape[1]):
            rgb_maze[i, j] = colors.get(Env[i, j], [1, 1, 1])

    if annot_matrix is not False:
        annot_matrix = np.where(Env == 1, '', annot_matrix)

    plt.figure(figsize=(10, 10))
    sns.heatmap(Env,fmt="",  cmap=sns.color_palette([colors[i] for i in range(6)]), cbar=False,annot=annot_matrix, linewidths=0.5, linecolor='black')
    plt.xlabel("X-axis")
    plt.ylabel("Y-axis")
    plt.title(title)

    if show:
        plt.show()

def plot_policy(Env, policy, title) -> None:
    """
    Function to plot the optimal policy matrix
    """
    
    plot_env(Env, title, annot_matrix=False, show=False)
    for i in range(Env.shape[0]):
        for j in range(Env.shape[1]):
            if policy[i, j] == "up":
                plt.arrow(j+0.5, i+0.85, 0, -0.5, width=0.04, color='black')  # Up
            if policy[i, j] == "right":
                plt.arrow(j+0.15, i+0.5, 0.5, 0, width=0.04, color='black')  # Right
            if policy[i, j] == "down":
                plt.arrow(j+0.5, i+0.15, 0, 0.50, width=0.04, color='black')  # Down
            if policy[i, j] == "left":
                plt.arrow(j+0.85, i+0.5, -0.5, 0, width=0.04, color='black')  # Left

    plt.show()

def plot_optimal_path(Env, path, title) -> None:
    """
    Function to plot the optimal path
    """
    plot_env(Env, title, annot_matrix=False, show=False)
    
    for state_cr, direction in path:
        r = state_cr[0] # x_coordinate
        c = state_cr[1] # y_coordinate

        if direction == 'right':
            plt.arrow(c + 0.5, r + 0.5, 0.8, 0, width=0.04, color='black')   # Right
        if direction == 'left':
            plt.arrow(c + 0.5, r + 0.5, -0.8, 0, width=0.04, color='black')  # Left
        if direction == 'up':
            plt.arrow(c + 0.5, r + 0.5, 0, -0.8, width=0.04, color='black')  # Up
        if direction == 'down':
            plt.arrow(c + 0.5, r + 0.5, 0, 0.8, width=0.04, color='black')  # Down

    plt.show()

def get_reward(next_state_type, wall_hit) -> float:
    """
    Function to return the reward of an action based on next state and if wall was hit
    """
    next_state_type = int(next_state_type)
    reward = -1 # For taking an action

    if wall_hit:         # Wall
        reward += -0.8

    if next_state_type == 2:  # Red
        reward += -10
    elif next_state_type == 3:  # Yellow
        reward += -5
    elif next_state_type == 5:  # Goal
        reward += 100
    
    return reward

def get_next_state(Env, state, action) -> tuple:
    """
    Function to return the next state given the current state and action
    """
    i, j = state[0], state[1]
    wall_hit = False

    if action == "up":
        next_i, next_j = i - 1, j
    elif action == "down":
        next_i, next_j = i + 1, j
    elif action == "left":
        next_i, next_j = i, j - 1
    elif action == "right":
        next_i, next_j = i, j + 1

    if Env[next_i, next_j] == 1:  # Wall
        next_i, next_j = i, j
        wall_hit = True
    
    return next_i, next_j, wall_hit

def get_probabilities(action, p) -> list:
    """
    Function to return the probabilities of each action given the current action
    """
    probabilities = {"up": 0, "down": 0, "left": 0, "right": 0}
    action = list(probabilities.keys())[action]

    if action == "up":
        probabilities["up"] = 1 - p
        probabilities["down"] = 0
        probabilities["left"] = p / 2
        probabilities["right"] = p / 2
    elif action == "down":
        probabilities["up"] = 0
        probabilities["down"] = 1 - p
        probabilities["left"] = p / 2
        probabilities["right"] = p / 2
    elif action == "left":
        probabilities["up"] = p / 2
        probabilities["down"] = p / 2
        probabilities["left"] = 1 - p
        probabilities["right"] = 0
    elif action == "right":
        probabilities["up"] = p / 2
        probabilities["down"] = p / 2
        probabilities["left"] = 0
        probabilities["right"] = 1 - p
    
    return list(probabilities.values())

def get_optimal_path(Env, start, end, optimal_policy):
    """
    Function to get the optimal path from start to end
    """
    curr_state = start
    path = []
    visited_states = []
    while curr_state != end:
        if curr_state in visited_states:
            print("No optimal path found.")
            return path

        visited_states.append(curr_state)
        i, j = curr_state[0], curr_state[1]
        action = optimal_policy[i, j]
        path.append((curr_state, action))
        next_i, next_j, wall_hit = get_next_state(Env, curr_state, action)
        curr_state = (next_i, next_j)

    return path

def get_random_state(Env):
    """
    Function to get a random state from the environment
    """
    while True:
        i = random.randint(1, Env.shape[0] - 2)
        j = random.randint(1, Env.shape[1] - 2)
        if Env[i, j] != 1 and Env[i, j] != 5:  # should not be the goal state or a wall
            break
    
    return (i, j)

def calculate_epsilon(epsilon, ep_no):
    """
    Function to calculate the epsilon value for epsilon-greedy policy
    """
    return max(0.1, epsilon ** ep_no)

def calculate_moving_avg(metrics):
    avg = []
    for i in range(1, len(metrics)+1):
        m = min(25, i)
        avg.append(average(metrics[i-m:i]))

    return avg

def plot_avg(metrics, ylabel):
    fig = plt.figure()
    plt.plot(metrics)
    plt.xlabel("Episodes")
    plt.ylabel(ylabel)
    plt.title(f"No of episodes vs {ylabel}")
    return fig

DQN

In [None]:
class QNetwork(nn.Module):
    """
    Deep Q-Network (DQN) to approximate Q-values for each action given a state.
    Architecture:
    - Input Layer: state (x, y) → 2 nodes
    - Hidden Layers: Fully connected with ReLU activations
    - Output Layer: Q-values for all possible actions
    """
    def __init__(self):
        super(QNetwork, self).__init__()
        self.net = nn.Sequential(
            nn.Linear(2, 128),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Linear(128, 4)
        )

    def forward(self, x):
        return self.net(x)

class DuelingQNetwork(nn.Module):
    def __init__(self):
        super(DuelingQNetwork, self).__init__()

        # Feature extractor
        self.features = nn.Sequential(
            nn.Linear(2, 128),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.ReLU()
        )

        # Value stream (output = 1)
        self.value_stream = nn.Linear(128, 1)

        # Advantage stream (output = 4)
        self.advantage_stream = nn.Linear(128, 4)

    def forward(self, x):
        features = self.features(x)

        value = self.value_stream(features)
        advantage = self.advantage_stream(features)

        # Combine V(s) and A(s, a) to get Q(s, a)
        q_values = value + (advantage - advantage.mean(dim=1, keepdim=True))

        return q_values

class ReplayBuffer:
    """
    Replay Buffer to store past experiences (transitions)
    Helps in breaking correlation between consecutive experiences.
    """
    def __init__(self, max_size):
        self.buffer = deque(maxlen=max_size)

    def push(self, state, action, reward, next_state, done):
        """
        Store a transition tuple in the buffer.
        """
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size):
        """
        Sample a random mini-batch from the buffer for training.
        """
        batch = random.sample(self.buffer, batch_size)
        state, action, reward, next_state, done = map(torch.tensor, zip(*batch))
        return state.float(), action.long(), reward.float(), next_state.float(), done.float()

    def __len__(self):
        return len(self.buffer)

class DQNAgent:
    """
    DQN Agent that interacts with environment, stores experiences, 
    learns from them, and updates the Q-network.
    """
    def __init__(self, Env, lr, gamma, tau, buffer_size, batch_size, double_DQN = False, dueling_DQN = False):
        self.states = torch.tensor(get_all_states(Env), dtype=torch.float32)
        self.double_DQN = double_DQN

        if dueling_DQN:
            self.q_network = DuelingQNetwork()
            self.target_network = DuelingQNetwork()
        else:
            self.q_network = QNetwork()
            self.target_network = QNetwork()
        
        self.target_network.load_state_dict(self.q_network.state_dict())  # Initial sync

        self.optimizer = optim.Adam(self.q_network.parameters(), lr=lr)

        self.gamma = gamma
        self.tau = tau

        self.replay_buffer = ReplayBuffer(buffer_size)
        self.batch_size = batch_size

    def select_action(self, state, epsilon):
        """
        Selects an action using epsilon-greedy policy.
        """
        if np.random.rand() < epsilon:
            return np.random.randint(NUM_ACTIONS)  # Random action
        state = torch.FloatTensor(state).unsqueeze(0)
        return self.q_network(state).argmax().item()

    def get_policy(self):
        """
        Returns the optimal policy derived from the Q-network.
        """
        with torch.no_grad():
            q_values = self.q_network(self.states)
            return q_values.argmax(dim=1).numpy()

    def get_q_values(self):
        """
        Returns the Q-values for all actions given a state.
        """
        with torch.no_grad():
            q_values = self.q_network(self.states)
        return q_values.numpy().flatten()

    def train(self):
        """
        Trains the Q-network using a batch of experiences from the replay buffer.
        """
        if len(self.replay_buffer) < self.batch_size:
            return

        # Sample a batch
        state, action, reward, next_state, done = self.replay_buffer.sample(self.batch_size)

        # Current Q-value for taken action
        current_q_values = self.q_network(state).gather(1, action.unsqueeze(1)).squeeze(1)

        # Update main Q-network using target network
        with torch.no_grad():
            if self.double_DQN:
                armgax_next_action = self.q_network(next_state).argmax(1)
                next_q_values = self.target_network(next_state).gather(1, armgax_next_action.unsqueeze(1)).squeeze(1)
                target_q_values = reward + self.gamma * next_q_values * (1 - done)

            else:
                max_next_q_values = self.target_network(next_state).max(1)[0]
                target_q_values = reward + self.gamma * max_next_q_values * (1 - done)

        loss = nn.MSELoss()(current_q_values, target_q_values)

        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        # Soft update target network
        for target_param, param in zip(self.target_network.parameters(), self.q_network.parameters()):
            target_param.data.copy_(self.tau * param.data + (1.0 - self.tau) * target_param.data)

        return loss.item()

def simulate(Env, num_episodes, max_steps, buffer_size, lr, gamma, tau, batch_size, epsilon, p, steps_update, double_DQN = False, dueling_DQN = False):
    agent = DQNAgent(Env, lr, gamma, tau, buffer_size, batch_size, double_DQN, dueling_DQN)
    rewards, losses = [], []

    for ep_no in range(num_episodes):
        state = get_random_state(Env)
        total_reward, total_loss = 0, 0
        step = 0

        while step < max_steps or state != GOAL_STATE:
            step += 1
            epsilon_value = calculate_epsilon(epsilon, step) # calculating epsilon value for this step
            action = agent.select_action(state, epsilon_value) # select action using epsilon greedy policy
            probabilities = get_probabilities(action, p) # get probabilities for each action since env is stochastic
            action_taken = np.random.choice(ACTIONS, p=probabilities) # select actual action based on probabilities
            next_i, next_j, wall_hit = get_next_state(Env, state, action_taken) # get next state and check if wall was hit
            next_state = (next_i, next_j)
            reward = get_reward(Env[next_state], wall_hit) # get reward based on next state and wall hit
            if next_state == GOAL_STATE:
                done = 1
            else:
                done = 0

            # Store transition
            agent.replay_buffer.push(state, action, reward, next_state, done)

            # Train agent
            if step % steps_update == 0:
                loss = agent.train()
                if loss:
                    total_loss += loss

            state = next_state
            total_reward += reward


        rewards.append(total_reward)
        losses.append(total_loss)

    optimal_policy = agent.get_policy()
    q_values = agent.get_q_values()
    optimal_policy_matrix = get_as_matrix(optimal_policy, Env, "policy")
    q_values_matrix = get_as_matrix(q_values, Env, "q_values")
    avg_rewards = calculate_moving_avg(rewards)
    avg_losses = calculate_moving_avg(losses)

    return optimal_policy_matrix, q_values_matrix, avg_rewards, avg_losses

In [None]:
# DQN
num_episodes = 500
max_steps = 50
buffer_size = 5000
lr=1e-3
gamma=0.99
tau=1e-2
batch_size=64
epsilon=0.99
p = 0.025
steps_update = 1

Env = init_env()
optimal_policy, q_values, avg_rewards, avg_losses = simulate(Env, num_episodes, max_steps, buffer_size, lr, gamma, tau, batch_size, epsilon, p, steps_update)
optimal_path = get_optimal_path(Env, START_STATE, GOAL_STATE, optimal_policy)
plot_policy(Env, optimal_policy, "Optimal Policy")
plot_optimal_path(Env, optimal_path, "Optimal Path")
plot_avg(avg_rewards, "Average Rewards")
plot_avg(avg_losses, "Average Losses")
plot_env(Env, "State Values", q_values)

In [None]:
# Double DQN
num_episodes = 500
max_steps = 50
buffer_size = 5000
lr=1e-3
gamma=0.99
tau=1e-2
batch_size=64
epsilon=0.99
p = 0.025
steps_update = 1

Env = init_env()
optimal_policy, q_values, avg_rewards, avg_losses = simulate(Env, num_episodes, max_steps, buffer_size, lr, gamma, tau, batch_size, epsilon, p, steps_update, double_DQN=True)
optimal_path = get_optimal_path(Env, START_STATE, GOAL_STATE, optimal_policy)
plot_policy(Env, optimal_policy, "Optimal Policy")
plot_optimal_path(Env, optimal_path, "Optimal Path")
plot_avg(avg_rewards, "Average Rewards")
plot_avg(avg_losses, "Average Losses")
plot_env(Env, "State Values", q_values)

In [None]:
# Dueling DQN
num_episodes = 500
max_steps = 50
buffer_size = 5000
lr=1e-3
gamma=0.99
tau=1e-2
batch_size=64
epsilon=0.99
p = 0.025
steps_update = 1

Env = init_env()
optimal_policy, q_values, avg_rewards, avg_losses = simulate(Env, num_episodes, max_steps, buffer_size, lr, gamma, tau, batch_size, epsilon, p, steps_update, dueling_DQN=True)
optimal_path = get_optimal_path(Env, START_STATE, GOAL_STATE, optimal_policy)
plot_policy(Env, optimal_policy, "Optimal Policy")
plot_optimal_path(Env, optimal_path, "Optimal Path")
plot_avg(avg_rewards, "Average Rewards")
plot_avg(avg_losses, "Average Losses")
plot_env(Env, "State Values", q_values)