# Algorithm function

## RDP Line Simplification Algorithm

In [None]:
def distance(a, b):
    return  np.sqrt((a[0] - b[0]) ** 2 + (a[1] - b[1]) ** 2)


def point_line_distance(point, start, end):
    if (start == end).all():
        return distance(point, start)
    else:
        n = abs(
            (end[0] - start[0]) * (start[1] - point[1]) -
            (start[0] - point[0]) * (end[1] - start[1])
        )
        d = np.sqrt(
            (end[0] - start[0]) ** 2 + (end[1] - start[1]) ** 2
        )
        return n / d


def rdp(points, epsilon):
    """Reduces a series of points to a simplified version that loses detail, but
    maintains the general shape of the series.
    """
    dmax = 0.0
    index = 0
    for i in range(1, len(points) - 1):
        d = point_line_distance(points[i], points[0], points[-1])
        if d > dmax:
            index = i
            dmax = d

    if dmax >= epsilon:
        results = rdp(points[:index+1], epsilon)[:-1] + rdp(points[index:], epsilon)
    else:
        results = [points[0], points[-1]]

    return results

## Route Detection

In [None]:
import numpy as np
from utils.cal import Calculation

def detect_sid_star(simplified_trajectory, procedure_dict, waypoints_coord_dict):
    """
    Detect SID and STAR

    Parameters
    ----------
    simplified_trajectory : float[lat, long]
        Simplified trajectory
    procedure_dict : dict
        Procedure dictionary
    waypoints_coord_dict : dict
        SID/STAR waypoint coordinate dictionary

    Returns
    -------
    SID/STAR : string
        Identified SID/STAR
    """
    area_list = []
    ats_list = []
    wp_lat = np.array(list(waypoints_coord_dict.values()))[:,0]
    wp_long = np.array(list(waypoints_coord_dict.values()))[:,1]
    for ats, waypoints in procedure_dict.items():
        # All waypoints in one procedure
        wp_list = np.empty([0,2])
        for wp in waypoints:
            wp_list = np.vstack((wp_list, waypoints_coord_dict[wp]))
        # Calculate total area between each segment and its cloest waypoint
        total_area = 0
        trajectory_in_area = np.empty([0,2])
        for i in range(len(simplified_trajectory)-1):
            # If the segment is within the procedure region
            if (simplified_trajectory[i][0] >= np.min(wp_lat)) & (simplified_trajectory[i][0] <= np.max(wp_lat)) & (simplified_trajectory[i][1] >= np.min(wp_long)) & (simplified_trajectory[i][1] <= np.max(wp_long)):
                trajectory_in_area = np.vstack((trajectory_in_area, simplified_trajectory[i], simplified_trajectory[i+1]))
                cross_dist = Calculation.cal_cross_track_distance(simplified_trajectory[i][0], simplified_trajectory[i][1], simplified_trajectory[i+1][0], simplified_trajectory[i+1][1], wp_list[:,0], wp_list[:,1])
                area = np.abs(Calculation.cal_great_circle_distance(simplified_trajectory[i][0], simplified_trajectory[i][1], simplified_trajectory[i+1][0], simplified_trajectory[i+1][1]) * cross_dist / 2.0)
                total_area += np.min(area)

        area_list.append(total_area)
        ats_list.append(ats)
        print(ats, total_area)
    
    return ats_list[np.argmin(area_list)], trajectory_in_area

# File input

In [None]:
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import cartopy.crs as ccrs

# Flight Radar
file = '20180601/KA809.csv'
file_path = os.path.join(os.getcwd(), '../data/replay/historic/', file)
df = pd.read_csv(file_path)
traj = df[['lat', 'long']].to_numpy()

# OpenSKY
# file_path = os.path.join(os.getcwd(), '../data/replay/historic/AAL125.csv')
# df = pd.read_csv(file_path)
# traj = df[['latitude', 'longitude']].to_numpy()

print(traj.shape)
simplified = np.array(rdp(traj, 0.005))
print(simplified.shape)

# Flight Plan Detection

In [None]:
from core.nav import Nav

min_lat = np.min(traj[:,0])
max_lat = np.max(traj[:,0])
min_long = np.min(traj[:,1])
max_long = np.max(traj[:,1])
print(min_lat, min_long, max_lat, max_long)

waypoints_in_area = Nav.get_fix_in_area(min_lat-0.5, min_long-0.5, max_lat+0.5, max_long+0.5)
print(waypoints_in_area.shape)

## SID/STAR Detection Algorithm

### Departure

In [None]:
from core.nav import Nav

departure_airport = Nav.find_closest_airport_runway(traj[0,0], traj[0,1])
departure_procedures = Nav.get_instrument_procedures(departure_airport[0], "SID")

# Get all departure route and related waypoints
departure_waypoints = []
departure_dict={}
for sid in departure_procedures:
    wp = Nav.get_procedure(departure_airport[0], departure_airport[1], sid)[0]
    wp = [ele for ele in wp if ele.strip()]
    departure_waypoints.extend(wp)
    departure_dict[sid] = wp
departure_waypoints = np.unique(departure_waypoints)

# Get coordinate of all arrival waypoints
departure_waypoints_coord_dict = {}
for wp in departure_waypoints:
    coord = Nav.get_fix_coordinate(wp, departure_airport[2], departure_airport[3])
    departure_waypoints_coord_dict[wp] = list(coord)

print(departure_airport)
print(departure_procedures)
# print(departure_dict)

In [None]:
# Detect departure
departure_result, departure_trajectory = detect_sid_star(simplified, departure_dict, departure_waypoints_coord_dict)

print("SID Detected:", departure_result)

### Arrival

In [None]:
from core.nav import Nav

arrival_airport = Nav.find_closest_airport_runway(traj[-1,0], traj[-1,1])
arrival_procedures = Nav.get_instrument_procedures(arrival_airport[0], "STAR")

# Get all arrival route and related waypoints
arrival_waypoints = []
arrivals_dict={}
for star in arrival_procedures:
    wp = Nav.get_procedure(arrival_airport[0], arrival_airport[1], star)[0]
    wp = [ele for ele in wp if ele.strip()]
    arrival_waypoints.extend(wp)
    arrivals_dict[star] = wp
arrival_waypoints = np.unique(arrival_waypoints)

# Get coordinate of all arrival waypoints
arrival_waypoints_coord_dict = {}
for wp in arrival_waypoints:
    coord = Nav.get_fix_coordinate(wp, arrival_airport[2], arrival_airport[3])
    arrival_waypoints_coord_dict[wp] = list(coord)

print(arrival_airport)
print(arrival_procedures)
# print(arrivals_dict)

In [None]:
# Detect arrival
arrival_result, arrival_trajectory  = detect_sid_star(simplified, arrivals_dict, arrival_waypoints_coord_dict)

print("STAR Detected:", arrival_result)

### Approach

In [None]:
from core.nav import Nav

approach_airport = Nav.find_closest_airport_runway(traj[-1,0], traj[-1,1])
approach_procedures = Nav.get_instrument_procedures(approach_airport[0], "APPCH")
ils = [str for str in approach_procedures if "I" in str]
ils_runway = [str.replace('I', '') for str in approach_procedures if "I" in str]
# Runway without ils
missed_procedure = [] 
for procedure in approach_procedures:
    hv_runway = [runway for runway in ils_runway if runway in procedure]
    if len(hv_runway) == 0:
        missed_procedure.append(procedure)
approach_procedures = ils + missed_procedure

# Get all arrival route and related waypoints
approach_waypoints = []
approach_dict={}
for approach in approach_procedures:
    wp = Nav.get_procedure(approach_airport[0], approach_airport[1], approach)[0]
    wp = [ele for ele in wp if ele.strip() and "RW" not in ele]
    approach_waypoints.extend(wp)
    approach_dict[approach] = wp
approach_waypoints = np.unique(approach_waypoints)

# Get coordinate of all arrival waypoints
approach_waypoints_coord_dict = {}
for wp in approach_waypoints:
    coord = Nav.get_fix_coordinate(wp, approach_airport[2], approach_airport[3])
    approach_waypoints_coord_dict[wp] = list(coord)

print(approach_airport)
print(approach_procedures)
print(approach_dict)

In [None]:
# Detect approach
approach_result, approach_trajectory = detect_sid_star(simplified, approach_dict, approach_waypoints_coord_dict)

print("Approach Detected:", approach_result)

## Enroute Detection

In [None]:
from core.nav import Nav

# print(Nav.airway)
airways = Nav.airway[Nav.airway[0].isin(waypoints_in_area[:,2]) & Nav.airway[3].isin(waypoints_in_area[:,2])]

# enroute_waypoints = waypoints_in_area[np.isin(waypoints_in_area[:,2], np.concatenate((arrival_waypoints, departure_waypoints, approach_waypoints)))]
trim_trajectory = np.vstack((departure_trajectory[:-3,], arrival_trajectory[3:,], approach_trajectory))
enroute_trajectory = simplified[np.isin(simplified, trim_trajectory, invert=True)].reshape(-1,2)[0:-1,:]
# enroute_trajectory = simplified
# enroute_trajectory = np.array(rdp(enroute_trajectory, 0.1))

waypoint_area = np.empty([0,len(waypoints_in_area[:,2])])
for i in range(len(enroute_trajectory) -1):
    projection = np.dot(waypoints_in_area[:,0:2]-enroute_trajectory[i], enroute_trajectory[i+1]-enroute_trajectory[i])/np.dot(enroute_trajectory[i+1]-enroute_trajectory[i],enroute_trajectory[i+1]-enroute_trajectory[i])
    cross_dist = Calculation.cal_cross_track_distance(enroute_trajectory[i][0], enroute_trajectory[i][1], enroute_trajectory[i+1][0], enroute_trajectory[i+1][1], waypoints_in_area[:,0].astype(float), waypoints_in_area[:,1].astype(float))
    area = np.where((0 <= projection) & (projection <= 1), np.abs(cross_dist), np.Inf)
    waypoint_area = np.vstack((waypoint_area, area))

segment_index = np.argmin(waypoint_area, axis=0)
waypoint_area = np.min(waypoint_area, axis=0)

    
waypoint_coord_dict = {}
waypoint_area_dict = {}
waypoint_segment_dict = {}
for key, area, lat, long, segment in zip(waypoints_in_area[:,2], waypoint_area, waypoints_in_area[:,0], waypoints_in_area[:,1], segment_index):
    waypoint_area_dict[key] = area
    waypoint_segment_dict[key] = segment
    waypoint_coord_dict[key] = [lat, long]


below_threshold = []
for index, airway in airways.iterrows():
    # If area of the waypoint is smaller then a threshold
    if (waypoint_area_dict[airway[0]] + waypoint_area_dict[airway[3]] < 1) and (waypoint_area_dict[airway[0]] < 0.5) and (waypoint_area_dict[airway[3]] < 0.5):
        traj_vector = enroute_trajectory[waypoint_segment_dict[airway[0]]+1,:] - enroute_trajectory[waypoint_segment_dict[airway[0]],:]
        airway_vector = np.array(waypoint_coord_dict[airway[3]]) - np.array(waypoint_coord_dict[airway[0]])
        if np.abs(np.cross(traj_vector, airway_vector)) < np.linalg.norm(traj_vector) * np.linalg.norm(airway_vector) * 2/9: 
            below_threshold.append(True)
        else:
            below_threshold.append(False)
    else:
        below_threshold.append(False)

airways_result = airways[below_threshold]

# below_threshold = []
# for index, airway in airways_result.iterrows():
#     start = np.array(waypoint_coord_dict[airway[0]])
#     end = np.array(waypoint_coord_dict[airway[3]])
#     lat1, long1 = waypoint_coord_dict[airway[0]]
#     lat2, long2 = waypoint_coord_dict[airway[3]]

#     projection = np.dot(enroute_trajectory[:,0:2]-start,end-start)/np.dot(end-start, end-start)
#     cross_dist = Calculation.cal_cross_track_distance(lat1, long1, lat2, long2, enroute_trajectory[:,0].astype(float), enroute_trajectory[:,1].astype(float))

#     dist = cross_dist[(0 <= projection) & (projection <= 1)]

#     if (len(dist) == 0) | (np.sum(dist)/len(dist) < 0.001):
#         below_threshold.append(True)
#     else:
#         below_threshold.append(False)

# airways_result = airways_result[below_threshold]


In [None]:
print(departure_result)
print(departure_dict[departure_result])
print(arrival_result)
print(arrivals_dict[arrival_result])
print(approach_result)
print(approach_dict[approach_result])
print(airways_result)

# Plot all result

In [None]:
%config InlineBackend.figure_format = 'svg'
fig, axs = plt.subplots(2, 2, subplot_kw={'projection': ccrs.PlateCarree()}, dpi=300, figsize=(15, 15))

# Overview
axs[0, 0].set_title("Overview")
axs[0, 0].coastlines(resolution='10m', color='black', linewidth=0.5)
axs[0, 0].plot(traj[:,1], traj[:,0], '--', c='tab:blue', lw=1, zorder=2)
axs[0, 0].scatter(x=simplified[:,1], y=simplified[:,0], c='tab:blue', s=5)
axs[0, 0].set_aspect('auto')

# Enroute
axs[0, 1].set_title("Enroute")
axs[0, 1].coastlines(resolution='10m', color='black', linewidth=0.5)
axs[0, 1].scatter(x=waypoints_in_area[:,1], y=waypoints_in_area[:,0], c = 'lightgrey', s=1)
axs[0, 1].plot(traj[:,1], traj[:,0], '--', c='tab:blue', lw=1, zorder=0)
axs[0, 1].scatter(x=simplified[:,1], y=simplified[:,0], c='tab:blue', s=5)
for index, airway in airways_result.iterrows():
    lat1, long1 = waypoint_coord_dict[airway[0]]
    lat2, long2 = waypoint_coord_dict[airway[3]]
    axs[0, 1].plot([long1, long2], [lat1, lat2], '-', c='tab:red', lw=1, zorder=2)
for index, airway in airways.iterrows():
    lat1, long1 = waypoint_coord_dict[airway[0]]
    lat2, long2 = waypoint_coord_dict[airway[3]]
    axs[0, 1].plot([long1, long2], [lat1, lat2], '-', c='lightgrey', lw=0.25, zorder=1)
axs[0, 1].set_aspect('auto')

# Departure
departure_path = np.empty([0,2])
_, idx = np.unique(departure_dict[departure_result], return_index=True) 
for wp in np.array(departure_dict[departure_result])[np.sort(idx)]:
    departure_path = np.vstack((departure_path, departure_waypoints_coord_dict[wp]))
departure_wp_long = np.array(list(departure_waypoints_coord_dict.values()))[:,1]
departure_wp_lat = np.array(list(departure_waypoints_coord_dict.values()))[:,0]
axs[1, 0].set_title("Departure: " + departure_result)
axs[1, 0].set_extent([np.min(departure_wp_long)-0.2, np.max(departure_wp_long)+0.2, np.min(departure_wp_lat)-0.2, np.max(departure_wp_lat)+0.2], crs=ccrs.PlateCarree())
axs[1, 0].coastlines(resolution='10m', color='black', linewidth=1)
axs[1, 0].scatter(x=waypoints_in_area[:,1], y=waypoints_in_area[:,0], c='lightgrey', marker='o', s=5, zorder=0)
axs[1, 0].scatter(x=departure_wp_long, y=departure_wp_lat, c='tab:pink', s=2.5)
axs[1, 0].plot(departure_path[:,1], departure_path[:,0], '-', c='tab:red', lw=1, zorder=1)
axs[1, 0].plot(traj[:,1], traj[:,0], '--', c='tab:blue', lw=1, zorder=2)
axs[1, 0].scatter(x=simplified[:,1], y=simplified[:,0], c='tab:blue', s=5)
axs[1, 0].set_aspect('auto')

# Arrival and Approach
# Arrival Path
arrival_path = np.empty([0,2])
_, idx = np.unique(arrivals_dict[arrival_result], return_index=True) 
for wp in np.array(arrivals_dict[arrival_result])[np.sort(idx)]:
    arrival_path = np.vstack((arrival_path, arrival_waypoints_coord_dict[wp]))
arrival_wp_long = np.array(list(arrival_waypoints_coord_dict.values()))[:,1]
arrival_wp_lat = np.array(list(arrival_waypoints_coord_dict.values()))[:,0]
# Approach path
approach_path = np.empty([0,2])
_, idx = np.unique(approach_dict[approach_result], return_index=True) 
for wp in np.array(approach_dict[approach_result])[np.sort(idx)]:
    approach_path = np.vstack((approach_path, approach_waypoints_coord_dict[wp]))
approach_wp_long = np.array(list(approach_waypoints_coord_dict.values()))[:,1]
approach_wp_lat = np.array(list(approach_waypoints_coord_dict.values()))[:,0]
axs[1, 1].set_title("Arrival: " + arrival_result + " & Approach:" + approach_result)
axs[1, 1].set_extent([np.min(arrival_wp_long)-0.2, np.max(arrival_wp_long)+0.2, np.min(arrival_wp_lat)-0.2, np.max(arrival_wp_lat)+0.2], crs=ccrs.PlateCarree())
axs[1, 1].coastlines(resolution='10m', color='black', linewidth=1)
axs[1, 1].scatter(x=waypoints_in_area[:,1], y=waypoints_in_area[:,0], c='lightgrey', marker='o', s=5, zorder=0)
axs[1, 1].scatter(x=arrival_wp_long, y=arrival_wp_lat, c='tab:pink', s=2.5)
axs[1, 1].plot(arrival_path[:,1], arrival_path[:,0], '-', c='tab:red', lw=1, zorder=1)
axs[1, 1].scatter(x=approach_wp_long, y=approach_wp_lat, c='tab:pink', s=2.5)
axs[1, 1].plot(approach_path[:,1], approach_path[:,0], '-', c='tab:red', lw=1, zorder=1)
axs[1, 1].plot(traj[:,1], traj[:,0], '--', c='tab:blue', lw=1, zorder=2)
axs[1, 1].scatter(x=simplified[:,1], y=simplified[:,0], c='tab:blue', s=5)
axs[1, 1].set_aspect('auto')


fig.tight_layout()
from matplotlib.lines import Line2D
fig.suptitle(file + "   " + departure_airport[0] + "/" + arrival_airport[0])
fig.legend(handles=[Line2D([0], [0], c='tab:blue', ls="--", lw=1, marker='o', ms=3, label='Trajectory'),
                    Line2D([0], [0], c='white', markerfacecolor='lightgrey', marker='o', ms=5, label='Fix & NavAids'),
                    Line2D([0], [0], c='lightgrey', ls="-",  lw=1, label='Airways'),
                    Line2D([0], [0], c='white', markerfacecolor='tab:pink', marker='o', ms=5, label='Waypoints considered'),
                    Line2D([0], [0], c='tab:red', ls="-", lw=1, label='Identified ATS route'),
                    ], loc='lower center', ncol=5, frameon=False)
fig.subplots_adjust(bottom=0.025, top = 0.96)
plt.show()

In [None]:
fig, axs = plt.subplots(1, 1, subplot_kw={'projection': ccrs.PlateCarree()}, dpi=300, figsize=(15, 15))
# Enroute
axs.set_title("All")
axs.coastlines(resolution='10m', color='black', linewidth=0.5)
axs.scatter(x=waypoints_in_area[:,1], y=waypoints_in_area[:,0], c = 'lightgrey', s=1)
axs.plot(traj[:,1], traj[:,0], '--', c='tab:blue', lw=1, zorder=2)
axs.scatter(x=enroute_trajectory[:,1], y=enroute_trajectory[:,0], c ='blue', s=5)
for index, airway in airways_result.iterrows():
    lat1, long1 = waypoint_coord_dict[airway[0]]
    lat2, long2 = waypoint_coord_dict[airway[3]]
    axs.plot([long1, long2], [lat1, lat2], '-', c='tab:red', lw=1, zorder=1)
for index, airway in airways.iterrows():
    lat1, long1 = waypoint_coord_dict[airway[0]]
    lat2, long2 = waypoint_coord_dict[airway[3]]
    axs.plot([long1, long2], [lat1, lat2], '-', c='lightgrey', lw=0.25, zorder=0)

# axs.set_title("All")
# axs.coastlines(resolution='10m', color='black', linewidth=0.5)
# axs.scatter(x=waypoints_in_area[:,1], y=waypoints_in_area[:,0], c = 'lightgrey', s=1)
# axs.plot(traj[:,1], traj[:,0], '--', c='tab:blue', lw=1, zorder=0)
# axs.scatter(x=simplified[:,1], y=simplified[:,0], c='tab:blue', s=5)
# for index, airway in airways_result.iterrows():
#     lat1, long1 = waypoint_coord_dict[airway[0]]
#     lat2, long2 = waypoint_coord_dict[airway[3]]
#     axs.plot([long1, long2], [lat1, lat2], '-', c='tab:red', lw=1, zorder=2)
# for index, airway in airways.iterrows():
#     lat1, long1 = waypoint_coord_dict[airway[0]]
#     lat2, long2 = waypoint_coord_dict[airway[3]]
#     axs.plot([long1, long2], [lat1, lat2], '-', c='lightgrey', lw=0.25, zorder=1)
axs.plot(departure_path[:,1], departure_path[:,0], '-', c='tab:green', lw=1, zorder=3)
axs.plot(arrival_path[:,1], arrival_path[:,0], '-', c='tab:green', lw=1, zorder=3)
axs.plot(approach_path[:,1], approach_path[:,0], '-', c='tab:green', lw=1, zorder=3)
axs.set_aspect('auto')
