In [21]:
import pygame
from stable_baselines3 import PPO
from stable_baselines3.common.env_util import make_vec_env
import gymnasium as gym
import numpy as np


class DroneRecoveryEnv(gym.Env):
    metadata = {'render_modes': ['human']}
    
    def __init__(self, render_mode=None):
        super(DroneRecoveryEnv, self).__init__()

        # Canvas size
        self.width = 600
        self.height = 400

        # Drone parameters
        self.drone_pos = np.array([self.width / 2, self.height / 2], dtype=np.float32)
        self.drone_vel = np.array([0.0, 0.0], dtype=np.float32)
        self.max_speed = 5.0

        # Balloon parameters
        self.balloon_pos = np.array([np.random.uniform(0, self.width), 0], dtype=np.float32)  # Starts at top
        self.balloon_vel = np.array([0, np.random.uniform(2, 5)], dtype=np.float32)  # Moves downward

        # Recovery parameters
        self.is_recovering = False
        self.recovery_steps = 0
        self.max_recovery_steps = 30  # Steps needed to stabilize

        # Action space: Thrust for left and right
        self.action_space = spaces.Box(low=-1, high=1, shape=(2,), dtype=np.float32)

        # Observation space: [Drone position, Drone velocity, Balloon position]
        low_obs = np.array([0, 0, -self.max_speed, -self.max_speed, 0, 0], dtype=np.float32)
        high_obs = np.array([self.width, self.height, self.max_speed, self.max_speed, self.width, self.height], dtype=np.float32)
        self.observation_space = spaces.Box(low=low_obs, high=high_obs, dtype=np.float32)

        # Rendering
        self.render_mode = render_mode
        if self.render_mode == 'human':
            pygame.init()
            self.screen = pygame.display.set_mode((self.width, self.height))
            self.clock = pygame.time.Clock()

        # Episode parameters
        self.max_steps = 500
        self.current_step = 0
        self.score = 0

    def reset(self, seed=None, options=None):
        super().reset(seed=seed)

        # Reset drone position and velocity
        self.drone_pos = np.array([self.width / 2, self.height / 2], dtype=np.float32)
        self.drone_vel = np.array([0.0, 0.0], dtype=np.float32)

        # Reset balloon position
        self.balloon_pos = np.array([np.random.uniform(0, self.width), 0], dtype=np.float32)
        self.balloon_vel = np.array([0, np.random.uniform(2, 5)], dtype=np.float32)

        self.is_recovering = False
        self.recovery_steps = 0
        self.current_step = 0
        self.score = 0

        observation = self._get_obs()
        return observation, {}

    def step(self, action):
        if not self.is_recovering:
            # Ensure action is a 1D array
            action = np.array(action).flatten()
            
            # Apply horizontal thrust to the drone
            left, right = action[0], action[1]
            thrust = right - left  # Horizontal movement
            self.drone_vel[0] += thrust

            # Add random horizontal drift to simulate natural movement
            random_drift = np.random.uniform(-0.5, 0.5)
            self.drone_vel[0] += random_drift

            # Limit horizontal speed
            if abs(self.drone_vel[0]) > self.max_speed:
                self.drone_vel[0] = np.sign(self.drone_vel[0]) * self.max_speed

            # Update horizontal position only (Y position remains fixed)
            self.drone_pos[0] += self.drone_vel[0]
            self.drone_pos[0] = np.clip(self.drone_pos[0], 0, self.width)
        else:
            # Recovery logic: Gradually stabilize the drone
            self.recovery_steps += 1
            self.drone_vel = self.drone_vel * 0.9  # Dampen velocity
            if self.recovery_steps >= self.max_recovery_steps:
                self.is_recovering = False
                self.recovery_steps = 0
                self.drone_vel = np.array([0.0, 0.0], dtype=np.float32)

        # Update balloon position
        self.balloon_pos += self.balloon_vel
        if self.balloon_pos[1] > self.height:
            self.balloon_pos = np.array([np.random.uniform(0, self.width), 0], dtype=np.float32)
            self.balloon_vel = np.array([0, np.random.uniform(2, 5)], dtype=np.float32)

        # Check for collision
        distance = np.linalg.norm(self.drone_pos - self.balloon_pos)
        if distance < 10.0:  # Collision detected
            self.is_recovering = True
            self.score -= 1  # Penalty for being hit

        # Reward logic
        reward = 1.0 if not self.is_recovering else -1.0  # Reward for stability, penalty for recovery

        # Increment step count
        self.current_step += 1
        done = self.current_step >= self.max_steps

        observation = self._get_obs()
        info = {"score": self.score}

        if self.render_mode == 'human':
            self.render()

        return observation, reward, done, False, info

    def render(self):
        if self.render_mode != 'human':
            return

        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                pygame.quit()

        self.screen.fill((255, 255, 255))  # White background

        # Draw balloon
        pygame.draw.circle(self.screen, (255, 0, 0), self.balloon_pos.astype(int), 10)  # Red balloon

        # Draw drone
        drone_color = (0, 0, 255) if not self.is_recovering else (255, 165, 0)  # Blue if stable, orange if recovering
        pygame.draw.rect(self.screen, drone_color, (*self.drone_pos - 10, 20, 20))  # Drone as a square

        # Display score
        font = pygame.font.Font(None, 24)
        score_label = font.render(f"Score: {self.score}", True, (0, 0, 0))
        self.screen.blit(score_label, (10, 10))

        pygame.display.flip()
        self.clock.tick(30)

    def close(self):
        if self.render_mode == 'human':
            pygame.quit()

    def _get_obs(self):
        # Observation includes drone position, velocity, and balloon position
        return np.concatenate((self.drone_pos, self.drone_vel, self.balloon_pos)).astype(np.float32)


In [None]:
from stable_baselines3 import PPO
from stable_baselines3.common.env_util import make_vec_env

# Create vectorized environment for parallel training
env = make_vec_env(lambda: DroneRecoveryEnv(), n_envs=4)

# Define the PPO model
model = PPO("MlpPolicy", env, verbose=1)

# Train the model
model.learn(total_timesteps=800000)

# Save the model
model.save("ppo_drone_stabilize")

In [24]:
# Load the trained model
model = PPO.load("ppo_drone_stabilize")

# Create the environment with rendering enabled
env = DroneRecoveryEnv(render_mode='human')

# Reset the environment
observation, info = env.reset()

# Simulate for 30 seconds (30 FPS)
num_steps = 30 * 30  # 30 seconds * 30 FPS = 900 steps
total_reward = 0.0

for _ in range(num_steps):
    # Get action from the model
    action, _ = model.predict(observation, deterministic=True)
    
    # Step the environment
    observation, reward, done, _, info = env.step(action)
    total_reward += reward

env.close()

print(f"Total Reward: {total_reward}")

  pygame.draw.rect(self.screen, drone_color, (*self.drone_pos - 10, 20, 20))  # Drone as a square


Total Reward: 900.0


In [23]:
import pygame
from stable_baselines3 import PPO
from stable_baselines3.common.env_util import make_vec_env

# Load the trained model
model = PPO.load("ppo_drone_stabilize")

# Create the environment with rendering enabled
env = DroneRecoveryEnv(render_mode='human')

# Reset the environment
observation, info = env.reset()

#num_steps = 300  # Simulate for 300 steps (10 seconds at 30 FPS)
num_steps = 30 * 30  # 30 seconds * 30 FPS = 900 steps

done = False
total_reward = 0.0

#while not done:
for _ in range(num_steps):

    # Get action from the model
    action, _ = model.predict(observation, deterministic=True)
    
    # Step the environment
    observation, reward, done, truncated, info = env.step(action)
    total_reward += reward

# Close the environment
env.close()

print(f"Total Reward: {total_reward}")

  pygame.draw.rect(self.screen, drone_color, (*self.drone_pos - 10, 20, 20))  # Drone as a square


Total Reward: 900.0


In [None]:
env = DroneHoverEnv(render_mode="human")
obs, _ = env.reset()

for step in range(500):  # Run for 500 steps
    action = np.random.uniform(-1, 1, size=(2,))  # Random left and right thrust actions
    obs, reward, done, _, info = env.step(action)
    print(f"Step: {step}, Reward: {reward}, Score: {info['score']}")
    if done:
        break

env.close()
