In [2]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque
import matplotlib.pyplot as plt
import os

# Define Maze Environment with Curriculum Learning
class CustomMazeEnv:
    def __init__(self, size=15, initial_obstacles=5, max_obstacles=50):
        """
        Initialize the maze environment.

        Args:
            size (int): Size of the maze (NxN grid).
            initial_obstacles (int): Initial number of obstacles.
            max_obstacles (int): Maximum number of obstacles.
        """
        self.size = size
        self.state_dim = (3, size, size)  # Channels for agent, goal, obstacles
        self.action_dim = 4  # Actions: 0 = up, 1 = down, 2 = left, 3 = right
        self.initial_obstacles = initial_obstacles
        self.max_obstacles = max_obstacles
        self.current_obstacles = initial_obstacles
        self.reset()

    def reset(self):
        """
        Reset the maze environment by repositioning the agent and goal
        and regenerating obstacles.

        Returns:
            state (array): Initial state of the environment.
        """
        while True:
            self.agent_pos = np.random.randint(0, self.size, size=2)
            self.goal_pos = np.random.randint(0, self.size, size=2)
            if not np.array_equal(self.agent_pos, self.goal_pos):
                self._generate_maze()
                if self._is_path_possible():
                    break

        self.steps = 0
        self.prev_agent_pos = self.agent_pos.copy()
        return self._get_state()

    def increase_difficulty(self):
        """
        Increase the number of obstacles to make the maze more difficult.
        """
        self.current_obstacles = min(self.current_obstacles + 1, self.max_obstacles)

    def _generate_maze(self):
        """
        Generate a random maze with obstacles.
        """
        while True:
            self.obstacles = []
            maze = np.zeros((self.size, self.size), dtype=int)
            maze[self.agent_pos[0], self.agent_pos[1]] = 1
            maze[self.goal_pos[0], self.goal_pos[1]] = 1

            for i in range(self.size):
                for j in range(self.size):
                    if maze[i, j] == 0 and np.random.rand() < self.current_obstacles / (self.size ** 2):
                        self.obstacles.append((i, j))

            if self._is_path_possible():
                break

    def _is_path_possible(self):
        """
        Check if there is a valid path from the agent's position to the goal.
        Uses depth-first search (DFS).

        Returns:
            bool: True if a path exists, False otherwise.
        """
        def dfs(x, y, visited):
            if (x, y) == tuple(self.goal_pos):
                return True
            visited.add((x, y))
            for dx, dy in [(-1, 0), (1, 0), (0, -1), (0, 1)]:
                nx, ny = x + dx, y + dy
                if 0 <= nx < self.size and 0 <= ny < self.size and (nx, ny) not in visited and (nx, ny) not in self.obstacles:
                    if dfs(nx, ny, visited):
                        return True
            return False

        return dfs(self.agent_pos[0], self.agent_pos[1], set())

    def _get_state(self):
        """
        Return the current state of the environment as a 3D array.
        - Channel 0: Agent position.
        - Channel 1: Goal position.
        - Channel 2: Obstacles.

        Returns:
            state (array): 3D array representation of the environment.
        """
        state = np.zeros(self.state_dim, dtype=np.float32)
        state[0, self.agent_pos[0], self.agent_pos[1]] = 1.0
        state[1, self.goal_pos[0], self.goal_pos[1]] = 1.0
        for obs in self.obstacles:
            state[2, obs[0], obs[1]] = 1.0
        return state

    def step(self, action):
        """
        Execute an action in the maze.

        Args:
            action (int): Action to take (0=up, 1=down, 2=left, 3=right).

        Returns:
            state (array): Updated state after action.
            reward (float): Reward for the action.
            done (bool): Whether the goal was reached.
            info (dict): Additional information.
        """
        next_pos = self.agent_pos.copy()
        if action == 0: next_pos[0] = max(self.agent_pos[0] - 1, 0)
        if action == 1: next_pos[0] = min(self.agent_pos[0] + 1, self.size - 1)
        if action == 2: next_pos[1] = max(self.agent_pos[1] - 1, 0)
        if action == 3: next_pos[1] = min(self.agent_pos[1] + 1, self.size - 1)
        if tuple(next_pos) not in self.obstacles:
            self.prev_agent_pos = self.agent_pos.copy()
            self.agent_pos = next_pos

        done = np.array_equal(self.agent_pos, self.goal_pos)
        dist_to_goal = np.linalg.norm(self.agent_pos - self.goal_pos)
        prev_dist_to_goal = np.linalg.norm(self.prev_agent_pos - self.goal_pos)
        reward = 1.0 if done else -0.01 * dist_to_goal
        reward += 0.1 * (prev_dist_to_goal - dist_to_goal)  # Reward for getting closer to the goal
        return self._get_state(), reward, done, {}

# Define the Actor-Critic Networks
class ActorCNN(nn.Module):
    def __init__(self, input_channels, action_dim):
        """
        Initialize the Actor CNN for policy generation.

        Args:
            input_channels (int): Number of input channels (e.g., for CNN).
            action_dim (int): Dimension of the action space.
        """
        super(ActorCNN, self).__init__()
        self.conv1 = nn.Conv2d(input_channels, 64, 3, 1, 1)
        self.conv2 = nn.Conv2d(64, 128, 3, 1, 1)
        self.conv3 = nn.Conv2d(128, 256, 3, 1, 1)
        self.fc1 = nn.Linear(256 * 15 * 15, 512)
        self.fc2 = nn.Linear(512, action_dim)

    def forward(self, state):
        """
        Forward pass for the actor network.

        Args:
            state (Tensor): Input state tensor.

        Returns:
            Tensor: Action probabilities.
        """
        x = torch.relu(self.conv1(state))
        x = torch.relu(self.conv2(x))
        x = torch.relu(self.conv3(x))
        x = x.view(x.size(0), -1)
        x = torch.relu(self.fc1(x))
        return torch.softmax(self.fc2(x), dim=-1)

class DDPGAgent:
    def __init__(self, input_channels, action_dim, gamma=0.99, tau=0.005, lr=1e-4):
        """
        Initialize the DDPG Agent.

        Args:
            input_channels (int): Number of input channels for the Actor network.
            action_dim (int): Dimension of the action space.
            gamma (float): Discount factor.
            tau (float): Soft update parameter for target network.
            lr (float): Learning rate for the optimizer.
        """
        self.actor = ActorCNN(input_channels, action_dim).to(device)
        self.target_actor = ActorCNN(input_channels, action_dim).to(device)
        self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=lr)
        self.gamma, self.tau = gamma, tau

    def _soft_update(self, target, source):
        """
        Perform soft update of target network parameters.

        Args:
            target (nn.Module): Target network.
            source (nn.Module): Source network.
        """
        for target_param, source_param in zip(target.parameters(), source.parameters()):
            target_param.data.copy_(self.tau * source_param.data + (1.0 - self.tau) * target_param.data)

    def act(self, state, epsilon=0.1):
        """
        Select an action using the actor network or explore randomly.

        Args:
            state (array): Current state of the environment.
            epsilon (float): Exploration probability.

        Returns:
            int: Selected action.
        """
        if np.random.rand() < epsilon:  # Explore with probability epsilon
            return np.random.choice(self.actor.fc2.out_features)
        state = torch.tensor(state, dtype=torch.float32).to(device).unsqueeze(0)
        with torch.no_grad():
            action_probs = self.actor(state).cpu().numpy()[0]
        return np.random.choice(len(action_probs), p=action_probs)

    def train(self, replay_buffer, batch_size):
        """
        Train the actor network using sampled transitions from the replay buffer.

        Args:
            replay_buffer (HGRReplayBuffer): Replay buffer containing transitions.
            batch_size (int): Number of transitions to sample per training step.
        """
        states, actions, rewards, next_states, dones = replay_buffer.sample(batch_size)
        states, rewards, dones = map(lambda x: torch.tensor(x, dtype=torch.float32).to(device), [states, rewards, dones])
        actions = torch.tensor(actions, dtype=torch.int64).to(device)
        next_states = torch.tensor(next_states, dtype=torch.float32).to(device)

        current_q = self.actor(states).gather(1, actions.unsqueeze(-1)).squeeze(-1)
        with torch.no_grad():
            target_q = rewards + self.gamma * (1 - dones) * self.target_actor(next_states).max(dim=1)[0]
        loss = nn.functional.mse_loss(current_q, target_q)

        self.actor_optimizer.zero_grad()
        loss.backward()
        self.actor_optimizer.step()
        self._soft_update(self.target_actor, self.actor)

    def save_model(self, filepath):
        """
        Save the actor network parameters.

        Args:
            filepath (str): Path to save the model.
        """
        torch.save(self.actor.state_dict(), filepath)

    def load_model(self, filepath):
        """
        Load the actor network parameters.

        Args:
            filepath (str): Path to the model file.
        """
        self.actor.load_state_dict(torch.load(filepath))

# Define the HGR Replay Buffer
class HGRReplayBuffer:
    def __init__(self, capacity, state_shape):
        """
        Initialize the HGR Replay Buffer.

        Args:
            capacity (int): Maximum size of the replay buffer.
            state_shape (tuple): Shape of the state space.
        """
        self.capacity = capacity
        self.memory = deque(maxlen=capacity)
        self.td_errors = deque(maxlen=capacity)
        self.state_shape = state_shape

    def add(self, state, action, reward, next_state, done, td_error):
        """
        Add a new transition to the replay buffer.

        Args:
            state (array): Current state.
            action (int): Action taken.
            reward (float): Reward received.
            next_state (array): Next state.
            done (bool): Whether the episode ended.
            td_error (float): Temporal difference error.
        """
        self.memory.append((state, action, reward, next_state, done))
        self.td_errors.append(td_error)

    def sample(self, batch_size):
        """
        Sample a batch of transitions based on TD error priorities.

        Args:
            batch_size (int): Number of transitions to sample.

        Returns:
            tuple: Sampled transitions (states, actions, rewards, next_states, dones).
        """
        probabilities = np.array(self.td_errors) / np.sum(self.td_errors)
        indices = np.random.choice(len(self.memory), batch_size, p=probabilities)
        batch = [self.memory[i] for i in indices]
        return map(np.array, zip(*batch))

# Compute TD Error for HGR

def compute_td_error(agent, state, action, reward, next_state, done):
    """
    Compute the temporal difference (TD) error for a transition.

    Args:
        agent (DDPGAgent): The agent for which to compute TD error.
        state (array): Current state.
        action (int): Action taken.
        reward (float): Reward received.
        next_state (array): Next state.
        done (bool): Whether the episode ended.

    Returns:
        float: TD error value.
    """
    state = torch.tensor(state, dtype=torch.float32).to(device).unsqueeze(0)
    action = torch.tensor(action, dtype=torch.int64).to(device).unsqueeze(0)
    reward = torch.tensor(reward, dtype=torch.float32).to(device).unsqueeze(0)
    next_state = torch.tensor(next_state, dtype=torch.float32).to(device).unsqueeze(0)
    done = torch.tensor(done, dtype=torch.float32).to(device).unsqueeze(0)

    with torch.no_grad():
        q_value = agent.actor(state).gather(1, action.unsqueeze(-1))
        next_q_value = agent.target_actor(next_state).max(1)[0].unsqueeze(1)
        target_q_value = reward + agent.gamma * (1 - done) * next_q_value
    td_error = (q_value - target_q_value).abs()
    return td_error.item()

# Training Function with HGR
def train_agent_with_hgr(env, agent, episodes, max_steps, replay_buffer, batch_size):
    """
    Train the agent using Hindsight Goal Ranking (HGR).

    Args:
        env (CustomMazeEnv): The maze environment.
        agent (DDPGAgent): The DDPG agent.
        episodes (int): Number of training episodes.
        max_steps (int): Maximum steps per episode.
        replay_buffer (HGRReplayBuffer): Replay buffer to store experiences.
        batch_size (int): Batch size for training.
    """
    rewards_per_episode = []
    successes_per_episode = []
    steps_per_episode = []
    obstacles_per_episode = []

    for ep in range(episodes):
        state = env.reset()
        total_reward, success, steps = 0, 0, 0

        for step in range(max_steps):
            action = agent.act(state, epsilon=max(0.1, 1 - ep / 100))
            next_state, reward, done, _ = env.step(action)
            td_error = compute_td_error(agent, state, action, reward, next_state, done)
            replay_buffer.add(state, action, reward, next_state, done, td_error)

            if len(replay_buffer.memory) >= batch_size:
                agent.train(replay_buffer, batch_size)

            state = next_state
            total_reward += reward
            steps += 1
            if done:
                success = 1
                break

        rewards_per_episode.append(total_reward)
        successes_per_episode.append(success)
        steps_per_episode.append(steps)
        obstacles_per_episode.append(env.current_obstacles)

        if ep % 10 == 0:
            print(f"Episode {ep}, Reward: {total_reward:.2f}, Success: {success}, Steps: {steps}, Obstacles: {env.current_obstacles}")

        if ep % 20 == 0 and ep > 0:
            env.increase_difficulty()

        if ep % 50 == 0 and ep > 0:
            agent.save_model(f"model_episode_{ep}.pth")
            save_performance_plots(rewards_per_episode, successes_per_episode, steps_per_episode, obstacles_per_episode, ep)

    # Final plots
    save_performance_plots(rewards_per_episode, successes_per_episode, steps_per_episode, obstacles_per_episode, "final")
    agent.save_model("final_model.pth")

def save_performance_plots(rewards, successes, steps, obstacles, suffix):
    """
    Save training performance plots.

    Args:
        rewards (list): Rewards per episode.
        successes (list): Successes per episode.
        steps (list): Steps per episode.
        obstacles (list): Obstacles per episode.
        suffix (str): Suffix for the plot filenames.
    """
    if not os.path.exists("plots"):
        os.makedirs("plots")

    plt.figure()
    plt.plot(rewards, label="Rewards")
    plt.title("Rewards per Episode")
    plt.xlabel("Episode")
    plt.ylabel("Reward")
    plt.legend()
    plt.savefig(f"plots/reward_plot_episode_{suffix}.png")
    plt.close()

    plt.figure()
    plt.plot(successes, label="Success")
    plt.title("Success per Episode")
    plt.xlabel("Episode")
    plt.ylabel("Success")
    plt.legend()
    plt.savefig(f"plots/success_plot_episode_{suffix}.png")
    plt.close()
    
    plt.figure()
    plt.plot(steps, label="Steps")
    plt.title("Steps per Episode")
    plt.xlabel("Episode")
    plt.ylabel("Steps")
    plt.legend()
    plt.savefig(f"plots/steps_plot_episode_{suffix}.png")
    plt.close()
    
    plt.figure()
    plt.plot(obstacles, label="Obstacles")
    plt.title("Obstacles per Episode")
    plt.xlabel("Episode")
    plt.ylabel("Number of Obstacles")
    plt.legend()
    plt.savefig(f"plots/obstacles_plot_episode_{suffix}.png")
    plt.close()
# Device configuration
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# Initialize the environment, replay buffer, and agent
env = CustomMazeEnv()
replay_buffer = HGRReplayBuffer(10000, env.state_dim)
agent = DDPGAgent(env.state_dim[0], env.action_dim)

# Training
train_agent_with_hgr(env, agent, 500, 300, replay_buffer, 64)


Episode 0, Reward: -25.84, Success: 0, Steps: 300, Obstacles: 5


KeyboardInterrupt: 

In [3]:
def test_model_with_multiple_runs_and_save(model_path, env_size=15, obstacles=50, max_steps=800, visualize_every=10, num_runs=3):
    """
    Test the model on multiple runs with random start and goal positions, save all results in a structured folder.

    Args:
        model_path (str): Path to the saved model.
        env_size (int): Size of the maze (default: 15x15).
        obstacles (int): Maximum number of obstacles in the maze.
        max_steps (int): Maximum number of steps allowed per run.
        visualize_every (int): Visualize the maze every n steps.
        num_runs (int): Number of runs with different start and goal positions.

    Returns:
        None
    """
    import torch
    import os
    import shutil
    import numpy as np
    from matplotlib import pyplot as plt

    # Initialize directories for storing results
    base_dir = "benchmark/Train_with_changing_position_test_different"
    os.makedirs(base_dir, exist_ok=True)

    # Create subdirectories for each run
    run_dirs = [os.path.join(base_dir, f"Run_{i+1}") for i in range(num_runs)]
    for run_dir in run_dirs:
        os.makedirs(run_dir, exist_ok=True)

    # Load the model
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    agent = DDPGAgent(input_channels=3, action_dim=4)
    try:
        agent.actor.load_state_dict(torch.load(model_path, map_location=device))
        agent.actor.eval()  # Set the model to evaluation mode
        print(f"Model loaded successfully from {model_path}")
    except Exception as e:
        print(f"Error loading model: {e}")
        return

    all_rewards = []  # To store rewards for each run

    for run_idx, run_dir in enumerate(run_dirs):
        print(f"\nStarting run {run_idx + 1}/{num_runs}")

        # Initialize the environment
        env = CustomMazeEnv(size=env_size, initial_obstacles=obstacles, max_obstacles=obstacles)
        env.reset()

        # Visualization function to save maze images
        def save_maze_image(env, title, step):
            plt.figure(figsize=(8, 8))
            maze = np.zeros((env.size, env.size))
            for obs in env.obstacles:
                maze[obs[0], obs[1]] = 1  # Obstacles
            plt.imshow(maze, cmap="binary")
            plt.scatter(env.agent_pos[1], env.agent_pos[0], c="blue", label="Agent")
            plt.scatter(env.goal_pos[1], env.goal_pos[0], c="green", label="Goal")
            plt.title(title)
            plt.legend(loc="upper right")
            file_path = os.path.join(run_dir, f"Step_{step}.png")
            plt.savefig(file_path)
            plt.close()

        # Save the initial maze image
        save_maze_image(env, title=f"Run {run_idx + 1} - Initial Maze", step="initial")

        # Test the agent on the environment
        state = env._get_state()
        total_reward = 0
        rewards = []

        for step in range(max_steps):
            action = agent.act(state, epsilon=0)  # No exploration during testing
            next_state, reward, done, _ = env.step(action)
            total_reward += reward
            rewards.append(total_reward)
            state = next_state

            # Save maze visualization every `visualize_every` steps or if the goal is reached
            if step % visualize_every == 0 or done:
                save_maze_image(env, title=f"Run {run_idx + 1} - Step {step}", step=step)

            if done:
                print(f"Goal reached in {step + 1} steps with total reward {total_reward:.2f}")
                break

        all_rewards.append(rewards)

    # Save performance plots for all runs
    performance_plot_path = os.path.join(base_dir, "Performance_Plot.png")
    plt.figure(figsize=(10, 5))
    for i, rewards in enumerate(all_rewards):
        plt.plot(rewards, label=f"Run {i + 1}")
    plt.xlabel("Step")
    plt.ylabel("Cumulative Reward")
    plt.title("Agent Performance Across Runs")
    plt.legend()
    plt.grid()
    plt.savefig(performance_plot_path)
    plt.close()

    # Compress the benchmark directory into a zip file for easier download
    shutil.make_archive("benchmark", 'zip', base_dir)
    print("Results saved and compressed into benchmark.zip.")


test_model_with_multiple_runs_and_save(
    model_path="/kaggle/input/test1234/final_model (1).pth",
    env_size=15,
    obstacles=50,
    max_steps=800,
    visualize_every=10,
    num_runs=3
)


  agent.actor.load_state_dict(torch.load(model_path, map_location=device))


Model loaded successfully from /kaggle/input/test1234/final_model (1).pth

Starting run 1/3
Goal reached in 68 steps with total reward -2.33

Starting run 2/3
Goal reached in 338 steps with total reward -23.40

Starting run 3/3
Goal reached in 10 steps with total reward 1.03
Results saved and compressed into benchmark.zip.


In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque
import matplotlib.pyplot as plt
import os

# Define Maze Environment with Curriculum Learning
class CustomMazeEnv:
    def __init__(self, size=15, initial_obstacles=5, max_obstacles=50):
        self.size = size
        self.state_dim = (3, size, size)  # Channels for agent, goal, obstacles
        self.action_dim = 4  # Actions: 0 = up, 1 = down, 2 = left, 3 = right
        self.initial_obstacles = initial_obstacles
        self.max_obstacles = max_obstacles
        self.current_obstacles = initial_obstacles

    def reset(self):
        """
        Reset the maze environment to a new configuration with random agent and goal positions.
        
        Returns:
            state (np.ndarray): Initial state of the environment.
        """
        while True:
            self.agent_pos = np.random.randint(0, self.size, size=2)
            self.goals = [tuple(np.random.randint(0, self.size, size=2))]
            self.current_goal_index = 0

            if not any(np.array_equal(self.agent_pos, goal) for goal in self.goals):
                self._generate_maze()
                if self._is_path_possible():
                    break

        return self._get_state()

    def increase_difficulty(self):
        self.current_obstacles = min(self.current_obstacles + 1, self.max_obstacles)

    def _generate_maze(self):
        while True:
            self.obstacles = []
            maze = np.zeros((self.size, self.size), dtype=int)
            maze[self.agent_pos[0], self.agent_pos[1]] = 1
            for goal in self.goals:
                maze[goal[0], goal[1]] = 1

            for i in range(self.size):
                for j in range(self.size):
                    if maze[i, j] == 0 and np.random.rand() < self.current_obstacles / (self.size ** 2):
                        self.obstacles.append((i, j))

            if self._is_path_possible():
                break

    def _is_path_possible(self):
        def dfs(x, y, visited):
            if (x, y) == tuple(self.goals[self.current_goal_index]):
                return True
            visited.add((x, y))
            for dx, dy in [(-1, 0), (1, 0), (0, -1), (0, 1)]:
                nx, ny = x + dx, y + dy
                if 0 <= nx < self.size and 0 <= ny < self.size and (nx, ny) not in visited and (nx, ny) not in self.obstacles:
                    if dfs(nx, ny, visited):
                        return True
            return False

        return dfs(self.agent_pos[0], self.agent_pos[1], set())

    def _get_state(self):
        state = np.zeros(self.state_dim, dtype=np.float32)
        state[0, self.agent_pos[0], self.agent_pos[1]] = 1.0
        for i, goal in enumerate(self.goals):
            state[1 + i % 2, goal[0], goal[1]] = 1.0
        for obs in self.obstacles:
            state[2, obs[0], obs[1]] = 1.0
        return state

    def step(self, action):
        next_pos = self.agent_pos.copy()
        if action == 0: next_pos[0] = max(self.agent_pos[0] - 1, 0)
        if action == 1: next_pos[0] = min(self.agent_pos[0] + 1, self.size - 1)
        if action == 2: next_pos[1] = max(self.agent_pos[1] - 1, 0)
        if action == 3: next_pos[1] = min(self.agent_pos[1] + 1, self.size - 1)
        if tuple(next_pos) not in self.obstacles:
            self.agent_pos = next_pos
        done = np.array_equal(self.agent_pos, self.goals[self.current_goal_index])
        reward = 1.0 if done else -0.01 * np.linalg.norm(self.agent_pos - self.goals[self.current_goal_index])

        if done:
            self.current_goal_index += 1
            if self.current_goal_index >= len(self.goals):
                done = True
            else:
                done = False
        return self._get_state(), reward, done, {}

class MultiLevelMazeEnv(CustomMazeEnv):
    def __init__(self, size=15, initial_obstacles=5, max_obstacles=50, levels=3):
        self.levels = levels
        super().__init__(size, initial_obstacles, max_obstacles)
        self.current_level = 0

    def reset(self):
        self.current_level = 0
        self.current_goal_index = 0  # Initialize goal index
        return self._reset_level()

    def _reset_level(self):
        while True:
            self.agent_pos = np.random.randint(0, self.size, size=2)
            self.goals = [tuple(np.random.randint(0, self.size, size=2))]
            self.current_goal_index = 0  # Initialize goal index
            if not any(np.array_equal(self.agent_pos, goal) for goal in self.goals):
                self._generate_maze()
                if self._is_path_possible():
                    break

        return self._get_state()

    def step(self, action):
        state, reward, done, info = super().step(action)
        if done:
            self.current_level += 1
            if self.current_level >= self.levels:
                done = True
            else:
                state = self._reset_level()
                done = False
        return state, reward, done, info


def benchmark_multi_level_with_model(env, agent, model_path, episodes, max_steps):
    agent.load_model(model_path)
    print(f"Model loaded from {model_path}")

    rewards_per_episode = []
    successes_per_episode = []
    levels_completed = []

    for ep in range(episodes):
        state = env.reset()
        total_reward, success, steps = 0, 0, 0
        completed_levels = 0

        for step in range(max_steps):
            action = agent.act(state)
            next_state, reward, done, _ = env.step(action)
            state = next_state
            total_reward += reward
            steps += 1

            if done:
                if env.current_level >= env.levels:
                    success = 1
                completed_levels = env.current_level
                break

        rewards_per_episode.append(total_reward)
        successes_per_episode.append(success)
        levels_completed.append(completed_levels)

        print(f"Episode {ep}, Reward: {total_reward:.2f}, Success: {success}, Levels Completed: {completed_levels}")

    # Plot results
    plot_multi_level_results(rewards_per_episode, successes_per_episode, levels_completed)

def plot_multi_level_results(rewards, successes, levels_completed):
    if not os.path.exists("plots"):
        os.makedirs("plots")

    plt.figure()
    plt.plot(rewards, label="Rewards")
    plt.title("Rewards per Episode")
    plt.xlabel("Episode")
    plt.ylabel("Total Reward")
    plt.legend()
    plt.savefig("plots/multi_level_rewards_plot.png")
    plt.close()

    plt.figure()
    plt.plot(successes, label="Success Rate")
    plt.title("Success per Episode")
    plt.xlabel("Episode")
    plt.ylabel("Success (1 = Success, 0 = Failure)")
    plt.legend()
    plt.savefig("plots/multi_level_successes_plot.png")
    plt.close()

    plt.figure()
    plt.plot(levels_completed, label="Levels Completed")
    plt.title("Levels Completed per Episode")
    plt.xlabel("Episode")
    plt.ylabel("Levels Completed")
    plt.legend()
    plt.savefig("plots/levels_completed_plot.png")
    plt.close()

    print("Plots saved in the 'plots' directory.")

# Device configuration
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# Initialize environment and agent
multi_level_env = MultiLevelMazeEnv(size=15, initial_obstacles=5, max_obstacles=50, levels=3)
agent = DDPGAgent(input_channels=3, action_dim=4)

# Path to the saved model
model_path = "/kaggle/input/test1234/final_model (1).pth"  # Update with the correct path to your model file

# Run the benchmark
benchmark_multi_level_with_model(multi_level_env, agent, model_path, episodes=300, max_steps=400)


In [None]:
def benchmark_multi_objective_with_model(env, agent, model_path, episodes=10, max_steps=500):
    """
    Test the agent on a multi-objective maze environment with a pretrained model.
    Saves reward and goal completion plots.

    Args:
        env (MultiObjectiveMazeEnv): Multi-objective maze environment.
        agent (DDPGAgent): Agent to be tested.
        model_path (str): Path to the pretrained model file.
        episodes (int): Number of episodes to test.
        max_steps (int): Maximum steps per episode.

    Returns:
        None
    """
    agent.load_model(model_path)
    print(f"Model loaded successfully from {model_path}")

    rewards_per_episode = []
    goals_reached_per_episode = []

    for ep in range(episodes):
        state = env.reset()
        total_reward = 0
        goals_reached = 0

        for step in range(max_steps):
            action = agent.act(state, epsilon=0)  # No exploration during testing
            next_state, reward, done, _ = env.step(action)
            state = next_state

            total_reward += reward
            if done:
                goals_reached = env.current_goal_index  # Number of goals reached
                break

        rewards_per_episode.append(total_reward)
        goals_reached_per_episode.append(goals_reached)

        print(f"Episode {ep + 1}/{episodes}: Total Reward = {total_reward:.2f}, Goals Reached = {goals_reached}")

    # Plot results
    plot_multi_objective_results(rewards_per_episode, goals_reached_per_episode)


def plot_multi_objective_results(rewards, goals_reached):
    """
    Plot rewards and goals reached during testing.

    Args:
        rewards (list): Total rewards per episode.
        goals_reached (list): Number of goals reached per episode.

    Returns:
        None
    """
    if not os.path.exists("plots"):
        os.makedirs("plots")

    # Plot rewards per episode
    plt.figure()
    plt.plot(rewards, label="Rewards")
    plt.title("Rewards per Episode")
    plt.xlabel("Episode")
    plt.ylabel("Total Reward")
    plt.legend()
    plt.savefig("plots/multi_objective_rewards_plot_HER.png")
    plt.close()

    # Plot goals reached per episode
    plt.figure()
    plt.plot(goals_reached, label="Goals Reached")
    plt.title("Goals Reached per Episode")
    plt.xlabel("Episode")
    plt.ylabel("Number of Goals Reached")
    plt.legend()
    plt.savefig("plots/multi_objective_goals_reached_plot_HER.png")
    plt.close()

    print("Plots saved in the 'plots' directory.")

# Main setup
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
multi_objective_env = MultiObjectiveMazeEnv(size=15, initial_obstacles=5, max_obstacles=50, num_goals=3)
agent = DDPGAgent(input_channels=3, action_dim=4)

model_path = "/kaggle/input/test1234/final_model (1).pth"
benchmark_multi_objective_with_model(multi_objective_env, agent, model_path, episodes=10, max_steps=500)


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from collections import deque
import random
import matplotlib.pyplot as plt
import os

# Actor Network for SAC
class ActorSAC(nn.Module):
    """
    Actor network for the Soft Actor-Critic (SAC) algorithm.

    Attributes:
        conv1, conv2, conv3: Convolutional layers for processing the maze state.
        fc1, fc2: Fully connected layers for generating action probabilities.
    """
    def __init__(self, input_channels, action_dim, maze_size):
        super(ActorSAC, self).__init__()
        self.conv1 = nn.Conv2d(input_channels, 64, 3, 1, 1)
        self.conv2 = nn.Conv2d(64, 128, 3, 1, 1)
        self.conv3 = nn.Conv2d(128, 256, 3, 1, 1)
        self.fc1 = nn.Linear(256 * maze_size * maze_size, 512)
        self.fc2 = nn.Linear(512, action_dim)

    def forward(self, state):
        """
        Forward pass for the actor network.
        Args:
            state: Input maze state.
        Returns:
            Softmax probabilities over actions.
        """
        x = torch.relu(self.conv1(state))
        x = torch.relu(self.conv2(x))
        x = torch.relu(self.conv3(x))
        x = x.view(x.size(0), -1)
        x = torch.relu(self.fc1(x))
        return torch.softmax(self.fc2(x), dim=-1)

# Critic Network for SAC
class CriticSAC(nn.Module):
    """
    Critic network for estimating Q-values in SAC.

    Attributes:
        conv1, conv2, conv3: Convolutional layers for processing the maze state.
        fc1, fc2: Fully connected layers for Q-value estimation.
    """
    def __init__(self, input_channels, action_dim, maze_size):
        super(CriticSAC, self).__init__()
        self.conv1 = nn.Conv2d(input_channels, 64, 3, 1, 1)
        self.conv2 = nn.Conv2d(64, 128, 3, 1, 1)
        self.conv3 = nn.Conv2d(128, 256, 3, 1, 1)
        self.fc1 = nn.Linear(256 * maze_size * maze_size + action_dim, 512)
        self.fc2 = nn.Linear(512, 1)

    def forward(self, state, action):
        """
        Forward pass for the critic network.
        Args:
            state: Input maze state.
            action: Action taken by the agent.
        Returns:
            Q-value for the state-action pair.
        """
        x = torch.relu(self.conv1(state))
        x = torch.relu(self.conv2(x))
        x = torch.relu(self.conv3(x))
        x = x.view(x.size(0), -1)
        x = torch.cat([x, action], dim=-1)  # Combine state and action
        x = torch.relu(self.fc1(x))
        return self.fc2(x)

# SAC Agent
class SACAgent:
    """
    Soft Actor-Critic (SAC) agent for maze-solving tasks.

    Attributes:
        actor: The policy network.
        critic1, critic2: Q-value estimators.
        target_critic1, target_critic2: Target networks for stability.
        optimizers: Optimizers for actor and critic networks.
        gamma: Discount factor for rewards.
        tau: Soft update factor for target networks.
        alpha: Entropy coefficient for exploration.
    """
    def __init__(self, input_channels, action_dim, maze_size, lr=3e-4, gamma=0.99, tau=0.005, alpha=0.2):
        self.actor = ActorSAC(input_channels, action_dim, maze_size).to(device)
        self.critic1 = CriticSAC(input_channels, action_dim, maze_size).to(device)
        self.critic2 = CriticSAC(input_channels, action_dim, maze_size).to(device)
        self.target_critic1 = CriticSAC(input_channels, action_dim, maze_size).to(device)
        self.target_critic2 = CriticSAC(input_channels, action_dim, maze_size).to(device)

        self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=lr)
        self.critic1_optimizer = optim.Adam(self.critic1.parameters(), lr=lr)
        self.critic2_optimizer = optim.Adam(self.critic2.parameters(), lr=lr)

        self.gamma = gamma
        self.tau = tau
        self.alpha = alpha

        self.update_targets()  # Initialize target networks

    def update_targets(self):
        """
        Soft update the target networks using the online networks.
        """
        for target_param, param in zip(self.target_critic1.parameters(), self.critic1.parameters()):
            target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)
        for target_param, param in zip(self.target_critic2.parameters(), self.critic2.parameters()):
            target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)

    def act(self, state):
        """
        Select an action based on the current policy.
        Args:
            state: Current state.
        Returns:
            Chosen action.
        """
        state = torch.tensor(state, dtype=torch.float32).to(device).unsqueeze(0)
        with torch.no_grad():
            action_probs = self.actor(state).cpu().numpy()[0]
        action = np.random.choice(len(action_probs), p=action_probs)
        return action

    def train(self, replay_buffer, batch_size):
        """
        Train the agent using the replay buffer.
        Args:
            replay_buffer: The replay buffer storing experiences.
            batch_size: Number of samples for each training step.
        """
        # Sample a batch from the replay buffer
        states, actions, rewards, next_states, dones = replay_buffer.sample(batch_size)
        states = torch.tensor(states, dtype=torch.float32).to(device)
        actions = torch.tensor(actions, dtype=torch.long).to(device)
        rewards = torch.tensor(rewards, dtype=torch.float32).to(device).unsqueeze(-1)
        next_states = torch.tensor(next_states, dtype=torch.float32).to(device)
        dones = torch.tensor(dones, dtype=torch.float32).to(device).unsqueeze(-1)

        # Compute target Q values
        with torch.no_grad():
            next_action_probs = self.actor(next_states)
            next_q1 = self.target_critic1(next_states, next_action_probs)
            next_q2 = self.target_critic2(next_states, next_action_probs)
            next_q = torch.min(next_q1, next_q2) - self.alpha * torch.log(next_action_probs + 1e-10).sum(dim=-1, keepdim=True)
            target_q = rewards + self.gamma * (1 - dones) * next_q

        # Update critics
        current_q1 = self.critic1(states, nn.functional.one_hot(actions, num_classes=4).float().to(device))
        current_q2 = self.critic2(states, nn.functional.one_hot(actions, num_classes=4).float().to(device))
        critic1_loss = nn.functional.mse_loss(current_q1, target_q)
        critic2_loss = nn.functional.mse_loss(current_q2, target_q)
        self.critic1_optimizer.zero_grad()
        critic1_loss.backward()
        self.critic1_optimizer.step()
        self.critic2_optimizer.zero_grad()
        critic2_loss.backward()
        self.critic2_optimizer.step()

        # Update actor
        action_probs = self.actor(states)
        q1 = self.critic1(states, action_probs)
        q2 = self.critic2(states, action_probs)
        actor_loss = (self.alpha * torch.log(action_probs + 1e-10).sum(dim=-1) - torch.min(q1, q2)).mean()
        self.actor_optimizer.zero_grad()
        actor_loss.backward()
        self.actor_optimizer.step()

        # Soft update targets
        self.update_targets()

# Replay Buffer
class ReplayBuffer:
    """
    Replay buffer for storing and sampling experiences.

    Attributes:
        capacity: Maximum number of experiences to store.
        buffer: Deque to store experiences.
    """
    def __init__(self, capacity, state_dim):
        self.capacity = capacity
        self.buffer = deque(maxlen=capacity)

    def add(self, state, action, reward, next_state, done):
        """
        Add a transition to the buffer.
        """
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size):
        """
        Sample a batch of transitions from the buffer.
        """
        batch = random.sample(self.buffer, batch_size)
        states, actions, rewards, next_states, dones = map(np.array, zip(*batch))
        return states, actions, rewards, next_states, dones

    def __len__(self):
        return len(self.buffer)

# Training Function
def train_sac_with_metrics(env, agent, replay_buffer, episodes, max_steps, batch_size, output_dir):
    """
    Train the SAC agent and log metrics.

    Args:
        env: The environment.
        agent: The SAC agent.
        replay_buffer: Replay buffer storing experiences.
        episodes: Number of training episodes.
        max_steps: Maximum steps per episode.
        batch_size: Number of samples per training step.
        output_dir: Directory to save metrics.
    """
    rewards_per_episode = []
    successes_per_episode = []

    for episode in range(episodes):
        state = env.reset()
        total_reward = 0
        success = 0

        for step in range(max_steps):
            action = agent.act(state)  # Select action
            next_state, reward, done, _ = env.step(action)  # Execute action
            replay_buffer.add(state, action, reward, next_state, done)  # Store transition

            if len(replay_buffer) > batch_size:
                agent.train(replay_buffer, batch_size)  # Train agent

            state = next_state
            total_reward += reward

            if done:
                success = 1  # Goal reached
                break

        # Log metrics
        rewards_per_episode.append(total_reward)
        successes_per_episode.append(success)

        print(f"Episode {episode + 1}/{episodes}, Total Reward: {total_reward}, Success: {success}")

        if (episode + 1) % 20 == 0:
            env.increase_difficulty()  # Curriculum learning

    # Save metrics as plots
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    plt.figure()
    plt.plot(rewards_per_episode, label="Reward")
    plt.title("Rewards per Episode")
    plt.xlabel("Episode")
    plt.ylabel("Total Reward")
    plt.legend()
    plt.savefig(os.path.join(output_dir, "reward_plot.png"))
    plt.close()

    plt.figure()
    plt.plot(successes_per_episode, label="Success")
    plt.title("Success Rate per Episode")
    plt.xlabel("Episode")
    plt.ylabel("Success (1/0)")
    plt.legend()
    plt.savefig(os.path.join(output_dir, "success_plot.png"))
    plt.close()

# Main Script
if __name__ == "__main__":
    env_size = 15
    obstacles = 5
    max_steps = 300
    episodes = 200
    batch_size = 64
    output_dir = "sac_results"

    env = CustomMazeEnv(size=env_size, initial_obstacles=obstacles, max_obstacles=50)
    replay_buffer = ReplayBuffer(10000, env.state_dim)
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    sac_agent = SACAgent(input_channels=3, action_dim=4, maze_size=env_size)

    train_sac_with_metrics(env, sac_agent, replay_buffer, episodes, max_steps, batch_size, output_dir)

    # Save the SAC model
    torch.save(sac_agent.actor.state_dict(), "sac_model.pth")
    print("SAC model saved as sac_model.pth")


In [None]:
import torch
import torch.nn as nn
import numpy as np
import matplotlib.pyplot as plt
import os


class CustomMazeEnv:
    """
    Base class for a customizable maze environment.

    Attributes:
        size (int): Size of the maze (NxN grid).
        state_dim (tuple): Shape of the state (channels, size, size).
        action_dim (int): Number of possible actions (4: up, down, left, right).
        initial_obstacles (int): Initial number of obstacles in the maze.
        max_obstacles (int): Maximum number of obstacles allowed in the maze.
        current_obstacles (int): Current number of obstacles in the maze.
    """

    def __init__(self, size=15, initial_obstacles=5, max_obstacles=50):
        self.size = size
        self.state_dim = (3, size, size)  # Channels for agent, goal, and obstacles
        self.action_dim = 4  # Actions: 0 = up, 1 = down, 2 = left, 3 = right
        self.initial_obstacles = initial_obstacles
        self.max_obstacles = max_obstacles
        self.current_obstacles = initial_obstacles

    def _generate_maze(self):
        """
        Generates a maze with obstacles while ensuring a valid path exists
        between the agent's position and the current goal.
        """
        self.obstacles = []
        while True:
            maze = np.zeros((self.size, self.size), dtype=int)
            maze[self.agent_pos[0], self.agent_pos[1]] = 1  # Mark agent position
            for goal in self.goals:
                maze[goal[0], goal[1]] = 1  # Mark goal positions

            # Place obstacles randomly based on the obstacle density
            for i in range(self.size):
                for j in range(self.size):
                    if maze[i, j] == 0 and np.random.rand() < self.current_obstacles / (self.size ** 2):
                        self.obstacles.append((i, j))

            if self._is_path_possible():
                break  # Ensure a valid path exists before proceeding

    def _is_path_possible(self):
        """
        Checks if there is a valid path from the agent's position to the current goal.
        Uses depth-first search (DFS) to verify connectivity.
        """
        def dfs(x, y, visited):
            if (x, y) == tuple(self.goals[self.current_goal_index]):
                return True
            visited.add((x, y))
            for dx, dy in [(-1, 0), (1, 0), (0, -1), (0, 1)]:
                nx, ny = x + dx, y + dy
                if 0 <= nx < self.size and 0 <= ny < self.size and (nx, ny) not in visited and (nx, ny) not in self.obstacles:
                    if dfs(nx, ny, visited):
                        return True
            return False

        return dfs(self.agent_pos[0], self.agent_pos[1], set())

    def _get_state(self):
        """
        Constructs the current state representation as a 3D tensor.

        Returns:
            numpy.ndarray: 3D state representation with separate channels for
            agent position, goals, and obstacles.
        """
        state = np.zeros(self.state_dim, dtype=np.float32)
        state[0, self.agent_pos[0], self.agent_pos[1]] = 1.0  # Agent position
        for goal in self.goals:
            state[1, goal[0], goal[1]] = 1.0  # Goals
        for obs in self.obstacles:
            state[2, obs[0], obs[1]] = 1.0  # Obstacles
        return state


class MultiObjectiveMazeEnv(CustomMazeEnv):
    """
    Maze environment with multiple sequential goals.

    Attributes:
        num_goals (int): Number of goals in the maze.
        goals (list): List of goal positions in the maze.
        current_goal_index (int): Index of the current goal the agent is pursuing.
    """

    def __init__(self, size=15, initial_obstacles=5, max_obstacles=50, num_goals=3):
        super().__init__(size, initial_obstacles, max_obstacles)
        self.num_goals = num_goals
        self.goals = []
        self.current_goal_index = 0
        self.reset()

    def reset(self):
        """
        Resets the environment, reinitializing the agent, goals, and maze.
        Ensures the agent does not start on a goal position.
        """
        self.agent_pos = np.random.randint(0, self.size, size=2)
        self.goals = [tuple(np.random.randint(0, self.size, size=2)) for _ in range(self.num_goals)]

        while any(np.array_equal(self.agent_pos, goal) for goal in self.goals):
            self.agent_pos = np.random.randint(0, self.size, size=2)

        self.current_goal_index = 0
        self._generate_maze()
        return self._get_state()

    def step(self, action):
        """
        Executes an action in the environment, updating the agent's position.

        Args:
            action (int): The action to perform (0 = up, 1 = down, 2 = left, 3 = right).

        Returns:
            tuple: (state, reward, done, info)
            - state: The next state after the action.
            - reward: Reward received for the action.
            - done: Whether the current episode is complete.
            - info: Additional info (empty for now).
        """
        next_pos = self.agent_pos.copy()
        if action == 0: next_pos[0] = max(self.agent_pos[0] - 1, 0)  # Move up
        if action == 1: next_pos[0] = min(self.agent_pos[0] + 1, self.size - 1)  # Move down
        if action == 2: next_pos[1] = max(self.agent_pos[1] - 1, 0)  # Move left
        if action == 3: next_pos[1] = min(self.agent_pos[1] + 1, self.size - 1)  # Move right

        if tuple(next_pos) not in self.obstacles:
            self.agent_pos = next_pos

        done = np.array_equal(self.agent_pos, self.goals[self.current_goal_index])
        reward = 1.0 if done else -0.01 * np.linalg.norm(self.agent_pos - self.goals[self.current_goal_index])

        if done:
            self.current_goal_index += 1
            done = self.current_goal_index >= len(self.goals)

        return self._get_state(), reward, done, {}


def benchmark_multi_objective_with_model(env, agent, model_path, episodes=10, max_steps=500):
    """
    Benchmarks a pretrained SAC agent on a multi-objective maze environment.

    Args:
        env (MultiObjectiveMazeEnv): The multi-objective maze environment.
        agent (SACAgent): The SAC agent to be benchmarked.
        model_path (str): Path to the pretrained model file.
        episodes (int): Number of episodes to run the benchmark.
        max_steps (int): Maximum steps per episode.

    Returns:
        None
    """
    agent.actor.load_state_dict(torch.load(model_path, map_location=device))
    print(f"Model loaded successfully from {model_path}")

    rewards_per_episode = []
    goals_reached_per_episode = []

    for ep in range(episodes):
        state = env.reset()
        total_reward = 0
        goals_reached = 0

        for step in range(max_steps):
            action = agent.act(state)
            next_state, reward, done, _ = env.step(action)
            state = next_state

            total_reward += reward
            if done:
                goals_reached = env.current_goal_index
                break

        rewards_per_episode.append(total_reward)
        goals_reached_per_episode.append(goals_reached)

        print(f"Episode {ep + 1}/{episodes}: Total Reward = {total_reward:.2f}, Goals Reached = {goals_reached}")

    plot_multi_objective_results(rewards_per_episode, goals_reached_per_episode)


def plot_multi_objective_results(rewards, goals_reached):
    """
    Saves plots for rewards and goals reached per episode.

    Args:
        rewards (list): Total rewards per episode.
        goals_reached (list): Number of goals reached per episode.
    """
    if not os.path.exists("plots"):
        os.makedirs("plots")

    plt.figure()
    plt.plot(rewards, label="Rewards")
    plt.title("Rewards per Episode")
    plt.xlabel("Episode")
    plt.ylabel("Total Reward")
    plt.legend()
    plt.savefig("plots/sac_multi_objective_rewards_plot.png")
    plt.close()

    plt.figure()
    plt.plot(goals_reached, label="Goals Reached")
    plt.title("Goals Reached per Episode")
    plt.xlabel("Episode")
    plt.ylabel("Number of Goals Reached")
    plt.legend()
    plt.savefig("plots/sac_multi_objective_goals_reached_plot.png")
    plt.close()

    print("Plots saved in the 'plots' directory.")




multi_objective_env = MultiObjectiveMazeEnv(size=15, initial_obstacles=5, max_obstacles=50, num_goals=3)
sac_agent = SACAgent(input_channels=3, action_dim=4, maze_size=15)
benchmark_multi_objective_with_model(multi_objective_env, sac_agent, "sac_model.pth", episodes=10, max_steps=500)


In [None]:
def benchmark_multi_level_with_model(env, agent, model_path, episodes, max_steps):
    """
    Benchmarks a pretrained SAC agent on a multi-level maze environment.

    Args:
        env (MultiLevelMazeEnv): The multi-level maze environment.
        agent (SACAgent): The SAC agent to be benchmarked.
        model_path (str): Path to the pretrained model file.
        episodes (int): Number of episodes to run the benchmark.
        max_steps (int): Maximum steps allowed per episode.

    Returns:
        None
    """
    # Load the pretrained model
    agent.actor.load_state_dict(torch.load(model_path, map_location=device))
    print(f"Model loaded from {model_path}")

    # Lists to store metrics for each episode
    rewards_per_episode = []  # Total rewards per episode
    successes_per_episode = []  # Success (1/0) for each episode
    levels_completed = []  # Number of levels completed per episode

    for ep in range(episodes):
        # Reset the environment at the start of each episode
        state = env.reset()
        total_reward, success, completed_levels = 0, 0, 0

        for step in range(max_steps):
            # Select and execute an action
            action = agent.act(state)
            next_state, reward, done, _ = env.step(action)
            state = next_state

            total_reward += reward  # Accumulate rewards

            if done:
                # Check if the final level is reached
                success = 1 if env.current_level >= env.levels else 0
                completed_levels = env.current_level  # Record the levels completed
                break

        # Record episode metrics
        rewards_per_episode.append(total_reward)
        successes_per_episode.append(success)
        levels_completed.append(completed_levels)

        # Print episode summary
        print(f"Episode {ep + 1}/{episodes}: Reward = {total_reward:.2f}, "
              f"Success = {success}, Levels = {completed_levels}")

    # Generate plots for results
    plot_multi_level_results(rewards_per_episode, successes_per_episode, levels_completed)


def plot_multi_level_results(rewards, successes, levels):
    """
    Generates and saves plots for benchmarking results in a multi-level environment.

    Args:
        rewards (list): Total rewards per episode.
        successes (list): Success (1/0) for each episode.
        levels (list): Number of levels completed per episode.

    Returns:
        None
    """
    if not os.path.exists("plots"):
        os.makedirs("plots")  # Ensure the plots directory exists

    # Plot rewards per episode
    plt.figure()
    plt.plot(rewards, label="Rewards")
    plt.title("Rewards per Episode")
    plt.xlabel("Episode")
    plt.ylabel("Total Reward")
    plt.legend()
    plt.savefig("plots/multi_level_rewards_plot.png")
    plt.close()

    # Plot success rate per episode
    plt.figure()
    plt.plot(successes, label="Success")
    plt.title("Success Rate per Episode")
    plt.xlabel("Episode")
    plt.ylabel("Success (1/0)")
    plt.legend()
    plt.savefig("plots/multi_level_success_plot.png")
    plt.close()

    # Plot levels completed per episode
    plt.figure()
    plt.plot(levels, label="Levels Completed")
    plt.title("Levels Completed per Episode")
    plt.xlabel("Episode")
    plt.ylabel("Number of Levels Completed")
    plt.legend()
    plt.savefig("plots/multi_level_levels_completed_plot.png")
    plt.close()

    print("Plots saved in the 'plots' directory.")



multi_level_env = MultiLevelMazeEnv(size=15, initial_obstacles=5, max_obstacles=50, levels=3)

benchmark_multi_level_with_model(multi_level_env, sac_agent, "sac_model.pth", episodes=300, max_steps=400)


In [None]:
def test_sac_model_with_multiple_runs_and_save(model_path, env_size=15, obstacles=50, max_steps=800, num_runs=3):
    """
    Tests a pretrained SAC agent across multiple runs and saves the performance results.

    Args:
        model_path (str): Path to the pretrained SAC model file.
        env_size (int): Size of the maze environment (NxN grid).
        obstacles (int): Number of obstacles in the maze.
        max_steps (int): Maximum number of steps allowed per run.
        num_runs (int): Number of independent runs to perform.

    Returns:
        None. Saves performance plots and results to the `benchmark` directory.
    """
    # Create base directory to save results
    base_dir = "benchmark/sac_multi_run_test"
    os.makedirs(base_dir, exist_ok=True)

    # Create subdirectories for each run
    run_dirs = [os.path.join(base_dir, f"Run_{i+1}") for i in range(num_runs)]
    for run_dir in run_dirs:
        os.makedirs(run_dir, exist_ok=True)

    # Initialize SAC agent and load the pretrained model
    sac_agent = SACAgent(input_channels=3, action_dim=4, maze_size=env_size)
    sac_agent.actor.load_state_dict(torch.load(model_path, map_location=device))
    print(f"Model loaded successfully from {model_path}")

    all_rewards = []  # List to store rewards for all runs

    # Perform multiple independent runs
    for run_idx, run_dir in enumerate(run_dirs):
        print(f"\nStarting run {run_idx + 1}/{num_runs}")
        
        # Initialize the environment for each run
        env = CustomMazeEnv(size=env_size, initial_obstacles=obstacles, max_obstacles=obstacles)
        env.reset()

        state = env._get_state()
        total_reward = 0
        rewards = []  # Cumulative rewards for this run

        for step in range(max_steps):
            # Agent selects an action
            action = sac_agent.act(state)
            next_state, reward, done, _ = env.step(action)
            state = next_state
            total_reward += reward
            rewards.append(total_reward)  # Append cumulative reward

            if done:
                print(f"Goal reached in {step + 1} steps with total reward {total_reward:.2f}")
                break

        all_rewards.append(rewards)  # Store the rewards for this run

    # Save performance plot
    performance_plot_path = os.path.join(base_dir, "Performance_Plot.png")
    plt.figure(figsize=(10, 5))
    for i, rewards in enumerate(all_rewards):
        plt.plot(rewards, label=f"Run {i + 1}")
    plt.xlabel("Step")
    plt.ylabel("Cumulative Reward")
    plt.title("SAC Agent Performance Across Runs")
    plt.legend()
    plt.grid()
    plt.savefig(performance_plot_path)
    plt.close()

    print("Results saved in", base_dir)




test_sac_model_with_multiple_runs_and_save("sac_model.pth", env_size=15, obstacles=50, max_steps=800, num_runs=3)
