In [8]:
#Install Dependencies
!pip install --quiet scipy scikit-learn shapely pulp ortools jupyter-dash plotly dash

#Imports for Core Functionality
import numpy as np
from scipy.spatial import KDTree
from sklearn.neighbors import KDTree as SKKDTree
from shapely.geometry import Point, LineString

#Optimization libs
from pulp import LpProblem, LpVariable, LpMinimize, lpSum
from ortools.linear_solver import pywraplp

#Dash for interactive viz
from jupyter_dash import JupyterDash
import dash
from dash import dcc, html

#Plotly graph objects
import plotly.graph_objs as go

print("✅ Dependencies installed and libraries imported successfully!")


✅ Dependencies installed and libraries imported successfully!


In [9]:
# Generate & Discretize 4D Waypoints

import numpy as np

def interpolate_waypoints(waypoints, t_start, t_end, num_samples=200):
    """
    Linearly interpolate a list of 2D/3D waypoints into timestamped 4D points.
    waypoints:   list of [x, y] or [x, y, z]
    t_start:     mission start time (float)
    t_end:       mission end time (float)
    num_samples: total number of (x,y,z,t) samples to generate
    """
    # total number of segments
    n_seg = len(waypoints) - 1
    # create uniform time samples
    times = np.linspace(t_start, t_end, num_samples)
    # map each time to a floating segment index
    u = (times - t_start) / (t_end - t_start) * n_seg
    seg_idx = np.floor(u).astype(int).clip(0, n_seg - 1)
    frac    = (u - seg_idx).reshape(-1,1)

    pts = []
    for i, ts in enumerate(times):
        p0 = np.array(waypoints[seg_idx[i]])
        p1 = np.array(waypoints[seg_idx[i] + 1])
        xyz = p0 + frac[i] * (p1 - p0)
        # ensure 4D: (x,y,z,t)
        if xyz.shape[0] == 2:
            xyz = np.append(xyz, 0.0)
        pts.append([*xyz.tolist(), float(ts)])
    return np.array(pts)

# -- Primary mission route --
primary_waypoints = [
    [0,  0, 0],
    [10, 5, 3],
    [20,15, 0]
]
primary = interpolate_waypoints(primary_waypoints, t_start=0.0, t_end=12.0, num_samples=250)

# -- Simulated “other drones” missions --
sim_pts, sim_ids = [], []
for drone_id in range(4):
    # random 3D waypoints in a 30×30×10 volume
    rnd_wps = (np.random.rand(5,3) * [30,30,10]).tolist()
    pts = interpolate_waypoints(rnd_wps, t_start=0.0, t_end=12.0, num_samples=180)
    sim_pts.append(pts)
    sim_ids.extend([drone_id]*len(pts))

sim_pts = np.vstack(sim_pts)
sim_ids = np.array(sim_ids, dtype=int)

print("Primary shape:", primary.shape)       # → (250, 4)
print("Simulated  shape:", sim_pts.shape)    # → (4*180, 4)
print("Sim IDs    shape:", sim_ids.shape)    # → (4*180,)


Primary shape: (250, 4)
Simulated  shape: (720, 4)
Sim IDs    shape: (720,)


In [10]:
from sklearn.neighbors import KDTree
import numpy as np

def check_conflict_kdtree(primary, sim_pts, sim_ids, spatial_radius=2.0, temporal_tolerance=0.5):
    """
    Checks for 4D conflicts using a KDTree (time-scaled into space).

    Args:
        primary (np.ndarray): Primary drone path (Nx4) → [x, y, z, t]
        sim_pts (np.ndarray): Simulated drone paths (Mx4)
        sim_ids (np.ndarray): Corresponding drone IDs for sim_pts (M,)
        spatial_radius (float): Min separation distance (in meters)
        temporal_tolerance (float): Time margin for conflict (in seconds)

    Returns:
        dict: {'status': 'clear'} if no conflict, or
              {'status': 'conflict', 'time': t, 'location': [x,y,z], 'drone_id': id}
    """
    # Scale time to match spatial radius
    time_scale = spatial_radius / temporal_tolerance

    # Prepare data
    sim_scaled = sim_pts.copy()
    sim_scaled[:, 3] *= time_scale
    tree = KDTree(sim_scaled)

    for idx, pt in enumerate(primary):
        query_pt = pt.copy()
        query_pt[3] *= time_scale

        neighbors = tree.query_radius([query_pt], r=spatial_radius)[0]
        if len(neighbors) > 0:
            si = neighbors[0]
            return {
                'status': 'conflict',
                'time': float(pt[3]),
                'location': pt[:3].tolist(),
                'drone_id': int(sim_ids[si]),
                'primary_idx': idx,
                'drone_point': sim_pts[si].tolist()
            }

    return {'status': 'clear'}


In [11]:
result = check_conflict_kdtree(primary, sim_pts, sim_ids)

if result['status'] == 'conflict':
    print(f"   Conflict at t = {result['time']:.2f}s")
    print(f"   Primary point #{result['primary_idx']} → {result['location']}")
    print(f"   Collides with Drone {result['drone_id']} at point {result['drone_point']}")
else:
    print("✅ Path is clear — no conflicts.")


   Conflict at t = 7.66s
   Primary point #159 → [12.771084337349398, 7.771084337349398, 2.16867469879518]
   Collides with Drone 0 at point [13.862254599169832, 7.746677959805442, 1.5897870746212588, 8.044692737430168]


In [12]:
# Conflict Check Wrapper Function
from sklearn.neighbors import KDTree

def check_conflict(primary, sim_pts, sim_ids, buffer=2.0, temporal_tolerance=0.5):
    """
    Returns:
    - {'status': 'clear'} if no conflict
    - {'status': 'conflict', 'time': t, 'location': [x,y,z], 'drone_id': int, 'primary_idx': int}
    """
    # Scale time so that time tolerance maps to spatial radius
    time_scale = buffer / temporal_tolerance
    sim_data = sim_pts.copy()
    sim_data[:, 3] *= time_scale
    tree = KDTree(sim_data)

    for i, pt in enumerate(primary):
        q_pt = pt.copy()
        q_pt[3] *= time_scale
        neighbors = tree.query_radius([q_pt], r=buffer)[0]
        if len(neighbors) > 0:
            j = neighbors[0]
            return {
                'status': 'conflict',
                'time': pt[3],
                'location': pt[:3].tolist(),
                'drone_id': int(sim_ids[j]),
                'primary_idx': i
            }

    return {'status': 'clear'}


In [13]:
# Conflict-Free Scenario Generation & Visualization

# 1. To create a conflict-free version of simulated drone trajectories
# Offset them far from the primary path (e.g., +50m in X)
sim_pts_safe = sim_pts.copy()
sim_pts_safe[:, 0] += 50  # offset X-coordinate
sim_ids_safe = sim_ids.copy()

# 2.To run conflict check using the same KD-tree method
result_safe = check_conflict(primary, sim_pts_safe, sim_ids_safe, buffer=2.0)

# 3.To visualize conflict-free trajectory (optional but recommended)
import plotly.graph_objects as go
import numpy as np

def visualize_conflict_free(primary, sim_pts, title="Conflict-Free Scenario"):
    frame_times = np.linspace(primary[:, 3].min(), primary[:, 3].max(), 60)
    frames = []

    for t in frame_times:
        p_pts = primary[primary[:, 3] <= t]
        s_pts = sim_pts[sim_pts[:, 3] <= t]

        data = [
            go.Scatter3d(
                x=p_pts[:, 0], y=p_pts[:, 1], z=p_pts[:, 2],
                mode='lines+markers', name='Primary',
                marker=dict(size=3), line=dict(width=2, color='blue')
            ),
            go.Scatter3d(
                x=s_pts[:, 0], y=s_pts[:, 1], z=s_pts[:, 2],
                mode='markers', name='Other Drones',
                marker=dict(size=4, opacity=0.6, color='orange')
            )
        ]

        layout = go.Layout(
            scene=dict(
                xaxis=dict(range=[0, 80]),
                yaxis=dict(range=[0, 30]),
                zaxis=dict(range=[0, 10])
            ),
            annotations=[dict(
                showarrow=False,
                x=0.05, y=0.95, xref="paper", yref="paper",
                text=f"🕒 t = {t:.1f}s",
                font=dict(size=16)
            )]
        )

        frames.append(go.Frame(data=data, name=f"{t:.2f}", layout=layout))

    # Initial plot
    init_t = frame_times[0]
    init_mask = primary[:, 3] <= init_t

    fig = go.Figure(
        data=[
            go.Scatter3d(
                x=primary[init_mask, 0], y=primary[init_mask, 1], z=primary[init_mask, 2],
                mode='lines+markers', marker=dict(size=3), line=dict(width=2, color='blue'),
                name='Primary'
            ),
            go.Scatter3d(
                x=sim_pts[sim_pts[:, 3] <= init_t, 0],
                y=sim_pts[sim_pts[:, 3] <= init_t, 1],
                z=sim_pts[sim_pts[:, 3] <= init_t, 2],
                mode='markers', marker=dict(size=4, opacity=0.6, color='orange'),
                name='Other Drones'
            )
        ],
        layout=go.Layout(
            title=title,
            scene=dict(
                xaxis_title='X', yaxis_title='Y', zaxis_title='Z',
                xaxis=dict(range=[0, 80]), yaxis=dict(range=[0, 30]), zaxis=dict(range=[0, 10])
            ),
            updatemenus=[dict(
                type="buttons", showactive=False,
                buttons=[dict(label="▶ Play",
                              method="animate",
                              args=[None, {"frame": {"duration": 100, "redraw": True},
                                           "fromcurrent": True}])
                         ])
            ]
        ),
        frames=frames
    )

    fig.show()

# 4. Show result
if result_safe['status'] == 'clear':
    print("✅ Conflict-free scenario confirmed.")
else:
    print("Unexpected conflict detected.")

# 5. Visualize
visualize_conflict_free(primary, sim_pts_safe)


✅ Conflict-free scenario confirmed.


In [14]:
import numpy as np
import plotly.graph_objects as go

# 1) Force a conflict at midpoint
t_conflict = (primary[:, 3].min() + primary[:, 3].max()) / 2
pc = primary[np.argmin(np.abs(primary[:, 3] - t_conflict))]
sim_pts[0, :] = pc  # Inject conflict
conflicts = [{'time': float(pc[3]), 'location': pc[:3].tolist(), 'drone_id': sim_ids[0]}]

# 2) Create animation frames
frame_times = np.linspace(primary[:, 3].min(), primary[:, 3].max(), 60)
frames = []

for t in frame_times:
    p_pts = primary[primary[:, 3] <= t]
    s_pts = sim_pts[sim_pts[:, 3] <= t]

    data = [
        go.Scatter3d(
            x=p_pts[:, 0], y=p_pts[:, 1], z=p_pts[:, 2],
            mode='lines+markers', name='Primary',
            marker=dict(size=3, color='blue'),
            line=dict(width=2, color='blue')
        ),
        go.Scatter3d(
            x=s_pts[:, 0], y=s_pts[:, 1], z=s_pts[:, 2],
            mode='markers', name='Other Drones',
            marker=dict(size=4, opacity=0.6, color='orange')
        )
    ]

    # Dynamic annotation content
    if t >= conflicts[0]['time']:
        cx, cy, cz = conflicts[0]['location']
        annotation_text = f"🕒 t = {t:.1f}s<br>📍 ({cx:.1f}, {cy:.1f}, {cz:.1f})"
    else:
        annotation_text = f"🕒 t = {t:.1f}s"

    annotations = [dict(
        showarrow=False,
        text=annotation_text,
        x=0.05, y=0.95,
        xref="paper", yref="paper",
        font=dict(size=16)
    )]

    layout = go.Layout(
        scene=dict(
            xaxis=dict(range=[0, 30]),
            yaxis=dict(range=[0, 30]),
            zaxis=dict(range=[0, 10])
        ),
        annotations=annotations
    )

    frames.append(go.Frame(data=data, name=f"{t:.2f}", layout=layout))

# 3) Initial static plot
init_t = frame_times[0]
init_mask = primary[:, 3] <= init_t
sim_mask = sim_pts[:, 3] <= init_t

fig = go.Figure(
    data=[
        go.Scatter3d(
            x=primary[init_mask, 0],
            y=primary[init_mask, 1],
            z=primary[init_mask, 2],
            mode='lines+markers',
            marker=dict(size=3, color='blue'),
            line=dict(width=2, color='blue'),
            name='Primary'
        ),
        go.Scatter3d(
            x=sim_pts[sim_mask, 0],
            y=sim_pts[sim_mask, 1],
            z=sim_pts[sim_mask, 2],
            mode='markers',
            marker=dict(size=4, opacity=0.6, color='orange'),
            name='Other Drones'
        )
    ],
    layout=go.Layout(
        title="4D Drone Deconfliction Animation",
        scene=dict(
            xaxis_title='X', yaxis_title='Y', zaxis_title='Z',
            xaxis=dict(range=[0, 30]),
            yaxis=dict(range=[0, 30]),
            zaxis=dict(range=[0, 10])
        ),
        updatemenus=[
            dict(
                type="buttons",
                showactive=False,
                buttons=[
                    dict(
                        label="▶ Play",
                        method="animate",
                        args=[None, {
                            "frame": {"duration": 100, "redraw": True},
                            "fromcurrent": True
                        }]
                    )
                ]
            )
        ]
    ),
    frames=frames
)

fig.show()
