In [None]:
!pip install gym pybullet stable-baselines3 matplotlib
!pip install pyvirtualdisplay
!pip install 'shimmy>=2.0'

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import gym
from gym import spaces
import pybullet as p
import numpy as np
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import DummyVecEnv, VecNormalize
import matplotlib.pyplot as plt

In [4]:
class DroneEnv(gym.Env):
    def __init__(self):
        super().__init__()
        self.client = p.connect(p.DIRECT)
        p.setGravity(0, 0, -9.8, physicsClientId=self.client)

        self.action_space = spaces.Box(low=-0.5, high=0.5, shape=(3,), dtype=np.float32)
        self.observation_space = spaces.Box(
            low=np.array([-20, -20, 0, -2, -2, -2, 0], dtype=np.float32),
            high=np.array([20, 20, 10, 2, 2, 2, 50], dtype=np.float32),
            dtype=np.float32
        )

        self.max_steps = 1000
        self.num_obstacles = 5
        self.reset()

    def reset(self):
        self.steps = 0
        self.drone_pos = np.array([
            np.random.uniform(-5, 5),
            np.random.uniform(-5, 5),
            1 + np.random.uniform(-0.1, 0.1)
        ], dtype=np.float32)
        self.drone_velocity = np.zeros(3, dtype=np.float32)
        self.target_pos = np.array([5, 5, 2], dtype=np.float32)

        self.obstacles = []
        for _ in range(self.num_obstacles):
            while True:
                obstacle_pos = np.array([
                    np.random.uniform(-10, 10),
                    np.random.uniform(-10, 10),
                    np.random.uniform(1, 5)
                ])
                if (np.linalg.norm(obstacle_pos - self.drone_pos) > 2.0 and
                    np.linalg.norm(obstacle_pos - self.target_pos) > 2.0):
                    self.obstacles.append({'position': obstacle_pos, 'radius': 0.5})
                    break
        return self._get_observation()

    def _get_observation(self):
        closest_obstacle_dist = min(np.linalg.norm(self.drone_pos - obs['position'])
                                  for obs in self.obstacles)
        closest_obstacle_dist = np.clip(closest_obstacle_dist, 0, 50)

        obs = np.concatenate([
            self.drone_pos,
            self.drone_velocity,
            [closest_obstacle_dist]
        ]).astype(np.float32)
        return np.clip(obs, self.observation_space.low, self.observation_space.high)

    def step(self, action):
        self.steps += 1
        action = np.clip(action, self.action_space.low, self.action_space.high)

        self.drone_velocity += action * 0.1
        self.drone_velocity *= 0.95
        self.drone_pos += self.drone_velocity * 0.1
        self.drone_pos = np.clip(self.drone_pos, [-20, -20, 0], [20, 20, 10])

        distance_to_target = np.linalg.norm(self.drone_pos - self.target_pos)
        velocity_magnitude = np.linalg.norm(self.drone_velocity)

        if distance_to_target < 0.2 and velocity_magnitude < 0.2:
            self.last_successful_pos = self.drone_pos.copy()

        done = False
        success = False

        if self.drone_pos[2] <= 0:
            reward = -1000
            done = True
        elif distance_to_target < 0.2 and velocity_magnitude < 0.2:
            reward = 200
            done = True
            success = True
            self.drone_pos = self.last_successful_pos.copy()
        elif self.steps >= self.max_steps:
            done = True
            reward = -distance_to_target * 0.1
        else:
            reward = -distance_to_target * 0.1
            if distance_to_target < 1.0:
                reward += (1.0 - distance_to_target) * 2.0
                reward += (1.0 - velocity_magnitude) * 1.0
            if velocity_magnitude > 1.0:
                reward -= (velocity_magnitude - 1.0) * 0.5

        return self._get_observation(), reward, done, {
            'distance': distance_to_target,
            'velocity': velocity_magnitude,
            'success': success
        }

    def get_drone_state(self):
        return {
            'drone_pos': self.drone_pos,
            'target_pos': self.target_pos,
            'obstacles': self.obstacles
        }

In [5]:
def visualize_trajectory(env, model, num_episodes=1):
    for episode in range(num_episodes):
        obs = env.reset()
        done = False
        positions = []
        target_reached_position = None

        while not done:
            action, _ = model.predict(obs, deterministic=True)
            current_pos = env.get_original_obs()[0][:3]
            positions.append(current_pos)

            obs, _, done, info = env.step(action)
            if info[0]['success']:
                target_reached_position = current_pos.copy()
                positions.append(target_reached_position)
                break

        positions = np.array(positions)
        final_position = target_reached_position if target_reached_position is not None else positions[-1]

        fig = plt.figure(figsize=(15, 6))
        ax = fig.add_subplot(111, projection='3d')

        points = ax.scatter(positions[:, 0], positions[:, 1], positions[:, 2],
                          c=np.arange(len(positions)), cmap='viridis')
        plt.colorbar(points, label='Time Step')

        unwrapped_env = env.env_method('get_drone_state')[0]
        ax.scatter(*unwrapped_env['target_pos'], c='red', marker='*', s=300, label='Target')

        for obstacle in unwrapped_env['obstacles']:
            ax.scatter(*obstacle['position'], c='gray', s=100, alpha=0.5)

        ax.set_xlabel('X Position')
        ax.set_ylabel('Y Position')
        ax.set_zlabel('Z Position')
        ax.legend()
        plt.show()

In [6]:
def evaluate_model(env, model, n_eval_episodes=10):
    rewards = []
    successes = 0

    for _ in range(n_eval_episodes):
        obs = env.reset()
        episode_reward = 0
        done = False

        while not done:
            action, _ = model.predict(obs, deterministic=True)
            obs, reward, done, info = env.step(action)
            episode_reward += reward[0]
            if info[0]['success']:
                successes += 1

        rewards.append(episode_reward)

    print(f"\nEvaluation Results ({n_eval_episodes} episodes):")
    print(f"Success Rate: {(successes/n_eval_episodes)*100:.1f}%")
    print(f"Average Reward: {np.mean(rewards):.2f} ± {np.std(rewards):.2f}")
    return rewards

In [None]:
eval_env = DummyVecEnv([lambda: DroneEnv()])
eval_env = VecNormalize.load("/content/drive/My Drive/models/RL/vec_normalize.pkl", eval_env)
eval_env.training = False
eval_env.norm_reward = False

model = PPO.load("/content/drive/My Drive/models/RL/ppo_drone_navigation")

In [None]:
rewards = evaluate_model(eval_env, model, n_eval_episodes=10)
visualize_trajectory(eval_env, model, num_episodes=3)