## Environment With MultiAgent UAV

In [14]:
import numpy as np
import gymnasium as gym
from gymnasium import spaces

class MultiUAVPartialObsEnv(gym.Env):
    def __init__(self, grid_size=20, max_steps=300, adversarial=False, sensor_dropout=True, num_obstacles=5, num_uavs=3):
        super().__init__()
        self.grid_size = grid_size
        self.max_steps = max_steps
        self.adversarial = adversarial
        self.sensor_dropout = sensor_dropout
        self.num_obstacles = num_obstacles
        self.num_uavs = num_uavs

        self.dt = 1.0
        self.max_vel = 3.0
        self.max_thrust = 1.0
        self.drag_coeff = 0.1

        # Each UAV has 2D thrust action (x,y)
        self.action_space = spaces.Box(low=-1.0, high=1.0, shape=(self.num_uavs, 2), dtype=np.float32)

        # Observation per UAV: GPS(2) + Lidar(3) + Battery(1) + Signal(1) + relative obstacle positions (2*num_obstacles)
        obs_dim = 2 + 3 + 1 + 1 + 2 * self.num_obstacles
        self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=(self.num_uavs, obs_dim), dtype=np.float32)

        self.reset()

    def reset(self, seed=None, options=None):
        # Initialize UAV states
        self.uav_pos = np.random.uniform(0, self.grid_size/2, size=(self.num_uavs, 2))
        self.goal = np.array([self.grid_size - 1.0, self.grid_size - 1.0])
        self.uav_vel = np.zeros((self.num_uavs, 2))
        self.battery = np.full(self.num_uavs, 100.0)
        self.steps = 0

        self.obstacles = np.random.uniform(2, self.grid_size - 2, size=(self.num_obstacles, 2))
        self.obstacle_vels = np.random.uniform(-0.5, 0.5, size=(self.num_obstacles, 2))

        return self._get_obs(), {}

    def _update_obstacles(self):
        for i in range(self.num_obstacles):
            self.obstacles[i] += self.obstacle_vels[i]
            for j in range(2):
                if self.obstacles[i, j] < 0 or self.obstacles[i, j] > self.grid_size:
                    self.obstacle_vels[i, j] *= -1
                    self.obstacles[i, j] = np.clip(self.obstacles[i, j], 0, self.grid_size)

    def _get_obs(self):
        all_obs = []
        for i in range(self.num_uavs):
            # GPS with spoofing
            gps_noise = np.random.normal(0, 1.0, 2) if self.adversarial else np.zeros(2)
            gps = self.uav_pos[i] + gps_noise

            if self.sensor_dropout and np.random.rand() < 0.2:
                gps = np.array([0.0, 0.0])

            # Lidar readings (idealized) - front, left, right distances to grid boundaries
            lidar_f = self.grid_size - self.uav_pos[i, 1]
            lidar_l = self.uav_pos[i, 0]
            lidar_r = self.grid_size - self.uav_pos[i, 0]

            if self.adversarial:
                jam = np.random.rand()
                if jam < 0.2:
                    lidar_f += np.random.normal(0, 5)
                elif jam < 0.4:
                    lidar_l += np.random.normal(0, 5)
                elif jam < 0.6:
                    lidar_r += np.random.normal(0, 5)

            if self.sensor_dropout:
                if np.random.rand() < 0.1:
                    lidar_f = 0.0
                if np.random.rand() < 0.1:
                    lidar_l = 0.0
                if np.random.rand() < 0.1:
                    lidar_r = 0.0

            # Relative obstacle positions (obstacles relative to UAV)
            obs_rel = self.obstacles - self.uav_pos[i]
            obs_flat = obs_rel.flatten()

            signal_strength = max(0.0, 1.0 - np.linalg.norm(self.uav_pos[i]) / self.grid_size)

            obs = np.concatenate([
                gps, [lidar_f, lidar_l, lidar_r],
                [self.battery[i]], [signal_strength], obs_flat
            ])
            all_obs.append(obs)
        return np.array(all_obs, dtype=np.float32)

    def step(self, actions):
        self._update_obstacles()
        rewards = np.zeros(self.num_uavs)
        terminated = np.zeros(self.num_uavs, dtype=bool)
        truncated = np.zeros(self.num_uavs, dtype=bool)

        for i in range(self.num_uavs):
            thrust = np.clip(actions[i], -self.max_thrust, self.max_thrust)
            self.uav_vel[i] += thrust * self.dt
            self.uav_vel[i] *= (1 - self.drag_coeff)
            self.uav_vel[i] = np.clip(self.uav_vel[i], -self.max_vel, self.max_vel)
            self.uav_pos[i] += self.uav_vel[i] * self.dt

            self.battery[i] -= 0.1 + np.linalg.norm(thrust) * 0.5

            crash = False
            if np.any(self.uav_pos[i] < 0) or np.any(self.uav_pos[i] >= self.grid_size) or self.battery[i] <= 0:
                crash = True

            for obs in self.obstacles:
                if np.linalg.norm(self.uav_pos[i] - obs) < 1.0:
                    crash = True
                    break

            for j in range(self.num_uavs):
                if i != j and np.linalg.norm(self.uav_pos[i] - self.uav_pos[j]) < 1.0:
                    crash = True
                    break

            reached = np.linalg.norm(self.uav_pos[i] - self.goal) < 1.0

            reward = -1  # base penalty
            if reached:
                reward += 100
                terminated[i] = True
            elif crash:
                reward -= 50
                terminated[i] = True
            else:
                dist = np.linalg.norm(self.uav_pos[i] - self.goal)
                reward -= 0.05 * dist
                reward -= (100 - self.battery[i]) * 0.01

            rewards[i] = float(reward)

        self.steps += 1
        if self.steps >= self.max_steps:
            truncated[:] = True  # All truncated due to time limit

        return self._get_obs(), rewards, terminated, truncated, {}


    def render(self):
        for i in range(self.num_uavs):
            print(f"UAV {i}: Pos {self.uav_pos[i]}, Vel {self.uav_vel[i]}, Battery {self.battery[i]:.1f}")
        print(f"Obstacles: {self.obstacles}")


## Training 

In [None]:
import numpy as np
import gymnasium as gym
from gymnasium import spaces
from stable_baselines3 import PPO
from stable_baselines3.common.env_checker import check_env
# --- Training code starts here ---

if __name__ == "__main__":
    env = MultiUAVPartialObsEnv()
    
    

    model = PPO("MlpPolicy", env, verbose=1, batch_size=64, n_steps=2048, learning_rate=3e-4, gamma=0.99)

    # Train the agent for 100k steps (adjust as you want)
    model.learn(total_timesteps=100_000)

    # Save the trained agent
    model.save("ppo_uav")

    # Test the trained agent
    obs, _ = env.reset()
    done = False
    while not done:
        action, _states = model.predict(obs, deterministic=True)
        obs, reward, done, _, info = env.step(action)
        env.render()