In [None]:
# Duisek Bermagambet + Professor Ilyas Muhammad 
# HIL DDPG system

import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from collections import deque
import random
import matplotlib.pyplot as plt
from typing import Tuple, Dict, List
import json
from datetime import datetime, timedelta

torch.manual_seed(42)
np.random.seed(42)
random.seed(42)

# I use RTX 4060 locally
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Using device: cuda


Professor, I opted to start with using basic GPS data instead of visual, since I've been strugling to properly improve existin approaches in 3D spaces.
In this case I generate 2D scenarios for drones to operate in.

In [None]:
class DroneGPSDataGenerator:    
    def __init__(self, area_size: Tuple[float, float] = (1000, 1000), 
                 n_obstacles: int = 10, n_waypoints: int = 5):
        self.area_size = area_size
        self.n_obstacles = n_obstacles
        self.n_waypoints = n_waypoints
        self.obstacles = self._generate_obstacles()
        self.waypoints = self._generate_waypoints()
        
    def _generate_obstacles(self) -> List[Dict]:
        obstacles = []
        attempts = 0
        
        while len(obstacles) < self.n_obstacles and attempts < 100:
            new_obs = {
                'id': len(obstacles),
                'position': (np.random.uniform(100, self.area_size[0]-100),
                           np.random.uniform(100, self.area_size[1]-100)),
                'radius': np.random.uniform(20, 40),
                'type': np.random.choice(['building', 'no_fly_zone', 'weather_hazard'])
            }
            
            valid = True
            for obs in obstacles:
                dist = np.sqrt((new_obs['position'][0] - obs['position'][0])**2 + 
                             (new_obs['position'][1] - obs['position'][1])**2)
                if dist < (new_obs['radius'] + obs['radius'] + 50):  # 50m minimum separation
                    valid = False
                    break
            
            if valid:
                obstacles.append(new_obs)
            attempts += 1
            
        return obstacles
    
    def _generate_waypoints(self) -> List[Tuple[float, float]]:
        waypoints = []
        for _ in range(self.n_waypoints):
            valid = False
            while not valid:
                pos = (np.random.uniform(50, self.area_size[0]-50),
                      np.random.uniform(50, self.area_size[1]-50))
                valid = True
                for obs in self.obstacles:
                    dist = np.sqrt((pos[0] - obs['position'][0])**2 + 
                                 (pos[1] - obs['position'][1])**2)
                    if dist < obs['radius'] + 30:  # 30m safety margin
                        valid = False
                        break
            waypoints.append(pos)
        return waypoints
    
    def generate_trajectory_data(self, n_episodes: int = 100) -> pd.DataFrame:
        data = []
        
        for episode in range(n_episodes):
            start_pos = (np.random.uniform(0, self.area_size[0]),
                        np.random.uniform(0, self.area_size[1]))
            
            current_pos = list(start_pos)
            target_wp_idx = 0
            
            for t in range(200):
                if target_wp_idx >= len(self.waypoints):
                    break
                    
                target = self.waypoints[target_wp_idx]
                
                dx = target[0] - current_pos[0]
                dy = target[1] - current_pos[1]
                dist = np.sqrt(dx**2 + dy**2)
                
                if dist < 10:
                    target_wp_idx += 1
                    continue
                
                dx_norm = dx / dist + np.random.normal(0, 0.1)
                dy_norm = dy / dist + np.random.normal(0, 0.1)
                
                current_pos[0] += dx_norm * 5
                current_pos[1] += dy_norm * 5
                
                danger_level = 0
                min_obstacle_dist = float('inf')
                for obs in self.obstacles:
                    obs_dist = np.sqrt((current_pos[0] - obs['position'][0])**2 + 
                                     (current_pos[1] - obs['position'][1])**2)
                    min_obstacle_dist = min(min_obstacle_dist, obs_dist - obs['radius'])
                    
                    if obs_dist < obs['radius'] + 50:
                        danger_level = max(danger_level, 
                                         1 - (obs_dist - obs['radius']) / 50)
                
                wind_speed = np.random.uniform(0, 20)
                battery = 100 - (t / 200) * 50
                altitude = 50 + np.random.normal(0, 2)
                
                data.append({
                    'episode': episode,
                    'timestep': t,
                    'x': current_pos[0],
                    'y': current_pos[1],
                    'altitude': altitude,
                    'target_x': target[0],
                    'target_y': target[1],
                    'velocity_x': dx_norm * 5,
                    'velocity_y': dy_norm * 5,
                    'wind_speed': wind_speed,
                    'battery': battery,
                    'danger_level': danger_level,
                    'min_obstacle_distance': max(0, min_obstacle_dist),
                    'waypoint_index': target_wp_idx
                })
        
        return pd.DataFrame(data)

In [None]:
class Actor(nn.Module):
    def __init__(self, state_dim: int, action_dim: int, hidden_dim: int = 256):
        super(Actor, self).__init__()
        self.fc1 = nn.Linear(state_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        self.fc3 = nn.Linear(hidden_dim, action_dim)
        
    def forward(self, state):
        x = F.relu(self.fc1(state))
        x = F.relu(self.fc2(x))
        x = torch.tanh(self.fc3(x))
        return x

class Critic(nn.Module):
    def __init__(self, state_dim: int, action_dim: int, hidden_dim: int = 256):
        super(Critic, self).__init__()
        self.fc1 = nn.Linear(state_dim + action_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        self.fc3 = nn.Linear(hidden_dim, 1)
        
    def forward(self, state, action):
        x = torch.cat([state, action], dim=1)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

class AuthorityNetwork(nn.Module):
    def __init__(self, state_dim: int, hidden_dim: int = 128):
        super(AuthorityNetwork, self).__init__()
        self.fc1 = nn.Linear(state_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        self.fc3 = nn.Linear(hidden_dim, 1)
        
    def forward(self, state):
        x = F.relu(self.fc1(state))
        x = F.relu(self.fc2(x))
        x = torch.sigmoid(self.fc3(x))  # Output between 0 and 1
        return x

In [40]:
class ReplayBuffer:
    def __init__(self, capacity: int):
        self.buffer = deque(maxlen=capacity)
    
    def push(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))
    
    def sample(self, batch_size: int):
        batch = random.sample(self.buffer, batch_size)
        state, action, reward, next_state, done = map(np.stack, zip(*batch))
        return state, action, reward, next_state, done
    
    def __len__(self):
        return len(self.buffer)

In [None]:
class DroneNavigationEnv:    
    def __init__(self, gps_generator: DroneGPSDataGenerator):
        self.gps_generator = gps_generator
        self.state_dim = 12
        self.action_dim = 3 
        self.reset()
        
    def reset(self):
        self.current_waypoint_idx = 0
        self.timestep = 0
        self.position = np.array([
            np.random.uniform(0, self.gps_generator.area_size[0]),
            np.random.uniform(0, self.gps_generator.area_size[1]),
            50.0  # altitude
        ])
        self.velocity = np.zeros(3)
        self.battery = 100.0
        return self._get_state()
    
    def _get_state(self):
        if self.current_waypoint_idx < len(self.gps_generator.waypoints):
            target = self.gps_generator.waypoints[self.current_waypoint_idx]
            target_alt = 50.0
        else:
            target = (self.position[0], self.position[1])
            target_alt = self.position[2]
        
        danger_level = 0
        min_obs_dist = float('inf')
        for obs in self.gps_generator.obstacles:
            dist = np.sqrt((self.position[0] - obs['position'][0])**2 + 
                         (self.position[1] - obs['position'][1])**2)
            min_obs_dist = min(min_obs_dist, dist - obs['radius'])
            if dist < obs['radius'] + 50:
                danger_level = max(danger_level, 1 - (dist - obs['radius']) / 50)
        
        state = np.array([
            self.position[0] / 1000,  # Normalize
            self.position[1] / 1000,
            self.position[2] / 100,
            target[0] / 1000,
            target[1] / 1000,
            target_alt / 100,
            self.velocity[0] / 10,
            self.velocity[1] / 10,
            self.velocity[2] / 5,
            np.random.uniform(0, 20) / 20,  # Wind
            self.battery / 100,
            danger_level
        ])
        return state
    
    def step(self, action):
        self.velocity = np.clip(action * np.array([10, 10, 5]), -10, 10)
        
        self.position += self.velocity * 0.1  # 0.1s timestep
        
        self.position[0] = np.clip(self.position[0], 0, self.gps_generator.area_size[0])
        self.position[1] = np.clip(self.position[1], 0, self.gps_generator.area_size[1])
        self.position[2] = np.clip(self.position[2], 10, 150)
        
        self.battery -= 0.05 + np.linalg.norm(self.velocity) * 0.005
        
        if self.current_waypoint_idx < len(self.gps_generator.waypoints):
            target = self.gps_generator.waypoints[self.current_waypoint_idx]
            dist_to_target = np.sqrt((self.position[0] - target[0])**2 + 
                                   (self.position[1] - target[1])**2)
            
            if dist_to_target < 20:
                self.current_waypoint_idx += 1
                print(f"Waypoint {self.current_waypoint_idx}/{len(self.gps_generator.waypoints)} reached!")
        else:
            dist_to_target = 0
        
        reward = self._calculate_reward(dist_to_target)
        
        mission_complete = self.current_waypoint_idx >= len(self.gps_generator.waypoints)
        done = (self.battery <= 0 or mission_complete or self.timestep >= 2000)
        
        if mission_complete and not hasattr(self, 'mission_completed'):
            reward += 100  # Big bonus for completing mission
            self.mission_completed = True
        
        self.timestep += 1
        
        info = {
            'mission_complete': mission_complete,
            'waypoints_reached': self.current_waypoint_idx,
            'total_waypoints': len(self.gps_generator.waypoints)
        }
        
        return self._get_state(), reward, done, info
    
    def _calculate_reward(self, dist_to_target):
        if self.current_waypoint_idx < len(self.gps_generator.waypoints):
            dist_reward = -dist_to_target / 50
        else:
            dist_reward = 0
        
        wp_reward = 50 if dist_to_target < 20 else 0  # Increased reward
        
        progress_reward = self.current_waypoint_idx * 5
        
        safety_penalty = 0
        min_safe_distance = float('inf')
        for obs in self.gps_generator.obstacles:
            dist = np.sqrt((self.position[0] - obs['position'][0])**2 + 
                         (self.position[1] - obs['position'][1])**2)
            safe_dist = dist - obs['radius']
            min_safe_distance = min(min_safe_distance, safe_dist)
            
            if dist < obs['radius']:
                safety_penalty = -100
            elif dist < obs['radius'] + 20:
                safety_penalty = min(safety_penalty, -20 * (1 - (dist - obs['radius']) / 20))
        
        if min_safe_distance > 30:
            safety_bonus = 1.0
        else:
            safety_bonus = 0
        
        battery_penalty = -10 if self.battery < 10 else 0
        
        time_penalty = -0.01
        
        return dist_reward + wp_reward + progress_reward + safety_penalty + battery_penalty + safety_bonus + time_penalty

In [None]:
class DDPGAgent:    
    def __init__(self, state_dim: int, action_dim: int, lr: float = 1e-3):
        self.actor = Actor(state_dim, action_dim).to(device)
        self.actor_target = Actor(state_dim, action_dim).to(device)
        self.actor_target.load_state_dict(self.actor.state_dict())
        
        self.critic = Critic(state_dim, action_dim).to(device)
        self.critic_target = Critic(state_dim, action_dim).to(device)
        self.critic_target.load_state_dict(self.critic.state_dict())
        
        self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=lr)
        self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=lr)
        
        self.memory = ReplayBuffer(100000)
        self.batch_size = 64
        self.gamma = 0.99
        self.tau = 0.001
        
    def select_action(self, state, noise=0.1):
        state = torch.FloatTensor(state).unsqueeze(0).to(device)
        action = self.actor(state).cpu().data.numpy()[0]
        if noise > 0:
            action += np.random.normal(0, noise, size=action.shape)
        return np.clip(action, -1, 1)
    
    def update(self):
        if len(self.memory) < self.batch_size:
            return
        
        state, action, reward, next_state, done = self.memory.sample(self.batch_size)
        
        state = torch.FloatTensor(state).to(device)
        action = torch.FloatTensor(action).to(device)
        reward = torch.FloatTensor(reward).unsqueeze(1).to(device)
        next_state = torch.FloatTensor(next_state).to(device)
        done = torch.FloatTensor(done).unsqueeze(1).to(device)
        
        next_action = self.actor_target(next_state)
        target_q = self.critic_target(next_state, next_action)
        target_q = reward + (1 - done) * self.gamma * target_q
        current_q = self.critic(state, action)
        
        critic_loss = F.mse_loss(current_q, target_q.detach())
        self.critic_optimizer.zero_grad()
        critic_loss.backward()
        self.critic_optimizer.step()
        
        actor_loss = -self.critic(state, self.actor(state)).mean()
        self.actor_optimizer.zero_grad()
        actor_loss.backward()
        self.actor_optimizer.step()
        
        self._soft_update(self.actor_target, self.actor)
        self._soft_update(self.critic_target, self.critic)
    
    def _soft_update(self, target, source):
        for target_param, param in zip(target.parameters(), source.parameters()):
            target_param.data.copy_(target_param.data * (1.0 - self.tau) + param.data * self.tau)


Professor, for my HIL interpretation I let the model to manually leave the collision trajectory by checking proximity to collision targets.

In [None]:
class HumanInLoopDDPG(DDPGAgent):    
    def __init__(self, state_dim: int, action_dim: int, lr: float = 1e-3):
        super().__init__(state_dim, action_dim, lr)
        self.authority_net = AuthorityNetwork(state_dim).to(device)
        self.authority_optimizer = optim.Adam(self.authority_net.parameters(), lr=lr/10)
        self.human_simulator = self._create_human_simulator()
        self.authority_memory = ReplayBuffer(10000)
        
    def _create_human_simulator(self):
        def human_action(state, env_info):
            danger = state[11]
            min_obs_dist = state[11] * 50  
            
            if danger > 0.7 or (min_obs_dist < 30 and min_obs_dist > 0):
                current_x, current_y = state[0] * 1000, state[1] * 1000
                
                best_action = None
                best_safety = -float('inf')
                
                for angle in np.linspace(0, 2*np.pi, 8):
                    test_dx = np.cos(angle) * 0.8
                    test_dy = np.sin(angle) * 0.8
                    
                    new_x = current_x + test_dx * 10
                    new_y = current_y + test_dy * 10
                    
                    safety_score = 0
                    if hasattr(env_info, 'gps_generator'):
                        for obs in env_info.gps_generator.obstacles:
                            dist = np.sqrt((new_x - obs['position'][0])**2 + 
                                         (new_y - obs['position'][1])**2)
                            safety_score += max(0, dist - obs['radius'])
                    
                    if safety_score > best_safety:
                        best_safety = safety_score
                        best_action = np.array([test_dx, test_dy, 0.1])
                
                return best_action if best_action is not None else np.array([0, 0, 0.2])
            return None
        
        return human_action
    
    def select_action(self, state, env_info=None, noise=0.1):
        state_tensor = torch.FloatTensor(state).unsqueeze(0).to(device)
        
        ai_action = self.actor(state_tensor).cpu().data.numpy()[0]
        if noise > 0:
            ai_action += np.random.normal(0, noise * 0.5, size=ai_action.shape)
        ai_action = np.clip(ai_action, -1, 1)
        
        with torch.no_grad():
            authority = self.authority_net(state_tensor).cpu().data.numpy()[0, 0]
        
        danger = state[11]
        human_action = self.human_simulator(state, env_info)
        
        if human_action is not None and (danger > 0.6 or authority < 0.3):
            final_action = human_action
            control_mode = 'human'
            actual_authority = 0.0
        elif human_action is not None and danger > 0.3:
            blend_factor = 1 - danger 
            final_action = blend_factor * ai_action + (1 - blend_factor) * human_action
            control_mode = 'shared'
            actual_authority = blend_factor
        else:
            final_action = ai_action
            control_mode = 'ai'
            actual_authority = 1.0
        
        return final_action, actual_authority, control_mode
    
    def update_authority(self, state, authority, reward, next_state, done, collision=False):
        self.authority_memory.push(state, authority, reward, next_state, done)
        
        if len(self.authority_memory) < self.batch_size:
            return
            
        batch_state, batch_authority, batch_reward, batch_next_state, batch_done = \
            self.authority_memory.sample(self.batch_size)
        
        state_tensor = torch.FloatTensor(batch_state).to(device)
        authority_tensor = torch.FloatTensor(batch_authority).unsqueeze(1).to(device)
        reward_tensor = torch.FloatTensor(batch_reward).unsqueeze(1).to(device)
        next_state_tensor = torch.FloatTensor(batch_next_state).to(device)
        done_tensor = torch.FloatTensor(batch_done).unsqueeze(1).to(device)
        
        predicted_authority = self.authority_net(state_tensor)
        
        danger_current = state_tensor[:, 11].unsqueeze(1)
        danger_next = next_state_tensor[:, 11].unsqueeze(1)
        
        target_authority = torch.sigmoid((0.5 - danger_next) * 10)
        
        reward_signal = torch.sigmoid(reward_tensor / 10)
        target_authority = 0.7 * target_authority + 0.3 * reward_signal
        
        if collision:
            target_authority = torch.zeros_like(target_authority)
        
        authority_loss = F.mse_loss(predicted_authority, target_authority.detach())
        
        self.authority_optimizer.zero_grad()
        authority_loss.backward()
        torch.nn.utils.clip_grad_norm_(self.authority_net.parameters(), 1.0)
        self.authority_optimizer.step()

    def pretrain_authority_network(self, env, episodes=20):
        print("Pre-training authority network...")
        
        for episode in range(episodes):
            state = env.reset()
            done = False
            
            while not done:
                danger = state[11]
                
                if danger > 0.7:
                    target_authority = 0.0 
                elif danger > 0.4:
                    target_authority = 0.3 
                elif danger > 0.2:
                    target_authority = 0.6
                else:
                    target_authority = 0.95 
                
                state_tensor = torch.FloatTensor(state).unsqueeze(0).to(device)
                predicted = self.authority_net(state_tensor)
                target = torch.FloatTensor([[target_authority]]).to(device)
                
                loss = F.mse_loss(predicted, target)
                self.authority_optimizer.zero_grad()
                loss.backward()
                self.authority_optimizer.step()
                
                action = np.random.uniform(-1, 1, size=3)
                next_state, _, done, _ = env.step(action)
                state = next_state
        
        print(f"Pre-training complete!")


In [None]:
def train_ddpg(env, agent, episodes=100):
    rewards_history = []
    success_history = []
    
    for episode in range(episodes):
        state = env.reset()
        episode_reward = 0
        done = False
        
        while not done:
            action = agent.select_action(state, noise=0.1)
            next_state, reward, done, info = env.step(action)
            
            agent.memory.push(state, action, reward, next_state, done)
            agent.update()
            
            state = next_state
            episode_reward += reward
        
        rewards_history.append(episode_reward)
        success_history.append(info.get('mission_complete', False))
        
        if episode % 10 == 0:
            recent_success_rate = np.mean(success_history[-10:]) if len(success_history) >= 10 else np.mean(success_history)
            print(f"Episode {episode}, Reward: {episode_reward:.2f}, "
                  f"Waypoints: {info['waypoints_reached']}/{info['total_waypoints']}, "
                  f"Success Rate: {recent_success_rate*100:.1f}%")
    
    return rewards_history

def train_human_in_loop_ddpg(env, agent, episodes=100):
    rewards_history = []
    authority_history = []
    control_mode_stats = {'ai': 0, 'shared': 0, 'human': 0}
    collision_history = []
    success_history = []
    
    for episode in range(episodes):
        state = env.reset()
        episode_reward = 0
        episode_authority = []
        episode_collisions = 0
        done = False
        
        while not done:
            action, authority, control_mode = agent.select_action(state, env, noise=0.1)
            next_state, reward, done, info = env.step(action)
            
            collision = False
            for obs in env.gps_generator.obstacles:
                dist = np.sqrt((env.position[0] - obs['position'][0])**2 + 
                             (env.position[1] - obs['position'][1])**2)
                if dist < obs['radius']:
                    collision = True
                    episode_collisions += 1
                    break
            
            agent.memory.push(state, action, reward, next_state, done)
            
            agent.update()
            agent.update_authority(state, authority, reward, next_state, done, collision)
            
            state = next_state
            episode_reward += reward
            episode_authority.append(authority)
            control_mode_stats[control_mode] += 1
        
        rewards_history.append(episode_reward)
        authority_history.append(np.mean(episode_authority))
        collision_history.append(episode_collisions)
        success_history.append(info.get('mission_complete', False))
        
        if episode % 10 == 0:
            recent_success_rate = np.mean(success_history[-10:]) if len(success_history) >= 10 else np.mean(success_history)
            print(f"Episode {episode}, Reward: {episode_reward:.2f}, "
                  f"Avg Authority: {np.mean(episode_authority):.2f}, "
                  f"Waypoints: {info['waypoints_reached']}/{info['total_waypoints']}, "
                  f"Success Rate: {recent_success_rate*100:.1f}%")
    
    print(f"\nTotal collisions over {episodes} episodes: {sum(collision_history)}")
    print(f"Final success rate: {np.mean(success_history)*100:.1f}%")
    
    return rewards_history, authority_history, control_mode_stats

In [None]:
def evaluate_safety_metrics(env, agent, n_episodes=20):
    metrics = {
        'collisions': 0,
        'near_misses': 0,
        'avg_min_obstacle_dist': [],
        'danger_time_ratio': [],
        'mission_success_rate': 0,
        'avg_battery_remaining': []
    }
    
    for episode in range(n_episodes):
        state = env.reset()
        done = False
        episode_min_distances = []
        episode_danger_time = 0
        total_timesteps = 0
        
        while not done:
            if isinstance(agent, HumanInLoopDDPG):
                action, _, _ = agent.select_action(state, env, noise=0)
            else:
                action = agent.select_action(state, noise=0)
            
            next_state, _, done, _ = env.step(action)
            
            min_dist = float('inf')
            for obs in env.gps_generator.obstacles:
                dist = np.sqrt((env.position[0] - obs['position'][0])**2 + 
                             (env.position[1] - obs['position'][1])**2)
                actual_dist = dist - obs['radius']
                min_dist = min(min_dist, actual_dist)
                
                if actual_dist < 0:
                    metrics['collisions'] += 1
                elif actual_dist < 20:
                    metrics['near_misses'] += 1
            
            episode_min_distances.append(min_dist)
            if state[11] > 0.5:  # Danger level
                episode_danger_time += 1
            total_timesteps += 1
            
            state = next_state
        
        metrics['avg_min_obstacle_dist'].append(np.mean(episode_min_distances))
        metrics['danger_time_ratio'].append(episode_danger_time / total_timesteps)
        metrics['avg_battery_remaining'].append(env.battery)
        
        if env.current_waypoint_idx >= len(env.gps_generator.waypoints):
            metrics['mission_success_rate'] += 1
    
    metrics['mission_success_rate'] /= n_episodes
    metrics['avg_min_obstacle_dist'] = np.mean(metrics['avg_min_obstacle_dist'])
    metrics['danger_time_ratio'] = np.mean(metrics['danger_time_ratio'])
    metrics['avg_battery_remaining'] = np.mean(metrics['avg_battery_remaining'])
    
    return metrics

def compare_models_performance(env, ddpg_agent, hil_agent):
    print("\n" + "="*50)
    print("Evaluating Model Safety and Performance...")
    print("="*50)
    
    ddpg_metrics = evaluate_safety_metrics(env, ddpg_agent)
    hil_metrics = evaluate_safety_metrics(env, hil_agent)
    
    print("\nSafety Metrics Comparison:")
    print(f"{'Metric':<30} {'Standard DDPG':<15} {'HIL-DDPG':<15} {'Improvement':<15}")
    print("-" * 75)
    
    metrics_to_compare = [
        ('Collisions', 'collisions', 'lower'),
        ('Near Misses', 'near_misses', 'lower'),
        ('Avg Min Obstacle Dist (m)', 'avg_min_obstacle_dist', 'higher'),
        ('Danger Time Ratio', 'danger_time_ratio', 'lower'),
        ('Mission Success Rate', 'mission_success_rate', 'higher'),
        ('Avg Battery Remaining (%)', 'avg_battery_remaining', 'higher')
    ]
    
    for name, key, better in metrics_to_compare:
        ddpg_val = ddpg_metrics[key]
        hil_val = hil_metrics[key]
        
        if better == 'lower':
            improvement = (ddpg_val - hil_val) / (ddpg_val + 1e-6) * 100
        else:
            improvement = (hil_val - ddpg_val) / (ddpg_val + 1e-6) * 100
        
        print(f"{name:<30} {ddpg_val:<15.2f} {hil_val:<15.2f} {improvement:>14.1f}%")
    
    return ddpg_metrics, hil_metrics

def analyze_authority_patterns(env, hil_agent, n_episodes=10):
    authority_data = []
    
    for episode in range(n_episodes):
        state = env.reset()
        done = False
        
        while not done:
            action, authority, control_mode = hil_agent.select_action(state, env, noise=0)
            next_state, _, done, _ = env.step(action)
            
            authority_data.append({
                'authority': authority,
                'danger_level': state[11],
                'obstacle_proximity': next_state[11] * 50,
                'battery': state[10] * 100,
                'distance_to_target': np.sqrt((state[3] - state[0])**2 + 
                                            (state[4] - state[1])**2) * 1000,
                'altitude': state[2] * 100,
                'wind_speed': state[9] * 20,
                'control_mode': control_mode
            })
            
            state = next_state
    
    df = pd.DataFrame(authority_data)
    
    fig, axes = plt.subplots(2, 3, figsize=(15, 10))
    axes = axes.flatten()
    
    factors = ['danger_level', 'obstacle_proximity', 'battery', 
               'distance_to_target', 'altitude', 'wind_speed']
    
    for i, factor in enumerate(factors):
        axes[i].scatter(df[factor], df['authority'], alpha=0.5)
        axes[i].set_xlabel(factor.replace('_', ' ').title())
        axes[i].set_ylabel('AI Authority')
        axes[i].set_ylim(-0.1, 1.1)
        
        z = np.polyfit(df[factor], df['authority'], 1)
        p = np.poly1d(z)
        x_line = np.linspace(df[factor].min(), df[factor].max(), 100)
        axes[i].plot(x_line, p(x_line), "r--", alpha=0.8)
    
    plt.suptitle('Authority Allocation Patterns Analysis', fontsize=16)
    plt.tight_layout()
    plt.show()
    
    print("\nAuthority Correlation Analysis:")
    print("-" * 40)
    for factor in factors:
        corr = df['authority'].corr(df[factor])
        print(f"{factor.replace('_', ' ').title():<25}: {corr:>6.3f}")
    
    return df

def generate_research_plots(trajectory_data):
    fig, axes = plt.subplots(2, 2, figsize=(12, 10))
    
    ax1 = axes[0, 0]
    sample_episode = trajectory_data[trajectory_data['episode'] == 0]
    scatter = ax1.scatter(sample_episode['x'], sample_episode['y'], 
                         c=sample_episode['danger_level'], cmap='YlOrRd', 
                         s=20, alpha=0.7)
    ax1.plot(sample_episode['x'], sample_episode['y'], 'b-', alpha=0.3)
    ax1.set_xlabel('X Position (m)')
    ax1.set_ylabel('Y Position (m)')
    ax1.set_title('Sample Drone Trajectory with Danger Levels')
    plt.colorbar(scatter, ax=ax1, label='Danger Level')
    
    ax2 = axes[0, 1]
    ax2.hist(trajectory_data['danger_level'], bins=50, edgecolor='black')
    ax2.set_xlabel('Danger Level')
    ax2.set_ylabel('Frequency')
    ax2.set_title('Distribution of Danger Levels in Training Data')
    
    ax3 = axes[1, 0]
    for ep in range(min(5, trajectory_data['episode'].max())):
        ep_data = trajectory_data[trajectory_data['episode'] == ep]
        ax3.plot(ep_data['timestep'], ep_data['battery'], 
                label=f'Episode {ep}', alpha=0.7)
    ax3.set_xlabel('Timestep')
    ax3.set_ylabel('Battery Level (%)')
    ax3.set_title('Battery Consumption Patterns')
    ax3.legend()
    
    ax4 = axes[1, 1]
    ax4.scatter(trajectory_data['min_obstacle_distance'], 
               trajectory_data['danger_level'], alpha=0.3, s=10)
    ax4.set_xlabel('Minimum Obstacle Distance (m)')
    ax4.set_ylabel('Danger Level')
    ax4.set_title('Danger Level vs Obstacle Proximity')
    ax4.set_xlim(0, 100)
    
    plt.tight_layout()
    plt.savefig('research_analysis_plots.png', dpi=300, bbox_inches='tight')
    plt.show()

def perform_statistical_analysis(ddpg_rewards, hil_rewards):
    from scipy import stats
    
    print("\n" + "="*50)
    print("Statistical Analysis for Research Paper")
    print("="*50)
    
    print("\nDescriptive Statistics:")
    print(f"{'Metric':<20} {'DDPG':<20} {'HIL-DDPG':<20}")
    print("-" * 60)
    
    metrics = [
        ('Mean Reward', np.mean, '.2f'),
        ('Std Deviation', np.std, '.2f'),
        ('Max Reward', np.max, '.2f'),
        ('Min Reward', np.min, '.2f'),
        ('Median Reward', np.median, '.2f')
    ]
    
    for name, func, fmt in metrics:
        ddpg_val = func(ddpg_rewards[-20:])  # Last 20 episodes
        hil_val = func(hil_rewards[-20:])
        print(f"{name:<20} {ddpg_val:<20{fmt}} {hil_val:<20{fmt}}")
    
    print("\n\nStatistical Tests:")
    print("-" * 40)
    
    t_stat, p_value = stats.ttest_ind(ddpg_rewards[-20:], hil_rewards[-20:])
    print(f"Independent t-test: t={t_stat:.3f}, p={p_value:.4f}")
    
    u_stat, u_p_value = stats.mannwhitneyu(ddpg_rewards[-20:], hil_rewards[-20:])
    print(f"Mann-Whitney U test: U={u_stat:.3f}, p={u_p_value:.4f}")
    
    pooled_std = np.sqrt((np.std(ddpg_rewards[-20:])**2 + np.std(hil_rewards[-20:])**2) / 2)
    cohens_d = (np.mean(hil_rewards[-20:]) - np.mean(ddpg_rewards[-20:])) / pooled_std
    print(f"Cohen's d effect size: {cohens_d:.3f}")
    
    print("\nInterpretation:")
    if p_value < 0.05:
        print("✓ Statistically significant difference between DDPG and HIL-DDPG (p < 0.05)")
    else:
        print("✗ No statistically significant difference found (p ≥ 0.05)")
    
    if abs(cohens_d) < 0.2:
        effect = "negligible"
    elif abs(cohens_d) < 0.5:
        effect = "small"
    elif abs(cohens_d) < 0.8:
        effect = "medium"
    else:
        effect = "large"
    
    print(f"✓ Effect size is {effect} (|d| = {abs(cohens_d):.3f})")

def generate_latex_tables(ddpg_metrics, hil_metrics):
    print("\n" + "="*50)
    print("LaTeX Tables for Paper")
    print("="*50)
    
    print("\n% Performance Comparison Table")
    print("\\begin{table}[h]")
    print("\\centering")
    print("\\caption{Safety and Performance Metrics Comparison}")
    print("\\label{tab:performance}")
    print("\\begin{tabular}{lrrr}")
    print("\\hline")
    print("Metric & Standard DDPG & HIL-DDPG & Improvement (\\%) \\\\")
    print("\\hline")
    
    metrics = [
        ('Collisions', 'collisions', 'lower'),
        ('Near Misses', 'near_misses', 'lower'),
        ('Avg. Min. Obstacle Dist. (m)', 'avg_min_obstacle_dist', 'higher'),
        ('Mission Success Rate', 'mission_success_rate', 'higher'),
    ]
    
    for name, key, better in metrics:
        ddpg_val = ddpg_metrics[key]
        hil_val = hil_metrics[key]
        
        if better == 'lower':
            improvement = (ddpg_val - hil_val) / (ddpg_val + 1e-6) * 100
        else:
            improvement = (hil_val - ddpg_val) / (ddpg_val + 1e-6) * 100
        
        print(f"{name} & {ddpg_val:.2f} & {hil_val:.2f} & {improvement:.1f} \\\\")
    
    print("\\hline")
    print("\\end{tabular}")
    print("\\end{table}")

In [None]:
print("Initializing Drone Navigation Environment...")
    
gps_gen = DroneGPSDataGenerator(area_size=(1000, 1000), 
                               n_obstacles=5, n_waypoints=3)

print("\nGenerating mock GPS trajectory data...")
trajectory_data = gps_gen.generate_trajectory_data(n_episodes=50)
trajectory_data.to_csv('drone_gps_data.csv', index=False)
print(f"Generated {len(trajectory_data)} GPS data points")
print(f"Data shape: {trajectory_data.shape}")
print(f"Columns: {list(trajectory_data.columns)}")

env = DroneNavigationEnv(gps_gen)

print("\nVisualizing environment...")
visualize_environment(env)

state_dim = env.state_dim
action_dim = env.action_dim

print(f"\nState dimension: {state_dim}")
print(f"Action dimension: {action_dim}")

print("\n" + "="*50)
print("Training Standard DDPG Agent...")
print("="*50)
ddpg_agent = DDPGAgent(state_dim, action_dim)
ddpg_rewards = train_ddpg(env, ddpg_agent, episodes=100)

print("\n" + "="*50)
print("Training Human-in-the-Loop DDPG Agent...")
print("="*50)
hil_agent = HumanInLoopDDPG(state_dim, action_dim)

hil_agent.pretrain_authority_network(env, episodes=20)

hil_rewards, hil_authority, control_stats = train_human_in_loop_ddpg(
    env, hil_agent, episodes=100)

print("\n" + "="*50)
print("Training Complete! Final Statistics:")
print("="*50)
print(f"Standard DDPG - Final avg reward: {np.mean(ddpg_rewards[-10:]):.2f}")
print(f"HIL-DDPG - Final avg reward: {np.mean(hil_rewards[-10:]):.2f}")
print(f"HIL-DDPG - Final avg authority: {np.mean(hil_authority[-10:]):.2f}")
print(f"Control mode distribution: {control_stats}")

print("\nPlotting training results...")
plot_training_results(ddpg_rewards, hil_rewards, hil_authority)

print("\nSaving trained models...")
torch.save({
    'actor_state_dict': ddpg_agent.actor.state_dict(),
    'critic_state_dict': ddpg_agent.critic.state_dict(),
}, 'ddpg_model.pth')

torch.save({
    'actor_state_dict': hil_agent.actor.state_dict(),
    'critic_state_dict': hil_agent.critic.state_dict(),
    'authority_state_dict': hil_agent.authority_net.state_dict(),
}, 'hil_ddpg_model.pth')

print("\nModels saved successfully!")
print("\nExperiment complete. You can now analyze the results for your paper.")

In [None]:
print("\n\nPerforming detailed evaluation for research paper...")
    
# Compare models
ddpg_metrics, hil_metrics = compare_models_performance(env, ddpg_agent, hil_agent)

# Analyze authority patterns
print("\n\nAnalyzing Authority Allocation Patterns...")
authority_df = analyze_authority_patterns(env, hil_agent)

# Statistical analysis
perform_statistical_analysis(ddpg_rewards, hil_rewards)

# Generate research plots
print("\n\nGenerating research quality plots...")
generate_research_plots(trajectory_data)

# Generate LaTeX tables
generate_latex_tables(ddpg_metrics, hil_metrics)

print("\n\nAll analyses complete! Ready for paper submission.")