In [1]:
import os
import copy
import joblib
import numpy as np
import pandas as pd
from tqdm.notebook import tqdm
from traffic.core import Flight
from traffic.core import Traffic
from joblib import Parallel, delayed

os.environ["KMP_DUPLICATE_LIB_OK"]="TRUE"
pd.set_option("display.max_columns", 100)
data_folder = os.path.join(os.getcwd(), "data")
flights_folder = os.path.join(os.getcwd(), "flightDfs")
cha_df = pd.read_csv(os.path.join(data_folder, "challenge_set.csv"))
sub_df = pd.read_csv(os.path.join(data_folder, "submission_set.csv"))
final_sub_df = pd.read_csv(os.path.join(data_folder, "final_submission_set.csv"))

parquet_paths = sorted([
    os.path.join(data_folder, file) for file in os.listdir(data_folder) if ".parquet" in file
])

usable_flight_ids = list(set(cha_df.flight_id.unique()).union(
    set(sub_df.flight_id.unique()).union(
        set(final_sub_df.flight_id.unique())
    )
))
del cha_df, sub_df, final_sub_df

if not os.path.isdir(flights_folder):
    os.mkdir(flights_folder)

complete_flights=[
    file for file in os.listdir(flights_folder) if "." not in file
]


In [2]:
def normalise_time_df(df, time_freq="1s", interpol_method="akima", time_column="timestamp", cols_to_interpol=[]):
    group_df = df.copy()
    group_df[time_column] = pd.to_datetime(group_df[time_column])  # Ensure time_column is a datetime column
    group_df = group_df.sort_values(by="timestamp")

    static_columns = group_df.columns.tolist()
    static_columns.remove(time_column)
    [static_columns.remove(interpol_column) for interpol_column in cols_to_interpol]
    
    # Generate a new DataFrame with a continuous 1-second interval for timestamps
    start_time = group_df[time_column].min()
    end_time = group_df[time_column].max()
    
    # Create a new range of timestamps at 1 second intervals
    new_timestamps = pd.date_range(start=start_time, end=end_time, freq=time_freq)
    
    # Reindex the dataframe with the new timestamps
    group_df.set_index(time_column, inplace=True)  # Set time_column as index for reindexing
    group_df = group_df.reindex(new_timestamps)  # Reindex to include every second
    
    # Interpolating columns
    group_df[cols_to_interpol] = group_df[cols_to_interpol].interpolate(method=interpol_method)

    # Forward fill constant values
    group_df[static_columns] = group_df[static_columns].ffill()
    
    # Reset the index to make time_column a column again
    group_df.reset_index(inplace=True)
    group_df = group_df.rename(columns={'index': time_column})

    return group_df

def haversine_np(lon1, lat1, lon2, lat2):
    """
    Compute the great-circle distance between two points on the Earth's surface using NumPy, in meters.
    """
    # Convert decimal degrees to radians
    lon1, lat1, lon2, lat2 = map(np.radians, [lon1, lat1, lon2, lat2])
    
    # Haversine formula
    dlon = lon2 - lon1
    dlat = lat2 - lat1
    a = np.sin(dlat / 2)**2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon / 2)**2
    c = 2 * np.arcsin(np.sqrt(a))
    
    # Radius of earth in meters (6371 km converted to meters)
    r = 6371000
    return c * r

def calculate_distances(df):
    # Shift the lat/lon columns to create the "previous" row
    lat1 = df['latitude'][:-1].values
    lon1 = df['longitude'][:-1].values
    lat2 = df['latitude'][1:].values
    lon2 = df['longitude'][1:].values
    
    # Compute the distance between consecutive points
    distances = haversine_np(lon1, lat1, lon2, lat2)
    
    # Append a NaN for the first distance (as there's no previous point for the first row)
    distances = np.insert(distances, 0, np.nan)
    
    # Add distances to DataFrame in meters
    return distances

def calculate_bearing(lat1, lon1, lat2, lon2):
    """
    Calculate the bearing (track angle) between two points using latitude and longitude.
    """
    # Convert latitude and longitude from degrees to radians
    lat1 = np.radians(lat1)
    lon1 = np.radians(lon1)
    lat2 = np.radians(lat2)
    lon2 = np.radians(lon2)
    
    # Calculate the change in longitude
    dlon = lon2 - lon1
    
    # Calculate bearing using the formula
    x = np.sin(dlon) * np.cos(lat2)
    y = np.cos(lat1) * np.sin(lat2) - np.sin(lat1) * np.cos(lat2) * np.cos(dlon)
    
    initial_bearing = np.arctan2(x, y)
    
    # Convert from radians to degrees
    initial_bearing = np.degrees(initial_bearing)
    
    # Normalize the bearing to 0 - 360 degrees
    bearing = (initial_bearing + 360) % 360
    
    return bearing

def calculate_bearings(df):
    # Shift the lat/lon columns to create the "previous" row
    lat1 = df['latitude'][:-1].values
    lon1 = df['longitude'][:-1].values
    lat2 = df['latitude'][1:].values
    lon2 = df['longitude'][1:].values
    
    # Compute the bearing between consecutive points
    bearings = calculate_bearing(lat1, lon1, lat2, lon2)
    
    # Append a NaN for the first row as there's no previous point for it
    bearings = np.insert(bearings, 0, np.nan)
    
    return bearings

def generate_phase_segments(df):
    df = df[["phase"]].copy()
    # Create a shifted phase column to detect phase changes
    df['shifted_phase'] = df['phase'].shift(1)

    # Mark where phase changes (start of a new segment)
    df['segment'] = (df['phase'] != df['shifted_phase']).cumsum()

    # Initialize the dictionary to store the segments
    phase_segments = {}

    # Use groupby to group by phase and segment and collect indices
    for (phase, segment), group in df.groupby(['phase', 'segment']):
        if phase not in phase_segments:
            phase_segments[phase] = []
        phase_segments[phase].append(np.array(list(group.index)))

    return phase_segments

def preprocess_flight(flight, output_folder="flights_folder"):
    flight.data = flight.data.drop(columns=["icao24"]).rename(
        columns={
            "u_component_of_wind": "wind_u",
            "v_component_of_wind": "wind_v"
        }
    ).reset_index(drop=True)
    
    flight = flight.filter(
        filter="aggressive",
        strategy=None
    )
    flight.data["distance"] = calculate_distances(flight.data) # meters
    flight.data["groundspeed"] = flight.data["distance"] / 0.514444 # conversion to knot
    bad_loc = (
        (flight.data.altitude > 100) &
        (flight.data.groundspeed==0)
    ).values.astype(bool)
    flight.data = flight.data.loc[~bad_loc].dropna().reset_index(drop=True)
    
    flight.data = normalise_time_df(
        flight.data, 
        time_freq="1s", 
        time_column="timestamp",
        interpol_method="akima",
        cols_to_interpol=flight.data.columns[2:]
    )
    
    flight.data["distance"] = calculate_distances(flight.data) # meters
    flight.data["groundspeed"] = flight.data["distance"] / 0.514444 # conversion to knot
    flight.data["track"] = calculate_bearings(flight.data)
    flight.data.loc[flight.data["track"]==0, "track"] = np.nan
    flight.data["track"] = flight.data["track"].ffill().bfill()
    flight.data["vertical_rate"] = flight.data["altitude"].diff().bfill() * 60 # per minute
    
    flight.data["track_x"], flight.data["track_y"] = np.sin(np.deg2rad(flight.data["track"])), np.cos(np.deg2rad(flight.data["track"]))
    flight.data['wind_mag'] = np.sqrt(flight.data['wind_u']**2 + flight.data['wind_v']**2)
    flight.data['wind_x'] = (flight.data['wind_u'] / flight.data['wind_mag']).fillna(0)
    flight.data['wind_y'] = (flight.data['wind_v'] / flight.data['wind_mag']).fillna(0)
    flight.data["track_wind_dot"] = (flight.data['track_x'] * flight.data['wind_x']) + (flight.data['track_y'] * flight.data['wind_y'])
    
    flight = flight.compute_TAS()
    flight.data["TdG_speed"] = (flight.data["TAS"] - flight.data["groundspeed"])
    flight.data["ToG_speed"] = (flight.data["TAS"] / flight.data["groundspeed"]).replace({np.inf: 0, -np.inf: 0})
    flight.data["heading_x"], flight.data["heading_y"] = np.sin(np.deg2rad(flight.data["heading"])), np.cos(np.deg2rad(flight.data["heading"]))
    flight.data["track_heading_dot"] = (flight.data['track_x'] * flight.data['heading_x']) + (flight.data['track_y'] * flight.data['heading_y'])
    flight.data["heading_wind_dot"] = (flight.data['heading_x'] * flight.data['wind_x']) + (flight.data['heading_y'] * flight.data['wind_y'])
    
    flight = flight.phases()
    flight.data = flight.data.dropna().reset_index(drop=True)
    
    # Generate phase segments and fixing LEVEL bugs
    phase_segments = generate_phase_segments(flight.data)
    if "LEVEL" in phase_segments:
        for level_indexs in phase_segments["LEVEL"]:
            first_lvl_idx = level_indexs[0]
            last_lvl_idx = level_indexs[-1]

            if first_lvl_idx > 0:
                previous_phase = flight.data.loc[first_lvl_idx-1, "phase"]
                upcoming_phase = flight.data.loc[last_lvl_idx+1, "phase"]

                # this if statment deals with LEVEL segments which occur
                # between 2 different segments, and assumes that
                # a LEVEL segment should only occur between two identical segments
                # e.g., CRUISE - LEVEL - CRUISE   is OK
                #       CRUISE - LEVEL - DESCENT  is NOT OK
                # when NOT OK, LEVEL is converted into the previous phase
                if previous_phase != upcoming_phase:
                    flight.data.loc[level_indexs, "phase"] = previous_phase

                else:
                    previous_segment_index = np.argmax([
                        (first_lvl_idx-1) in phase_segment 
                        for phase_segment in phase_segments[previous_phase]
                    ])
                    current_segment_length = len(level_indexs)
                    previous_segment_length = len(phase_segments[previous_phase][previous_segment_index])
                    upcoming_segment_length = len(phase_segments[previous_phase][previous_segment_index+1])
                    # if the twho "before and after" segments are identical,
                    # check if the LEVEL length in question is GREATER
                    # than both segments, assumming that LEVEL segments 
                    # should be smaller segments within phases
                    # if LEVEL segment is too large, then consider it a bug and convert it into the phase it is contained in
                    if (current_segment_length >= previous_segment_length) and (current_segment_length >= upcoming_segment_length):
                        flight.data.loc[level_indexs, "phase"] = previous_phase
    
    if "GROUND" in flight.data.phase.unique():
        last_ground_index = flight.data.loc[flight.data.phase=="GROUND"].index[-1]
        # this if statement makes sure that we don't exclude the entire flight
        # in the case where GROUND only appears at the beggining of the data
        # since then we would discard all of the acctual flight
        if last_ground_index >= int(0.25 * len(flight.data)): 
            flight.data = flight.data.loc[:last_ground_index].reset_index(drop=True)
    
    joblib.dump(
        flight.data,
        os.path.join(output_folder, str(int(flight.flight_id)))
    )

def process_parquet_parallel(parquet_path="path", complete_flights="list", usable_flight_ids="list", min_flight_duration_minutes=1):
    traffics = Traffic.from_file(parquet_path)
    for traffic in tqdm(traffics, leave=False, desc=parquet_path.split("\\")[-1].split(".")[0]):
        if (traffic.flight_id in usable_flight_ids):
            flight = Flight(traffic.data)
            try:
                flight.data["timestamp"] = pd.to_datetime(flight.data["timestamp"])
                flight_duration_minutes = (flight.data.timestamp.max() - flight.data.timestamp.min()).total_seconds() // 60
                if (flight_duration_minutes >= min_flight_duration_minutes) and (str(int(flight.flight_id)) not in complete_flights):
                    preprocess_flight(flight, output_folder=flights_folder)
            except Exception as e:
                print(e, parquet_path)


In [3]:
Parallel(n_jobs=12)( # 12 is max on my 64Gb RAM laptop without
    delayed(         # making it impossible to use the laptop
        process_parquet_parallel
    )(
        parquet_path=parquet_path, 
        min_flight_duration_minutes=1,
        complete_flights=complete_flights,
        usable_flight_ids=usable_flight_ids
    )
    for parquet_path in tqdm(parquet_paths)
);

  0%|          | 0/365 [00:00<?, ?it/s]