In [None]:
from pathlib import Path
import logging
import pandas as pd
import numpy as np
from scipy.spatial import cKDTree

# ───────────────────────── config ─────────────────────────
SRC_DIR   = Path("data")            # raw Parquets
DST_DIR   = Path("data_denoised")   # denoised output
SKIP      = {"slovenia_towers.parquet"}

R         = 6_371_000.0             # Earth radius (m)
COUNT_THRESH = 150000               # coord-repeat threshold
TIME_THRESH_S = 5*60                # long-stop threshold (seconds)
TOWER_RADIUS_M = 10.0               # proximity for celltower_denoise

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s %(levelname)s: %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S",
)

# Load cell‐tower list once
df_towers = pd.read_parquet("data/slovenia_towers.parquet")
logging.info("Loaded %d towers", len(df_towers))


# ───────────── helpers: denoise ─────────────────

def celltower_denoise(df: pd.DataFrame,
                      towers: pd.DataFrame,
                      radius_m: float = TOWER_RADIUS_M,
                      earth_r: float = R) -> pd.DataFrame:
    """Drop any ping within `radius_m` of its nearest tower."""
    tower_rad = np.radians(towers[["LAT","LON"]].astype("float64").values)
    tree      = cKDTree(tower_rad)
    pts_rad   = np.radians(df[["lat","lon"]].astype("float64").values)
    dist_rad, _ = tree.query(pts_rad, k=1)
    dist_m      = dist_rad * earth_r

    mask   = dist_m <= radius_m
    df = df[~mask].reset_index(drop=True)
    return df


def remove_repeated_coords(df: pd.DataFrame,
                           count_thresh: int = COUNT_THRESH,
                           squash: bool = False) -> pd.DataFrame:
    """
    Remove or squash (lat, lon) coordinates that occur >= count_thresh times globally.
    """
    before = len(df)

    # Count global frequency of (lat, lon) pairs
    coord_counts = df.groupby(["lat", "lon"]).size()
    frequent_coords = coord_counts[coord_counts >= count_thresh].index

    # Mark rows to drop
    df["is_fallback"] = df.set_index(["lat", "lon"]).index.isin(frequent_coords)

    if squash:
        df = df.sort_values(["deviceid", "datetime"])
        df = df[~df.duplicated(["lat", "lon"], keep="first")].copy()
    else:
        df = df[~df["is_fallback"]].copy()

    df.drop(columns=["is_fallback"], inplace=True)
    return df

def remove_long_stops(df: pd.DataFrame,
                      time_thresh_s: float = TIME_THRESH_S,
                      squash: bool = False) -> pd.DataFrame:
    """
    Drop or squash runs of identical coords PER DEVICE whose total dwell ≥ time_thresh_s.
    Assumes df is sorted by deviceid/datetime and has df['dt'] (seconds).
    """
    before = len(df)
    df = df.sort_values(['deviceid','datetime']).reset_index(drop=True)

    same = (
        (df['deviceid'] == df['deviceid'].shift(1)) &
        (df['lat']       == df['lat'].shift(1)) &
        (df['lon']       == df['lon'].shift(1))
    )
    df['is_repeat'] = same
    df['run_id']    = (~df['is_repeat']).cumsum()

    run_durs = (
        df.groupby(['deviceid','run_id'])['dt']
          .sum()
          .reset_index(name='run_dur_s')
    )
    df = df.merge(run_durs, on=['deviceid','run_id'], how='left')

    if squash:
        mask = ~((df['is_repeat']) & (df['run_dur_s'] >= time_thresh_s))
        df = df[mask]
        df = df.drop_duplicates(['deviceid','run_id'], keep='first')
    else:
        df = df[~((df['is_repeat']) & (df['run_dur_s'] >= time_thresh_s))]

    df.drop(columns=['is_repeat','run_id','run_dur_s'], inplace=True)
    dropped = before - len(df)
    logging.info("remove_long_stops: dropped %d of %d pings (%.2f%%) in runs ≥%ds",
                 dropped, before, 100*dropped/before if before else 0.0, time_thresh_s)
    return df


# ───────────── helpers: existing pipeline ─────────────────

def sequential_deltas(df: pd.DataFrame) -> pd.DataFrame:
    """Add device_change, dist_m, dt, speed_m_s (in-place)."""
    df["date"]     = df["date"].astype(str)
    df["time"]     = df["time"].astype(str)
    df["datetime"] = pd.to_datetime(df["date"] + " " + df["time"],
                                    dayfirst=True)

    df["deviceid"] = df["deviceid"].astype("category")
    df.sort_values(["deviceid", "datetime"], inplace=True, ignore_index=True)

    dc = (df["deviceid"] != df["deviceid"].shift()).to_numpy()
    df["device_change"] = dc

    lat = np.radians(df["lat"].to_numpy())
    lon = np.radians(df["lon"].to_numpy())
    t   = (df["datetime"].astype("int64").to_numpy() //
           1_000_000_000)

    lat_prev = np.roll(lat, 1); lon_prev = np.roll(lon, 1); t_prev = np.roll(t, 1)
    lat_prev[dc] = lat[dc]; lon_prev[dc] = lon[dc]; t_prev[dc] = t[dc]

    dlat = lat - lat_prev
    dlon = lon - lon_prev
    a    = np.sin(dlat/2)**2 + np.cos(lat)*np.cos(lat_prev)*np.sin(dlon/2)**2
    dist = R * (2*np.arctan2(np.sqrt(a), np.sqrt(1 - a)))
    dt   = (t - t_prev).clip(min=1)

    dist[dc] = 0.0; dt[dc] = 0.0
    speed    = np.divide(dist, dt, out=np.zeros_like(dist), where=dt > 0)

    df["dist_m"]    = dist
    df["dt"]        = dt
    df["speed_m_s"] = speed
    df["lat_rad"]   = lat          # for angle later
    df["lon_rad"]   = lon
    return df


def _bearing(lat1, lon1, lat2, lon2):
    dλ = lon2 - lon1
    x  = np.sin(dλ)*np.cos(lat2)
    y  = np.cos(lat1)*np.sin(lat2) - np.sin(lat1)*np.cos(lat2)*np.cos(dλ)
    return (np.degrees(np.arctan2(x,y)) + 360.0) % 360.0


def zhang_denoise(df, speed_th=30, angle_th=30, time_th=10):
    lat = df['lat_rad'].to_numpy(); lon = df['lon_rad'].to_numpy()
    t   = (df['datetime'].astype('int64')//1_000_000_000).astype(float)
    spd = df['speed_m_s'].to_numpy(); dc = df['device_change'].to_numpy()

    lat_prev = np.roll(lat,1); lon_prev = np.roll(lon,1); t_prev = np.roll(t,1)
    lat_next = np.roll(lat,-1); lon_next = np.roll(lon,-1)

    lat_prev[dc] = lon_prev[dc] = t_prev[dc] = np.nan
    last = np.roll(dc,-1); last[-1]=True
    lat_next[last] = lon_next[last] = np.nan

    b1  = _bearing(lat_prev, lon_prev, lat, lon)
    b2  = _bearing(lat, lon, lat_next, lon_next)
    ang = np.abs(b2 - b1); ang = np.where(ang>180,360-ang,ang)

    dt  = t - t_prev
    keep = (spd < speed_th) & ((ang > angle_th)|(dt > time_th))
    keep &= ~np.isnan(lat_prev) & ~np.isnan(lat_next)
    return df[keep].reset_index(drop=True)


def sliding_window_denoise(df, window=5, speed_th=40, min_pts=3):
    spd = df['speed_m_s'].to_numpy(); dc = df['device_change'].to_numpy()
    keep = np.ones(len(df), dtype=bool)
    start = 0
    for i in range(1,len(df)+1):
        if i==len(df) or dc[i]:
            med = pd.Series(spd[start:i]).rolling(window,center=True,min_periods=1).median().to_numpy()
            keep[start:i] &= med < speed_th
            start = i
    valid = df.loc[keep,'deviceid'].value_counts().loc[lambda s:s>min_pts].index
    keep &= df['deviceid'].isin(valid)
    return df[keep].reset_index(drop=True)


# ─────────────────────── main loop ───────────────────────

def main():
    DST_DIR.mkdir(exist_ok=True, parents=True)

    for fp in SRC_DIR.glob("*.parquet"):
        if fp.name in SKIP:
            logging.info(f"Skipping {fp.name}")
            continue

        logging.info(f"=== Processing {fp.name} ===")
        df = pd.read_parquet(fp)
        orig = len(df)
        prev_len = orig
        logging.info(f"Original rows: {orig:,}")

        df[['lat','lon']] = df[['lat','lon']].astype('float32')

        # 1) Tower-proximity denoise
        df = celltower_denoise(df, df_towers, radius_m=TOWER_RADIUS_M)
        step_dropped = prev_len - len(df)
        logging.info(f"After tower-proximity denoise: {step_dropped:,} dropped ({100 * step_dropped / orig:.2f}%)")
        prev_len = len(df)

        # 2) Remove repeated coordinates
        df = remove_repeated_coords(df, count_thresh=COUNT_THRESH, squash=False)
        step_dropped = prev_len - len(df)
        logging.info(f"After repeated-coords removal: {step_dropped:,} dropped ({100 * step_dropped / orig:.2f}%)")
        prev_len = len(df)

        # 3) Encode deviceid and compute deltas
        codes, _ = pd.factorize(df['deviceid'], sort=False)
        df['deviceid'] = codes.astype('int32')

        logging.info("Computing deltas")
        df = sequential_deltas(df)

        # 4) Zhang denoise
        logging.info("Zhang denoise")
        df = zhang_denoise(df)
        step_dropped = prev_len - len(df)
        logging.info(f"After Zhang denoise: {step_dropped:,} dropped ({100 * step_dropped / orig:.2f}%)")
        prev_len = len(df)

        # 5) Sliding-window denoise
        logging.info("Sliding-window denoise")
        df = sliding_window_denoise(df)
        step_dropped = prev_len - len(df)
        logging.info(f"After sliding-window denoise: {step_dropped:,} dropped ({100 * step_dropped / orig:.2f}%)")
        prev_len = len(df)

        # Cleanup and save
        df.drop(columns=["lat_rad", "lon_rad"], inplace=True)
        out_path = DST_DIR / fp.name
        df.to_parquet(out_path, index=False, compression="snappy")
        logging.info(f"Wrote {out_path}")

        # Final summary
        final_rows = len(df)
        total_dropped = orig - final_rows
        logging.info(f"Total dropped: {total_dropped:,} of {orig:,} ({100 * total_dropped / orig:.2f}% removed)")

if __name__ == "__main__":
    main()

  from pandas.core import (
2025-05-26 19:16:32 INFO: Loaded 54056 towers
2025-05-26 19:16:32 INFO: === Processing 20230331.parquet ===
2025-05-26 19:17:06 INFO: Original rows: 134,586,862
2025-05-26 19:18:19 INFO: After tower-proximity denoise: 2,988,806 dropped (2.22%)
2025-05-26 19:18:33 INFO: After repeated-coords removal: 12,091,544 dropped (8.98%)
2025-05-26 19:18:34 INFO: Computing deltas
2025-05-26 19:20:10 INFO: Zhang denoise
2025-05-26 19:20:54 INFO: After Zhang denoise: 24,502,000 dropped (18.21%)
2025-05-26 19:20:54 INFO: Sliding-window denoise
2025-05-26 19:21:45 INFO: After sliding-window denoise: 231,508 dropped (0.17%)
2025-05-26 19:22:58 INFO: Wrote data_denoised/20230331.parquet
2025-05-26 19:22:58 INFO: Total dropped: 39,813,858 of 134,586,862 (29.58% removed)
2025-05-26 19:22:58 INFO: === Processing 20230328.parquet ===
2025-05-26 19:23:40 INFO: Original rows: 130,654,297
2025-05-26 19:24:47 INFO: After tower-proximity denoise: 2,844,364 dropped (2.18%)
2025-05-26 1