In [None]:
!pip install numpy tensorflow faiss-cpu matplotlib

In [None]:
import imageio
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import faiss
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
from matplotlib.patches import Patch
from matplotlib.colors import ListedColormap
import random
from collections import deque
import time
from PIL import Image
import imageio
import glob
import os

In [None]:
# Set random seeds for reproducibility
np.random.seed(42)
torch.manual_seed(42)

# Maze Environment
class MazeEnvironment:
    def __init__(self, size=10):
        self.size = size
        self.maze = np.zeros((size, size), dtype=int)  # 0: free, 1: wall
        self.start = (1, 1)
        self.goal = (size-2, size-2)
        self.agent_pos = self.start
        self.generate_maze()

    def generate_maze(self):
        num_walls = int(0.25 * self.size * self.size)
        for _ in range(num_walls):
            x, y = np.random.randint(0, self.size, 2)
            if (x, y) != self.start and (x, y) != self.goal:
                self.maze[x, y] = 1
        self.maze[self.start] = 0
        self.maze[self.goal] = 0

    def get_state(self):
        x, y = self.agent_pos
        state = np.zeros((3, 3))
        for i in range(-1, 2):
            for j in range(-1, 2):
                xi, yj = x + i, y + j
                if 0 <= xi < self.size and 0 <= yj < self.size:
                    state[i+1, j+1] = self.maze[xi, yj]
                else:
                    state[i+1, j+1] = 1
        return np.array([x, y] + state.flatten().tolist(), dtype=np.float32)

    def step(self, action):
        moves = [(-1, 0), (0, 1), (1, 0), (0, -1)]
        dx, dy = moves[action]
        new_pos = (self.agent_pos[0] + dx, self.agent_pos[1] + dy)

        reward = -0.1
        done = False

        if (0 <= new_pos[0] < self.size and 0 <= new_pos[1] < self.size and
                self.maze[new_pos] != 1):
            self.agent_pos = new_pos

        if self.agent_pos == self.goal:
            reward = 10.0
            done = True

        return self.get_state(), reward, done

    def reset(self):
        self.agent_pos = self.start
        return self.get_state()

# DQN Model
class DQN(nn.Module):
    def __init__(self, state_size, action_size):
        super(DQN, self).__init__()
        self.network = nn.Sequential(
            nn.Linear(state_size, 64),
            nn.ReLU(),
            nn.Linear(64, 64),
            nn.ReLU(),
            nn.Linear(64, action_size)
        )

    def forward(self, x):
        return self.network(x)

# DQN Agent
class DQNAgent:
    def __init__(self, state_size, action_size):
        self.state_size = state_size
        self.action_size = action_size
        self.memory = deque(maxlen=5000)
        self.gamma = 0.99
        self.epsilon = 1.0
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.995
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.model = DQN(state_size, action_size).to(self.device)
        self.target_model = DQN(state_size, action_size).to(self.device)
        self.optimizer = optim.Adam(self.model.parameters(), lr=0.0005)
        self.loss_fn = nn.MSELoss()
        self.update_target_model()
        self.faiss_index = faiss.IndexFlatL2(state_size)
        self.faiss_memory = deque(maxlen=10000)
        self.faiss_id = 0

    def update_target_model(self):
        self.target_model.load_state_dict(self.model.state_dict())

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))
        state_vec = state.astype(np.float32)
        if len(self.faiss_memory) >= 10000:
            self.faiss_index.reset()
            vectors = np.array([m[1].astype(np.float32) for m in self.faiss_memory])
            self.faiss_index.add(vectors)
        self.faiss_index.add(np.array([state_vec]))
        self.faiss_memory.append((self.faiss_id, state, action, reward, next_state, done))
        self.faiss_id += 1

    def act(self, state):
        if np.random.rand() <= self.epsilon:
            return random.randrange(self.action_size)
        state = torch.FloatTensor(state).unsqueeze(0).to(self.device)
        with torch.no_grad():
            q_values = self.model(state)
        return q_values.argmax().item()

    def replay(self, batch_size):
        if len(self.memory) < batch_size:
            return
        minibatch = random.sample(self.memory, batch_size)
        states = torch.FloatTensor(np.array([t[0] for t in minibatch])).to(self.device)
        actions = torch.LongTensor([t[1] for t in minibatch]).to(self.device)
        rewards = torch.FloatTensor([t[2] for t in minibatch]).to(self.device)
        next_states = torch.FloatTensor(np.array([t[3] for t in minibatch])).to(self.device)
        dones = torch.FloatTensor([t[4] for t in minibatch]).to(self.device)

        q_values = self.model(states)
        next_q_values = self.target_model(next_states)
        targets = q_values.clone()
        for i in range(batch_size):
            targets[i, actions[i]] = rewards[i] + self.gamma * next_q_values[i].max() * (1 - dones[i])

        self.optimizer.zero_grad()
        loss = self.loss_fn(q_values, targets)
        loss.backward()
        self.optimizer.step()

        # FAISS-based replay
        if self.faiss_index.ntotal > 0:
            state_sample = states[np.random.randint(0, batch_size)].cpu().numpy()
            D, I = self.faiss_index.search(np.array([state_sample.astype(np.float32)]), k=1)
            if I[0][0] >= 0:
                idx = I[0][0]
                _, s, a, r, ns, d = self.faiss_memory[idx]
                s = torch.FloatTensor(s).unsqueeze(0).to(self.device)
                ns = torch.FloatTensor(ns).unsqueeze(0).to(self.device)
                with torch.no_grad():
                    next_q = self.target_model(ns).max().item()
                target = r if d else r + self.gamma * next_q
                q_val = self.model(s)
                target_q = q_val.clone()
                target_q[0, a] = target
                self.optimizer.zero_grad()
                loss = self.loss_fn(q_val, target_q)
                loss.backward()
                self.optimizer.step()

        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

# Training
def train_dqn(episodes=500):
    env = MazeEnvironment(size=10)
    state_size = 2 + 3*3
    action_size = 4
    agent = DQNAgent(state_size, action_size)
    batch_size = 64
    success_count = 0
    last_rewards = deque(maxlen=20)
    start_time = time.time()

    for e in range(episodes):
        state = env.reset()
        total_reward = 0
        for step in range(200):
            action = agent.act(state)
            next_state, reward, done = env.step(action)
            agent.remember(state, action, reward, next_state, done)
            state = next_state
            total_reward += reward
            if done:
                success_count += 1
                break

        agent.replay(batch_size)
        if (e + 1) % 10 == 0:
            agent.update_target_model()

        last_rewards.append(total_reward)
        elapsed_time = time.time() - start_time
        print(f"Episode: {e+1}/{episodes}, Reward: {total_reward:.2f}, Epsilon: {agent.epsilon:.2f}, "
              f"Successes: {success_count}, Time: {elapsed_time:.0f}s")

        if len(last_rewards) == 20 and sum(1 for r in last_rewards if r > 5) >= 18:
            print("Early stopping: Agent consistently reaches goal.")
            break

    print(f"Training completed in {time.time() - start_time:.0f} seconds.")
    return env, agent

# Visualization
def visualize_path(env, agent):
    path = []
    state = env.reset()
    env.maze[env.goal] = 2
    done = False
    steps = 0
    max_steps = 200

    while not done and steps < max_steps:
        path.append(env.agent_pos)
        action = agent.act(state)
        state, _, done = env.step(action)
        steps += 1

    maze_display = env.maze.copy()
    for pos in path:
        if maze_display[pos] not in [2]:
            maze_display[pos] = 3
    maze_display[env.start] = 4

    cmap = ListedColormap(['white', 'black', 'red', 'blue', 'green'])
    plt.figure(figsize=(8, 8))
    plt.imshow(maze_display, cmap=cmap, interpolation='none')
    start_row, start_col = env.start
    goal_row, goal_col = env.goal
    plt.text(start_col, start_row, 'S', ha='center', va='center', color='black', fontsize=12, fontweight='bold')
    plt.text(goal_col, goal_row, 'G', ha='center', va='center', color='white', fontsize=12, fontweight='bold')
    legend_elements = [
        Patch(facecolor='white', label='Free'),
        Patch(facecolor='black', label='Wall'),
        Patch(facecolor='red', label='Goal'),
        Patch(facecolor='blue', label='Path'),
        Patch(facecolor='green', label='Start')
    ]
    plt.legend(handles=legend_elements, loc='upper right')
    plt.grid(color='gray', linestyle='--', linewidth=0.5)
    plt.title("10x10 Maze with Agent's Path")
    plt.show()

# Run
if __name__ == "__main__":
    env, agent = train_dqn(episodes=1000)
    visualize_path(env, agent)

In [None]:
# Function to compute grid cell size in pixels
def get_grid_cell_size(fig_width_inch=8, grid_size=10, dpi=None):
    # Get DPI from matplotlib if not provided
    if dpi is None:
        fig = plt.figure(figsize=(fig_width_inch, fig_width_inch))
        dpi = fig.dpi
        plt.close(fig)
    # Calculate pixel size of figure
    fig_width_px = fig_width_inch * dpi
    # Calculate pixel size of one grid cell
    cell_size_px = fig_width_px / grid_size
    return int(cell_size_px)

# Function to resize PNG image to grid cell size
def resize_image(image_path, cell_size_px):
    img = Image.open(image_path)
    # Resize to cell_size_px x cell_size_px, maintaining aspect ratio with padding if needed
    img = img.resize((cell_size_px, cell_size_px), Image.LANCZOS)
    # Convert to RGBA if not already
    if img.mode != 'RGBA':
        img = img.convert('RGBA')
    return img

def save_frames_as_images(env, agent, output_dir="frames", start_img_path='start.png',
                          goal_img_path='goal.png', agent_img_path='agent.png'):
    # Create output directory if it doesn't exist
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    state = env.reset()
    maze_display = env.maze.copy()  # Base maze with 0 (free) and 1 (walls)
    done = False
    steps = 0
    max_steps = 200

    # Compute grid cell size
    cell_size_px = get_grid_cell_size(fig_width_inch=8, grid_size=10)

    # Load and resize images
    start_img = resize_image(start_img_path, cell_size_px)
    goal_img = resize_image(goal_img_path, cell_size_px)
    agent_img = resize_image(agent_img_path, cell_size_px)

    # Set up plot
    cmap = ListedColormap(['white', 'black'])  # Only for maze (free/walls)
    fig, ax = plt.subplots(figsize=(8, 8))

    while not done and steps < max_steps:
        # Plot maze
        ax.clear()
        ax.imshow(maze_display, cmap=cmap, interpolation='none')

        # Place start image
        start_row, start_col = env.start
        ax.imshow(start_img, extent=(start_col - 0.5, start_col + 0.5, start_row + 0.5, start_row - 0.5), zorder=2)

        # Place goal image
        goal_row, goal_col = env.goal
        ax.imshow(goal_img, extent=(goal_col - 0.5, goal_col + 0.5, goal_row + 0.5, goal_row - 0.5), zorder=2)

        # Place agent image at current position
        agent_row, agent_col = env.agent_pos
        ax.imshow(agent_img, extent=(agent_col - 0.5, agent_col + 0.5, agent_row + 0.5, agent_row - 0.5), zorder=1)

        # Add grid and labels
        ax.grid(color='gray', linestyle='--', linewidth=0.5)
        ax.set_xticks(np.arange(0, 10, 1))
        ax.set_yticks(np.arange(0, 10, 1))
        ax.set_title(f"Agent Navigation in 10x10 Maze - Step {steps}")

        # Save the frame as an image
        frame_path = os.path.join(output_dir, f"frame_{steps:03d}.png")
        fig.canvas.draw()
        plt.savefig(frame_path, bbox_inches='tight', dpi=100)
        print(f"Saved frame: {frame_path}")

        # Take action
        action = agent.act(state)
        state, _, done = env.step(action)
        steps += 1

    # Close plot
    plt.close(fig)
    print(f"All frames saved in {output_dir}")


In [None]:
  #save each step as an image
save_frames_as_images(
    env,
    agent,
    output_dir="/content/drive/MyDrive/maze_frames",
    start_img_path='/content/drive/MyDrive/merry_go-removebg-preview.png',
    goal_img_path='/content/drive/MyDrive/gomu_2-removebg-preview.png',
    agent_img_path='/content/drive/MyDrive/luffy_agent.png'
)

In [None]:
#converting the saved images to gif
output_dir = "/content/drive/MyDrive/maze_frames"
gif_path = "/content/drive/MyDrive/maze_animation_new.gif"
images = []
for file_path in sorted(glob.glob(os.path.join(output_dir, "frame_*.png"))):
    images.append(imageio.imread(file_path))
imageio.mimsave(gif_path, images, fps=2)
print(f"GIF saved as {gif_path}")