In [2]:
import gymnasium as gym
import numpy as np

In [33]:


class ComplexDroneDeliveryEnv(gym.Env):
    def __init__(self):
        super().__init__()
        self.grid_size = 6
        self.no_fly_zones = [(2,2), (2,3), (3,2), (3,3)]
        self.action_space = gym.spaces.Discrete(6)  # Up, Down, Left, Right, Pickup/Dropoff, No-op
        self.observation_space = gym.spaces.Tuple((
            gym.spaces.Box(0, 5, (2,), dtype=int),  # Drone position
            gym.spaces.Discrete(4),  # Carrying capacity (0-3 packages)
            gym.spaces.MultiBinary(3),  # Delivery status for each customer
            gym.spaces.MultiBinary(3)   # Return status for each customer
        ))
        self.reset()

    def reset(self, seed=None, options=None):
        super().reset(seed=seed)
        self.drone_pos = np.array([0, 0], dtype=int)
        self.carrying = 0
        self.num_customers = 3
        self.delivery_locations = self._generate_valid_locations(self.num_customers)
        self.return_locations = self._generate_valid_locations(self.num_customers)
        self.deliveries_completed = [False] * self.num_customers
        self.returns_completed = [False] * self.num_customers
        self.steps = 0
        return self._get_obs(), {}

    def _generate_valid_locations(self, num_locations):
        valid_locations = []
        while len(valid_locations) < num_locations:
            loc = (np.random.randint(0, self.grid_size), np.random.randint(0, self.grid_size))
            if loc not in self.no_fly_zones and loc != (0, 0) and loc not in valid_locations:
                valid_locations.append(loc)
        return valid_locations

    def step(self, action):
        self.steps += 1
        reward = -1  # Default step penalty
        terminated = False

        if action < 4:  # Movement
            new_pos = self.drone_pos.copy()
            if action == 0: new_pos[1] = min(5, new_pos[1]+1)  # Up
            elif action == 1: new_pos[1] = max(0, new_pos[1]-1)  # Down
            elif action == 2: new_pos[0] = max(0, new_pos[0]-1)  # Left
            elif action == 3: new_pos[0] = min(5, new_pos[0]+1)  # Right

            if self._is_valid(new_pos):
                self.drone_pos = new_pos
            else:
                reward = -100  # No-fly zone penalty

        elif action == 4:  # Pickup/Dropoff
            if np.array_equal(self.drone_pos, [0, 0]):  # At warehouse
                if self.carrying > 0:  # Dropoff returns
                    reward = 50 * self.carrying
                    self.carrying = 0
                elif self.carrying < 3:  # Pickup deliveries
                    packages_to_pickup = min(3 - self.carrying, sum(~np.array(self.deliveries_completed)))
                    self.carrying += packages_to_pickup
                    reward = 25 * packages_to_pickup
            else:
                for i, loc in enumerate(self.delivery_locations):
                    if np.array_equal(self.drone_pos, loc) and self.carrying > 0 and not self.deliveries_completed[i]:
                        self.deliveries_completed[i] = True
                        self.carrying -= 1
                        reward = 100
                        break
                for i, loc in enumerate(self.return_locations):
                    if np.array_equal(self.drone_pos, loc) and self.carrying < 3 and not self.returns_completed[i]:
                        self.returns_completed[i] = True
                        self.carrying += 1
                        reward = 50
                        break

        # Check if all tasks are completed
        if all(self.deliveries_completed) and all(self.returns_completed):
            terminated = True
            reward += 500  # Bonus for completing all tasks

        return self._get_obs(), reward, terminated, False, {}

    def _is_valid(self, pos):
        return (tuple(pos) not in self.no_fly_zones 
                and 0 <= pos[0] < self.grid_size 
                and 0 <= pos[1] < self.grid_size)

    def _get_obs(self):
        return (self.drone_pos.copy(), 
                self.carrying, 
                np.array(self.deliveries_completed), 
                np.array(self.returns_completed))

    def render(self):
        grid = np.full((6,6), '⬜')
        for x,y in self.no_fly_zones:
            grid[y,x] = '🟥'
        
        for i, (x, y) in enumerate(self.delivery_locations):
            if not self.deliveries_completed[i]:
                grid[y,x] = f'📦{i+1}'
        
        for i, (x, y) in enumerate(self.return_locations):
            if not self.returns_completed[i]:
                grid[y,x] = f'🔄{i+1}'
        
        grid[self.drone_pos[1], self.drone_pos[0]] = '🚁' + str(self.carrying)
        
        print('\n'.join([' '.join(row) for row in grid]))
        print(f"Carrying: {self.carrying} packages")
        print(f"Deliveries completed: {sum(self.deliveries_completed)}/{self.num_customers}")
        print(f"Returns collected: {sum(self.returns_completed)}/{self.num_customers}")




In [34]:
# Example usage
env = ComplexDroneDeliveryEnv()
obs, _ = env.reset()

print("Initial State:")
env.render()

for t in range(10):
    action = env.action_space.sample()
    next_obs, reward, terminated, truncated, _ = env.step(action)
    
    print(f"\nTimestep {t+1}")
    print(f"State: {obs}")
    print(f"Action: {['Up', 'Down', 'Left', 'Right', 'Pickup/Dropoff', 'No-op'][action]}")
    print(f"Reward: {reward}")
    env.render()
    
    obs = next_obs
    if terminated:
        print("Episode finished early.")
        break

print("Random agent simulation completed.")

Initial State:
🚁 🔄 ⬜ ⬜ ⬜ ⬜
⬜ ⬜ 🔄 ⬜ ⬜ ⬜
⬜ ⬜ 🟥 🟥 📦 ⬜
⬜ 📦 🟥 🟥 ⬜ ⬜
⬜ ⬜ ⬜ ⬜ ⬜ ⬜
⬜ ⬜ 📦 ⬜ 🔄 ⬜
Carrying: 0 packages
Deliveries completed: 0/3
Returns collected: 0/3

Timestep 1
State: (array([0, 0]), 0, array([False, False, False]), array([False, False, False]))
Action: Left
Reward: -1
🚁 🔄 ⬜ ⬜ ⬜ ⬜
⬜ ⬜ 🔄 ⬜ ⬜ ⬜
⬜ ⬜ 🟥 🟥 📦 ⬜
⬜ 📦 🟥 🟥 ⬜ ⬜
⬜ ⬜ ⬜ ⬜ ⬜ ⬜
⬜ ⬜ 📦 ⬜ 🔄 ⬜
Carrying: 0 packages
Deliveries completed: 0/3
Returns collected: 0/3

Timestep 2
State: (array([0, 0]), 0, array([False, False, False]), array([False, False, False]))
Action: Up
Reward: -1
⬜ 🔄 ⬜ ⬜ ⬜ ⬜
🚁 ⬜ 🔄 ⬜ ⬜ ⬜
⬜ ⬜ 🟥 🟥 📦 ⬜
⬜ 📦 🟥 🟥 ⬜ ⬜
⬜ ⬜ ⬜ ⬜ ⬜ ⬜
⬜ ⬜ 📦 ⬜ 🔄 ⬜
Carrying: 0 packages
Deliveries completed: 0/3
Returns collected: 0/3

Timestep 3
State: (array([0, 1]), 0, array([False, False, False]), array([False, False, False]))
Action: Left
Reward: -1
⬜ 🔄 ⬜ ⬜ ⬜ ⬜
🚁 ⬜ 🔄 ⬜ ⬜ ⬜
⬜ ⬜ 🟥 🟥 📦 ⬜
⬜ 📦 🟥 🟥 ⬜ ⬜
⬜ ⬜ ⬜ ⬜ ⬜ ⬜
⬜ ⬜ 📦 ⬜ 🔄 ⬜
Carrying: 0 packages
Deliveries completed: 0/3
Returns collected: 0/3

Timestep 4
State: (array([0, 1]), 0, array([False, F

In [35]:


class StochasticComplexDroneDeliveryEnv(gym.Env):
    def __init__(self):
        super().__init__()
        self.grid_size = 6
        self.static_no_fly_zones = [(2,2), (2,3), (3,2), (3,3)]
        self.dynamic_no_fly_zones = []
        self.action_space = gym.spaces.Discrete(6)  # Up, Down, Left, Right, Pickup/Dropoff, No-op
        self.observation_space = gym.spaces.Tuple((
            gym.spaces.Box(0, 5, (2,), dtype=int),  # Drone position
            gym.spaces.Discrete(4),  # Carrying capacity (0-3 packages)
            gym.spaces.MultiBinary(3),  # Delivery status for each customer
            gym.spaces.MultiBinary(3),   # Return status for each customer
            gym.spaces.MultiBinary(36)  # Dynamic no-fly zones
        ))
        self.reset()

    def reset(self, seed=None, options=None):
        super().reset(seed=seed)
        self.drone_pos = np.array([0, 0], dtype=int)
        self.carrying = 0
        self.num_customers = 3
        self.delivery_locations = self._generate_valid_locations(self.num_customers)
        self.return_locations = self._generate_valid_locations(self.num_customers)
        self.deliveries_completed = [False] * self.num_customers
        self.returns_completed = [False] * self.num_customers
        self.steps = 0
        self._update_dynamic_no_fly_zones()
        return self._get_obs(), {}

    def _generate_valid_locations(self, num_locations):
        valid_locations = []
        while len(valid_locations) < num_locations:
            loc = (np.random.randint(0, self.grid_size), np.random.randint(0, self.grid_size))
            if loc not in self.static_no_fly_zones and loc != (0, 0) and loc not in valid_locations:
                valid_locations.append(loc)
        return valid_locations

    def _update_dynamic_no_fly_zones(self):
        self.dynamic_no_fly_zones = []
        for _ in range(2):  # Add 2 dynamic no-fly zones
            while True:
                loc = (np.random.randint(0, self.grid_size), np.random.randint(0, self.grid_size))
                if loc not in self.static_no_fly_zones and loc not in self.dynamic_no_fly_zones:
                    self.dynamic_no_fly_zones.append(loc)
                    break

    def step(self, action):
        self.steps += 1
        reward = -1  # Default step penalty
        terminated = False

        if action < 4:  # Movement
            if np.random.random() < 0.8:  # 80% chance of successful movement
                new_pos = self.drone_pos.copy()
                if action == 0: new_pos[1] = min(5, new_pos[1]+1)  # Up
                elif action == 1: new_pos[1] = max(0, new_pos[1]-1)  # Down
                elif action == 2: new_pos[0] = max(0, new_pos[0]-1)  # Left
                elif action == 3: new_pos[0] = min(5, new_pos[0]+1)  # Right

                if self._is_valid(new_pos):
                    self.drone_pos = new_pos
                else:
                    reward = -100  # No-fly zone penalty
            else:
                reward = -5  # Failed movement penalty

        elif action == 4:  # Pickup/Dropoff
            if np.array_equal(self.drone_pos, [0, 0]):  # At warehouse
                if self.carrying > 0:  # Dropoff returns
                    reward = 50 * self.carrying
                    self.carrying = 0
                elif self.carrying < 3:  # Pickup deliveries
                    packages_to_pickup = min(3 - self.carrying, sum(~np.array(self.deliveries_completed)))
                    self.carrying += packages_to_pickup
                    reward = 25 * packages_to_pickup
            else:
                for i, loc in enumerate(self.delivery_locations):
                    if np.array_equal(self.drone_pos, loc) and self.carrying > 0 and not self.deliveries_completed[i]:
                        self.deliveries_completed[i] = True
                        self.carrying -= 1
                        reward = 100
                        break
                for i, loc in enumerate(self.return_locations):
                    if np.array_equal(self.drone_pos, loc) and self.carrying < 3 and not self.returns_completed[i]:
                        self.returns_completed[i] = True
                        self.carrying += 1
                        reward = 50
                        break

        # Check if all tasks are completed
        if all(self.deliveries_completed) and all(self.returns_completed):
            terminated = True
            reward += 500  # Bonus for completing all tasks

        # Update dynamic no-fly zones
        if self.steps % 10 == 0:
            self._update_dynamic_no_fly_zones()

        return self._get_obs(), reward, terminated, False, {}

    def _is_valid(self, pos):
        return (tuple(pos) not in self.static_no_fly_zones 
                and tuple(pos) not in self.dynamic_no_fly_zones
                and 0 <= pos[0] < self.grid_size 
                and 0 <= pos[1] < self.grid_size)

    def _get_obs(self):
        dynamic_no_fly_zones = np.zeros(36, dtype=bool)
        for x, y in self.dynamic_no_fly_zones:
            dynamic_no_fly_zones[y * 6 + x] = True
        return (self.drone_pos.copy(), 
                self.carrying, 
                np.array(self.deliveries_completed), 
                np.array(self.returns_completed),
                dynamic_no_fly_zones)

    def render(self):
        grid = np.full((6,6), '⬜')
        for x,y in self.static_no_fly_zones + self.dynamic_no_fly_zones:
            grid[y,x] = '🟥'
        
        for i, (x, y) in enumerate(self.delivery_locations):
            if not self.deliveries_completed[i]:
                grid[y,x] = f'📦{i+1}'
        
        for i, (x, y) in enumerate(self.return_locations):
            if not self.returns_completed[i]:
                grid[y,x] = f'🔄{i+1}'
        
        grid[self.drone_pos[1], self.drone_pos[0]] = '🚁' + str(self.carrying)
        
        print('\n'.join([' '.join(row) for row in grid]))
        print(f"Carrying: {self.carrying} packages")
        print(f"Deliveries completed: {sum(self.deliveries_completed)}/{self.num_customers}")
        print(f"Returns collected: {sum(self.returns_completed)}/{self.num_customers}")
        print(f"Dynamic no-fly zones: {self.dynamic_no_fly_zones}")




In [36]:
# Example usage
env = StochasticComplexDroneDeliveryEnv()
obs, _ = env.reset()

print("Initial State:")
env.render()

for t in range(10):
    action = env.action_space.sample()
    next_obs, reward, terminated, truncated, _ = env.step(action)
    
    print(f"\nTimestep {t+1}")
    print(f"State: {obs}")
    print(f"Action: {['Up', 'Down', 'Left', 'Right', 'Pickup/Dropoff', 'No-op'][action]}")
    print(f"Reward: {reward}")
    env.render()
    
    obs = next_obs
    if terminated:
        print("Episode finished early.")
        break

print("Random agent simulation completed.")

Initial State:
🚁 🔄 ⬜ ⬜ ⬜ ⬜
⬜ 🔄 ⬜ ⬜ 📦 ⬜
🟥 ⬜ 🟥 🟥 ⬜ 🔄
⬜ ⬜ 🟥 🟥 ⬜ ⬜
⬜ ⬜ 🟥 ⬜ 📦 ⬜
⬜ ⬜ 📦 ⬜ ⬜ ⬜
Carrying: 0 packages
Deliveries completed: 0/3
Returns collected: 0/3
Dynamic no-fly zones: [(0, 2), (2, 4)]

Timestep 1
State: (array([0, 0]), 0, array([False, False, False]), array([False, False, False]), array([False, False, False, False, False, False, False, False, False,
       False, False, False,  True, False, False, False, False, False,
       False, False, False, False, False, False, False, False,  True,
       False, False, False, False, False, False, False, False, False]))
Action: Right
Reward: -5
🚁 🔄 ⬜ ⬜ ⬜ ⬜
⬜ 🔄 ⬜ ⬜ 📦 ⬜
🟥 ⬜ 🟥 🟥 ⬜ 🔄
⬜ ⬜ 🟥 🟥 ⬜ ⬜
⬜ ⬜ 🟥 ⬜ 📦 ⬜
⬜ ⬜ 📦 ⬜ ⬜ ⬜
Carrying: 0 packages
Deliveries completed: 0/3
Returns collected: 0/3
Dynamic no-fly zones: [(0, 2), (2, 4)]

Timestep 2
State: (array([0, 0]), 0, array([False, False, False]), array([False, False, False]), array([False, False, False, False, False, False, False, False, False,
       False, False, False,  True, False, False, F