2. src/environment.py

In [None]:
"""
Grid Environment for Autonomous Delivery Agent
Handles terrain costs, obstacles, and dynamic elements
"""

import numpy as np
from typing import List, Tuple, Dict, Optional

class GridEnvironment:
    def __init__(self, width: int, height: int):
        self.width = width
        self.height = height
        self.terrain_costs = np.ones((height, width), dtype=int)  # Default cost 1
        self.static_obstacles = np.zeros((height, width), dtype=bool)
        self.dynamic_obstacles = []  # Format: [(x, y, time_step), ...]
        self.moving_obstacles = []   # Format: [{'path': [(x,y)], 'speed': 1, 'start_time': 0}]
        self.packages = []           # Format: [(x, y, destination_x, destination_y)]
        self.agent_start = (0, 0)
        self.delivery_points = []

    def load_from_file(self, filename: str) -> bool:
        """Load environment configuration from map file"""
        try:
            with open(filename, 'r') as f:
                lines = [line.strip() for line in f if line.strip() and not line.startswith('#')]

            # Read dimensions
            dims = list(map(int, lines[0].split()))
            self.width, self.height = dims[0], dims[1]

            # Initialize arrays
            self.terrain_costs = np.ones((self.height, self.width), dtype=int)
            self.static_obstacles = np.zeros((self.height, self.width), dtype=bool)

            line_idx = 1

            # Read terrain costs
            for i in range(self.height):
                costs = list(map(int, lines[line_idx].split()))
                self.terrain_costs[i] = costs
                line_idx += 1

            # Read static obstacles
            for i in range(self.height):
                obstacles = list(map(int, lines[line_idx].split()))
                self.static_obstacles[i] = obstacles
                line_idx += 1

            # Read special positions (S=start, G=goal, D=dynamic obstacle)
            for i in range(self.height):
                cells = lines[line_idx].split()
                for j, cell in enumerate(cells):
                    if cell == 'S':
                        self.agent_start = (j, i)
                    elif cell == 'G':
                        self.delivery_points.append((j, i))
                    elif cell == 'D':
                        self.dynamic_obstacles.append((j, i, 0))  # Appears at time 0
                line_idx += 1

            return True

        except Exception as e:
            print(f"Error loading map file: {e}")
            return False

    def is_valid_position(self, x: int, y: int, time_step: int = 0) -> bool:
        """Check if position is valid and not blocked at given time"""
        if x < 0 or x >= self.width or y < 0 or y >= self.height:
            return False

        if self.static_obstacles[y, x]:
            return False

        # Check dynamic obstacles at this time
        for obs_x, obs_y, obs_time in self.dynamic_obstacles:
            if obs_x == x and obs_y == y and time_step >= obs_time:
                return False

        # Check moving obstacles
        for obstacle in self.moving_obstacles:
            path = obstacle['path']
            speed = obstacle['speed']
            start_time = obstacle['start_time']

            if time_step >= start_time:
                path_index = ((time_step - start_time) // speed) % len(path)
                obs_x, obs_y = path[path_index]
                if obs_x == x and obs_y == y:
                    return False

        return True

    def get_terrain_cost(self, x: int, y: int) -> int:
        """Get movement cost for a cell"""
        return self.terrain_costs[y, x]

    def get_neighbors(self, x: int, y: int, time_step: int = 0) -> List[Tuple[int, int, int]]:
        """Get valid neighboring positions (4-connected) with costs"""
        neighbors = []
        directions = [(0, 1), (1, 0), (0, -1), (-1, 0)]  # Up, Right, Down, Left

        for dx, dy in directions:
            nx, ny = x + dx, y + dy
            if self.is_valid_position(nx, ny, time_step):
                cost = self.get_terrain_cost(nx, ny)
                neighbors.append((nx, ny, cost))

        return neighbors

    def add_moving_obstacle(self, path: List[Tuple[int, int]], speed: int = 1, start_time: int = 0):
        """Add a moving obstacle that follows a path"""
        self.moving_obstacles.append({
            'path': path,
            'speed': speed,
            'start_time': start_time
        })

    def visualize(self, agent_position: Tuple[int, int] = None, path: List[Tuple[int, int]] = None):
        """Visualize the environment"""
        grid = np.zeros((self.height, self.width), dtype=str)

        # Fill with terrain symbols
        for y in range(self.height):
            for x in range(self.width):
                if not self.is_valid_position(x, y):
                    grid[y, x] = '█'  # Obstacle
                else:
                    cost = self.terrain_costs[y, x]
                    grid[y, x] = str(cost)

        # Mark delivery points
        for x, y in self.delivery_points:
            grid[y, x] = 'G'

        # Mark agent
        if agent_position:
            x, y = agent_position
            grid[y, x] = 'A'

        # Mark path
        if path:
            for x, y in path:
                if grid[y, x] not in ['A', 'G']:
                    grid[y, x] = '.'

        # Print grid
        for row in grid:
            print(' '.join(row))