In [None]:
import numpy as np
import pandas as pd
import random
import heapq
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque
import matplotlib.pyplot as plt

# A* algorithm to find the shortest path between two points
def astar(maze, start, goal):
    def heuristic(a, b):
        return abs(a[0] - b[0]) + abs(a[1] - b[1])

    def get_neighbors(x, y):
        neighbors = []
        directions = [(-1, 0), (1, 0), (0, -1), (0, 1)]
        for dx, dy in directions:
            nx, ny = x + dx, y + dy
            if 0 <= nx < len(maze) and 0 <= ny < len(maze[0]) and maze[nx][ny] != 1:
                neighbors.append((nx, ny))

        return neighbors

    open_list = []
    heapq.heappush(open_list, (0 + heuristic(start, goal), 0, start))
    came_from = {}
    g_score = {start: 0}
    f_score = {start: heuristic(start, goal)}

    while open_list:
        _, g, current = heapq.heappop(open_list)

        if current == goal:
            path = []
            while current in came_from:
                path.append(current)
                current = came_from[current]
            path.append(start)
            path.reverse()
            return path

        for neighbor in get_neighbors(*current):
            tentative_g_score = g + 1
            if neighbor not in g_score or tentative_g_score < g_score[neighbor]:
                came_from[neighbor] = current
                g_score[neighbor] = tentative_g_score
                f_score[neighbor] = g_score[neighbor] + heuristic(neighbor, goal)
                heapq.heappush(open_list, (f_score[neighbor], g_score[neighbor], neighbor))

    return []









In [None]:
#Environment creation consisting of pins, ed_point, start_point
def create_environment(n, num_pins):
    if n <= 1:
        raise ValueError("Maze size must be greater than 1.")
    if num_pins >= n * n:
        raise ValueError("Too many pins for the maze size. Reduce the number of pins.")

    maze = np.zeros((n, n), dtype=int)
    start_point = (random.randint(0, n - 1), random.randint(0, n - 1))
    while True:
        end_point = (random.randint(0, n - 1), random.randint(0, n - 1))
        if end_point != start_point:
            break
    maze[start_point[0]][start_point[1]] = 2
    maze[end_point[0]][end_point[1]] = 2

    pins = []
    #pins.append(start_point)
    while len(pins) < num_pins:
        pin_location = (random.randint(0, n - 1), random.randint(0, n - 1))
        if maze[pin_location[0]][pin_location[1]] == 0:
            maze[pin_location[0]][pin_location[1]] = 1
            pins.append(pin_location)

    # Visualize
    '''
    plt.imshow(maze, cmap='viridis', origin='upper')
    plt.colorbar(label='Legend (0=Empty, 1=Pin, 2=Start, 3=End)')
    plt.grid(visible=True, color='white', linestyle='-', linewidth=0.5)
    plt.title("Maze with Start, End, and Pins")
    plt.show()'''

    return maze, start_point, end_point, pins

In [None]:
#Deep Q network
class DQN(nn.Module):
    def __init__(self, state_dims, num_actions):
        super(DQN, self).__init__()
        self.state_dims = state_dims
        self.num_actions = num_actions

        self.q_network = nn.Sequential(
            nn.Linear(self.state_dims, 128),
            nn.ReLU(),
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Linear(64, self.num_actions)
        )

    def forward(self, state):
        return self.q_network(state)


In [None]:
class MazeEnvironment:
    def __init__(self, maze_size, num_pins):
        self.maze, self.start_point, self.end_point, self.pins = create_environment(maze_size, num_pins)
        self.state_dims = maze_size * maze_size
        self.num_actions = len(self.pins)
        self.state = self.start_point
        self.agent_path = []

    def reset(self):
        self.maze, self.start_point, self.end_point, self.pins = create_environment(len(self.maze),num_actions)
        self.state = self.start_point
        self.agent_path = []
        return self._get_state()

    def step(self, action,epsilon):

        if len(self.pins) == 0:
            raise ValueError("No pins left to select action.")
        if action < 0 or action >= len(self.pins):
            action = random.randint(0, len(self.pins) - 1)
            #raise ValueError(f"Action {action} is out of bounds. Valid range: 0 to {len(self.pins)-1}")
        #print(f"before action : {action} pins left are {len(self.pins)}")

        pair = self.pins[action]
        if env.maze[self.start_point[0]][self.start_point[1]] == 1 or env.maze[self.end_point[0]][self.end_point[1]] == 1:
            #print(f"goal is blocked! Goal: {self.end_point}")
            env.maze[self.start_point[0]][self.start_point[1]] == 2
            env.maze[self.end_point[0]][self.end_point[1]] == 2
        path = astar(self.maze, pair, self.end_point)

        #print(path)
        if path:

            for (x,y) in path:
              env.maze[x][y]=1      #dynamic route blockages

              reward = 1 - 0.01 * len(path) #accounting for the path length instead of only successful connection
        else:
              reward=-0.1
        #print("Maze after routing pin:", action)
        '''
        for row in env.maze:
            print(row)'''
        self.pins.remove(pair)
        #print(f"after action : {action} pins left are {len(self.pins)}")
        done = len(self.pins) == 0
        self.agent_path.extend(path)
        return self._get_state(), reward, done

    def _get_state(self):
        state = np.zeros_like(self.maze)
        state[self.state[0]][self.state[1]] = 1
        return state.flatten()

def visualize_routing_after_episode(maze, start, end, agent_path, pins):
    maze_copy = maze.copy()
    maze_copy[start[0], start[1]] = 2
    maze_copy[end[0], end[1]] = 3
    for pin in pins:
        maze_copy[pin[0], pin[1]] = 4

    plt.imshow(maze_copy)
    #plt.colorbar(label='Legend (0=Empty, 2=Start, 3=End, 4=Pin)')
    plt.title("Maze After Episode")
    plt.show()

    for (x, y) in agent_path:
        maze_copy[x, y] = 5
    plt.imshow(maze_copy)
    #plt.colorbar(label='Legend (0=Empty, 2=Start, 3=End, 4=Pin, 5=Path)')
    plt.title("Maze with Routing Path After Episode")
    plt.show()


In [None]:
def train(agent, env, num_episodes=50, gamma=0.9, batch_size=30, epsilon=1.0, epsilon_decay=0.995, min_epsilon=0.01):
    optimizer = optim.Adam(agent.parameters(), lr=0.001)
    replay_buffer = deque(maxlen=3000)

    episode_rewards = []
    losses = []

    for episode in range(num_episodes):

        #print("new Episode :+++++++++++++++++++++++++")
        env = MazeEnvironment(env.maze.shape[0], len(env.pins))
        state = env.reset()

        state = torch.tensor(state, dtype=torch.float32).unsqueeze(0)
        total_reward = 0
        done = False

        while not done:
            if len(env.pins) == 0:
                print(f"Episode {episode}: No pins left, ending episode.")
                done = True
                break
            if random.random() < epsilon:

                action = random.randint(0, len(env.pins) - 1)
                #print(f"action selected {action}")
            else:
                with torch.no_grad():
                    q_values = agent(state)
                    action = torch.argmax(q_values[:, :len(env.pins)]).item()

            next_state, reward, done = env.step(action,epsilon)
            next_state = torch.tensor(next_state, dtype=torch.float32).unsqueeze(0)
            total_reward += reward

            replay_buffer.append((state, action, reward, next_state, done))

            state = next_state

            if len(replay_buffer) >= batch_size:
                batch = random.sample(replay_buffer, batch_size)
                for s, a, r, next_s, d in batch:
                    q_values = agent(s)
                    next_q_values = agent(next_s)
                    target = r + gamma * torch.max(next_q_values) * (1 - d)
                    loss = nn.MSELoss()(q_values[0][a], target)
                    optimizer.zero_grad()
                    loss.backward()
                    optimizer.step()

                losses.append(loss.item())

        episode_rewards.append(total_reward)
        if done:
            print(f"Episode {episode} done! Visualizing the path...")
            visualize_routing_after_episode(env.maze, env.start_point, env.end_point, env.agent_path, env.pins)
        epsilon = max(min_epsilon, epsilon * epsilon_decay)
        #if episode%5==0:
        print(f"Episode {episode}/{num_episodes} - Reward: {total_reward}")

    return episode_rewards, losses


def visualize_training(episode_rewards, losses):
    plt.figure(figsize=(12, 6))
    plt.plot(losses, label='Loss per Episode')
    plt.xlabel('Episodes')
    plt.ylabel('Loss')
    plt.title('Loss per Episode during Training')
    plt.legend()
    plt.show()

    plt.figure(figsize=(12, 6))
    plt.plot(episode_rewards, label='rewards')
    plt.title('rewards ')
    plt.legend()
    plt.show()

    rewards_smoothed = pd.Series(episode_rewards).rolling(window=10).mean()
    plt.plot(rewards_smoothed, label="Smoothed Rewards")
    plt.show()






In [None]:
maze_size = 100
num_pins = 25
env = MazeEnvironment(maze_size, num_pins)

state_dims = env.state_dims
num_actions = env.num_actions
agent = DQN(state_dims, num_actions)

episode_rewards, losses = train(agent, env)
visualize_training(episode_rewards, losses)