# Step 1: Data Preprocessing

## Import required libraries

In [52]:
import glob
from os.path import join as opj
from traffic.core import Traffic, Flight
from traffic.algorithms.filters import FilterMedian, FilterMean
import pandas as pd
from datetime import datetime, timedelta

from traffic.data import aixm_airspaces
aixm_airspaces.data = aixm_airspaces.data.set_geometry("geometry")

import warnings
warnings.filterwarnings("ignore", category=FutureWarning)
warnings.filterwarnings("ignore", category=UserWarning)

## Helper functions

In [53]:
lsasfra = aixm_airspaces["LSASFRA"]

# Removal of short flights
def remove_short(flight):
    if len(flight) > 72:
        return flight

# Removal of flights with no altitude
def remove_ground_only(flight):
    if not flight.data['altitude'].isna().all():
        return flight

# Resampling the flight data
def resample_flight(flight):
    df = flight.data

    df["original"] = True

    # Resample to 1-second intervals
    df = df.set_index("timestamp")
    df = df[~df.index.duplicated(keep="first")]
    df_resampled = df.resample("1s").asfreq()
    df_resampled["original"] = False

    # Combine the original and resampled, then sort
    df_combined = pd.concat([df, df_resampled[~df_resampled.index.isin(df.index)]])
    df_combined = df_combined.sort_values("timestamp")

    # interpolate and fill
    interpolate_cols = [
        "latitude",
        "longitude",
        "altitude",
        "vertical_rate",
        "geominusbaro",
        "geoaltitude",
        "groundspeed",
        "track",
    ]
    df_combined[interpolate_cols] = df_combined[interpolate_cols].interpolate(
        method="time"
    )
    fill_cols = [
        "icao24",
        "squawk",
        "nic",
        "onground",
        "callsign",
        "flight_id",
        "typecode",
    ]
    df_combined["nic"] = pd.to_numeric(df_combined["nic"], errors="coerce").astype(
        "Int64"
    )
    df_combined = df_combined.astype(
        {
            "icao24": "string",
            "squawk": "string",
            "onground": "boolean",
            "callsign": "string",
            "flight_id": "string",
            "typecode": "string",
        }
    )
    df_combined[fill_cols] = df_combined[fill_cols].ffill().bfill()

    # Only keep resampled rows
    df_combined = df_combined[df_combined.original == False]
    df_combined = df_combined.reset_index()
    df_combined = df_combined[
        [
            "timestamp",
            "icao24",
            "latitude",
            "longitude",
            "onground",
            "altitude",
            "nic",
            "vertical_rate",
            "geominusbaro",
            "geoaltitude",
            "groundspeed",
            "track",
            "squawk",
            "callsign",
            "flight_id",
            "typecode"
        ]
    ]

    return Flight(df_combined)

## PreProcessing Loop

In [None]:
trajectory_data_path = "/store/Projects_CRM/CRM4_data/trajectory/raw/2024"

start = datetime.strptime("2024-07-01", "%Y-%m-%d")
end = datetime.strptime("2024-08-01", "%Y-%m-%d")

days = [start + timedelta(days=i) for i in range((end - start).days)]

for day in days:
    # Load trajectory data for day
    print(f"processing {str(day)[0:4]}-{str(day)[5:7]}-{str(day)[8:10]}")
    list_parquet = glob.glob(
        opj(
            trajectory_data_path,
            f"{str(day)[0:4]}-{str(day)[5:7]}-{str(day)[8:10]}*.parquet",
        )
    )
    t = Traffic(
        pd.concat(
            [pd.read_parquet(f) for f in list_parquet],
            axis=0,
        )
    )

    # Assign ids
    t_id = (
        t.iterate_lazy(iterate_kw={"by": "10s"})
        .assign_id("{self.start:%y/%m/%d}_{self.callsign}_{idx:>03}")
        .eval(max_workers=1, desc="Assigning IDs")
    )

    t_enriched = t_id.aircraft_data()

    # Other processing steps
    t_proc = (
        t_enriched.iterate_lazy()
        .pipe(remove_short)
        .pipe(remove_ground_only)
        .filter(
            FilterMedian(
                altitude=3, geoaltitude=3, vertical_rate=3, groundspeed=3, track=0
            )
        )
        .filter(
            FilterMean(
                altitude=3, geoaltitude=3, vertical_rate=3, groundspeed=3, track=0
            )
        )
        .clip(lsasfra)
        .pipe(resample_flight)
        .eval(max_workers=40, desc="Processing")
    )
    t_proc.to_parquet(
        f"/store/fusg/VT1/trajectory_data/{str(day)[0:4]}_{str(day)[5:7]}_{str(day)[8:10]}.parquet"
    )

processing 2024-07-01


Assigning IDs:   0%|          | 0/40655 [00:00<?, ?it/s]

Processing:   0%|          | 0/40655 [00:00<?, ?it/s]

processing 2024-07-02


Assigning IDs:   0%|          | 0/50805 [00:00<?, ?it/s]

Processing:   0%|          | 0/50805 [00:00<?, ?it/s]

processing 2024-07-03


Assigning IDs:   0%|          | 0/42730 [00:00<?, ?it/s]

Processing:   0%|          | 0/42730 [00:00<?, ?it/s]

processing 2024-07-04


Assigning IDs:   0%|          | 0/40588 [00:00<?, ?it/s]

Processing:   0%|          | 0/40588 [00:00<?, ?it/s]

processing 2024-07-05


Assigning IDs:   0%|          | 0/47710 [00:00<?, ?it/s]

Processing:   0%|          | 0/47710 [00:00<?, ?it/s]

processing 2024-07-06


Assigning IDs:   0%|          | 0/33699 [00:00<?, ?it/s]

Processing:   0%|          | 0/33699 [00:00<?, ?it/s]

processing 2024-07-07


Assigning IDs:   0%|          | 0/32256 [00:00<?, ?it/s]

Processing:   0%|          | 0/32256 [00:00<?, ?it/s]

processing 2024-07-08


Assigning IDs:   0%|          | 0/39226 [00:00<?, ?it/s]

Processing:   0%|          | 0/39226 [00:00<?, ?it/s]

processing 2024-07-09


Assigning IDs:   0%|          | 0/41435 [00:00<?, ?it/s]

Processing:   0%|          | 0/41435 [00:00<?, ?it/s]

processing 2024-07-10


Assigning IDs:   0%|          | 0/39633 [00:00<?, ?it/s]

Processing:   0%|          | 0/39633 [00:00<?, ?it/s]

processing 2024-07-11


Assigning IDs:   0%|          | 0/44121 [00:00<?, ?it/s]

Processing:   0%|          | 0/44121 [00:00<?, ?it/s]

processing 2024-07-12


Assigning IDs:   0%|          | 0/36328 [00:00<?, ?it/s]

Processing:   0%|          | 0/36328 [00:00<?, ?it/s]

processing 2024-07-13


Assigning IDs:   0%|          | 0/38746 [00:00<?, ?it/s]

Processing:   0%|          | 0/38746 [00:00<?, ?it/s]

processing 2024-07-14


Assigning IDs:   0%|          | 0/42353 [00:00<?, ?it/s]

Processing:   0%|          | 0/42353 [00:00<?, ?it/s]

processing 2024-07-15


Assigning IDs:   0%|          | 0/40604 [00:00<?, ?it/s]

Processing:   0%|          | 0/40604 [00:00<?, ?it/s]

processing 2024-07-16


Assigning IDs:   0%|          | 0/42544 [00:00<?, ?it/s]

Processing:   0%|          | 0/42544 [00:00<?, ?it/s]

processing 2024-07-17


Assigning IDs:   0%|          | 0/44187 [00:00<?, ?it/s]

Processing:   0%|          | 0/44187 [00:00<?, ?it/s]

processing 2024-07-18


Assigning IDs:   0%|          | 0/47471 [00:00<?, ?it/s]

Processing:   0%|          | 0/47471 [00:00<?, ?it/s]

processing 2024-07-19


Assigning IDs:   0%|          | 0/42954 [00:00<?, ?it/s]

Processing:   0%|          | 0/42954 [00:00<?, ?it/s]

processing 2024-07-20


Assigning IDs:   0%|          | 0/48259 [00:00<?, ?it/s]

Processing:   0%|          | 0/48259 [00:00<?, ?it/s]

processing 2024-07-21


Assigning IDs:   0%|          | 0/30764 [00:00<?, ?it/s]

Processing:   0%|          | 0/30764 [00:00<?, ?it/s]

processing 2024-07-22


Assigning IDs:   0%|          | 0/38997 [00:00<?, ?it/s]

Processing:   0%|          | 0/38997 [00:00<?, ?it/s]

processing 2024-07-23


Assigning IDs:   0%|          | 0/42917 [00:00<?, ?it/s]

Processing:   0%|          | 0/42917 [00:00<?, ?it/s]

processing 2024-07-24


Assigning IDs:   0%|          | 0/42925 [00:00<?, ?it/s]