## Drone following ball my custom environment

In [37]:
import gymnasium as gym
from gymnasium import spaces
import numpy as np
import pygame
import random

class DroneBallEnvx(gym.Env):
    
    def __init__(self, render_mode=None):
        super().__init__()
        
        # Canvas Size
        self.width = 600
        self.height = 400
        
        # Score
        self.score = 0
        
        # Drone Parameters
        self.drone_pos = np.array([self.width / 2, self.height / 2], dtype=np.float32)
        self.drone_vel = np.array([0, 0], dtype=np.float32)
        self.max_speed = 5
        
        # Balloon Parameters
        self.balloon_pos = np.array([np.random.uniform(0, self.width), np.random.uniform(0, self.height)], dtype=np.float32)
        self.balloon_vel = np.array([np.random.uniform(-1, 1), np.random.uniform(-1, 1)], dtype=np.float32)
        
        # Step counter
        self.current_step = 0
        self.max_steps = 500  # Maximum steps per episode
        
        # Define action space
        self.action_space = spaces.Discrete(4)  # 0: Up, 1: Down, 2: Left, 3: Right
        
        # Observation space: Drone position (x, y), Drone velocity (vx, vy), Balloon position (x, y)
        low_obs = np.array([0, 0, -self.max_speed, -self.max_speed, 0, 0], dtype=np.float32)
        high_obs = np.array([self.width, self.height, self.max_speed, self.max_speed, self.width, self.height], dtype=np.float32)
        self.observation_space = spaces.Box(low=low_obs, high=high_obs, dtype=np.float32)

        # Rendering
        self.render_mode = render_mode
        if self.render_mode == 'human':
            pygame.init()
            self.screen = pygame.display.set_mode((self.width, self.height))
            pygame.display.set_caption("Drone and Balloon Environment")
            self.clock = pygame.time.Clock()
            
    def reset(self, seed=None, options=None):
        super().reset(seed=seed)
        self.score = 0
        self.current_step = 0  # Reset step counter
        
        # Reset drone position and velocity
        self.drone_pos = np.array([self.width / 2, self.height / 2], dtype=np.float32)
        self.drone_vel = np.array([0, 0], dtype=np.float32)
        
        # Reset balloon position and velocity
        self.balloon_pos = np.array([np.random.uniform(0, self.width), np.random.uniform(0, self.height)], dtype=np.float32)
        self.balloon_vel = np.array([np.random.uniform(-1, 1), np.random.uniform(-1, 1)], dtype=np.float32)
        
        info = {}
        observation = self.get_obs()
        return observation, info
        
    def get_obs(self):
        return np.concatenate((self.drone_pos, self.drone_vel, self.balloon_pos)).astype(np.float32)
    
    def step(self, action):
        # Increment step counter
        self.current_step += 1
        
        # Update drone velocity based on discrete action
        if action == 0:  # Up
            self.drone_vel[1] -= 1
        elif action == 1:  # Down
            self.drone_vel[1] += 1
        elif action == 2:  # Left
            self.drone_vel[0] -= 1
        elif action == 3:  # Right
            self.drone_vel[0] += 1
        
        # Clip the velocity to max speed
        self.drone_vel = np.clip(self.drone_vel, -self.max_speed, self.max_speed)
        
        # Update drone position
        self.drone_pos += self.drone_vel
        
        # Ensure the drone stays within the canvas
        self.drone_pos = np.clip(self.drone_pos, [0, 0], [self.width, self.height])
        
        # Update balloon position
        self.balloon_pos += self.balloon_vel
        
        # Ensure the balloon stays within the canvas
        self.balloon_pos = np.clip(self.balloon_pos, [0, 0], [self.width, self.height])
        
        # Calculate reward (example: negative distance to balloon)
        distance_to_balloon = np.linalg.norm(self.drone_pos - self.balloon_pos)
        reward = -distance_to_balloon
        
        # Check if terminated (example: if drone reaches balloon)
        terminated = distance_to_balloon < 10 or self.current_step >= self.max_steps
        
        # Increment score
        if distance_to_balloon < 40:
            self.score += 1
            print(self.score)
            reward += 10  # Reward for catching the balloon
            # Respawn balloon
            self.balloon_pos = np.array([np.random.uniform(0, self.width), np.random.uniform(0, self.height)], dtype=np.float32)
        
        # Get observation
        observation = self.get_obs()
        
        info = {}
        
        return observation, reward, terminated, False, info

    def render(self):
        if self.render_mode == 'human':
            # Handle Pygame events to prevent freezing
            for event in pygame.event.get():
                if event.type == pygame.QUIT:
                    self.close()

            # Clear the screen
            self.screen.fill((255, 255, 255))  # Fill screen with white
            
            # Draw the drone
            pygame.draw.circle(self.screen, (0, 0, 255), self.drone_pos.astype(int), 10)
            
            # Draw the balloon
            pygame.draw.circle(self.screen, (255, 0, 0), self.balloon_pos.astype(int), 10)
            
            font = pygame.font.Font(None, 24)  # Default font, size 24
            balloon_label = font.render("Balloon", True, (0, 0, 0))  # Black text for balloon
            drone_label = font.render("Drone", True, (0, 0, 0))  # Black text for drone

            # Position labels near objects
            self.screen.blit(balloon_label, (self.balloon_pos[0] - 20, self.balloon_pos[1] - 20))
            self.screen.blit(drone_label, (self.drone_pos[0] - 20, self.drone_pos[1] - 20))
            
            # Display the score and steps
            font = pygame.font.Font(None, 24)
            score_text = font.render(f"Score: {self.score}", True, (0, 0, 0))
            step_text = font.render(f"Step: {self.current_step}/{self.max_steps}", True, (0, 0, 0))
            self.screen.blit(score_text, (10, 10))
            self.screen.blit(step_text, (10, 40))
            
            pygame.display.flip()
            self.clock.tick(30)
    
    def close(self):
        if self.render_mode == 'human':
            pygame.quit()


### To test our environment without keyboard control

In [None]:
if __name__ == "__main__":
    env = DroneBallEnvx(render_mode='human')
    
    obs, info = env.reset()
    
    for _ in range(500):  # Run for 500 steps
        action = env.action_space.sample()  # Take random actions
        obs, reward, terminated, truncated, info = env.step(action)
        
        # Render the environment
        env.render()
        
        if terminated:
            obs, info = env.reset()
    
    env.close()


### To test our environment with keyboard control

In [None]:
import pygame
from ppo_drone_ball_env import DroneBallEnv  # Replace with your environment file path

# Initialize environment
env = DroneBallEnvx(render_mode='human')
obs, info = env.reset()

running = True
while running:
    # Process Pygame events
    for event in pygame.event.get():
        if event.type == pygame.QUIT:
            running = False
        elif event.type == pygame.KEYDOWN:  # Only handle key presses
            if event.key == pygame.K_UP:
                action = int(0)  # Up
            elif event.key == pygame.K_DOWN:
                action = int(1)  # Down
            elif event.key == pygame.K_LEFT:
                action = int(2)  # Left
            elif event.key == pygame.K_RIGHT:
                action = int(3)  # Right
            else:
                continue

            # Take a step in the environment
            obs, reward, terminated, truncated, info = env.step(action)

            # Render the environment
            env.render()

            # Check if the episode is terminated or truncated
            if terminated or truncated:
                print(f"Episode ended. Score: {env.score}")
                obs, info = env.reset()

    # Add a small delay for smoother rendering
    pygame.time.delay(50)

# Close the environment
env.close()


### To train our agent and model

In [None]:
import gymnasium as gym
from stable_baselines3 import PPO
from stable_baselines3.common.env_util import make_vec_env
from stable_baselines3.common.monitor import Monitor
import matplotlib.pyplot as plt
import os

# Paths for saving models and logs
models_dir = r"D:\Reinforcement_Learning\ppo\model"
logdir = r"D:\Reinforcement_Learning\ppo\logs"

# Create directories if they don't exist
os.makedirs(models_dir, exist_ok=True)
os.makedirs(logdir, exist_ok=True)

# Create and wrap the environment
# Wrap with VecMonitor for logging rewards and episode lengths
env = make_vec_env(lambda: DroneBallEnvx(), n_envs=4)

# Define the PPO model with TensorBoard logging
model = PPO("MlpPolicy", env, verbose=1, tensorboard_log=logdir)

# Train the agent with periodic model saving
TIMESTEPS = 100000
num_iterations = 10

for i in range(num_iterations):
    print(f"Starting iteration {i + 1}/{num_iterations}")
    model.learn(total_timesteps=TIMESTEPS, reset_num_timesteps=False, tb_log_name="PPO-ball-drone-deepseek")
    model.save(f"{models_dir}/ppo_drone_chase_{TIMESTEPS * (i + 1)}")

# Close the environment
env.close()

### To test and validate our model without episode stop

In [None]:
import gymnasium as gym
from stable_baselines3 import PPO
from stable_baselines3.common.env_util import make_vec_env
import os

# Paths for saving models and logs
models_dir = r"D:\Reinforcement_Learning\ppo\model"
logdir = r"D:\Reinforcement_Learning\ppo\logs"

model_path = f"{models_dir}\ppo_drone_chase_1000000"

# Create the environment for evaluation
env = DroneBallEnvx(render_mode='human')

# Load the trained model
model = PPO.load(model_path)

# Run the trained model in the environment
observation, info = env.reset()
episode_over = False
terminated = False
truncated = False

while True:#not episode_over:
    # Use the trained model to predict actions
    action, _ = model.predict(observation, deterministic=True)
    observation, reward, terminated, truncated, info = env.step(action)
    episode_over = terminated or truncated
    
    # Render the environment
    env.render()

env.close()

### To test and validate our model with episode 

In [None]:
import gymnasium as gym
from stable_baselines3 import PPO
from stable_baselines3.common.env_util import make_vec_env
import os

# Paths for saving models and logs
models_dir = r"D:\Reinforcement_Learning\ppo\model"
logdir = r"D:\Reinforcement_Learning\ppo\logs"

model_path = f"{models_dir}\ppo_drone_chase_100000"

# Create the environment for evaluation
env = DroneBallEnvx(render_mode='human')

# Load the trained model
model = PPO.load(model_path)

# Run multiple episodes
num_episodes = 5  # Number of episodes to run
for episode in range(num_episodes):
    print(f"Starting episode {episode + 1}/{num_episodes}")
    observation, info = env.reset()
    terminated = False
    truncated = False

    while not (terminated or truncated):
        # Use the trained model to predict actions
        action, _ = model.predict(observation, deterministic=True)
        observation, reward, terminated, truncated, info = env.step(action)

        # Render the environment
        env.render()

    print(f"Episode {episode + 1} ended. Total reward: {reward}")

# Close the environment when all episodes are done
env.close()
