In [28]:
from __future__ import annotations
import math
import random
from itertools import count
from enum import Enum, auto
from dataclasses import dataclass, field
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple, deque
import numpy as np

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

In [29]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

@dataclass
class HyperParams:
    batch_size: int = 64  # Larger batch for more stable learning
    gamma: float = 0.95   # Slightly lower discount for faster learning
    eps_start: float = 1.0
    eps_end: float = 0.02   # Keep more exploration
    eps_decay: int = 15000  # Much slower epsilon decay
    tau: float = 0.001      # Slower target network updates
    lr: float = 5e-4        # Higher learning rate
    device: torch.device = field(default_factory=lambda: device)

In [30]:
class DQN(nn.Module):
    def __init__(self, n_observations, n_actions):
        super(DQN, self).__init__()
        # Validate and parse the observation space
        # Expected: 25 (5x5 grid) + 2 (position) = 27 total observations
        self.vision_size = 25  # 5x5 local view
        self.pos_size = 2      # normalized row, col position
        
        expected_obs = self.vision_size + self.pos_size
        if n_observations != expected_obs:
            raise ValueError(f"Expected {expected_obs} observations (25 vision + 2 position), got {n_observations}")
        
        # Convolutional layers for spatial understanding of 5x5 local view
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3, padding=1)  # 5x5 -> 5x5
        self.bn1 = nn.BatchNorm2d(32)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1) # 5x5 -> 5x5
        self.bn2 = nn.BatchNorm2d(64)
        
        # Calculate conv output size: 64 channels * 5 * 5 = 1600
        conv_output_size = 64 * 5 * 5
        
        # Fully connected layers
        self.fc1 = nn.Linear(conv_output_size + self.pos_size, 512)
        self.fc2 = nn.Linear(512, 256)
        self.fc3 = nn.Linear(256, 128)
        self.fc4 = nn.Linear(128, n_actions)
        
        # Dropout for regularization
        self.dropout = nn.Dropout(0.2)

    def forward(self, x):
        batch_size = x.size(0)
        
        # Split input: first 25 elements are 5x5 grid, last 2 are position
        grid_input = x[:, :self.vision_size].view(batch_size, 1, 5, 5)
        pos_input = x[:, self.vision_size:]
        
        # Process spatial information with convolutions
        grid_features = F.relu(self.bn1(self.conv1(grid_input)))
        grid_features = F.relu(self.bn2(self.conv2(grid_features)))
        
        # Flatten conv output
        grid_features = grid_features.view(batch_size, -1)
        
        # Combine spatial and positional features
        combined = torch.cat([grid_features, pos_input], dim=1)
        
        # Process through fully connected layers
        x = F.relu(self.fc1(combined))
        x = self.dropout(x)
        x = F.relu(self.fc2(x))
        x = self.dropout(x)
        x = F.relu(self.fc3(x))
        x = self.fc4(x)
        
        return x

In [31]:
class StepResult(Enum):
    MOVED = 0.1           # Small positive reward for any movement
    HIT_WALL = -0.2       # Wall collision penalty  
    GOAL_REACHED = 50.0   # Higher goal reward
    INVALID = -0.1        # Invalid action penalty

class ExplorationRewards:
    NEW_AREA_BONUS = 1.0    # Larger exploration bonus
    REVISIT_PENALTY = -0.05  # Smaller backtracking penalty
    DISTANCE_BONUS = 0.1     # Bonus for getting closer to goal

In [32]:
def compute_movement_reward(is_new_area: bool, old_distance: int = None, new_distance: int = None) -> float:
    """Compute reward for valid movement based on exploration status and goal distance."""
    base_reward = StepResult.MOVED.value
    
    # Exploration bonus/penalty
    if is_new_area:
        exploration_reward = ExplorationRewards.NEW_AREA_BONUS
    else:
        exploration_reward = ExplorationRewards.REVISIT_PENALTY
    
    # Distance-based reward shaping (optional)
    distance_reward = 0.0
    if old_distance is not None and new_distance is not None:
        if new_distance < old_distance:
            distance_reward = ExplorationRewards.DISTANCE_BONUS  # Getting closer
        elif new_distance > old_distance:
            distance_reward = -ExplorationRewards.DISTANCE_BONUS  # Getting farther
        # If distance unchanged, distance_reward remains 0
    
    return base_reward + exploration_reward + distance_reward

In [33]:
class Actions(Enum):
    """Enum for actions in the dungeon environment."""
    MOVE_UP = 0
    MOVE_DOWN = 1
    MOVE_LEFT = 2
    MOVE_RIGHT = 3
    ATTACK = 4
    PICKUP_ITEM = 5
    USE_ITEM = 6
    OPEN_INVENTORY = 7
    QUIT = 8

In [34]:
class DQNAgent:
    def __init__(self, policy_net, hp):
        self.policy_net = policy_net
        self.hp = hp
        self.steps_done = 0

    def select_action(self, state, action_space):
        sample = random.random()
        eps_threshold = self.hp.eps_end + (self.hp.eps_start - self.hp.eps_end) * \
            math.exp(-1. * self.steps_done / self.hp.eps_decay)
        self.steps_done += 1
        if sample > eps_threshold:
            self.policy_net.eval()  # Set to eval mode for inference
            with torch.no_grad():
                action = self.policy_net(state).max(1).indices.view(1, 1)
            self.policy_net.train()  # Set back to training mode
            return action
        else:
            return torch.tensor([[random.choice(action_space)]], device=self.hp.device, dtype=torch.long)


In [35]:
class GridCellValues(Enum):
    EMPTY = 0
    WALL = 1
    VISITED = 2  # Breadcrumb system
    GOAL = 3
    PATH = 4   
    START = 5

In [36]:
class DungeonEnv:
    def __init__(self, rows=8, cols=8, wall_prob=0.25, seed=None, device=torch.device("cpu")):
        self.action_space = [a.value for a in Actions if a.value < Actions.ATTACK.value]
        
        # Local observation: 5x5 window around agent + agent position
        self.vision_range = 2  # Agent can see 2 cells in each direction
        self.obs_size = (2 * self.vision_range + 1) ** 2  # 5x5 = 25 cells
        self.observation_space = self.obs_size + 2  # +2 for agent's row,col position
        
        self.rows = rows
        self.cols = cols
        self.wall_prob = wall_prob
        self.rng = np.random.RandomState(seed)
        self.grid = None
        self.visited_grid = None  # Track where agent has been
        self.start_pos = None
        self.goal_pos = None
        self.agent_pos = None
        self.device = device
        self.max_steps = rows * cols * 3  # More steps for larger exploration
        self.steps_taken = 0
        self.reset()
     
    def get_local_observation(self):
        """Get local 5x5 observation around agent plus position info."""
        agent_row, agent_col = self.agent_pos
        
        # Create local observation window
        local_obs = []
        
        for dr in range(-self.vision_range, self.vision_range + 1):
            for dc in range(-self.vision_range, self.vision_range + 1):
                obs_row = agent_row + dr
                obs_col = agent_col + dc
                
                # Check bounds
                if (0 <= obs_row < self.rows and 0 <= obs_col < self.cols):
                    cell_value = self.grid[obs_row, obs_col]
                    # Convert to float and normalize
                    if cell_value == GridCellValues.EMPTY:
                        if self.visited_grid[obs_row, obs_col]:
                            local_obs.append(0.3)  # Visited empty space
                        else:
                            local_obs.append(0.0)  # Unvisited empty space
                    elif cell_value == GridCellValues.WALL:
                        local_obs.append(1.0)  # Wall
                    elif cell_value == GridCellValues.GOAL:
                        local_obs.append(0.9)  # Goal (if visible)
                    elif cell_value == GridCellValues.START:
                        local_obs.append(0.1)  # Start position
                    else:
                        local_obs.append(0.0)  # Default
                else:
                    # Out of bounds = wall
                    local_obs.append(1.0)
        
        # Add agent's normalized position (helps with spatial awareness)
        norm_row = agent_row / (self.rows - 1)
        norm_col = agent_col / (self.cols - 1)
        
        # Combine local view + position
        full_obs = local_obs + [norm_row, norm_col]
        
        return torch.tensor(full_obs, device=self.device, dtype=torch.float32)

    def reset(self):
        """Reset environment and return initial state."""
        self.steps_taken = 0
        
        # Initialize visited grid
        self.visited_grid = np.zeros((self.rows, self.cols), dtype=bool)
        
        # Generate maze-like grid
        max_retries = 10
        for attempt in range(max_retries):
            self.grid = self.rng.choice(
                [GridCellValues.EMPTY, GridCellValues.WALL],
                (self.rows, self.cols), 
                p=[1 - self.wall_prob, self.wall_prob]
            ) 
            
            # Ensure borders are mostly walls for maze feel (except for some openings)
            for i in range(self.rows):
                for j in range(self.cols):
                    if (i == 0 or i == self.rows-1 or j == 0 or j == self.cols-1):
                        if self.rng.random() < 0.8:  # 80% chance of border walls
                            self.grid[i, j] = GridCellValues.WALL
            
            # Find all empty cells
            empty_cells = list(zip(*np.where(self.grid == GridCellValues.EMPTY)))
            
            if len(empty_cells) >= 2:
                break
            elif attempt == max_retries - 1:
                # Last resort: create simple maze
                self.grid.fill(GridCellValues.EMPTY)
                # Add some walls randomly but ensure connectivity
                for i in range(1, self.rows-1):
                    for j in range(1, self.cols-1):
                        if self.rng.random() < 0.3:
                            self.grid[i, j] = GridCellValues.WALL
                empty_cells = list(zip(*np.where(self.grid == GridCellValues.EMPTY)))
                break
        
        # Place start and goal far apart
        if len(empty_cells) >= 2:
            # Try to maximize distance between start and goal
            max_dist = 0
            best_start = None
            best_goal = None
            
            for i, start_cell in enumerate(empty_cells):
                for j, goal_cell in enumerate(empty_cells):
                    if i != j:
                        dist = abs(start_cell[0] - goal_cell[0]) + abs(start_cell[1] - goal_cell[1])
                        if dist > max_dist:
                            max_dist = dist
                            best_start = start_cell
                            best_goal = goal_cell
            
            if best_start and best_goal:
                self.start_pos = best_start
                self.goal_pos = best_goal
            else:
                # Fallback
                self.start_pos = empty_cells[0]
                self.goal_pos = empty_cells[-1]
        else:
            # Should not happen with our fallback, but just in case
            self.start_pos = (1, 1)
            self.goal_pos = (self.rows-2, self.cols-2)

        # Set positions in grid
        self.grid[self.goal_pos] = GridCellValues.GOAL
        self.grid[self.start_pos] = GridCellValues.START
        self.agent_pos = self.start_pos 
        
        # Mark starting position as visited
        self.visited_grid[self.start_pos] = True
        
        return self.get_local_observation()

    def step(self, action: int):
        """Apply action and return (next_state, reward, done, info)."""
        try:
            action_enum = Actions(action)
        except ValueError:
            reward = StepResult.INVALID.value
            done = False
            new_state = self.get_local_observation()
            return new_state, reward, done, {"invalid_action": action}

        old_row, old_col = self.agent_pos
        new_row, new_col = old_row, old_col

        # Apply movement
        if action_enum == Actions.MOVE_UP:
            new_row -= 1
        elif action_enum == Actions.MOVE_DOWN:
            new_row += 1
        elif action_enum == Actions.MOVE_LEFT:
            new_col -= 1
        elif action_enum == Actions.MOVE_RIGHT:
            new_col += 1
        elif action_enum == Actions.QUIT:
            reward = 0.0
            done = True
            new_state = self.get_local_observation()
            return new_state, reward, done, {"quit": True}
        else:
            reward = StepResult.INVALID.value
            done = False
            new_state = self.get_local_observation()
            return new_state, reward, done, {"unsupported_action": action_enum.name}

        # Increment and check max steps
        self.steps_taken += 1
        if self.steps_taken >= self.max_steps:
            reward = StepResult.INVALID.value
            done = True
            new_state = self.get_local_observation()
            return new_state, reward, done, {"max_steps_reached": True}

        # Check bounds
        if not (0 <= new_row < self.rows and 0 <= new_col < self.cols):
            reward = StepResult.HIT_WALL.value
            done = False
            new_state = self.get_local_observation()
            return new_state, reward, done, {"hit_boundary": True}

        cell_value = self.grid[new_row, new_col]

        if cell_value == GridCellValues.WALL:
            # Hit a wall, don't move
            reward = StepResult.HIT_WALL.value
            done = False
            new_state = self.get_local_observation()
        elif cell_value == GridCellValues.GOAL:
            # Found the goal!
            self.agent_pos = (new_row, new_col)
            self.visited_grid[new_row, new_col] = True
            reward = StepResult.GOAL_REACHED.value
            done = True
            new_state = self.get_local_observation()
        else:
            # Valid move to empty space
            old_distance = abs(old_row - self.goal_pos[0]) + abs(old_col - self.goal_pos[1])
            self.agent_pos = (new_row, new_col)
            new_distance = abs(new_row - self.goal_pos[0]) + abs(new_col - self.goal_pos[1])
            
            # Check if this is a new area
            is_new_area = not self.visited_grid[new_row, new_col]
            self.visited_grid[new_row, new_col] = True
            
            # Compute reward with distance bonus/penalty
            reward = compute_movement_reward(is_new_area, old_distance, new_distance)
            done = False
            new_state = self.get_local_observation()

        return new_state, reward, done, {}


In [37]:

# Define a named tuple to represent a transition in the environment
Transition = namedtuple('Transition',
                        ('state', 'action', 'next_state', 'reward'))


class ReplayMemory:

    def __init__(self, capacity):
        self.memory = deque([], maxlen=capacity)

    def push(self, *args):
        """Save a transition"""
        self.memory.append(Transition(*args))

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

In [38]:
def optimize_model(hp: HyperParams, policy_net, target_net, memory, optimizer):
    """Perform one step of optimization on the policy network."""
    if len(memory) < hp.batch_size:
        return
    
    transitions = memory.sample(hp.batch_size)
    batch = Transition(*zip(*transitions))

    # Compute mask of non-final states
    non_final_mask = torch.tensor(
        tuple(map(lambda s: s is not None, batch.next_state)), 
        device=hp.device, 
        dtype=torch.bool
    )
    
    # Prepare batch tensors first
    state_batch = torch.cat(batch.state)
    action_batch = torch.cat(batch.action)
    reward_batch = torch.cat(batch.reward)
    
    # Handle case where all states are terminal (no non-final states)
    non_final_next_states_list = [s for s in batch.next_state if s is not None]
    if len(non_final_next_states_list) > 0:
        non_final_next_states = torch.cat(non_final_next_states_list)
    else:
        # Create dummy tensor with correct shape if no non-final states
        # Get the observation space size from the first state in the batch
        obs_size = state_batch.shape[1]
        non_final_next_states = torch.empty((0, obs_size), device=hp.device, dtype=torch.float32)

    # Compute Q(s_t, a)
    state_action_values = policy_net(state_batch).gather(1, action_batch)

    # Compute V(s_{t+1}) for all next states
    next_state_values = torch.zeros(hp.batch_size, device=hp.device)
    if len(non_final_next_states_list) > 0:
        with torch.no_grad():
            next_state_values[non_final_mask] = target_net(non_final_next_states).max(1).values
    
    # Compute expected Q values
    expected_state_action_values = (next_state_values * hp.gamma) + reward_batch

    # Compute Huber loss
    criterion = nn.SmoothL1Loss()
    loss = criterion(state_action_values, expected_state_action_values.unsqueeze(1))

    # Optimize the model
    optimizer.zero_grad()
    loss.backward()
    torch.nn.utils.clip_grad_value_(policy_net.parameters(), 100)
    optimizer.step()


In [39]:
def print_reward_structure():
    """Print the complete reward structure for transparency."""
    print("=== Reward Structure ===")
    print("Base Action Rewards:")
    for result in StepResult:
        print(f"  {result.name}: {result.value}")
    
    print("\nExploration Rewards:")
    print(f"  NEW_AREA_BONUS: {ExplorationRewards.NEW_AREA_BONUS}")
    print(f"  REVISIT_PENALTY: {ExplorationRewards.REVISIT_PENALTY}")
    print(f"  DISTANCE_BONUS: {ExplorationRewards.DISTANCE_BONUS}")
    
    print("\nEffective Movement Rewards:")
    print(f"  Move to new area (closer): {compute_movement_reward(True, 10, 9)}")
    print(f"  Move to new area (farther): {compute_movement_reward(True, 5, 6)}")
    print(f"  Move to visited area (closer): {compute_movement_reward(False, 10, 9)}")
    print(f"  Move to visited area (farther): {compute_movement_reward(False, 5, 6)}")
    print(f"  Hit wall: {StepResult.HIT_WALL.value}")
    print(f"  Find goal: {StepResult.GOAL_REACHED.value}")
    print()

In [40]:
def test_environment():
    """Test the environment in isolation."""
    print("=== Environment Test ===")
    env = DungeonEnv(rows=6, cols=6, wall_prob=0.15, seed=42, device=device)  # Match training env
    
    print(f"Environment: {env.rows}x{env.cols} grid, max_steps = {env.max_steps}")
    print(f"Observation space: {env.observation_space} (local 5x5 view + position)")
    print(f"Vision range: {env.vision_range}")
    
    # Test one episode to show the environment
    print(f"\n--- Test Episode ---")
    state = env.reset()
    print(f"Start pos: {env.start_pos}, Goal pos: {env.goal_pos}")
    print(f"Distance to goal: {abs(env.start_pos[0] - env.goal_pos[0]) + abs(env.start_pos[1] - env.goal_pos[1])}")
    print(f"Initial observation shape: {state.shape}")
    
    total_reward = 0
    for step in range(10):
        # Random exploration
        action = env.rng.choice(env.action_space)
        next_state, reward, done, info = env.step(action)
        total_reward += reward
        
        print(f"  Step {step}: action={action}, reward={reward:.3f}, done={done}")
        print(f"    agent_pos={env.agent_pos}")
        if info:
            print(f"    info={info}")
        
        if done:
            print(f"    Episode ended: {info}")
            break
        elif step == 9:
            print(f"    Episode continued...")
    
    # Show final grid
    print(f"  Grid visualization:")
    for i in range(env.rows):
        row_str = "    "
        for j in range(env.cols):
            if (i, j) == env.agent_pos:
                row_str += "A "
            elif (i, j) == env.goal_pos:
                row_str += "G "
            elif env.grid[i, j] == GridCellValues.WALL:
                row_str += "# "
            elif env.visited_grid[i, j]:
                row_str += ". "
            else:
                row_str += "  "
        print(row_str)
    print()

In [41]:
def train_dqn():
    """Main training function."""
    # Print reward structure for clarity
    print_reward_structure()
    
    # Run environment test first
    test_environment()
    
    # Set seeds for reproducibility
    seed = 42
    random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)

    # Initialize environment and hyperparameters
    env = DungeonEnv(rows=6, cols=6, wall_prob=0.15, device=device)  # Start smaller and easier
    hp = HyperParams()
    
    n_actions = len(env.action_space)
    n_observations = env.observation_space

    print(f"=== Training Setup ===")
    print(f"Environment: {env.rows}x{env.cols} grid, max_steps = {env.max_steps}")
    print(f"Action space: {env.action_space}")
    print(f"Observation space: {n_observations}")
    print(f"Learning rate: {hp.lr}, Epsilon decay: {hp.eps_decay}")

    # Initialize networks with improved architecture
    policy_net = DQN(n_observations, n_actions).to(hp.device)
    target_net = DQN(n_observations, n_actions).to(hp.device)
    target_net.load_state_dict(policy_net.state_dict())
    
    # Set networks to appropriate modes
    policy_net.train()  # Enable training mode for batch norm and dropout
    target_net.eval()   # Keep target network in eval mode

    # Initialize agent, optimizer, and memory
    dqn_agent = DQNAgent(policy_net=policy_net, hp=hp)
    optimizer = optim.Adam(policy_net.parameters(), lr=hp.lr)  # Adam often works better than AdamW
    memory = ReplayMemory(50000)  # Much larger memory

    # Training loop
    num_episodes = 2000  # More episodes for the improved setup
    episode_rewards = []
    episode_lengths = []
    goals_reached = 0
    exploration_stats = []

    for i_episode in range(num_episodes):
        # Initialize environment
        state = env.reset()
        state = state.unsqueeze(0)  # Add batch dimension
        total_reward = 0
        episode_length = 0
        goal_reached_this_episode = False
        cells_explored = 0

        for t in count():
            # Select action
            action = dqn_agent.select_action(state, env.action_space)
            
            # Take step
            next_state, reward, done, info = env.step(action.item())
            total_reward += reward
            episode_length += 1
            
            # Count exploration
            cells_explored = np.sum(env.visited_grid)
            
            # Check if goal was reached
            if reward >= 10.0:
                goal_reached_this_episode = True
            
            # Prepare tensors
            next_state = next_state.unsqueeze(0) if not done else None
            reward_tensor = torch.tensor([reward], device=hp.device, dtype=torch.float32)

            # Store transition
            memory.push(state, action, next_state, reward_tensor)

            # Move to next state
            state = next_state

            # Optimize model (start learning earlier with better architecture)
            if len(memory) > hp.batch_size * 4:  # Start after 4 batches worth of experience
                optimize_model(hp, policy_net, target_net, memory, optimizer)

            # Soft update target network
            target_net_state_dict = target_net.state_dict()
            policy_net_state_dict = policy_net.state_dict()
            for key in policy_net_state_dict:
                target_net_state_dict[key] = (
                    policy_net_state_dict[key] * hp.tau + 
                    target_net_state_dict[key] * (1 - hp.tau)
                )
            target_net.load_state_dict(target_net_state_dict)

            if done:
                episode_rewards.append(total_reward)
                episode_lengths.append(episode_length)
                exploration_stats.append(cells_explored)
                if goal_reached_this_episode:
                    goals_reached += 1
                
                # Enhanced logging every 50 episodes for faster feedback
                if i_episode % 50 == 0:
                    avg_reward = np.mean(episode_rewards[-100:])
                    avg_length = np.mean(episode_lengths[-100:])
                    avg_exploration = np.mean(exploration_stats[-100:])
                    goal_rate = goals_reached / (i_episode + 1) * 100
                    current_eps = hp.eps_end + (hp.eps_start - hp.eps_end) * math.exp(-1. * dqn_agent.steps_done / hp.eps_decay)
                    
                    print(f"Episode {i_episode}")
                    print(f"  Average Reward: {avg_reward:.2f}")
                    print(f"  Average Length: {avg_length:.1f}")
                    print(f"  Average Exploration: {avg_exploration:.1f}/{env.rows * env.cols} cells")
                    print(f"  Goals Reached: {goals_reached}/{i_episode + 1} ({goal_rate:.1f}%)")
                    print(f"  Epsilon: {current_eps:.3f}")
                    if i_episode > 0:
                        print(f"  Last Episode Info: {info}")
                    print()
                break

    return policy_net, episode_rewards

In [42]:
trained_net, rewards = train_dqn()

# Plot training progress with moving average
plt.figure(figsize=(12, 8))

plt.subplot(2, 1, 1)
plt.plot(rewards, alpha=0.3, label='Episode Reward')

# Calculate moving average
window_size = min(50, len(rewards) // 4)  # Adaptive window size
if len(rewards) >= window_size and window_size > 1:
    moving_avg = np.convolve(rewards, np.ones(window_size)/window_size, mode='valid')
    plt.plot(range(window_size-1, len(rewards)), moving_avg, label=f'Moving Average ({window_size} episodes)')

plt.title('Training Progress - Rewards')
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.legend()
plt.grid(True, alpha=0.3)

plt.subplot(2, 1, 2)
# Plot running success rate (rough estimate)
success_threshold = 30  # Assume success if reward > 30 (close to goal reward)
successes = [1 if r > success_threshold else 0 for r in rewards]
if len(successes) >= window_size and window_size > 1:
    success_rate = np.convolve(successes, np.ones(window_size)/window_size, mode='valid')
    plt.plot(range(window_size-1, len(rewards)), success_rate * 100)
    plt.title('Estimated Success Rate')
    plt.ylabel('Success Rate (%)')
else:
    plt.plot(successes)
    plt.title('Success/Failure per Episode')
    plt.ylabel('Success (1) / Failure (0)')

plt.xlabel('Episode')
plt.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

print(f"\nTraining completed!")
print(f"Final average reward (last 100 episodes): {np.mean(rewards[-100:]):.2f}")
print(f"Best episode reward: {max(rewards):.2f}")
print(f"Episodes with reward > {success_threshold}: {sum(successes)}/{len(successes)} ({sum(successes)/len(successes)*100:.1f}%)")

=== Reward Structure ===
Base Action Rewards:
  MOVED: 0.1
  HIT_WALL: -0.2
  GOAL_REACHED: 50.0
  INVALID: -0.1

Exploration Rewards:
  NEW_AREA_BONUS: 1.0
  REVISIT_PENALTY: -0.05
  DISTANCE_BONUS: 0.1

Effective Movement Rewards:
  Move to new area (closer): 1.2000000000000002
  Move to new area (farther): 1.0
  Move to visited area (closer): 0.15000000000000002
  Move to visited area (farther): -0.05
  Hit wall: -0.2
  Find goal: 50.0

=== Environment Test ===
Environment: 6x6 grid, max_steps = 108
Observation space: 27 (local 5x5 view + position)
Vision range: 2

--- Test Episode ---
Start pos: (np.int64(1), np.int64(1)), Goal pos: (np.int64(4), np.int64(4))
Distance to goal: 6
Initial observation shape: torch.Size([27])
  Step 0: action=1, reward=-0.200, done=False
    agent_pos=(np.int64(1), np.int64(1))
  Step 1: action=1, reward=-0.200, done=False
    agent_pos=(np.int64(1), np.int64(1))
  Step 2: action=0, reward=-0.200, done=False
    agent_pos=(np.int64(1), np.int64(1))
  S

KeyboardInterrupt: 