In [1]:
import polars as pl
from pathlib import Path
import math
import time

# ============================
# CONFIG
# ============================

DATA_ROOT = Path("/Users/jakobschneider/Machine Learning/Data_LCC")

ENRICHED_PATH = DATA_ROOT / "AIS_2024_enriched_optimized.parquet"
MRV_PATH      = DATA_ROOT / "MRV_2024.xlsx"

OUT_DIR = DATA_ROOT / "feature_pipeline_2024"
OUT_BUCKET_DIR = OUT_DIR / "buckets_agg"
OUT_DIR.mkdir(parents=True, exist_ok=True)
OUT_BUCKET_DIR.mkdir(parents=True, exist_ok=True)

YEAR = 2024

# Bucket count: higher = smaller per-bucket workload (safer)
N_BUCKETS = 512  # if kernel still dies, increase to 512

# Segment filters / thresholds (documented assumptions)
MAX_DT_SECONDS = 2 * 3600    # max 2h gap
MAX_SPEED_KN   = 35          # implied speed teleport filter

# Quality thresholds
MIN_DISTANCE_NM = 500
MIN_POINTS      = 10_000
MAX_MEDIAN_DT   = 600        # 10 minutes

# Movement thresholds
MOVING_SOG_KN = 1.0
IDLE_SOG_KN   = 0.5


# ============================
# HELPERS
# ============================

def haversine_nm(lat1: pl.Expr, lon1: pl.Expr, lat2: pl.Expr, lon2: pl.Expr) -> pl.Expr:
    """Haversine distance in nautical miles (nm) using Polars Expr methods."""
    R_km = 6371.0
    deg2rad = math.pi / 180.0

    lat1r = lat1 * deg2rad
    lon1r = lon1 * deg2rad
    lat2r = lat2 * deg2rad
    lon2r = lon2 * deg2rad

    dlat = lat2r - lat1r
    dlon = lon2r - lon1r

    a = (dlat / 2).sin() ** 2 + lat1r.cos() * lat2r.cos() * (dlon / 2).sin() ** 2
    c = (a.sqrt()).arcsin() * 2.0

    km = c * R_km
    nm = km / 1.852
    return nm


def dt_seconds_expr(dt_col: pl.Expr) -> pl.Expr:
    """
    Version-stable dt seconds extractor.
    - Some Polars versions support .dt.total_seconds()
    - Others need .dt.seconds() (may lose days if gaps are huge, but we filter gaps anyway)
    """
    try:
        # Newer Polars
        return dt_col.dt.total_seconds()
    except Exception:
        # Older Polars fallback (ok because we filter dt < 2h)
        return dt_col.dt.seconds()


# ============================
# STEP 0: Define the minimal column set
# (Assuming enriched file already contains Length/Width/Draft/VesselType etc.)
# ============================

# Adjust these names if your columns differ
COL_IMO  = "IMO"
COL_T    = "BaseDateTime"
COL_LAT  = "LAT"
COL_LON  = "LON"
COL_SOG  = "SOG"

# Optional ship metadata columns already in enriched file
# Keep only what you actually want in the model table
META_COLS = [
    "VesselType",
    "Length",
    "Width",
    "Draft",
]

# Some files have slightly different names; keep only those that exist later
BASE_COLS = [COL_IMO, COL_T, COL_LAT, COL_LON, COL_SOG] + META_COLS


# ============================
# STEP 1: Per-bucket aggregation
# ============================

for b in range(N_BUCKETS):
    t0 = time.time()
    out_path = OUT_BUCKET_DIR / f"agg_bucket_{b:03d}.parquet"
    if out_path.exists():
        print(f"[bucket {b:03d}] exists -> skip")
        continue

    # --- Load lazily and reduce early ---
    lf = pl.scan_parquet(ENRICHED_PATH)

    # --- Keep only columns that exist (prevents KeyError if Draft/VesselType missing) ---
    #     Note: LazyFrame schema is available without reading full data.
    available = set(lf.schema.keys())
    cols = [c for c in BASE_COLS if c in available]

    lf = (
        lf
        .select(cols)
        .filter(
            pl.col(COL_IMO).is_not_null() &
            pl.col(COL_T).is_not_null() &
            pl.col(COL_LAT).is_not_null() &
            pl.col(COL_LON).is_not_null() &
            (pl.col(COL_SOG) >= 0)
        )
        # --- Hash bucket on IMO to process a manageable subset ---
        .with_columns(
            bucket=(pl.col(COL_IMO).hash() % N_BUCKETS).cast(pl.Int16)
        )
        .filter(pl.col("bucket") == b)
        .drop("bucket")
    )

    # --- Sort within bucket (still heavy, but now bounded) ---
    lf = lf.sort([COL_IMO, COL_T])

    # --- dt seconds per IMO ---
    lf = lf.with_columns(
        dt_seconds=dt_seconds_expr(pl.col(COL_T).diff().over(COL_IMO))
    ).filter(
        pl.col("dt_seconds").is_not_null() &
        (pl.col("dt_seconds") > 0) &
        (pl.col("dt_seconds") < MAX_DT_SECONDS)
    )

    # --- Segment distance + implied speed teleport filter ---
    lf = lf.with_columns(
        segment_distance_nm=haversine_nm(
            pl.col(COL_LAT).shift(1).over(COL_IMO),
            pl.col(COL_LON).shift(1).over(COL_IMO),
            pl.col(COL_LAT),
            pl.col(COL_LON),
        )
    ).with_columns(
        implied_speed_kn=pl.col("segment_distance_nm") / (pl.col("dt_seconds") / 3600.0)
    ).filter(
        pl.col("segment_distance_nm").is_not_null() &
        (pl.col("segment_distance_nm") >= 0) &
        (pl.col("implied_speed_kn") < MAX_SPEED_KN)
    )

    # --- Movement flags ---
    lf = lf.with_columns([
        (pl.col(COL_SOG) > MOVING_SOG_KN).cast(pl.Int8).alias("moving_flag"),
        (pl.col(COL_SOG) < IDLE_SOG_KN).cast(pl.Int8).alias("idle_flag"),
    ])

    # --- Aggregate to IMO (one row per ship) ---
    # Important: use Expr.sum() for expression sums to avoid older Polars errors.
    aggs = [
        pl.sum("segment_distance_nm").alias("ais_distance_nm_total"),
        (pl.sum("dt_seconds") / 3600.0).alias("ais_time_hours_total"),
        pl.col(COL_IMO).count().alias("ais_points"),

        pl.mean(COL_SOG).alias("sog_mean_kn"),
        pl.quantile(COL_SOG, 0.50).alias("sog_p50_kn"),
        pl.quantile(COL_SOG, 0.95).alias("sog_p95_kn"),

        ((pl.col("dt_seconds") * pl.col("moving_flag")).sum() / 3600.0).alias("moving_hours"),
        ((pl.col("dt_seconds") * pl.col("idle_flag")).sum() / 3600.0).alias("idle_hours"),

        pl.median("dt_seconds").alias("median_dt_seconds"),
    ]

    # Metadata: choose robust aggregation per IMO (median/first)
    if "VesselType" in available:
        aggs.append(pl.first("VesselType").alias("VesselType"))
    if "Length" in available:
        aggs.append(pl.median("Length").alias("Length"))
    if "Width" in available:
        aggs.append(pl.median("Width").alias("Width"))
    if "Draft" in available:
        aggs.append(pl.median("Draft").alias("draft_m_median"))

    lf_agg = (
        lf
        .group_by(COL_IMO)
        .agg(aggs)
        .with_columns([
            pl.lit(YEAR).alias("year"),
            (pl.col("moving_hours") / pl.col("ais_time_hours_total")).alias("moving_share"),
            (
                (pl.col("ais_distance_nm_total") > MIN_DISTANCE_NM) &
                (pl.col("ais_points") > MIN_POINTS) &
                (pl.col("median_dt_seconds") < MAX_MEDIAN_DT)
            ).alias("quality_ok")
        ])
    )

    # --- Collect only aggregated result (small!) and write to disk ---
    df_bucket = lf_agg.collect()
    df_bucket.write_parquet(out_path)

    dt = time.time() - t0
    print(f"[bucket {b:03d}] rows={df_bucket.shape[0]} written -> {out_path.name} ({dt:.1f}s)")

print("Bucket aggregation finished.")

  available = set(lf.schema.keys())


: 

In [None]:
import polars as pl
from pathlib import Path

DATA_ROOT = Path("/Users/jakobschneider/Machine Learning/Data_LCC")
OUT_DIR = DATA_ROOT / "feature_pipeline_2024"
OUT_BUCKET_DIR = OUT_DIR / "buckets_agg"

agg_files = sorted(OUT_BUCKET_DIR.glob("agg_bucket_*.parquet"))
print("Agg files:", len(agg_files))

df_features = pl.concat([pl.read_parquet(p) for p in agg_files], how="vertical")

# Optional: remove duplicates just in case (should not happen with hashing, but safe)
df_features = df_features.unique(subset=["IMO", "year"], keep="first")

out_features_path = OUT_DIR / "features_imo_2024.parquet"
df_features.write_parquet(out_features_path)

print(df_features.shape)
print("Saved:", out_features_path)

In [None]:
import polars as pl
from pathlib import Path

DATA_ROOT = Path("/Users/jakobschneider/Machine Learning/Data_LCC")
OUT_DIR = DATA_ROOT / "feature_pipeline_2024"

MRV_PATH = DATA_ROOT / "MRV_2024.xlsx"
FEATURES_PATH = OUT_DIR / "features_imo_2024.parquet"

YEAR = 2024

df_features = pl.read_parquet(FEATURES_PATH)

df_mrv = (
    pl.read_excel(MRV_PATH)
    .select([
        pl.col("IMO Number").alias("IMO"),
        pl.col("Reporting Period").alias("year"),
        pl.col("CO₂ emissions per distance [kg CO₂ / n mile]").alias("y_co2_per_nm_kg"),
    ])
    .filter(pl.col("year") == YEAR)
)

df_model = (
    df_features
    .join(df_mrv, on=["IMO", "year"], how="inner")
)

out_model_path = OUT_DIR / "model_table_imo_2024.parquet"
df_model.write_parquet(out_model_path)

print(df_model.shape)
print("Saved:", out_model_path)