In [1]:
import gym
import numpy as np
import pybullet as p
import pybullet_data
from gym import spaces
from stable_baselines3 import PPO

class DroneJammingEnv(gym.Env):
    def __init__(self):
        super(DroneJammingEnv, self).__init__()

        # Connection to PyBullet
        try:
            p.disconnect()
        except:
            pass
            
        self.client = p.connect(p.GUI)
        p.setAdditionalSearchPath(pybullet_data.getDataPath())

        # Define action and observation spaces
        # [dx, dy, dz]
        self.action_space = spaces.Discrete(6)
        # [x, y, z, signal]
        self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=(4,), dtype=np.float32)

        # Load drone model
        self.drone = p.loadURDF("./quadrotor.urdf", [0, 0, 1])

        # Set simulation parameters
        self.current_step = 0
        self.time_step = 1.0
        self.max_steps = 1000
        self.signal_strength = 0

        self.reset()

    def step(self, action):
        # Apply force
        done = False
        thrust_x, thrust_y, thrust_z = self._map_discrete_action(action)
        p.applyExternalForce(self.drone, -1, forceObj=np.array([thrust_x, thrust_y, thrust_z]) * 100, posObj=[0, 0, 0], flags=p.LINK_FRAME)
        p.stepSimulation()

        pos, ori = p.getBasePositionAndOrientation(self.drone)
        self.drone_position = pos

        self.adjust_camera_angle()


        # Signal reward
        distance = np.linalg.norm(pos - np.array([5, 5, 1]))
        signal = 1e5/(distance ** 2) 
    
        reward = signal
        self.state =  np.concatenate([np.array(pos), np.array([signal])])
        
        self.current_step += 1
        
        if self.current_step > self.max_steps: 
            done = True

        if pos[-1] <= 0.1 or pos[-1] >= 5:
            reward -= 1e5
            done = True
                        
        return self.state, reward, done, {}

    def _map_discrete_action(self, action):
        discrete_thrust_levels = [
            [1, 0, 0],
            [-1, 0, 0],
            [0, 1, 0],
            [0, -1, 0],
            [0, 0, 1],
            [0, 0, -1]
        ]
        return discrete_thrust_levels[action]

    def adjust_camera_angle(self):
        # Example: Move the camera closer to the scene
        camera_position = [self.drone_position[0], self.drone_position[1], 5]
        target_position = self.drone_position
        up_vector = [0, 0, 1] 
    
        # Compute view matrix directly from camera parameters
        view_matrix = p.computeViewMatrix(cameraEyePosition=camera_position,
                                           cameraTargetPosition=target_position,
                                           cameraUpVector=up_vector)
    
        # Set the new view matrix for the camera
        p.resetDebugVisualizerCamera(cameraDistance=5, cameraYaw=0, cameraPitch=-30,
                                      cameraTargetPosition=target_position)

    def reset(self):
        p.resetSimulation()
        p.setGravity(0, 0, -9.8)
        p.loadURDF("plane.urdf", [0, 0, 0])
        self.drone = p.loadURDF("quadrotor.urdf", [0, 0, 1])
        self.drone_position = np.array([0, 0, 1])
        
        # Set initial state
        self.state = np.zeros(4)
        self.current_step = 0 
        
        return self.state
        
    def render(self, mode='human'):
        p.getCameraImage(width=640, height=480)
        
    def close(self):
        p.disconnect()

pybullet build time: Apr 24 2024 11:19:34


In [None]:
env = DroneJammingEnv()

# Initialize the PPO agent
model = PPO("MlpPolicy", env, verbose=1, n_steps=64)

# Train the model
total_timesteps = 1e3
model.learn(total_timesteps=total_timesteps, progress_bar=True)

print("im done")
# Save the model
model.save("ppo_drone")

# Load the model
model = PPO.load("ppo_drone")

# Evaluate the model
num_episodes = 10
for episode in range(num_episodes):
    obs = env.reset()
    done = False
    ep_reward = 0
    ep_length = 0
    while not done:
        action, _ = model.predict(obs, deterministic=True)
        print(action)
        obs, reward, done, info = env.step(action)
        ep_reward += reward
        ep_length += 1
        env.render()
    
    print(f"Episode {episode + 1}: Reward = {ep_reward}, Length = {ep_length}")

env.close()

Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.




Output()

---------------------------
| time/              |    |
|    fps             | 19 |
|    iterations      | 1  |
|    time_elapsed    | 3  |
|    total_timesteps | 64 |
---------------------------
-------------------------------------------
| rollout/                |               |
|    ep_len_mean          | 66            |
|    ep_rew_mean          | 4.58e+04      |
| time/                   |               |
|    fps                  | 20            |
|    iterations           | 2             |
|    time_elapsed         | 6             |
|    total_timesteps      | 128           |
| train/                  |               |
|    approx_kl            | 2.0489097e-08 |
|    clip_fraction        | 0             |
|    clip_range           | 0.2           |
|    entropy_loss         | -1.79         |
|    explained_variance   | -1.43e-06     |
|    learning_rate        | 0.0003        |
|    loss                 | 4.47e+08      |
|    n_updates            | 10            |
|    policy_