In [1]:
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
import mpl_toolkits.mplot3d.axes3d as p3
from scipy.spatial.distance import euclidean
from scipy.interpolate import interp1d
from typing import List, Tuple, Dict, Optional, Union
import json
import datetime
import os

In [2]:
class Waypoint:
    """Represents a single waypoint in 3D space with an optional timestamp."""

    def __init__(self, x: float, y: float, z: float = 0, timestamp: Optional[float] = None):
        self.x = x
        self.y = y
        self.z = z
        self.timestamp = timestamp

    def __repr__(self) -> str:
        if self.timestamp is not None:
            return f"Waypoint(x={self.x}, y={self.y}, z={self.z}, t={self.timestamp})"
        return f"Waypoint(x={self.x}, y={self.y}, z={self.z})"

    def coordinates(self) -> np.ndarray:
        """Returns the spatial coordinates as a numpy array."""
        return np.array([self.x, self.y, self.z])

    def distance_to(self, other: 'Waypoint') -> float:
        """Calculate Euclidean distance to another waypoint."""
        return euclidean(self.coordinates(), other.coordinates())

In [3]:
class DroneTrajectory:
    """Represents a drone's trajectory defined by waypoints and timing information."""

    def __init__(self,
                 drone_id: str,
                 waypoints: List[Waypoint],
                 start_time: float,
                 end_time: float,
                 color: str = None):
        self.drone_id = drone_id
        self.waypoints = waypoints
        self.start_time = start_time
        self.end_time = end_time
        self.color = color or self._generate_random_color()

        # Generate timestamps if not provided
        if all(wp.timestamp is None for wp in waypoints):
            self._generate_timestamps()

        # Create interpolation functions for position at any time
        self._create_interpolators()

    def _generate_random_color(self) -> str:
        """Generate a random color for visualization."""
        r, g, b = np.random.random(3)
        return f'#{int(r*255):02x}{int(g*255):02x}{int(b*255):02x}'

    def _generate_timestamps(self) -> None:
        """Generate evenly distributed timestamps for waypoints."""
        total_distance = 0
        distances = [0]

        # Calculate cumulative distances
        for i in range(1, len(self.waypoints)):
            dist = self.waypoints[i].distance_to(self.waypoints[i-1])
            total_distance += dist
            distances.append(total_distance)

        # Normalize distances to time range
        time_range = self.end_time - self.start_time
        for i, wp in enumerate(self.waypoints):
            if i == 0:
                wp.timestamp = self.start_time
            elif i == len(self.waypoints) - 1:
                wp.timestamp = self.end_time
            else:
                # Proportional time based on distance
                wp.timestamp = self.start_time + (distances[i] / total_distance) * time_range

    def _create_interpolators(self) -> None:
        """Create interpolation functions for x, y, z coordinates over time."""
        times = np.array([wp.timestamp for wp in self.waypoints])
        x_values = np.array([wp.x for wp in self.waypoints])
        y_values = np.array([wp.y for wp in self.waypoints])
        z_values = np.array([wp.z for wp in self.waypoints])

        # Create interpolation functions (cubic for smooth trajectories)
        self.x_interp = interp1d(times, x_values, kind='linear', bounds_error=False, fill_value=(x_values[0], x_values[-1]))
        self.y_interp = interp1d(times, y_values, kind='linear', bounds_error=False, fill_value=(y_values[0], y_values[-1]))
        self.z_interp = interp1d(times, z_values, kind='linear', bounds_error=False, fill_value=(z_values[0], z_values[-1]))

    def position_at_time(self, time: float) -> np.ndarray:
        """Get the interpolated position of the drone at a specific time."""
        # Clamp time to valid range
        t = max(self.start_time, min(time, self.end_time))

        x = float(self.x_interp(t))
        y = float(self.y_interp(t))
        z = float(self.z_interp(t))

        return np.array([x, y, z])

    def is_active_at_time(self, time: float) -> bool:
        """Check if the drone is active/flying at the specified time."""
        return self.start_time <= time <= self.end_time

    def extract_trajectory_samples(self, num_samples: int = 100) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
        """Extract sampled points along the trajectory for visualization."""
        times = np.linspace(self.start_time, self.end_time, num_samples)
        x_vals = self.x_interp(times)
        y_vals = self.y_interp(times)
        z_vals = self.z_interp(times)

        return times, x_vals, y_vals, z_vals

In [5]:
class DeconflictionSystem:
    """
    Main system for strategic deconfliction of drone missions.
    Checks if a planned drone mission conflicts with existing flight schedules.
    """

    def __init__(self, safety_buffer: float = 10.0):
        """
        Initialize the deconfliction system.

        Args:
            safety_buffer: Minimum required separation distance between drones (meters)
        """
        self.safety_buffer = safety_buffer
        self.registered_flights: List[DroneTrajectory] = []

    def register_flight(self, flight: DroneTrajectory) -> None:
        """Register an existing flight in the system."""
        self.registered_flights.append(flight)

    def register_flights(self, flights: List[DroneTrajectory]) -> None:
        """Register multiple flights at once."""
        self.registered_flights.extend(flights)

    def check_mission_safety(self,
                            mission: DroneTrajectory,
                            time_resolution: float = 1.0,
                            visualize: bool = False) -> Dict:
        """
        Check if a planned mission is safe to execute without conflicts.

        Args:
            mission: The planned drone trajectory to check
            time_resolution: Time step for checking conflicts (seconds)
            visualize: Whether to create and show a visualization

        Returns:
            Dictionary containing safety status and conflict details if any
        """
        conflicts = []

        # Generate time points to check
        check_times = np.arange(mission.start_time, mission.end_time + time_resolution, time_resolution)

        for t in check_times:
            if not mission.is_active_at_time(t):
                continue

            mission_pos = mission.position_at_time(t)

            for other_flight in self.registered_flights:
                if other_flight.drone_id == mission.drone_id:
                    continue  # Skip checking against itself

                if not other_flight.is_active_at_time(t):
                    continue

                other_pos = other_flight.position_at_time(t)
                distance = euclidean(mission_pos, other_pos)

                if distance < self.safety_buffer:
                    conflict = {
                        "time": t,
                        "position": mission_pos.tolist(),
                        "conflicting_drone": other_flight.drone_id,
                        "distance": distance,
                        "required_buffer": self.safety_buffer
                    }
                    conflicts.append(conflict)

        result = {
            "status": "clear" if not conflicts else "conflict detected",
            "conflicts": conflicts,
            "mission_id": mission.drone_id,
            "total_conflicts": len(conflicts)
        }

        if visualize and conflicts:
            self.visualize_conflicts(mission, conflicts)

        return result

    def visualize_conflicts(self, mission: DroneTrajectory, conflicts: List[Dict]) -> None:
        """Visualize detected conflicts for analysis."""
        fig = plt.figure(figsize=(12, 8))
        ax = fig.add_subplot(111, projection='3d')

        # Plot mission trajectory
        times, x, y, z = mission.extract_trajectory_samples()
        ax.plot(x, y, z, 'b-', linewidth=2, label=f'Mission: {mission.drone_id}')

        # Plot other drone trajectories
        for flight in self.registered_flights:
            times, x, y, z = flight.extract_trajectory_samples()
            ax.plot(x, y, z, color=flight.color, linestyle='-', alpha=0.7,
                   label=f'Drone: {flight.drone_id}')

        # Mark conflict points
        if conflicts:
            conflict_x = [c["position"][0] for c in conflicts]
            conflict_y = [c["position"][1] for c in conflicts]
            conflict_z = [c["position"][2] for c in conflicts]

            ax.scatter(conflict_x, conflict_y, conflict_z, color='red', s=100, marker='x',
                      label=f'Conflicts ({len(conflicts)})')

        ax.set_xlabel('X position (m)')
        ax.set_ylabel('Y position (m)')
        ax.set_zlabel('Z position (m)')
        ax.set_title('Drone Mission Conflicts in 3D Airspace')
        ax.legend()

        plt.tight_layout()
        plt.show()

    def create_4d_animation(self, mission: DroneTrajectory,
                           output_file: str = 'drone_animation.mp4',
                           fps: int = 30) -> None:
        """
        Create a 4D animation (3D space + time) of the mission and other flights.

        Args:
            mission: The main mission trajectory to highlight
            output_file: Path to save the animation video
            fps: Frames per second for the animation
        """
        import matplotlib.animation as animation

        # Find global time range
        min_time = min([mission.start_time] + [f.start_time for f in self.registered_flights])
        max_time = max([mission.end_time] + [f.end_time for f in self.registered_flights])

        # Create figure and 3D axis
        fig = plt.figure(figsize=(12, 8))
        ax = fig.add_subplot(111, projection='3d')

        # Find axis limits for better visualization
        all_flights = self.registered_flights + [mission]
        x_vals, y_vals, z_vals = [], [], []

        for flight in all_flights:
            _, x, y, z = flight.extract_trajectory_samples()
            x_vals.extend(x)
            y_vals.extend(y)
            z_vals.extend(z)

        x_min, x_max = min(x_vals), max(x_vals)
        y_min, y_max = min(y_vals), max(y_vals)
        z_min, z_max = min(z_vals), max(z_vals)

        # Add some padding
        padding = max(self.safety_buffer * 2, (x_max - x_min) * 0.1)
        x_min -= padding
        x_max += padding
        y_min -= padding
        y_max += padding
        z_min -= padding
        z_max += padding

        # Set fixed plot limits
        ax.set_xlim([x_min, x_max])
        ax.set_ylim([y_min, y_max])
        ax.set_zlim([z_min, z_max])

        # Prepare plot elements
        mission_path, = ax.plot([], [], [], 'b-', linewidth=3, label=f'Mission: {mission.drone_id}')
        mission_head = ax.scatter([], [], [], color='blue', s=100, marker='o')

        # Initialize other drone paths and heads
        other_paths = []
        other_heads = []

        for flight in self.registered_flights:
            path, = ax.plot([], [], [], color=flight.color, linestyle='-', alpha=0.7,
                           label=f'Drone: {flight.drone_id}')
            head = ax.scatter([], [], [], color=flight.color, s=80, marker='o')
            other_paths.append(path)
            other_heads.append(head)

        # Add safety buffer visualization for main drone
        buffer_sphere = None  # Will be updated in animation

        # Time display text
        time_text = ax.text2D(0.05, 0.95, "", transform=ax.transAxes)
        conflict_text = ax.text2D(0.05, 0.90, "", transform=ax.transAxes, color='red')

        ax.set_xlabel('X position (m)')
        ax.set_ylabel('Y position (m)')
        ax.set_zlabel('Z position (m)')
        ax.set_title('4D Drone Deconfliction Simulation (3D Space + Time)')
        ax.legend()

        def init():
            mission_path.set_data([], [])
            mission_path.set_3d_properties([])
            mission_head._offsets3d = ([], [], [])

            for path in other_paths:
                path.set_data([], [])
                path.set_3d_properties([])

            for head in other_heads:
                head._offsets3d = ([], [], [])

            time_text.set_text('')
            conflict_text.set_text('')

            return [mission_path, mission_head, time_text, conflict_text] + other_paths + other_heads

        def animate(frame):
            # Calculate current time based on frame number
            t = min_time + (max_time - min_time) * (frame / (fps * 10))  # 10-second animation

            # Format time display
            time_text.set_text(f'Time: {t:.1f}s')

            # Check for conflicts at this time
            conflicts_now = False
            conflict_drones = []

            if mission.is_active_at_time(t):
                mission_pos = mission.position_at_time(t)

                # Update mission path up to current time
                times = np.linspace(mission.start_time, min(t, mission.end_time), 100)
                x_vals = mission.x_interp(times)
                y_vals = mission.y_interp(times)
                z_vals = mission.z_interp(times)

                mission_path.set_data(x_vals, y_vals)
                mission_path.set_3d_properties(z_vals)

                # Update mission current position
                mission_head._offsets3d = ([mission_pos[0]], [mission_pos[1]], [mission_pos[2]])

                # Check for conflicts with other drones
                for i, other_flight in enumerate(self.registered_flights):
                    if other_flight.is_active_at_time(t):
                        other_pos = other_flight.position_at_time(t)
                        distance = euclidean(mission_pos, other_pos)

                        if distance < self.safety_buffer:
                            conflicts_now = True
                            conflict_drones.append(other_flight.drone_id)
            else:
                # Drone not active yet or already finished
                mission_path.set_data([], [])
                mission_path.set_3d_properties([])
                mission_head._offsets3d = ([], [], [])

            # Update other drone paths
            for i, (other_flight, path, head) in enumerate(zip(self.registered_flights, other_paths, other_heads)):
                if other_flight.is_active_at_time(t):
                    # Update path
                    times = np.linspace(other_flight.start_time, min(t, other_flight.end_time), 100)
                    x_vals = other_flight.x_interp(times)
                    y_vals = other_flight.y_interp(times)
                    z_vals = other_flight.z_interp(times)

                    path.set_data(x_vals, y_vals)
                    path.set_3d_properties(z_vals)

                    # Update current position
                    other_pos = other_flight.position_at_time(t)
                    head._offsets3d = ([other_pos[0]], [other_pos[1]], [other_pos[2]])
                else:
                    # Drone not active
                    path.set_data([], [])
                    path.set_3d_properties([])
                    head._offsets3d = ([], [], [])

            # Update conflict notification
            if conflicts_now:
                conflict_text.set_text(f"CONFLICT DETECTED with: {', '.join(conflict_drones)}")
            else:
                conflict_text.set_text("")

            return [mission_path, mission_head, time_text, conflict_text] + other_paths + other_heads

        # Create animation
        ani = animation.FuncAnimation(fig, animate, frames=fps*10, init_func=init,
                                     interval=1000/fps, blit=True)

        # Save animation
        writer = animation.FFMpegWriter(fps=fps, metadata=dict(artist='DeconflictionSystem'))
        ani.save(output_file, writer=writer)

        plt.close(fig)
        print(f"Animation saved to {output_file}")



In [6]:
def create_sample_scenario() -> Tuple[DroneTrajectory, List[DroneTrajectory]]:
    """Create a sample scenario with a primary mission and other drone flights."""
    # Primary mission (delivery drone)
    primary_waypoints = [
        Waypoint(0, 0, 10),      # Start at origin, 10m altitude
        Waypoint(50, 50, 20),    # Climb while moving
        Waypoint(100, 100, 15),  # Descend slightly
        Waypoint(150, 150, 5)    # End point, low altitude
    ]

    primary_mission = DroneTrajectory(
        drone_id="delivery_drone_1",
        waypoints=primary_waypoints,
        start_time=0,
        end_time=120,  # 2-minute mission
        color='blue'
    )

    # Other drones in the airspace
    other_flights = []

    # Drone 2: Crossing path
    drone2_waypoints = [
        Waypoint(0, 100, 15),
        Waypoint(100, 0, 15)
    ]
    drone2 = DroneTrajectory(
        drone_id="inspection_drone_2",
        waypoints=drone2_waypoints,
        start_time=30,  # Starts 30 seconds after primary
        end_time=90,    # 1-minute mission
        color='green'
    )
    other_flights.append(drone2)

    # Drone 3: Close to primary's path but at different altitude
    drone3_waypoints = [
        Waypoint(20, 20, 30),
        Waypoint(70, 70, 30),
        Waypoint(120, 120, 30)
    ]
    drone3 = DroneTrajectory(
        drone_id="survey_drone_3",
        waypoints=drone3_waypoints,
        start_time=10,
        end_time=100,
        color='red'
    )
    other_flights.append(drone3)

    # Drone 4: Potentially conflicting path
    drone4_waypoints = [
        Waypoint(150, 0, 15),
        Waypoint(100, 50, 15),
        Waypoint(50, 100, 15),
        Waypoint(0, 150, 15)
    ]
    drone4 = DroneTrajectory(
        drone_id="photography_drone_4",
        waypoints=drone4_waypoints,
        start_time=50,
        end_time=170,
        color='purple'
    )
    other_flights.append(drone4)

    return primary_mission, other_flights

def create_conflict_free_scenario() -> Tuple[DroneTrajectory, List[DroneTrajectory]]:
    """Create a scenario with no conflicts between drones."""
    # Primary mission
    primary_waypoints = [
        Waypoint(0, 0, 20),
        Waypoint(100, -50, 20),
        Waypoint(200, 0, 20),
        Waypoint(250, 50, 20)
    ]

    primary_mission = DroneTrajectory(
        drone_id="mapping_drone_1",
        waypoints=primary_waypoints,
        start_time=0,
        end_time=180,  # 3-minute mission
        color='blue'
    )

    other_flights = []

    # Other drones flying in different areas
    drone2_waypoints = [
        Waypoint(-100, -100, 30),
        Waypoint(-50, -150, 30),
        Waypoint(0, -100, 30),
        Waypoint(-50, -50, 30)
    ]
    drone2 = DroneTrajectory(
        drone_id="survey_drone_2",
        waypoints=drone2_waypoints,
        start_time=20,
        end_time=140,
        color='green'
    )
    other_flights.append(drone2)

    # Higher altitude drone
    drone3_waypoints = [
        Waypoint(50, 50, 60),
        Waypoint(100, 100, 60),
        Waypoint(150, 150, 60),
        Waypoint(200, 200, 60)
    ]
    drone3 = DroneTrajectory(
        drone_id="weather_drone_3",
        waypoints=drone3_waypoints,
        start_time=0,
        end_time=200,
        color='orange'
    )
    other_flights.append(drone3)

    # Drone that flies after primary mission completes
    drone4_waypoints = [
        Waypoint(0, 0, 25),
        Waypoint(100, 50, 25),
        Waypoint(200, 0, 25)
    ]
    drone4 = DroneTrajectory(
        drone_id="delivery_drone_4",
        waypoints=drone4_waypoints,
        start_time=200,  # Starts after primary mission ends
        end_time=300,
        color='red'
    )
    other_flights.append(drone4)

    return primary_mission, other_flights



In [10]:
def run_demonstration():
    """Run a full demonstration of the deconfliction system."""
    print("UAV Strategic Deconfliction Demo")
    print("--------------------------------")

    # Create deconfliction system
    system = DeconflictionSystem(safety_buffer=15.0)

    # Example 1: Conflict scenario
    print("\n### SCENARIO 1: POTENTIAL CONFLICTS ###")
    primary_mission, other_flights = create_sample_scenario()

    # Register other flights in the system
    system.register_flights(other_flights)

    # Check mission safety
    results = system.check_mission_safety(primary_mission, time_resolution=1.0)

    # Print results
    print(f"Mission ID: {results['mission_id']}")
    print(f"Status: {results['status']}")
    print(f"Total conflicts detected: {results['total_conflicts']}")

    if results['conflicts']:
        print("\nConflict Details:")
        for i, conflict in enumerate(results['conflicts'][:5], 1):  # Show first 5 conflicts
            print(f"  Conflict {i}:")
            print(f"    Time: {conflict['time']:.2f}s")
            print(f"    Position: ({conflict['position'][0]:.1f}, {conflict['position'][1]:.1f}, {conflict['position'][2]:.1f})")
            print(f"    Conflicting drone: {conflict['conflicting_drone']}")
            print(f"    Distance: {conflict['distance']:.2f}m (buffer: {conflict['required_buffer']}m)")

        if len(results['conflicts']) > 5:
            print(f"    ... and {len(results['conflicts']) - 5} more conflicts")

    # Create visualization for this scenario
    print("\nCreating 4D animation for conflict scenario...")
    #os.makedirs('output', exist_ok=True)
    system.create_4d_animation(primary_mission, output_file='/content/drive/My Drive/Flytbase Assignment/conflict_scenario.mp4')

    # Example 2: Conflict-free scenario
    print("\n### SCENARIO 2: CONFLICT-FREE ###")

    # Reset system
    system = DeconflictionSystem(safety_buffer=15.0)

    # Create new scenario
    primary_mission, other_flights = create_conflict_free_scenario()

    # Register other flights
    system.register_flights(other_flights)

    # Check mission safety
    results = system.check_mission_safety(primary_mission, time_resolution=1.0)

    # Print results
    print(f"Mission ID: {results['mission_id']}")
    print(f"Status: {results['status']}")
    print(f"Total conflicts detected: {results['total_conflicts']}")

    # Create visualization for this scenario
    print("\nCreating 4D animation for conflict-free scenario...")
    system.create_4d_animation(primary_mission, output_file='/content/drive/My Drive/Flytbase Assignment/conflict_free_scenario.mp4')

    print("\nDemonstration complete. Check the output directory for animations.")


In [9]:
# prompt: mount my google drive here

from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [11]:
if __name__ == "__main__":
    run_demonstration()

UAV Strategic Deconfliction Demo
--------------------------------

### SCENARIO 1: POTENTIAL CONFLICTS ###
Mission ID: delivery_drone_1
Status: conflict detected
Total conflicts detected: 50

Conflict Details:
  Conflict 1:
    Time: 28.00s
    Position: (34.9, 34.9, 17.0)
    Conflicting drone: survey_drone_3
    Distance: 14.87m (buffer: 15.0m)
  Conflict 2:
    Time: 29.00s
    Position: (36.2, 36.2, 17.2)
    Conflicting drone: survey_drone_3
    Distance: 14.56m (buffer: 15.0m)
  Conflict 3:
    Time: 30.00s
    Position: (37.4, 37.4, 17.5)
    Conflicting drone: survey_drone_3
    Distance: 14.25m (buffer: 15.0m)
  Conflict 4:
    Time: 31.00s
    Position: (38.7, 38.7, 17.7)
    Conflicting drone: survey_drone_3
    Distance: 13.94m (buffer: 15.0m)
  Conflict 5:
    Time: 32.00s
    Position: (39.9, 39.9, 18.0)
    Conflicting drone: survey_drone_3
    Distance: 13.63m (buffer: 15.0m)
    ... and 45 more conflicts

Creating 4D animation for conflict scenario...
Animation saved t