# Sound-Based Navigation System in Google Colab

This notebook demonstrates how to run the sound-based navigation system in Google Colab.

In [None]:
# Install required packages
!pip install numpy torch matplotlib tqdm

## Create project structure and files

In [None]:
# Create directory structure
import os
os.makedirs('core', exist_ok=True)
os.makedirs('rl', exist_ok=True)
os.makedirs('utils', exist_ok=True)
os.makedirs('interface', exist_ok=True)

In [None]:
%%writefile core/__init__.py


In [None]:
%%writefile core/grid_world.py
import numpy as np
import matplotlib.pyplot as plt
from core.sound_source import Wall, SoundSource, propagate_sound
from utils.audio_processing import get_audio_observation_features


class GridWorld:
    """
    A 25x25 grid world for the sound-based navigation task.
    Cell types: 0 - empty, 1 - wall, 2 - agent, 3 - sound source
    """
    
    def __init__(self, width: int = 25, height: int = 25):
        """
        Initialize the grid world.
        
        Args:
            width: Width of the grid (default 25)
            height: Height of the grid (default 25)
        """
        self.width = width
        self.height = height
        self.grid = np.zeros((height, width), dtype=np.int8)
        
        # Initialize agent and sound sources
        self.agent_pos = None
        self.sound_sources = []
        self.wall_objects = []  # Store Wall objects instead of just coordinates
        
    def reset(self) -> None:
        """Reset the grid to initial state"""
        self.grid = np.zeros((self.height, self.width), dtype=np.int8)
        self.agent_pos = None
        self.sound_sources = []
        self.wall_objects = []
        
    def get_state(self) -> np.ndarray:
        """
        Return the current state of the grid.
        
        Returns:
            Copy of the grid state
        """
        return self.grid.copy()
        
    def render(self) -> None:
        """Visualize the current state of the grid"""
        plt.figure(figsize=(8, 8))
        
        # Create a copy of the grid for visualization
        vis_grid = self.grid.copy()
        
        # Mark the agent position if exists
        if self.agent_pos is not None:
            x, y = self.agent_pos
            vis_grid[x][y] = 2
            
        # Mark sound sources if exist
        for source in self.sound_sources:
            vis_grid[source.x][source.y] = 3
            
        plt.imshow(vis_grid, cmap='viridis', interpolation='nearest')
        plt.colorbar(label='Cell Type (0: Empty, 1: Wall, 2: Agent, 3: Sound Source)')
        plt.title('Grid World Visualization')
        plt.show()
        
    def is_valid_position(self, x: int, y: int) -> bool:
        """
        Check if position is within bounds.
        
        Args:
            x: X coordinate
            y: Y coordinate
            
        Returns:
            True if position is valid, False otherwise
        """
        return 0 <= x < self.width and 0 <= y < self.height
        
    def place_wall(self, x: int, y: int, permeability: float = 0.5) -> None:
        """
        Place a wall at the given position with permeability.
        
        Args:
            x: X coordinate
            y: Y coordinate
            permeability: Permeability of the wall (default 0.5)
        """
        if self.is_valid_position(x, y):
            self.grid[x][y] = 1
            wall = Wall(x, y, permeability)
            self.wall_objects.append(wall)
            
    def place_sound_source(self, sound_source: 'SoundSource') -> None:
        """
        Place a sound source on the grid.
        
        Args:
            sound_source: SoundSource object to place
        """
        if self.is_valid_position(sound_source.x, sound_source.y):
            self.sound_sources.append(sound_source)
            
    def place_agent(self, x: int, y: int) -> bool:
        """
        Place the agent at the given position.
        
        Args:
            x: X coordinate
            y: Y coordinate
            
        Returns:
            True if placement was successful, False otherwise
        """
        if self.is_valid_position(x, y) and self.grid[x][y] == 0:
            self.agent_pos = (x, y)
            return True
        return False
    
    def compute_sound_map(self) -> np.ndarray:
        """
        Compute the sound propagation map based on sources and walls.
        
        Returns:
            2D numpy array representing sound intensity at each cell
        """
        return propagate_sound(self.grid, self.sound_sources, self.wall_objects)


class Agent:
    """
    Basic agent that can move in the grid world.
    Actions: up, down, left, right, stay
    """
    
    def __init__(self, start_x: int = 0, start_y: int = 0):
        """
        Initialize the agent.
        
        Args:
            start_x: Starting X coordinate (default 0)
            start_y: Starting Y coordinate (default 0)
        """
        self.x = start_x
        self.y = start_y
        self.position = (start_x, start_y)
        
        # Define possible actions
        self.actions = {
            0: 'up',
            1: 'down', 
            2: 'left',
            3: 'right',
            4: 'stay'
        }
        
        # Direction vectors for movement
        self.action_vectors = {
            'up': (-1, 0),
            'down': (1, 0),
            'left': (0, -1),
            'right': (0, 1),
            'stay': (0, 0)
        }
        
    def move(self, action: int, grid_world: 'GridWorld') -> tuple:
        """
        Move the agent according to the action in the given grid world.
        
        Args:
            action: Action index (0-4) or action name ('up', 'down', etc.)
            grid_world: GridWorld instance to move in
            
        Returns:
            New position of the agent as (x, y) tuple
        """
        # Handle numpy integers as well as Python integers
        import numbers
        if isinstance(action, numbers.Integral):
            action = self.actions[action]
            
        if action not in self.action_vectors:
            raise ValueError(f"Invalid action: {action}")
            
        dx, dy = self.action_vectors[action]
        new_x = self.x + dx
        new_y = self.y + dy
        
        # Check boundaries and collisions with walls
        # We need to check the actual grid state, not a copy
        if (grid_world.is_valid_position(new_x, new_y) and 
            grid_world.grid[new_x][new_y] != 1):  # Not a wall
            self.x = new_x
            self.y = new_y
            self.position = (self.x, self.y)
            
            # Update the agent position in the grid world
            grid_world.agent_pos = (self.x, self.y)
            
        return self.position
        
    def get_position(self) -> tuple:
        """
        Get current position of the agent.
        
        Returns:
            Current position as (x, y) tuple
        """
        return self.position
        
    def observe(self, sound_map: np.ndarray = None, grid_world: 'GridWorld' = None) -> np.ndarray:
        """
        Audio observation for the agent.
        Returns audio features extracted from the sound at the agent's position.
        
        Args:
            sound_map: Sound intensity map (optional)
            grid_world: GridWorld instance (optional)
            
        Returns:
            Audio observation features as numpy array
        """
        if sound_map is not None:
            intensity = sound_map[self.x][self.y]
            
            # Determine frequency content from nearby sound sources
            frequency_content = self._get_frequency_content_at_position(grid_world)
            
            # Get audio observation features
            return get_audio_observation_features(intensity, frequency_content)
        else:
            # Return default audio features when no sound map is available
            return get_audio_observation_features(0.0, 0.5)  # Default: no intensity, medium frequency

    def _get_frequency_content_at_position(self, grid_world: 'GridWorld') -> float:
        """
        Get the dominant frequency content at the agent's position based on nearby sources.
        This considers both the distance and volume of sources to determine which one is loudest.
        
        Args:
            grid_world: GridWorld instance to analyze
            
        Returns:
            Dominant frequency content (0.0-1.0)
        """
        if grid_world is None or not grid_world.sound_sources:
            return 0.5  # Default frequency content

        # Find the most prominent sound source at the agent's position based on perceived loudness
        max_perceived_loudness = -1
        dominant_frequency = 0.5
        
        for source in grid_world.sound_sources:
            # Calculate Manhattan distance
            distance = abs(self.x - source.x) + abs(self.y - source.y)
            
            # Calculate perceived loudness based on distance and source volume
            # Using inverse relationship similar to our sound propagation
            perceived_loudness = source.volume / (1 + 0.5 * distance + 0.1 * distance**1.5)
            
            if perceived_loudness > max_perceived_loudness:
                max_perceived_loudness = perceived_loudness
                dominant_frequency = source.frequency
                
        return dominant_frequency

In [None]:
%%writefile core/sound_source.py
import numpy as np


class Wall:
    """
    Represents a wall in the grid world with permeability properties.
    """
    
    def __init__(self, x: int, y: int, permeability: float = 0.5):
        """
        Initialize a wall.
        
        Args:
            x: X coordinate of the wall
            y: Y coordinate of the wall
            permeability: How much sound passes through the wall (0.0-1.0)
        """
        self.x = x
        self.y = y
        self.permeability = permeability
        
        
class SoundSource:
    """
    Represents a sound source in the grid world.
    """
    
    def __init__(self, x: int, y: int, volume: float = 1.0, frequency: float = 0.5):
        """
        Initialize a sound source.
        
        Args:
            x: X coordinate of the sound source
            y: Y coordinate of the sound source
            volume: Volume of the sound source (0.0-1.0)
            frequency: Frequency characteristic of the sound (0.0-1.0)
        """
        self.x = x
        self.y = y
        self.volume = volume
        self.frequency = frequency
        
        
def propagate_sound(grid, sound_sources, walls, decay_factor=0.5, spread_distance=5):
    """
    Simulate sound propagation in the grid world considering walls.
    
    Args:
        grid: 2D numpy array representing the grid world
        sound_sources: List of SoundSource objects
        walls: List of Wall objects
        decay_factor: Factor determining how quickly sound decays with distance
        spread_distance: Maximum distance sound can travel
        
    Returns:
        2D numpy array representing sound intensity at each cell
    """
    height, width = grid.shape
    sound_map = np.zeros((height, width), dtype=np.float32)
    
    # Create a map of wall positions and permeabilities
    wall_map = np.ones((height, width), dtype=np.float32)  # Default: no obstruction
    for wall in walls:
        wall_map[wall.x, wall.y] = wall.permeability
    
    # For each sound source, propagate its sound
    for source in sound_sources:
        # Start with the source's volume at its position
        sound_map[source.x, source.y] += source.volume
        
        # Propagate sound to neighboring cells
        for dist in range(1, spread_distance + 1):
            for dx in range(-dist, dist + 1):
                for dy in range(-dist, dist + 1):
                    if abs(dx) + abs(dy) != dist:  # Only consider cells at exact distance
                        continue
                        
                    nx, ny = source.x + dx, source.y + dy
                    
                    # Check bounds
                    if 0 <= nx < height and 0 <= ny < width:
                        # Calculate distance from source
                        distance = abs(dx) + abs(dy)
                        
                        # Calculate attenuation based on distance and walls
                        attenuation = 1.0 / (1 + decay_factor * distance + 0.1 * distance**1.5)
                        
                        # Find path from source to current position and calculate permeability
                        path_attenuation = calculate_path_attenuation(source.x, source.y, nx, ny, wall_map)
                        
                        # Apply both distance and wall attenuations
                        effective_attenuation = attenuation * path_attenuation
                        
                        # Add attenuated sound to this cell
                        sound_map[nx, ny] += source.volume * effective_attenuation
    
    # Normalize the sound map to [0, 1]
    if sound_map.max() > 0:
        sound_map = sound_map / sound_map.max()
    
    return sound_map


def calculate_path_attenuation(start_x, start_y, end_x, end_y, wall_map):
    """
    Calculate the attenuation along the path from start to end based on walls.
    Uses a simple approach considering the direct path.
    
    Args:
        start_x, start_y: Starting coordinates
        end_x, end_y: Ending coordinates
        wall_map: 2D array with permeability values for each cell
        
    Returns:
        Combined permeability factor for the path
    """
    # For simplicity, we'll use Bresenham's line algorithm concept to find path
    # and multiply the permeabilities of cells in the path
    
    dx = abs(end_x - start_x)
    dy = abs(end_y - start_y)
    
    # Simple approximation: average permeability along the primary directions
    min_x, max_x = min(start_x, end_x), max(start_x, end_x)
    min_y, max_y = min(start_y, end_y), max(start_y, end_y)
    
    combined_permeability = 1.0
    path_length = 0
    
    # Average permeability along x direction
    if dx > 0:
        for x in range(min_x, max_x + 1):
            avg_permeability = wall_map[x, start_y] if start_y == end_y else (wall_map[x, start_y] + wall_map[x, end_y]) / 2
            combined_permeability *= avg_permeability
            path_length += 1
            
    # Average permeability along y direction
    if dy > 0:
        for y in range(min_y, max_y + 1):
            avg_permeability = wall_map[start_x, y] if start_x == end_x else (wall_map[start_x, y] + wall_map[end_x, y]) / 2
            combined_permeability *= avg_permeability
            path_length += 1
    
    # If no path considered, return 1.0 (no attenuation)
    if path_length == 0:
        return 1.0
        
    # Take the geometric mean to avoid overly small values
    return combined_permeability ** (1.0 / path_length) if path_length > 0 else 1.0

In [None]:
%%writefile utils/__init__.py


In [None]:
%%writefile utils/colab_visualization.py
"""
Visualization module for Google Colab that uses matplotlib instead of pygame.
"""

import matplotlib.pyplot as plt
import numpy as np


class ColabVisualizer:
    """
    Visualizer for the sound navigation environment using matplotlib.
    Designed specifically for Google Colab compatibility.
    """
    
    def __init__(self):
        """Initialize the visualizer."""
        self.fig = None
        self.ax = None
        
    def update(self, env):
        """
        Update visualization with current environment state.
        
        Args:
            env: Environment object with grid, agent_pos, and sound_sources
        """
        # Create or clear the plot
        if self.fig is None:
            self.fig, self.ax = plt.subplots(figsize=(10, 8))
        
        # Clear the axes
        self.ax.clear()
        
        # Create a copy of the grid for visualization
        vis_grid = env.grid.copy()
        
        # Mark the agent position if exists
        if env.agent_pos is not None:
            x, y = env.agent_pos
            vis_grid[x][y] = 2
            
        # Mark sound sources if exist
        for source in env.sound_sources:
            vis_grid[source.x][source.y] = 3
        
        # Display the grid
        im = self.ax.imshow(vis_grid, cmap='viridis', interpolation='nearest')
        self.ax.set_title(f'Grid World Visualization - Step: {getattr(env, "step_count", 0)}')
        
        # Add colorbar
        cbar = self.fig.colorbar(im, ax=self.ax, label='Cell Type (0: Empty, 1: Wall, 2: Agent, 3: Sound Source)')
        
        # Show the plot
        plt.show()
        
    def close(self):
        """Close the visualization."""
        if self.fig is not None:
            plt.close(self.fig)


def visualize_single_frame(env, title="Environment State"):
    """
    Visualize a single frame of the environment without maintaining state.
    
    Args:
        env: Environment object with grid, agent_pos, and sound_sources
        title: Title for the visualization
    """
    # Create a copy of the grid for visualization
    vis_grid = env.grid.copy()
    
    # Mark the agent position if exists
    if env.agent_pos is not None:
        x, y = env.agent_pos
        vis_grid[x][y] = 2
    
    # Mark sound sources if exist
    for source in env.sound_sources:
        vis_grid[source.x][source.y] = 3
    
    plt.figure(figsize=(8, 8))
    plt.imshow(vis_grid, cmap='viridis', interpolation='nearest')
    plt.colorbar(label='Cell Type (0: Empty, 1: Wall, 2: Agent, 3: Sound Source)')
    plt.title(title)
    plt.show()


def plot_training_progress(losses, rewards):
    """
    Plot training progress showing losses and rewards over time.
    
    Args:
        losses: List of loss values during training
        rewards: List of cumulative rewards during training
    """
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 5))
    
    # Plot losses
    ax1.plot(losses)
    ax1.set_title('Training Loss Over Time')
    ax1.set_xlabel('Steps')
    ax1.set_ylabel('Loss')
    
    # Plot rewards
    ax2.plot(rewards)
    ax2.set_title('Cumulative Reward Over Episodes')
    ax2.set_xlabel('Episodes')
    ax2.set_ylabel('Reward')
    
    plt.tight_layout()
    plt.show()

In [None]:
%%writefile utils/audio_processing.py
import numpy as np


def get_audio_observation_features(sound_intensity, frequency_content):
    """
    Extract audio features from the sound at the agent's position.
    
    Args:
        sound_intensity: Intensity of sound at the agent's position
        frequency_content: Dominant frequency at the agent's position
        
    Returns:
        Array of audio features
    """
    # Basic features: intensity, frequency, and their combinations
    features = [
        sound_intensity,           # Overall intensity
        frequency_content,         # Dominant frequency
        sound_intensity ** 2,      # Quadratic term for intensity
        frequency_content ** 2,    # Quadratic term for frequency
        sound_intensity * frequency_content,  # Cross-term
        np.sqrt(max(0, sound_intensity)),     # Square root of intensity
        np.sin(2 * np.pi * frequency_content),  # Sinusoidal transformation of frequency
        np.cos(2 * np.pi * frequency_content),  # Cosine transformation of frequency
    ]
    
    # Add some basic directional features based on intensity changes
    # These would normally come from comparing with previous observations
    # For now, we'll include placeholder values
    features.extend([0.0, 0.0, 0.0, 0.0])  # Placeholder for directional features
    
    return np.array(features, dtype=np.float32)


def extract_spectral_features(audio_signal, sample_rate=44100):
    """
    Extract spectral features from an audio signal.
    
    Args:
        audio_signal: Array representing the audio signal
        sample_rate: Sample rate of the audio signal
        
    Returns:
        Dictionary of spectral features
    """
    # Compute FFT
    fft = np.fft.fft(audio_signal)
    magnitude_spectrum = np.abs(fft[:len(fft)//2])
    
    # Compute spectral centroid (center of mass of spectrum)
    frequencies = np.arange(len(magnitude_spectrum)) * (sample_rate / len(audio_signal))
    spectral_centroid = np.sum(frequencies * magnitude_spectrum) / np.sum(magnitude_spectrum)
    
    # Compute spectral rolloff (frequency below which a certain percentage of energy is contained)
    threshold = 0.85 * np.sum(magnitude_spectrum)
    spectral_rolloff = frequencies[np.argmax(np.cumsum(magnitude_spectrum) >= threshold)]
    
    return {
        'spectral_centroid': spectral_centroid,
        'spectral_rolloff': spectral_rolloff,
        'power': np.mean(audio_signal ** 2)
    }

In [None]:
%%writefile utils/environment_gen.py
import numpy as np
from core.grid_world import GridWorld, Agent
from core.sound_source import SoundSource, Wall
from core.tasks import create_task_environment


def generate_random_environment(task_type: int, width: int = 25, height: int = 25):
    """
    Generate a random environment for the specified task.
    
    Args:
        task_type: Type of task (1, 2, or 3)
        width: Width of the grid
        height: Height of the grid
        
    Returns:
        Environment object for the specified task
    """
    # Create base grid world
    env = create_task_environment(task_type, width, height)
    
    # Randomly place walls (about 10-20% of cells)
    num_walls = int(0.15 * width * height)
    for _ in range(num_walls):
        x, y = np.random.randint(0, width), np.random.randint(0, height)
        if (x, y) != env.agent_pos and (x, y) not in [(s.x, s.y) for s in env.sound_sources]:
            permeability = np.random.uniform(0.1, 0.9)  # Random permeability
            env.place_wall(x, y, permeability)
    
    # Make sure agent position is properly set
    if env.agent_pos is None:
        # Find an empty spot for the agent
        empty_cells = []
        for x in range(width):
            for y in range(height):
                if env.grid[x][y] == 0:  # Empty cell
                    empty_cells.append((x, y))
        
        if empty_cells:
            agent_x, agent_y = empty_cells[np.random.choice(len(empty_cells))]
            env.place_agent(agent_x, agent_y)
    
    return env


def manual_environment_setup(task_type: int):
    """
    Set up an environment manually based on user input.
    For Colab compatibility, we'll create a simple predefined environment.
    
    Args:
        task_type: Type of task (1, 2, or 3)
        
    Returns:
        Environment object for the specified task
    """
    print(f"Creating manual environment for Task {task_type}")
    
    # Create base environment for the task
    env = create_task_environment(task_type, 25, 25)
    
    # Add some predefined walls
    wall_positions = [
        (5, 5), (5, 6), (5, 7),
        (10, 15), (11, 15), (12, 15),
        (15, 10), (15, 11), (15, 12), (15, 13)
    ]
    
    for x, y in wall_positions:
        env.place_wall(x, y, permeability=np.random.uniform(0.2, 0.8))
    
    # Ensure agent is placed if not already
    if env.agent_pos is None:
        env.place_agent(2, 2)
    
    return env

In [None]:
%%writefile core/tasks.py
import numpy as np
from core.grid_world import GridWorld, Agent
from core.sound_source import SoundSource


class BaseTaskEnvironment(GridWorld):
    """
    Base class for task-specific environments.
    """
    
    def __init__(self, width=25, height=25, max_steps=500):
        super().__init__(width, height)
        self.max_steps = max_steps
        self.step_count = 0
        self.total_reward = 0
        self.done = False
        self.agent = None
        
    def reset(self):
        """Reset the environment to initial state."""
        super().reset()
        self.step_count = 0
        self.total_reward = 0
        self.done = False
        
    def step(self, action):
        """
        Execute one step in the environment.
        
        Args:
            action: Action to take
            
        Returns:
            Tuple of (observation, reward, done)
        """
        if self.done:
            raise RuntimeError("Environment is done. Please reset before continuing.")
        
        # Move the agent
        old_pos = self.agent_pos
        new_pos = self.agent.move(action, self)
        
        # Compute reward
        reward = self.compute_reward(old_pos, new_pos, action)
        
        # Update step count
        self.step_count += 1
        self.total_reward += reward
        
        # Check termination conditions
        self.done = self.check_done()
        
        # Get new observation
        sound_map = self.compute_sound_map()
        observation = self.agent.observe(sound_map=sound_map, grid_world=self)
        
        return observation, reward, self.done
    
    def compute_reward(self, old_pos, new_pos, action):
        """
        Compute reward for the transition from old_pos to new_pos.
        This method should be overridden by subclasses.
        """
        raise NotImplementedError("Subclasses should implement compute_reward method")
    
    def check_done(self):
        """
        Check if the episode is done.
        """
        return self.step_count >= self.max_steps
    
    
class FindAllSourcesTask(BaseTaskEnvironment):
    """
    Task: Find all sound sources in the environment.
    Reward: Positive reward for finding each new source, small penalty for each step.
    """
    
    def __init__(self, width=25, height=25, max_steps=500):
        super().__init__(width, height, max_steps)
        self.found_sources = set()
        
    def reset(self):
        super().reset()
        self.found_sources = set()
        
    def compute_reward(self, old_pos, new_pos, action):
        """
        Reward is positive when finding a new source, negative for each step.
        """
        # Small negative reward for each step to encourage efficiency
        reward = -0.01
        
        # Check if the agent is at the same position as any unfound sound source
        for i, source in enumerate(self.sound_sources):
            if new_pos == (source.x, source.y) and i not in self.found_sources:
                self.found_sources.add(i)
                reward += 1.0  # Positive reward for finding a new source
                
        return reward
        
    def check_done(self):
        """
        Episode is done when all sources are found or max steps reached.
        """
        return len(self.found_sources) == len(self.sound_sources) or self.step_count >= self.max_steps


class FindQuietestPlaceTask(BaseTaskEnvironment):
    """
    Task: Find the quietest place in the environment.
    Reward: Based on how quiet the current location is compared to others.
    """
    
    def __init__(self, width=25, height=25, max_steps=500):
        super().__init__(width, height, max_steps)
        self.sound_map_history = []
        
    def reset(self):
        super().reset()
        self.sound_map_history = []
        
    def compute_reward(self, old_pos, new_pos, action):
        """
        Reward is based on how much quieter the new position is compared to previous positions.
        """
        # Compute current sound map
        sound_map = self.compute_sound_map()
        current_intensity = sound_map[new_pos[0]][new_pos[1]]
        
        # Negative reward proportional to sound intensity (quieter = higher reward)
        reward = -current_intensity
        
        # Additional reward if this is the quietest place found so far
        if not self.sound_map_history or current_intensity < min(self.sound_map_history):
            self.sound_map_history.append(current_intensity)
            reward += 0.5  # Bonus for finding a quieter place
            
        # Small penalty for each step
        reward -= 0.01
        
        return reward


class FollowMovingSourceTask(BaseTaskEnvironment):
    """
    Task: Follow a moving sound source.
    The sound source moves randomly around the environment.
    Reward: Positive when close to the source, negative when far.
    """
    
    def __init__(self, width=25, height=25, max_steps=500):
        super().__init__(width, height, max_steps)
        self.moving_source_idx = 0  # Index of the moving source
        
    def reset(self):
        super().reset()
        # Reset the moving source to its original position
        if hasattr(self, '_original_source_positions'):
            for i, pos in enumerate(self._original_source_positions):
                if i < len(self.sound_sources):
                    self.sound_sources[i].x = pos[0]
                    self.sound_sources[i].y = pos[1]
        
    def step(self, action):
        """
        Execute one step in the environment, including moving the sound source.
        """
        if self.done:
            raise RuntimeError("Environment is done. Please reset before continuing.")
        
        # Move the agent
        old_pos = self.agent_pos
        new_pos = self.agent.move(action, self)
        
        # Move the sound source randomly
        self._move_sound_source_randomly()
        
        # Compute reward
        reward = self.compute_reward(old_pos, new_pos, action)
        
        # Update step count
        self.step_count += 1
        self.total_reward += reward
        
        # Check termination conditions
        self.done = self.check_done()
        
        # Get new observation
        sound_map = self.compute_sound_map()
        observation = self.agent.observe(sound_map=sound_map, grid_world=self)
        
        return observation, reward, self.done
    
    def _move_sound_source_randomly(self):
        """Move one of the sound sources randomly."""
        if self.sound_sources:
            # Select a source to move (could be randomized selection)
            source = self.sound_sources[self.moving_source_idx % len(self.sound_sources)]
            
            # Possible moves: up, down, left, right, stay
            moves = [(-1, 0), (1, 0), (0, -1), (0, 1), (0, 0)]
            move = moves[np.random.choice(len(moves))]
            
            new_x = source.x + move[0]
            new_y = source.y + move[1]
            
            # Check bounds and make sure it's not a wall
            if (0 <= new_x < self.width and 0 <= new_y < self.height and 
                self.grid[new_x][new_y] != 1):  # Not a wall
                source.x = new_x
                source.y = new_y
                
            # Move to next source for next step
            self.moving_source_idx += 1
    
    def compute_reward(self, old_pos, new_pos, action):
        """
        Reward based on proximity to the closest sound source.
        """
        # Find the closest sound source
        min_distance = float('inf')
        for source in self.sound_sources:
            distance = abs(new_pos[0] - source.x) + abs(new_pos[1] - source.y)  # Manhattan distance
            min_distance = min(min_distance, distance)
        
        # Reward based on proximity (closer = higher reward)
        # Using inverse relationship with distance
        if min_distance == 0:
            reward = 1.0
        else:
            reward = 1.0 / (min_distance + 0.1)
            
        # Small penalty for each step
        reward -= 0.01
        
        return reward


def create_task_environment(task_type, width=25, height=25):
    """
    Factory function to create task-specific environments.
    
    Args:
        task_type: Type of task (1, 2, or 3)
        width: Width of the grid
        height: Height of the grid
        
    Returns:
        Task-specific environment instance
    """
    if task_type == 1:
        env = FindAllSourcesTask(width, height)
        
        # Add a few sound sources
        env.place_sound_source(SoundSource(5, 5, volume=0.8))
        env.place_sound_source(SoundSource(20, 20, volume=0.9))
        env.place_sound_source(SoundSource(10, 15, volume=0.7))
        
        # Place agent
        env.place_agent(2, 2)
        env.agent = Agent(2, 2)
        
    elif task_type == 2:
        env = FindQuietestPlaceTask(width, height)
        
        # Add sound sources positioned to create gradient of sound
        env.place_sound_source(SoundSource(3, 3, volume=1.0))
        env.place_sound_source(SoundSource(22, 3, volume=1.0))
        env.place_sound_source(SoundSource(3, 22, volume=1.0))
        env.place_sound_source(SoundSource(22, 22, volume=1.0))
        
        # Place agent
        env.place_agent(12, 12)  # Center position
        env.agent = Agent(12, 12)
        
    elif task_type == 3:
        env = FollowMovingSourceTask(width, height)
        
        # Add a primary source that moves and a few stationary ones
        env.place_sound_source(SoundSource(5, 5, volume=0.9))  # Moving source
        env.place_sound_source(SoundSource(15, 15, volume=0.6))
        env.place_sound_source(SoundSource(20, 5, volume=0.7))
        
        # Store original positions for reset
        env._original_source_positions = [(s.x, s.y) for s in env.sound_sources]
        
        # Place agent
        env.place_agent(2, 2)
        env.agent = Agent(2, 2)
        
    else:
        raise ValueError(f"Unknown task type: {task_type}")
        
    return env

In [None]:
%%writefile rl/__init__.py


In [None]:
%%writefile rl/dqn.py
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import random
from collections import deque
import os

# Check if CUDA is available
TORCH_AVAILABLE = torch.cuda.is_available() or torch.backends.mps.is_available()


class DQN(nn.Module):
    """
    Deep Q-Network implementation.
    """
    
    def __init__(self, input_size: int, output_size: int, hidden_sizes: list = None):
        """
        Initialize the DQN network.
        
        Args:
            input_size: Size of the input layer
            output_size: Size of the output layer (number of actions)
            hidden_sizes: List of sizes for hidden layers (default: [64, 64])
        """
        super(DQN, self).__init__()
        
        if hidden_sizes is None:
            hidden_sizes = [64, 64]
        
        # Build the network layers
        layers = []
        prev_size = input_size
        
        for hidden_size in hidden_sizes:
            layers.append(nn.Linear(prev_size, hidden_size))
            layers.append(nn.ReLU())
            prev_size = hidden_size
        
        # Output layer
        layers.append(nn.Linear(prev_size, output_size))
        
        self.network = nn.Sequential(*layers)
        
    def forward(self, x):
        """
        Forward pass through the network.
        
        Args:
            x: Input tensor
            
        Returns:
            Output tensor with Q-values for each action
        """
        if isinstance(x, np.ndarray):
            x = torch.FloatTensor(x)
        
        # Ensure x has the right shape
        if len(x.shape) == 1:
            x = x.unsqueeze(0)
        
        return self.network(x)


class ReplayBuffer:
    """
    Experience replay buffer for storing and sampling experiences.
    """
    
    def __init__(self, capacity: int):
        """
        Initialize the replay buffer.
        
        Args:
            capacity: Maximum capacity of the buffer
        """
        self.buffer = deque(maxlen=capacity)
        
    def push(self, state, action, reward, next_state, done):
        """
        Add a new experience to the buffer.
        
        Args:
            state: Current state
            action: Action taken
            reward: Reward received
            next_state: Next state after action
            done: Whether the episode is done
        """
        self.buffer.append((state, action, reward, next_state, done))
        
    def sample(self, batch_size: int):
        """
        Sample a batch of experiences from the buffer.
        
        Args:
            batch_size: Number of experiences to sample
            
        Returns:
            Tuple of (states, actions, rewards, next_states, dones)
        """
        batch = random.sample(self.buffer, batch_size)
        state, action, reward, next_state, done = map(np.stack, zip(*batch))
        return state, action, reward, next_state, done
    
    def __len__(self):
        """Return the current size of the buffer."""
        return len(self.buffer)


class DQNAgentWrapper:
    """
    Wrapper class for the DQN agent that handles training and inference.
    """
    
    def __init__(self, input_size: int, output_size: int, lr: float = 0.001, 
                 gamma: float = 0.99, epsilon: float = 1.0, epsilon_decay: float = 0.995,
                 epsilon_min: float = 0.01, buffer_size: int = 10000, 
                 target_update_freq: int = 100):
        """
        Initialize the DQN agent.
        
        Args:
            input_size: Size of the input observations
            output_size: Number of possible actions
            lr: Learning rate
            gamma: Discount factor
            epsilon: Initial exploration rate
            epsilon_decay: Rate at which epsilon decreases
            epsilon_min: Minimum exploration rate
            buffer_size: Size of the replay buffer
            target_update_freq: How often to update the target network
        """
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        if torch.backends.mps.is_available():
            self.device = torch.device("mps")
        
        self.input_size = input_size
        self.output_size = output_size
        self.gamma = gamma
        self.epsilon = epsilon
        self.epsilon_decay = epsilon_decay
        self.epsilon_min = epsilon_min
        self.target_update_freq = target_update_freq
        
        # Networks
        self.q_network = DQN(input_size, output_size).to(self.device)
        self.target_network = DQN(input_size, output_size).to(self.device)
        self.optimizer = optim.Adam(self.q_network.parameters(), lr=lr)
        
        # Sync target network with main network
        self.sync_target_network()
        
        # Replay buffer
        self.memory = ReplayBuffer(buffer_size)
        
        # Training statistics
        self.step_count = 0
        
    def sync_target_network(self):
        """Sync the target network with the main network."""
        self.target_network.load_state_dict(self.q_network.state_dict())
        
    def remember(self, state, action, reward, next_state, done):
        """
        Store experience in the replay buffer.
        
        Args:
            state: Current state
            action: Action taken
            reward: Reward received
            next_state: Next state after action
            done: Whether the episode is done
        """
        self.memory.push(state, action, reward, next_state, done)
        
    def act(self, state, training: bool = True):
        """
        Choose an action based on the current state.
        
        Args:
            state: Current state
            training: Whether the agent is in training mode (affects epsilon-greedy policy)
            
        Returns:
            Selected action
        """
        # Decay epsilon during training
        if training:
            self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)
        
        # Epsilon-greedy action selection
        if np.random.random() <= self.epsilon and training:
            return np.random.choice(self.output_size)
        
        # Convert state to tensor and get Q-values
        state_tensor = torch.FloatTensor(state).unsqueeze(0).to(self.device)
        q_values = self.q_network(state_tensor)
        
        # Return the action with highest Q-value
        return np.argmax(q_values.cpu().data.numpy())
        
    def replay(self, batch_size: int = 32):
        """
        Train the agent on a batch of experiences from the replay buffer.
        
        Args:
            batch_size: Size of the batch to train on
        
        Returns:
            Loss value for this batch
        """
        if len(self.memory) < batch_size:
            return 0.0  # Not enough samples to train
        
        # Sample experiences from the buffer
        states, actions, rewards, next_states, dones = self.memory.sample(batch_size)
        
        # Convert to tensors
        states = torch.FloatTensor(states).to(self.device)
        actions = torch.LongTensor(actions).to(self.device)
        rewards = torch.FloatTensor(rewards).to(self.device)
        next_states = torch.FloatTensor(next_states).to(self.device)
        dones = torch.BoolTensor(dones).to(self.device)
        
        # Get current Q-values
        current_q_values = self.q_network(states).gather(1, actions.unsqueeze(1))
        
        # Get next Q-values from target network
        next_q_values = self.target_network(next_states).max(1)[0].detach()
        target_q_values = rewards + (self.gamma * next_q_values * ~dones)
        
        # Compute loss
        loss = nn.MSELoss()(current_q_values.squeeze(), target_q_values)
        
        # Optimize the model
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        
        # Update step count and periodically sync target network
        self.step_count += 1
        if self.step_count % self.target_update_freq == 0:
            self.sync_target_network()
        
        return loss.item()
        
    def save(self, filepath: str):
        """
        Save the trained model to a file.
        
        Args:
            filepath: Path to save the model
        """
        torch.save({
            'q_network_state_dict': self.q_network.state_dict(),
            'target_network_state_dict': self.target_network.state_dict(),
            'optimizer_state_dict': self.optimizer.state_dict(),
            'epsilon': self.epsilon,
            'step_count': self.step_count
        }, filepath)
        
        print(f"Model saved to {filepath}")
        
    def load(self, filepath: str):
        """
        Load a trained model from a file.
        
        Args:
            filepath: Path to load the model from
        """
        if not os.path.exists(filepath):
            print(f"Model file does not exist: {filepath}")
            return
        
        checkpoint = torch.load(filepath, map_location=self.device)
        self.q_network.load_state_dict(checkpoint['q_network_state_dict'])
        self.target_network.load_state_dict(checkpoint['target_network_state_dict'])
        self.optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
        self.epsilon = checkpoint['epsilon']
        self.step_count = checkpoint['step_count']
        
        print(f"Model loaded from {filepath}")

In [None]:
%%writefile rl/training.py
import numpy as np
import torch
import os
from tqdm import tqdm
from rl.dqn import DQNAgentWrapper
from utils.environment_gen import generate_random_environment
from core.tasks import create_task_environment


def train_task(task_type: int, num_episodes: int = 1000, 
               model_path: str = None, lr: float = 0.001,
               epsilon_decay: float = 0.995, batch_size: int = 32):
    """
    Train a DQN agent for a specific task.
    
    Args:
        task_type: Type of task (1, 2, or 3)
        num_episodes: Number of episodes to train for
        model_path: Path to save/load model
        lr: Learning rate for the optimizer
        epsilon_decay: Rate at which exploration rate decreases
        batch_size: Size of the training batches
        
    Returns:
        Trained agent and list of losses
    """
    # Create environment for the task
    env = create_task_environment(task_type)
    
    # Determine observation size by getting a sample observation
    from utils.audio_processing import get_audio_observation_features
    sample_obs = get_audio_observation_features(0.5, 0.5)
    obs_size = len(sample_obs)
    action_size = 5  # up, down, left, right, stay
    
    # Create agent
    agent = DQNAgentWrapper(
        input_size=obs_size,
        output_size=action_size,
        lr=lr,
        epsilon_decay=epsilon_decay
    )
    
    # Lists to store training metrics
    losses = []
    episode_rewards = []
    
    print(f"Starting training for Task {task_type} with {num_episodes} episodes")
    
    # Training loop
    for episode in tqdm(range(num_episodes), desc=f"Training Task {task_type}"):
        # Reset environment
        env.reset()
        obs = env.agent.observe(sound_map=env.compute_sound_map(), grid_world=env)
        
        total_reward = 0
        episode_loss = 0
        step_count = 0
        
        # Run episode
        while not env.done and step_count < env.max_steps:
            # Select action
            action = agent.act(obs, training=True)
            
            # Take action
            next_obs, reward, done = env.step(action)
            
            # Remember experience
            agent.remember(obs, action, reward, next_obs, done)
            
            # Train on batch
            loss = agent.replay(batch_size)
            if loss > 0:
                episode_loss += loss
            
            # Update state
            obs = next_obs
            total_reward += reward
            step_count += 1
        
        # Store metrics
        losses.append(episode_loss / max(step_count, 1))
        episode_rewards.append(total_reward)
        
        # Print progress occasionally
        if (episode + 1) % 100 == 0:
            avg_reward = np.mean(episode_rewards[-100:])
            print(f"Episode {episode + 1}/{num_episodes}, Avg Reward: {avg_reward:.2f}, Epsilon: {agent.epsilon:.3f}")
    
    # Save model if path provided
    if model_path:
        agent.save(model_path)
    
    print(f"Training completed for Task {task_type}")
    return agent, losses


def evaluate_agent(agent, task_type, num_episodes=10):
    """
    Evaluate a trained agent on a specific task.
    
    Args:
        agent: Trained DQNAgentWrapper
        task_type: Type of task (1, 2, or 3)
        num_episodes: Number of episodes to evaluate for
        
    Returns:
        Average reward across episodes
    """
    # Create environment for the task
    env = create_task_environment(task_type)
    
    # Set agent to evaluation mode (no exploration)
    original_epsilon = agent.epsilon
    agent.epsilon = 0  # No exploration during evaluation
    
    total_rewards = []
    steps_counts = []
    
    print(f"Evaluating agent for Task {task_type} with {num_episodes} episodes")
    
    for episode in range(num_episodes):
        # Reset environment
        env.reset()
        obs = env.agent.observe(sound_map=env.compute_sound_map(), grid_world=env)
        
        total_reward = 0
        step_count = 0
        
        # Run episode
        while not env.done and step_count < env.max_steps:
            # Select action (no exploration)
            action = agent.act(obs, training=False)
            
            # Take action
            obs, reward, done = env.step(action)
            
            total_reward += reward
            step_count += 1
        
        total_rewards.append(total_reward)
        steps_counts.append(step_count)
        
        print(f"Episode {episode + 1}: Total Reward = {total_reward:.2f}, Steps = {step_count}")
    
    # Restore original epsilon
    agent.epsilon = original_epsilon
    
    avg_reward = np.mean(total_rewards)
    avg_steps = np.mean(steps_counts)
    
    print(f"Evaluation completed for Task {task_type}")
    print(f"Average Reward: {avg_reward:.2f}, Average Steps: {avg_steps:.2f}")
    
    return avg_reward


def train_all_tasks(num_episodes=1000):
    """
    Train agents for all tasks sequentially.
    
    Args:
        num_episodes: Number of episodes to train each agent for
        
    Returns:
        Dictionary mapping task types to trained agents
    """
    agents = {}
    
    for task_type in [1, 2, 3]:
        print(f"\n--- Training for Task {task_type} ---")
        
        # Create model path
        model_dir = "models"
        os.makedirs(model_dir, exist_ok=True)
        model_path = f"{model_dir}/dqn_task_{task_type}.pth"
        
        # Train the agent
        agent, losses = train_task(
            task_type=task_type,
            num_episodes=num_episodes,
            model_path=model_path
        )
        
        agents[task_type] = agent
        
        print(f"Task {task_type} training completed")
    
    print("\nAll tasks training completed!")
    return agents

## Запуск обучения

In [None]:
# Добавляем пути к модулям
import sys
sys.path.append('.')
sys.path.append('./core')
sys.path.append('./rl')
sys.path.append('./utils')

In [None]:
# Запускаем обучение для первой задачи
from rl.training import train_task, evaluate_agent
from utils.environment_gen import generate_random_environment

print("Начинаем обучение агента для задачи 1...")
agent, losses = train_task(
    task_type=1,
    num_episodes=50,  # Меньше эпизодов для тестирования в Colab
    model_path=None
)

print("Оценка обученного агента...")
evaluate_agent(agent, task_type=1, num_episodes=3)