In [84]:
import os, sys, importlib, inspect
from pathlib import Path
import numpy as np
import pandas as pd

# Had issues with the root bc of where i started the notebook from but used parents and grandparent levels to get to the actual repo root
repo_root = Path.cwd()
for up in [repo_root, repo_root.parent, repo_root.parent.parent]:
    if (up / "src").exists():
        repo_root = up
        break
sys.path.insert(0, str(repo_root))

#importing our geo for coordinate translation
import src.geo as geo 
importlib.invalidate_caches()
geo = importlib.reload(geo)

# clean name for the rest of the notebook
latlonalt_to_enu = geo.latlonalt_to_enu


In [85]:
# Used absolute path just because of my issues with the project root -- will fix them later
DATA_PATH = "/users/danielvillafuerte/weather-aware-trajectory-prediction/data/raw/06_12_17.csv"
df = pd.read_csv(DATA_PATH)

# time, callsign cleanup
df["time"] = pd.to_datetime(df["time"], unit="s", errors="coerce")
df["callsign"] = df.get("callsign", "").fillna("").str.strip()

# drop rows without position and time bc they aren't useful to us
df = df.dropna(subset=["lat","lon","time"]).copy()

# altitude column - preffered geoaltitude but will use baroalitude as fallback just in case, will take a 0 as an extra fallback just to avoid NaNs
if "geoaltitude" in df.columns:
    df["alt"] = df["geoaltitude"]
    if "baroaltitude" in df.columns:
        df["alt"] = df["alt"].fillna(df["baroaltitude"])
elif "baroaltitude" in df.columns:
    df["alt"] = df["baroaltitude"]
else:
    df["alt"] = 0.0
df["alt"] = df["alt"].fillna(0.0)

#now lets check what we have so far
df.head()

Unnamed: 0,time,icao24,lat,lon,velocity,heading,vertrate,callsign,onground,alert,spi,squawk,baroaltitude,geoaltitude,lastposupdate,lastcontact,alt
0,2017-06-12 00:00:00,4062d5,52.74187,-0.567287,172.11977,295.492795,-9.10336,EXS46B,False,False,False,3244.0,9212.58,9372.6,1497226000.0,1497226000.0,9372.6
1,2017-06-12 00:00:10,a9f4c6,38.733215,-90.131836,150.662723,83.53018,13.0048,3536,False,False,False,1734.0,1905.0,1981.2,1497226000.0,1497226000.0,1981.2
2,2017-06-12 00:00:10,a0b20e,35.164902,-104.443939,221.811287,268.40517,0.0,FDX556,False,False,False,7677.0,11574.78,12146.28,1497226000.0,1497226000.0,12146.28
5,2017-06-12 00:00:10,85c970,35.337385,138.482724,203.732815,264.203031,0.0,ANA791,False,False,False,2355.0,7924.8,8161.02,1497226000.0,1497226000.0,8161.02
6,2017-06-12 00:00:10,a0ce85,39.907278,-104.448242,130.236658,73.712637,8.128,,False,False,False,625.0,6515.1,3154.68,1497225000.0,1497226000.0,3154.68


In [72]:
# Sort + flight segmentation
df = df.sort_values(["icao24","time"]).reset_index(drop=True)
df["gap_s"] = df.groupby("icao24")["time"].diff().dt.total_seconds().fillna(0)
df["flight_id"] = ((df["icao24"] != df["icao24"].shift()) | (df["gap_s"] > 600)).cumsum()


In [86]:
def keep_consecutive_runs(df, min_len=60, max_gap_s=120):
    """
    Keep only continuous, valid segments within each flight:
      - valid = lat, lon, alt present
      - new run when invalid OR time gap > max_gap_s
      - keep runs with at least min_len consecutive valid samples
    """
    d = df.sort_values(["flight_id","time"]).copy()
    d["valid"] = d[["lat","lon","alt"]].notna().all(axis=1)
    d["dt"] = d.groupby("flight_id")["time"].diff().dt.total_seconds().fillna(0)

    break_flag = (~d["valid"]) | (d["dt"] > max_gap_s)
    d["run_id"] = break_flag.groupby(d["flight_id"]).cumsum()

    d_valid = d[d["valid"]].copy()
    run_sizes = d_valid.groupby(["flight_id","run_id"]).size()
    keeps = run_sizes[run_sizes >= min_len].reset_index()[["flight_id","run_id"]]
    out = d_valid.merge(keeps, on=["flight_id","run_id"], how="inner")
    return out.drop(columns=["valid"])


In [87]:
# Keep continuous runs and sort after lets check how many runs we actually kept
df_runs = keep_consecutive_runs(df, min_len=60, max_gap_s=120).copy()
df_runs = df_runs.sort_values(["flight_id","run_id","time"]).reset_index(drop=True)

print("Runs kept:", df_runs.groupby(["flight_id","run_id"]).size().shape[0],
      "| rows:", len(df_runs))


KeyError: 'flight_id'

In [88]:
# ENU per (flight_id, run_id), anchored at first sample of each run
runs = []
for (fid, rid), g in df_runs.groupby(["flight_id","run_id"], sort=False):
    g = g.sort_values("time").copy()
    lat0, lon0, alt0 = g.iloc[0][["lat","lon","alt"]]
    E, N, U = latlonalt_to_enu(
        g["lat"].to_numpy(),
        g["lon"].to_numpy(),
        g["alt"].to_numpy(),
        float(lat0), float(lon0), float(alt0)
    )
    g["E"], g["N"], g["U"] = E, N, U
    runs.append(g)

df_runs = pd.concat(runs, ignore_index=True)
del runs


In [89]:
# Kinematics on native cadence - lets use the ENU we alraedy got to get vE, vN, VU and turn rate
gb = df_runs.groupby(["flight_id","run_id"], sort=False)

df_runs["dt"] = gb["time"].diff().dt.total_seconds()
df_runs["dt"] = df_runs["dt"].replace(0, np.nan)

for c in ["E","N","U"]:
    df_runs[f"d{c}"] = gb[c].diff()

df_runs["vE"] = df_runs["dE"] / df_runs["dt"]
df_runs["vN"] = df_runs["dN"] / df_runs["dt"]
df_runs["vU"] = df_runs["dU"] / df_runs["dt"]
df_runs["speed"] = np.sqrt(df_runs["vE"]**2 + df_runs["vN"]**2 + df_runs["vU"]**2)

df_runs["heading_rad"] = np.arctan2(df_runs["vE"], df_runs["vN"])
df_runs["heading_unwrapped"] = gb["heading_rad"].transform(np.unwrap)
df_runs["turn_rate"] = gb["heading_unwrapped"].diff() / df_runs["dt"]

seg = gb.size().rename("len").reset_index()
print("Segments kept:", len(seg),
      "| median len:", int(seg["len"].median()),
      "| min/max:", int(seg["len"].min()), "/", int(seg["len"].max()))
print(df_runs[["E","N","U","vE","vN","vU","speed","turn_rate"]].describe().round(3))


Segments kept: 5492 | median len: 167 | min/max: 60 / 359
                 E            N            U           vE           vN  \
count  1038555.000  1038555.000  1038555.000  1033063.000  1033063.000   
mean      3623.369    15954.238    -6349.379        4.077       11.715   
std     243086.761   168533.944    13439.911      667.696      557.247   
min   -2126501.613 -1423972.650  -574410.003  -222303.167  -152195.385   
25%     -92555.121   -39988.059    -9364.080      -89.026      -26.400   
50%          0.000      311.114    -1518.899        0.000        0.000   
75%      98922.595    75155.142        3.388       91.582       65.925   
max    1486336.529  1576440.127    37703.760   221471.841   153061.609   

                vU        speed  turn_rate  
count  1033063.000  1033063.000        0.0  
mean        -6.171      167.402        NaN  
std        131.525      863.602        NaN  
min     -52703.771        0.000        NaN  
25%        -10.672        0.000        NaN  
50%  

In [91]:
# Quality summaries (same helpers you wrote)
def enu_counts(df: pd.DataFrame, eps: float = 1e-6):
    total = len(df)
    out = {"total_rows": total}
    for c in ("E","N","U"):
        mask = df[c].notna() & (df[c].abs() > eps)
        cnt = int(mask.sum())
        out[f"{c}_nonzero_nonNaN"] = cnt
        out[f"{c}_pct"] = (cnt/total*100.0) if total else 0.0
    return pd.Series(out)

def enu_row_summary(df: pd.DataFrame, eps: float = 1e-6, ignore_first_per_flight: bool = True):
    needed = {"E","N","U"}
    if not needed.issubset(df.columns):
        raise KeyError("DataFrame must have E, N, U columns.")
    valid = df[["E","N","U"]].notna().all(axis=1)
    if ignore_first_per_flight and "flight_id" in df.columns:
        first_idx = df.groupby("flight_id").head(1).index
        valid_ex = valid.copy(); valid_ex.loc[first_idx] = False
    else:
        valid_ex = valid
    any_nz = valid_ex & (df[["E","N","U"]].abs() > eps).any(axis=1)
    all_nz = valid_ex & (df[["E","N","U"]].abs() > eps).all(axis=1)
    total = len(df); tot_valid = int(valid.sum())
    return pd.Series({
        "total_rows": total,
        "valid_rows": tot_valid,
        "valid_pct": (tot_valid/total*100.0) if total else 0.0,
        "any_nonzero_rows": int(any_nz.sum()),
        "any_nonzero_pct_of_valid": (int(any_nz.sum())/max(tot_valid,1)*100.0),
        "all_nonzero_rows": int(all_nz.sum()),
        "all_nonzero_pct_of_valid": (int(all_nz.sum())/max(tot_valid,1)*100.0),
        "ignored_first_rows": int(valid.sum() - valid_ex.sum()) if ignore_first_per_flight and "flight_id" in df.columns else 0
    })

print(enu_counts(df_runs, eps=1e-6))
print(enu_row_summary(df_runs, eps=1e-6, ignore_first_per_flight=True))


total_rows          1.038555e+06
E_nonzero_nonNaN    9.981860e+05
E_pct               9.611296e+01
N_nonzero_nonNaN    9.980340e+05
N_pct               9.609833e+01
U_nonzero_nonNaN    1.008920e+06
U_pct               9.714652e+01
dtype: float64
total_rows                  1.038555e+06
valid_rows                  1.038555e+06
valid_pct                   1.000000e+02
any_nonzero_rows            1.016795e+06
any_nonzero_pct_of_valid    9.790478e+01
all_nonzero_rows            9.909490e+05
all_nonzero_pct_of_valid    9.541613e+01
ignored_first_rows          5.335000e+03
dtype: float64


In [96]:
from pathlib import Path
import pandas as pd
import numpy as np

out_path = Path("data/processed/flights_nativecadence_enu_kinematics.parquet")
out_path.parent.mkdir(parents=True, exist_ok=True)

# 0) Make sure there are no Pandas Period dtypes (rare, but safe to handle)
for c in df_runs.columns:
    if pd.api.types.is_period_dtype(df_runs[c]):
        df_runs[c] = df_runs[c].astype(str)

saved = False

# 1) Try pyarrow
try:
    import pyarrow as pa  # type: ignore
    try:
        pa.unregister_extension_type("pandas.period")
    except Exception:
        pass
    df_runs.to_parquet(out_path, index=False, engine="pyarrow", compression="snappy")
    print(f"Saved Parquet with pyarrow → {out_path.resolve()}")
    saved = True
except Exception as e:
    print("pyarrow failed:", repr(e))

# 2) Fallback using fastparquet (install if missing)
if not saved:
    try:
        import fastparquet  # type: ignore
    except ImportError:
        import sys, subprocess
        try:
            subprocess.check_call([sys.executable, "-m", "pip", "install", "fastparquet", "--quiet"])
            import fastparquet  # noqa: F401
            print("Installed fastparquet.")
        except Exception as e:
            print("Couldn't install fastparquet:", repr(e))

    try:
        df_runs.to_parquet(out_path, index=False, engine="fastparquet", compression="snappy")
        print(f"Saved Parquet with fastparquet → {out_path.resolve()}")
        saved = True
    except Exception as e:
        print("fastparquet failed:", repr(e))


  if pd.api.types.is_period_dtype(df_runs[c]):


pyarrow failed: ArrowKeyError('A type extension with name pandas.interval already defined')
Saved Parquet with fastparquet → /Users/danielvillafuerte/weather-aware-trajectory-prediction/notebooks/data/processed/flights_nativecadence_enu_kinematics.parquet
