In [2]:
import sqlite3
import pandas as pd

db_file = "rutgers_buses.db"

with sqlite3.connect(db_file) as con:
    tables = pd.read_sql_query("SELECT name FROM sqlite_master WHERE type='table';", con)
    print(tables)
    
    df = pd.read_sql_query("SELECT * FROM Bus_Logs", con)
    names = pd.read_sql_query("SELECT name FROM sqlite_master WHERE type='table';", con)['name'].tolist()
    dfs = {name: pd.read_sql_query(f"SELECT * FROM \"{name}\";", con) for name in names}
print(df.shape)

              name
0          Systems
1           Routes
2            Buses
3            Stops
4      Route_Stops
5         ETA_Logs
6         Bus_Logs
7  sqlite_sequence
(1696450, 8)


In [None]:
with sqlite3.connect(db_file) as con:
    try:
        q = """
        SELECT b.*
        FROM Bus_Logs b
        JOIN Routes r ON (b.routeId = r.routeId OR b.route_id = r.route_id)
        WHERE COALESCE(r.shortName, r.short_name, r.shortname) = 'LX'
        """
        bus_lx = pd.read_sql_query(q, con)
    except Exception:
        bus = pd.read_sql_query("SELECT * FROM Bus_Logs", con)
        routes = pd.read_sql_query("SELECT * FROM Routes", con)
        short_col = next((c for c in routes.columns if c.lower() in ("shortname","short_name","shortname","shortname")), None)
        route_id_col = next((c for c in routes.columns if "route" in c.lower() and "id" in c.lower()), None)
        bus_route_col = next((c for c in bus.columns if "route" in c.lower() and "id" in c.lower()), None)
        if short_col is None or route_id_col is None or bus_route_col is None:
            raise RuntimeError("Couldn't find expected route/short-name columns in DB tables.")
        lx_routes = routes[routes[short_col].astype(str).str.upper() == "LX"]
        bus_lx = bus[bus[bus_route_col].isin(lx_routes[route_id_col].unique())].copy()

print(f"Filtered Bus_Logs -> {len(bus_lx)} rows")

Filtered Bus_Logs -> 224425 rows


In [7]:
bus_lx.head()

Unnamed: 0,log_id,timestamp,bus_id,route_myid,latitude,longitude,pax_load,arrived_stop_id
19442,19463,1762772798,13208,54545,40.523091,-74.450614,0.0,
19445,19466,1762772808,13208,54545,40.520478,-74.451069,0.0,
19448,19469,1762772818,13208,54545,40.518852,-74.452266,0.0,
19451,19472,1762772828,13208,54545,40.516903,-74.454934,0.0,
19454,19475,1762772838,13208,54545,40.515706,-74.456754,0.0,


In [5]:
# Python - run in your notebook cell
import sqlite3
import pandas as pd
import numpy as np
from collections import deque
import math

# helpers to guess column names
def find_col(df, candidates):
    for c in candidates:
        if c in df.columns:
            return c
    # case-insensitive fallback
    cols = {col.lower(): col for col in df.columns}
    for c in candidates:
        if c.lower() in cols:
            return cols[c.lower()]
    return None

def haversine_meters(lat1, lon1, lat2, lon2):
    R = 6371000.0
    phi1, phi2 = math.radians(lat1), math.radians(lat2)
    dphi = math.radians(lat2 - lat1)
    dlambda = math.radians(lon2 - lon1)
    a = math.sin(dphi/2)**2 + math.cos(phi1)*math.cos(phi2)*math.sin(dlambda/2)**2
    return R * 2 * math.asin(math.sqrt(a))

with sqlite3.connect(db_file) as con:
    bus = pd.read_sql_query("SELECT * FROM Bus_Logs", con)
    eta = pd.read_sql_query("SELECT * FROM ETA_Logs", con)

# find columns
bus_id_col = find_col(bus, ["bus_id","busId","busid","bus"])
stop_col = find_col(bus, ["stop_id","stopId","stop"])
time_col = find_col(bus, ["created","time","timestamp","ts","created_at"])
lat_col = find_col(bus, ["lat","latitude","busLat","bus_lat"])
lon_col = find_col(bus, ["lon","longitude","busLon","bus_lon"])
arrived_col = find_col(bus, ["arrived","arrived_at","is_arrived","arrived_bool"])

# ETA columns
eta_log_logid = find_col(eta, ["log_id","logid","logId"])
eta_log_stop = find_col(eta, ["stop_id","stopId","stop"])
eta_seconds_col = find_col(eta, ["eta_seconds","eta_seconds","eta","eta_secs","eta_seconds"])
eta_sort_col = find_col(eta, ["sort_order","sortorder","order","rank"])
eta_pax_col = find_col(eta, ["pax_load","paxLoad","paxLoadS","pax_load_s","pax"])

# basic sanity
if bus_id_col is None or time_col is None:
    raise RuntimeError("Couldn't find bus id or timestamp column in Bus_Logs.")

# normalize timestamps
bus = bus.copy()
bus["__ts"] = pd.to_datetime(bus[time_col], errors="coerce")
eta = eta.copy()
# ETA_Logs likely don't have timestamps; they relate to Bus_Logs via log_id (eta_log_logid)
if eta_log_logid is None:
    raise RuntimeError("ETA_Logs missing log_id column")

# prepare quick lookup: for each log_id, pick next-stop prediction (sort_order==0 preferred)
def pick_next_stop_for_log(g):
    # prefer sort_order == 0
    if eta_sort_col and 0 in g.get(eta_sort_col).values:
        row = g[g[eta_sort_col] == 0].iloc[0]
    else:
        # fallback: smallest eta_seconds
        if eta_seconds_col in g.columns:
            row = g.loc[g[eta_seconds_col].astype(float).idxmin()]
        else:
            row = g.iloc[0]
    return pd.Series({
        "pred_stop": row.get(eta_log_stop),
        "pred_eta_s": (row.get(eta_seconds_col) if eta_seconds_col in g.columns else None),
        "pred_pax": (row.get(eta_pax_col) if eta_pax_col in g.columns else None)
    })

eta_next = eta.groupby(eta_log_logid).apply(pick_next_stop_for_log).reset_index().rename(columns={eta_log_logid:"log_id"})

# merge predictions onto bus logs (left join by log_id)
log_id_col_in_bus = find_col(bus, ["log_id","logid","logId"])
if log_id_col_in_bus is None:
    # some designs don't store log_id in Bus_Logs; try to match by primary key column name 'log_id'
    log_id_col_in_bus = "log_id" if "log_id" in bus.columns else None

if log_id_col_in_bus and log_id_col_in_bus in bus.columns:
    df = bus.merge(eta_next, how="left", left_on=log_id_col_in_bus, right_on="log_id", suffixes=("","_eta"))
else:
    # fall back: no log_id in bus, try to match by index alignment (assumes logs were produced together)
    df = bus.copy()
    df = df.reset_index().merge(eta_next.reset_index(), left_on="index", right_on="index", how="left")
    # pred columns are 'pred_stop','pred_eta_s','pred_pax'

# compute actual arrival: for each row, find first future Bus_Logs for same bus and same stop within 1.5 hours
MAX_DELAY_S = 90 * 60  # 1.5 hours

# prepare searchable dataframe per bus
bus_sorted = bus.sort_values([bus_id_col, "__ts"]).reset_index(drop=True)
# for quick lookup, group by bus id and keep indices (ensure index not included)
groups = {bid: grp.reset_index(drop=True) for bid, grp in bus_sorted.groupby(bus_id_col)}

# function to find actual arrival ts for a given row (robust to missing stop/arrived cols)
def find_actual_arrival(row):
    # quick guards
    if bus_id_col is None or time_col is None:
        return pd.NaT
    bid = row.get(bus_id_col)
    if pd.isna(row.get("__ts")) or bid is None:
        return pd.NaT

    pred_stop = row.get("pred_stop")
    if stop_col is None or pred_stop is None or pd.isna(pred_stop):
        # cannot match without a stop id -> treat as no arrival found
        return pd.NaT

    start_ts = row["__ts"]
    grp = groups.get(bid)
    if grp is None or grp.empty:
        return pd.NaT

    # future rows for same bus
    later = grp[grp["__ts"] > start_ts]
    if later.empty:
        return pd.NaT

    # Try using arrived flag if available and safe to reference
    if arrived_col and arrived_col in later.columns:
        try:
            cand = later[(later[stop_col] == pred_stop) & (later[arrived_col].astype(bool))]
        except Exception:
            cand = later[later[stop_col] == pred_stop]
        if not cand.empty:
            t = cand.iloc[0]["__ts"]
            if (t - start_ts).total_seconds() <= MAX_DELAY_S:
                return t

    # Fallback: first later row with same stop
    try:
        cand = later[later[stop_col] == pred_stop]
    except Exception:
        cand = pd.DataFrame()
    if not cand.empty:
        t = cand.iloc[0]["__ts"]
        if (t - start_ts).total_seconds() <= MAX_DELAY_S:
            return t

    return pd.NaT

# vectorized-ish: iterate rows (can be optimized later)
df = df.reset_index(drop=True)
actual_arrival_list = []
for _, row in df.iterrows():
    actual_arrival_list.append(find_actual_arrival(row))
df["actual_arrival_ts"] = pd.to_datetime(actual_arrival_list)

# compute y = actual_time_from_log - predicted_eta_seconds
df["pred_eta_s"] = pd.to_numeric(df["pred_eta_s"], errors="coerce")
df["actual_travel_s"] = (df["actual_arrival_ts"] - df["__ts"]).dt.total_seconds()
df["eta_error_s"] = np.where(df["actual_travel_s"].notna() & df["pred_eta_s"].notna(),
                             df["actual_travel_s"] - df["pred_eta_s"],
                             np.nan)

# FEATURE: pax_load (use pred_pax if present, else try bus-level pax columns)
pax_cols = [c for c in (["pax_load","paxLoad","paxLoadS","pax"] + bus.columns.tolist()) if c in df.columns]
# prefer pred_pax
df["pax_load_raw"] = df.get("pred_pax")
# fallback to bus columns
if df["pax_load_raw"].isna().all():
    for c in ["paxLoadS","pax_load","paxLoad","pax"]:
        if c in df.columns:
            df["pax_load_raw"] = df[c]
            break

# parse pax numeric
import re
def parse_pax(v):
    if pd.isna(v):
        return np.nan
    if isinstance(v, (int,float)):
        return float(v)
    s = str(v).strip()
    m = re.search(r'(-?\d+(?:\.\d+)?)', s)
    return float(m.group(1)) if m else np.nan

df["pax_load"] = df["pax_load_raw"].map(parse_pax)

# TIME-OF-DAY
df["hour"] = df["__ts"].dt.hour
df["minute"] = df["__ts"].dt.minute
df["time_of_day_s"] = df["hour"] * 3600 + df["minute"] * 60 + df["__ts"].dt.second

# SPEED FEATURES: immediate, 1min avg, 10min avg
if lat_col is None or lon_col is None:
    df["speed_prev_mps"] = np.nan
    df["speed_1min_mps"] = np.nan
    df["speed_10min_mps"] = np.nan
else:
    # build per-bus segment speeds (for each log row compute previous segment speed and timestamp)
    bus_sorted = bus_sorted.reset_index().rename(columns={"index":"orig_index"})
    # ensure lat/lon numeric
    bus_sorted[lat_col] = pd.to_numeric(bus_sorted[lat_col], errors="coerce")
    bus_sorted[lon_col] = pd.to_numeric(bus_sorted[lon_col], errors="coerce")
    bus_sorted["__ts"] = pd.to_datetime(bus_sorted["__ts"])
    seg_records = {}  # map orig_index -> dict(ts, seg_speed)
    for bid, grp in bus_sorted.groupby(bus_id_col):
        grp = grp.sort_values("__ts").reset_index(drop=True)
        prev = None
        for i, r in grp.iterrows():
            if prev is None:
                seg_records[r["orig_index"]] = {"seg_t": r["__ts"], "seg_speed": np.nan}
            else:
                dt = (r["__ts"] - prev["__ts"]).total_seconds()
                if dt <= 0:
                    seg_speed = np.nan
                else:
                    dist = haversine_meters(prev[lat_col], prev[lon_col], r[lat_col], r[lon_col])
                    seg_speed = dist / dt  # meters per second
                seg_records[r["orig_index"]] = {"seg_t": r["__ts"], "seg_speed": seg_speed}
            prev = r

    # map seg speed back to df rows by matching indices - need to find mapping between df rows and bus_sorted.orig_index
    # Create a key to match: bus id + timestamp
    key_bus_ts_to_seg = {}
    for orig_idx, rec in seg_records.items():
        key_bus_ts_to_seg[(bus_sorted.loc[orig_idx, bus_id_col], bus_sorted.loc[orig_idx, "__ts"])] = rec

    def get_seg_for_row(row):
        key = (row[bus_id_col], row["__ts"])
        return key_bus_ts_to_seg.get(key, {"seg_speed": np.nan, "seg_t": row["__ts"]})

    # immediate speed: previous segment speed but only if previous dt <= 30s
    speed_prev = []
    for _, row in df.iterrows():
        bid = row[bus_id_col]
        ts = row["__ts"]
        # find previous record for this bus
        grp = groups.get(bid)
        if grp is None or ts is pd.NaT:
            speed_prev.append(np.nan); continue
        prev_rows = grp[grp["__ts"] < ts]
        if prev_rows.empty:
            speed_prev.append(np.nan); continue
        prev_row = prev_rows.iloc[-1]
        dt = (ts - prev_row["__ts"]).total_seconds()
        if dt > 30 or dt <= 0:
            speed_prev.append(np.nan)
        else:
            dist = haversine_meters(prev_row[lat_col], prev_row[lon_col], row[lat_col], row[lon_col])
            speed_prev.append(dist / dt)
    df["speed_prev_mps"] = speed_prev

    # 1min and 10min average of segment speeds: do per-bus deque scan
    df["speed_1min_mps"] = np.nan
    df["speed_10min_mps"] = np.nan
    # precompute per-bus lists of (ts, lat, lon) sorted
    for bid, grp in bus_sorted.groupby(bus_id_col):
        seq = grp.sort_values("__ts").reset_index(drop=True)
        # build segments list of (end_ts, seg_speed)
        segs = []
        prev = None
        for i, r in seq.iterrows():
            if prev is not None:
                dt = (r["__ts"] - prev["__ts"]).total_seconds()
                if dt > 0:
                    dist = haversine_meters(prev[lat_col], prev[lon_col], r[lat_col], r[lon_col])
                    seg_speed = dist / dt
                else:
                    seg_speed = np.nan
                segs.append((r["__ts"], seg_speed))
            prev = r
        # iterate original rows in df for this bus in time order and compute rolling means using deque
        rows_idx = df[(df[bus_id_col] == bid)].sort_values("__ts").index
        dq_1 = deque()
        dq_10 = deque()
        for rid in rows_idx:
            ts = df.at[rid, "__ts"]
            # we want segments with end_ts < = ts (segments that end at or before current log)
            # push new segments from segs whose end_ts <= ts
            # since segs are sorted by end_ts, we can advance pointer — simple approach: filter each time (ok for moderate sizes)
            valid_segs_1 = [s for s in segs if s[0] <= ts and (ts - s[0]).total_seconds() <= 60]
            valid_segs_10 = [s for s in segs if s[0] <= ts and (ts - s[0]).total_seconds() <= 600]
            speeds1 = [s[1] for s in valid_segs_1 if not pd.isna(s[1])]
            speeds10 = [s[1] for s in valid_segs_10 if not pd.isna(s[1])]
            df.at[rid, "speed_1min_mps"] = np.nanmean(speeds1) if speeds1 else np.nan
            df.at[rid, "speed_10min_mps"] = np.nanmean(speeds10) if speeds10 else np.nan

# final features + target
features = df[[
    log_id_col_in_bus if log_id_col_in_bus in df.columns else None,
    bus_id_col, stop_col, "__ts", "pred_stop", "pred_eta_s", "actual_arrival_ts", "actual_travel_s", "eta_error_s",
    "pax_load", "hour", "time_of_day_s", "speed_prev_mps", "speed_1min_mps", "speed_10min_mps"
]]
# drop possible None column
features = features.loc[:, features.columns.notnull()]

print("Features prepared:", features.shape)
features.head()

  eta_next = eta.groupby(eta_log_logid).apply(pick_next_stop_for_log).reset_index().rename(columns={eta_log_logid:"log_id"})


KeyboardInterrupt: 

In [16]:
def haversine_meters(lat1, lon1, lat2, lon2):
    R = 6371000.0
    phi1, phi2 = math.radians(lat1), math.radians(lat2)
    dphi = math.radians(lat2 - lat1)
    dlambda = math.radians(lon2 - lon1)
    a = math.sin(dphi/2)**2 + math.cos(phi1)*math.cos(phi2)*math.sin(dlambda/2)**2
    return R * 2 * math.asin(math.sqrt(a))

In [17]:
import sqlite3
import pandas as pd
import numpy as np
import math
from collections import deque
import re

def find_col(df, candidates):
    for c in candidates:
        if c in df.columns:
            return c
    cols = {col.lower(): col for col in df.columns}
    for c in candidates:
        if c.lower() in cols:
            return cols[c.lower()]
    return None

def parse_pax(v):
    if pd.isna(v): return np.nan
    if isinstance(v,(int,float)): return float(v)
    s = str(v).strip()
    m = re.search(r'(-?\d+(?:\.\d+)?)', s)
    return float(m.group(1)) if m else np.nan

# read DB
with sqlite3.connect(db_file) as con:
    bus = pd.read_sql_query("SELECT * FROM Bus_Logs", con)
    eta = pd.read_sql_query("SELECT * FROM ETA_Logs", con)

# detect columns
bus_id_col = find_col(bus, ["bus_id"])
time_col = find_col(bus, ["timestamp"])
lat_col = find_col(bus, ["latitude"])
lon_col = find_col(bus, ["longitude"])
# ETA
eta_log_logid = find_col(eta, ["log_id"])
eta_log_stop = find_col(eta, ["stop_id"])
eta_seconds_col = find_col(eta, ["eta_seconds"])
eta_sort_col = find_col(eta, ["sort_order"])
eta_pax_col = find_col(eta, ["pax_load"])

if bus_id_col is None or time_col is None:
    raise RuntimeError("Missing bus id or timestamp column")

# normalize timestamps (auto-detect unit)
bus = bus.copy()
ts_vals = pd.to_numeric(bus[time_col], errors="coerce").dropna()
bus["__ts"] = pd.to_datetime(bus[time_col].astype(float), unit='s', errors="coerce")

# coerce lat/lon
if lat_col: bus[lat_col] = pd.to_numeric(bus[lat_col], errors="coerce")
if lon_col: bus[lon_col] = pd.to_numeric(bus[lon_col], errors="coerce")

# prepare eta_next -> merge
if eta_log_logid is None:
    raise RuntimeError("ETA_Logs missing log_id column")

def pick_next_stop_for_log(g):
    if eta_sort_col and 0 in g.get(eta_sort_col).values:
        row = g[g[eta_sort_col] == 0].iloc[0]
    else:
        if eta_seconds_col in g.columns:
            row = g.loc[g[eta_seconds_col].astype(float).idxmin()]
        else:
            row = g.iloc[0]
    return pd.Series({
        "pred_stop": row.get(eta_log_stop),
        "pred_eta_s": (row.get(eta_seconds_col) if eta_seconds_col in g.columns else None),
        "pred_pax": (row.get(eta_pax_col) if eta_pax_col in g.columns else None)
    })
print('A')
eta_next = eta.groupby(eta_log_logid).apply(pick_next_stop_for_log).reset_index().rename(columns={eta_log_logid:"log_id"})
print('B')
log_id_col_in_bus = find_col(bus, ["log_id","logid","logId"]) or ("log_id" if "log_id" in bus.columns else None)
if log_id_col_in_bus and log_id_col_in_bus in bus.columns:
    df = bus.merge(eta_next, how="left", left_on=log_id_col_in_bus, right_on="log_id", suffixes=("","_eta"))
    print('C')
else:
    df = bus.reset_index().merge(eta_next.reset_index(), left_on="index", right_on="index", how="left")
    print('D')

# groups for lookups and arrival finder
bus_sorted = bus.sort_values([bus_id_col, "__ts"]).reset_index(drop=True)
groups = {bid: grp.reset_index(drop=True) for bid, grp in bus_sorted.groupby(bus_id_col)}
print('E')
MAX_DELAY_S = 90*60

A


  eta_next = eta.groupby(eta_log_logid).apply(pick_next_stop_for_log).reset_index().rename(columns={eta_log_logid:"log_id"})


B
C
E


In [15]:
def find_actual_arrival(row):
    bid = row.get(bus_id_col)
    if pd.isna(row.get("__ts")) or bid is None:
        return pd.NaT
    pred_stop = row.get("pred_stop")
    if pred_stop is None or pd.isna(pred_stop):
        return pd.NaT
    start_ts = row["__ts"]
    grp = groups.get(bid)
    if grp is None or grp.empty: return pd.NaT
    later = grp[grp["__ts"] > start_ts]
    if later.empty: return pd.NaT
    # try arrived flag then fallback by stop match
    arrived_col = find_col(grp, ["arrived","arrived_at","is_arrived","arrived_bool"])
    if arrived_col and arrived_col in later.columns:
        try:
            cand = later[(later.get(pred_stop) == later[pred_stop]) if False else (later[stop_col] == pred_stop) & (later[arrived_col].astype(bool))]
        except Exception:
            cand = later[later[stop_col] == pred_stop]
        if not cand.empty:
            t = cand.iloc[0]["__ts"]
            if (t - start_ts).total_seconds() <= MAX_DELAY_S: return t
    cand = later[later[stop_col] == pred_stop] if stop_col in later.columns else pd.DataFrame()
    if not cand.empty:
        t = cand.iloc[0]["__ts"]
        if (t - start_ts).total_seconds() <= MAX_DELAY_S:
            return t
    return pd.NaT

In [10]:
import numpy as np, pandas as pd, math

# show samples
print("time sample:", bus[time_col].head().tolist())
print("lat sample:", bus[lat_col].head().tolist() if lat_col in bus.columns else "no lat_col")
print("lon sample:", bus[lon_col].head().tolist() if lon_col in bus.columns else "no lon_col")

# detect numeric timestamp scale if possible
ts_vals = pd.to_numeric(bus[time_col], errors="coerce").dropna()
detected_unit = None
if not ts_vals.empty:
    mx = ts_vals.max()
    if mx > 1e15:
        detected_unit = "ns"
    elif mx > 1e11:
        detected_unit = "ms"
    elif mx > 1e9:
        detected_unit = "s"
print("detected timestamp unit:", detected_unit)

# reparse timestamps using detected unit if any (fallback: let pandas infer)
if detected_unit:
    bus["__ts"] = pd.to_datetime(bus[time_col].astype(float), unit=detected_unit, errors="coerce")
else:
    bus["__ts"] = pd.to_datetime(bus[time_col], errors="coerce")

# coerce lat/lon to numeric
bus[lat_col] = pd.to_numeric(bus[lat_col], errors="coerce")
bus[lon_col] = pd.to_numeric(bus[lon_col], errors="coerce")

# report nulls / duplicates / monotonicity for a sample bus
sample_bid = bus[bus_id_col].dropna().iloc[0]
grp = bus[bus[bus_id_col] == sample_bid].sort_values("__ts")
print(f"sample bus {sample_bid}: rows={len(grp)}, null lat={grp[lat_col].isna().sum()}, null lon={grp[lon_col].isna().sum()}")
print("first times:", grp["__ts"].head().tolist())
print("time diffs (seconds):", grp["__ts"].diff().dt.total_seconds().head().tolist())

# rebuild segs_map (same logic as before) after fixes
bus_sorted = bus.sort_values([bus_id_col, "__ts"]).reset_index(drop=True)
segs_map = {}
for bid, grp in bus_sorted.groupby(bus_id_col):
    seq = grp.sort_values("__ts").reset_index(drop=True)
    segs = []
    prev = None
    for i, r in seq.iterrows():
        if prev is not None:
            dt = (r["__ts"] - prev["__ts"]).total_seconds() if pd.notna(r["__ts"]) and pd.notna(prev["__ts"]) else np.nan
            if pd.notna(dt) and dt > 0 and pd.notna(prev[lat_col]) and pd.notna(prev[lon_col]) and pd.notna(r[lat_col]) and pd.notna(r[lon_col]):
                # haversine
                phi1, phi2 = math.radians(prev[lat_col]), math.radians(r[lat_col])
                dphi = math.radians(r[lat_col] - prev[lat_col])
                dlambda = math.radians(r[lon_col] - prev[lon_col])
                a = math.sin(dphi/2)**2 + math.cos(phi1)*math.cos(phi2)*math.sin(dlambda/2)**2
                dist = 6371000.0 * 2 * math.asin(math.sqrt(a))
                seg_speed = dist / dt
            else:
                seg_speed = np.nan
            segs.append((r["__ts"], seg_speed))
        prev = r
    segs_map[bid] = segs

# peek at one bus
some = next(iter(segs_map.keys()))
print("example segs for", some, "->", segs_map[some][:10])

time sample: [1762715184, 1762715184, 1762715184, 1762715184, 1762715185]
lat sample: [40.4919366, 40.523222, 40.4924533, 40.5178944, 40.517338783]
lon sample: [-74.442982, -74.4418223, -74.4429633, -74.4614939, -74.43026205]
detected timestamp unit: s
sample bus 13211: rows=39112, null lat=0, null lon=0
first times: [Timestamp('2025-11-09 19:06:24'), Timestamp('2025-11-09 19:06:34'), Timestamp('2025-11-09 19:06:35'), Timestamp('2025-11-09 19:06:36'), Timestamp('2025-11-09 19:06:37')]
time diffs (seconds): [nan, 10.0, 1.0, 1.0, 1.0]
example segs for 4850 -> [(Timestamp('2025-11-10 12:18:06'), 0.04214881446563141), (Timestamp('2025-11-10 12:18:16'), 0.0716281724044955), (Timestamp('2025-11-10 12:18:26'), 0.053700502751985325), (Timestamp('2025-11-10 12:18:36'), 2.2323890262890087), (Timestamp('2025-11-10 12:18:46'), 2.211762756866596), (Timestamp('2025-11-10 12:18:56'), 0.9947906977109016), (Timestamp('2025-11-10 12:19:17'), 0.5184393712872131), (Timestamp('2025-11-10 12:19:23'), 14.748

In [11]:
segs_map

{4850: [(Timestamp('2025-11-10 12:18:06'), 0.04214881446563141),
  (Timestamp('2025-11-10 12:18:16'), 0.0716281724044955),
  (Timestamp('2025-11-10 12:18:26'), 0.053700502751985325),
  (Timestamp('2025-11-10 12:18:36'), 2.2323890262890087),
  (Timestamp('2025-11-10 12:18:46'), 2.211762756866596),
  (Timestamp('2025-11-10 12:18:56'), 0.9947906977109016),
  (Timestamp('2025-11-10 12:19:17'), 0.5184393712872131),
  (Timestamp('2025-11-10 12:19:23'), 14.748015757264993),
  (Timestamp('2025-11-10 12:19:34'), 5.094129993562484),
  (Timestamp('2025-11-10 12:19:43'), 7.535762525108335),
  (Timestamp('2025-11-10 12:19:54'), 9.191277880768162),
  (Timestamp('2025-11-10 12:20:03'), 2.3801758938937834),
  (Timestamp('2025-11-10 12:20:13'), 5.2839923143399785),
  (Timestamp('2025-11-10 12:20:24'), 1.1983890234611179),
  (Timestamp('2025-11-10 12:20:34'), 2.104232549542811),
  (Timestamp('2025-11-10 12:20:43'), 10.836728559929329),
  (Timestamp('2025-11-10 12:20:54'), 5.482704635656892),
  (Timestam

In [None]:
out_path = "features_stream.csv"
import os
first_write = not os.path.exists(out_path)

total = len(df)
written = 0
for i, row in df.iterrows():
    # progress
    if (i % 100) == 0:
        print(f"Processing row {i+1}/{total}", end="\r", flush=True)

    # actual arrival (uses find_actual_arrival defined earlier)
    actual_ts = find_actual_arrival(row)

    # predictions / target
    pred_eta_s = pd.to_numeric(row.get("pred_eta_s"), errors="coerce")
    actual_travel_s = np.nan
    if pd.notna(actual_ts) and pd.notna(row.get("__ts")):
        actual_travel_s = (pd.to_datetime(actual_ts) - pd.to_datetime(row["__ts"])).total_seconds()
    eta_error_s = actual_travel_s - pred_eta_s if (not np.isnan(actual_travel_s) and not np.isnan(pred_eta_s)) else np.nan

    # pax_load: prefer pred_pax then bus-level fallbacks
    raw_pax = row.get("pred_pax")
    if pd.isna(raw_pax):
        for c in ["paxLoadS", "pax_load", "paxLoad", "pax"]:
            if c in df.columns and not df[c].isna().all():
                raw_pax = row.get(c)
                break
    pax = parse_pax(raw_pax)

    # time-of-day
    ts = row.get("__ts")
    hour = np.nan
    minute = np.nan
    time_of_day_s = np.nan
    if pd.notna(ts):
        ts = pd.to_datetime(ts)
        hour = ts.hour
        minute = ts.minute
        time_of_day_s = hour * 3600 + minute * 60 + ts.second

    # speed_prev: previous segment speed if dt <= 30s
    speed_prev = np.nan
    bid = row.get(bus_id_col)
    if bid in groups and pd.notna(ts) and lat_col and lon_col:
        grp = groups[bid]
        prev_rows = grp[grp["__ts"] < row["__ts"]]
        if not prev_rows.empty:
            prev_row = prev_rows.iloc[-1]
            dt = (row["__ts"] - prev_row["__ts"]).total_seconds()
            if 0 < dt <= 30:
                dist = haversine_meters(prev_row[lat_col], prev_row[lon_col], row[lat_col], row[lon_col])
                speed_prev = dist / dt

    # rolling speeds from segs_map (1min / 10min)
    speed_1min = np.nan
    speed_10min = np.nan
    if bid in segs_map and pd.notna(ts):
        segs = segs_map[bid]
        valid1 = [s[1] for s in segs if s[0] <= row["__ts"] and (row["__ts"] - s[0]).total_seconds() <= 60 and not pd.isna(s[1])]
        valid10 = [s[1] for s in segs if s[0] <= row["__ts"] and (row["__ts"] - s[0]).total_seconds() <= 600 and not pd.isna(s[1])]
        if valid1:
            speed_1min = float(np.nanmean(valid1))
        if valid10:
            speed_10min = float(np.nanmean(valid10))

    # compose record matching 'features' columns
    rec = {}
    if (log_id_col_in_bus in df.columns):
        rec[log_id_col_in_bus] = row.get(log_id_col_in_bus)
    rec[bus_id_col] = bid
    rec[stop_col] = row.get(stop_col)
    rec["__ts"] = row.get("__ts")
    rec["pred_stop"] = row.get("pred_stop")
    rec["pred_eta_s"] = pred_eta_s
    rec["actual_arrival_ts"] = pd.to_datetime(actual_ts) if pd.notna(actual_ts) else pd.NaT
    rec["actual_travel_s"] = actual_travel_s
    rec["eta_error_s"] = eta_error_s
    rec["pax_load"] = pax
    rec["hour"] = hour
    rec["time_of_day_s"] = time_of_day_s
    rec["speed_prev_mps"] = speed_prev
    rec["speed_1min_mps"] = speed_1min
    rec["speed_10min_mps"] = speed_10min

    # append to CSV immediately
    pd.DataFrame([rec]).to_csv(out_path, mode="a", header=first_write, index=False)
    first_write = False
    written += 1

print()  # end progress line
print(f"Wrote {written} rows to {out_path}")

In [9]:
import sqlite3
import pandas as pd
import numpy as np
import math
from typing import Dict

# Fixed schema column names (per provided schema)
DB_FILE = globals().get("db_file", "rutgers_buses.db")
BUS_TBL = "Bus_Logs"
ETA_TBL = "ETA_Logs"

# column names from schema
BUS_LOG_ID = "log_id"
BUS_TS = "timestamp"
BUS_BUS_ID = "bus_id"
BUS_LAT = "latitude"
BUS_LON = "longitude"
BUS_PAX = "pax_load"
BUS_ARRIVED_STOP = "arrived_stop_id"

ETA_LOG_ID = "log_id"
ETA_STOP = "stop_id"
ETA_SECONDS = "eta_seconds"
ETA_SORT = "sort_order"
# config
MAX_DELAY_S = 60 * 60  # 1 hour
ETA_CHUNK = 200_000

def haversine_meters_vec(lat1, lon1, lat2, lon2):
    R = 6371000.0
    phi1 = np.radians(lat1)
    phi2 = np.radians(lat2)
    dphi = np.radians(lat2 - lat1)
    dlambda = np.radians(lon2 - lon1)
    a = np.sin(dphi/2.0)**2 + np.cos(phi1) * np.cos(phi2) * np.sin(dlambda/2.0)**2
    return R * 2 * np.arcsin(np.sqrt(a))

with sqlite3.connect(DB_FILE) as con:
    bus = pd.read_sql_query(f"SELECT * FROM {BUS_TBL}", con)
    # inspect ETA columns via pragma to confirm presence (will select explicitly later)
    eta_cols = pd.read_sql_query(f"PRAGMA table_info('{ETA_TBL}')", con)['name'].tolist()

# enforce expected columns exist
required_bus_cols = {BUS_LOG_ID, BUS_TS, BUS_BUS_ID, BUS_LAT, BUS_LON, BUS_PAX, BUS_ARRIVED_STOP}
missing = required_bus_cols - set(bus.columns)
if missing:
    raise RuntimeError(f"Bus_Logs missing expected columns: {missing}")

# normalize/parse types
bus = bus.copy()
# timestamp numeric unit detection (same heuristic as before)
_ts_vals = pd.to_numeric(bus[BUS_TS], errors="coerce").dropna()
detected_unit = None
if len(_ts_vals):
    mx = _ts_vals.max()
    if mx > 1e15:
        detected_unit = "ns"
    elif mx > 1e11:
        detected_unit = "ms"
    elif mx > 1e9:
        detected_unit = "s"
if detected_unit:
    bus["__ts"] = pd.to_datetime(bus[BUS_TS].astype(float), unit=detected_unit, errors="coerce")
else:
    bus["__ts"] = pd.to_datetime(bus[BUS_TS], errors="coerce")

# coerce lat/lon and pax
bus[BUS_LAT] = pd.to_numeric(bus[BUS_LAT], errors="coerce")
bus[BUS_LON] = pd.to_numeric(bus[BUS_LON], errors="coerce")
bus[BUS_PAX] = pd.to_numeric(bus[BUS_PAX], errors="coerce")

# quick lookup by log_id
if BUS_LOG_ID not in bus.columns:
    raise RuntimeError("Bus_Logs does not contain log_id; schema mismatch.")
bus_by_log = bus.set_index(BUS_LOG_ID, drop=False)

# prepare bus stop events from arrived_stop_id
bus_stop_events = bus.loc[bus[BUS_ARRIVED_STOP].notna(),
                          [BUS_BUS_ID, "__ts", BUS_ARRIVED_STOP, BUS_LAT, BUS_LON]].copy()
bus_stop_events = bus_stop_events.rename(columns={BUS_ARRIVED_STOP: "event_stop_id", "__ts": "event_ts"})

# build per-bus segs_map for speeds
bus_sorted = bus.sort_values([BUS_BUS_ID, "__ts"]).reset_index(drop=True)
segs_map: Dict[str, Dict[str, np.ndarray]] = {}
for bid, grp in bus_sorted.groupby(BUS_BUS_ID):
    seq = grp.sort_values("__ts")
    ts = seq["__ts"].values.astype("datetime64[ns]")
    lat = seq[BUS_LAT].values if BUS_LAT in seq.columns else None
    lon = seq[BUS_LON].values if BUS_LON in seq.columns else None
    if len(ts) < 2 or lat is None or lon is None:
        segs_map[bid] = {"ts": np.array([], dtype="datetime64[ns]"),
                         "speed": np.array([], dtype=float),
                         "dt": np.array([], dtype=float)}
        continue
    dt = (ts[1:] - ts[:-1]) / np.timedelta64(1, "s")
    dist = haversine_meters_vec(lat[:-1].astype(float), lon[:-1].astype(float),
                                lat[1:].astype(float), lon[1:].astype(float))
    speed = dist / dt
    segs_map[bid] = {"ts": ts[1:], "speed": speed, "dt": dt}

def compute_speed_features_for_row(bid, start_ts):
    if bid not in segs_map:
        return np.nan, np.nan
    rec = segs_map[bid]
    seg_ts = rec["ts"]
    if seg_ts.size == 0:
        return np.nan, np.nan
    st = np.datetime64(pd.to_datetime(start_ts).to_datetime64())
    idx = np.searchsorted(seg_ts, st) - 1
    speed_prev = np.nan
    if idx >= 0:
        if rec["dt"][idx] > 0 and rec["dt"][idx] <= 30:
            speed_prev = float(rec["speed"][idx])
    lower = st - np.timedelta64(60, "s")
    mask = (seg_ts > lower) & (seg_ts <= st)
    if mask.any():
        speeds = rec["speed"][mask]
        speeds = speeds[np.isfinite(speeds)]
        speed_1min = float(np.nanmean(speeds)) if speeds.size else np.nan
    else:
        speed_1min = np.nan
    return speed_prev, speed_1min

# process ETA_Logs in chunks (explicit column names from schema)
features_parts = []
with sqlite3.connect(DB_FILE) as con:
    select_cols = [f"{ETA_LOG_ID} AS log_id", f"{ETA_STOP} AS stop_id",
                   f"{ETA_SECONDS} AS eta_seconds", f"{ETA_SORT} AS sort_order"]
    sql = f"SELECT {', '.join(select_cols)} FROM {ETA_TBL}"
    total_eta = pd.read_sql_query("SELECT COUNT(*) AS c FROM ETA_Logs", con).iloc[0, 0]
    total_chunks = max(1, math.ceil(total_eta / ETA_CHUNK))
    chunk_iter = pd.read_sql_query(sql, con, chunksize=ETA_CHUNK)

    base_idx = 0
    processed = 0
    chunk_idx = 0
    for chunk in chunk_iter:
        chunk_idx += 1
        chunk = chunk.reset_index(drop=True)
        chunk["eta_row_id"] = np.arange(len(chunk)) + base_idx
        base_idx += len(chunk)

        # print progress (a / b)
        print(f"[ETA] chunk {chunk_idx}/{total_chunks} — rows {processed + 1} / {total_eta}", flush=True)

        # join on log_id (foreign key)
        bus_by_log_view = bus_by_log.reset_index(drop=True)[[BUS_LOG_ID, BUS_BUS_ID, "__ts", BUS_LAT, BUS_LON, BUS_PAX, BUS_ARRIVED_STOP]]
        merged = chunk.merge(
            bus_by_log_view,
            left_on="log_id",
            right_on=BUS_LOG_ID,
            how="left",
            suffixes=("_eta", "_bus")
        ).rename(columns={BUS_BUS_ID: "bus_id", "__ts": "start_ts", BUS_PAX: "bus_pax"})

        # drop rows without originating bus log
        merged_valid = merged[merged["start_ts"].notna()].copy()
        if BUS_ARRIVED_STOP in merged_valid.columns:
            merged_valid = merged_valid[ merged_valid[BUS_ARRIVED_STOP].isna() | (merged_valid[BUS_ARRIVED_STOP] != merged_valid["stop_id"]) ].copy()
        if merged_valid.empty:
            processed += len(chunk)
            continue

        # actual arrival: restrict bus_stop_events to the buses in this chunk
        bus_events_map = {}
        for bid, g in bus_stop_events.groupby(BUS_BUS_ID):
            g2 = g.sort_values("event_ts")[["event_ts", "event_stop_id"]].dropna(subset=["event_ts"]).reset_index(drop=True)
            if g2.empty: 
                continue
            bus_events_map[bid] = {
                "ts": g2["event_ts"].values.astype("datetime64[ns]"),
                "stop": g2["event_stop_id"].astype(object).values
            }

        def lookup_arrival_ts(bid, stop_id, start_ts):
            if bid not in bus_events_map or pd.isna(stop_id) or pd.isna(start_ts):
                return pd.NaT
            arr = bus_events_map[bid]
            try:
                st64 = np.datetime64(pd.to_datetime(start_ts).to_datetime64())
            except Exception:
                return pd.NaT
            idx = np.searchsorted(arr["ts"], st64)
            # scan forward until matching stop found or beyond MAX_DELAY_S
            while idx < arr["ts"].size:
                if arr["stop"][idx] == stop_id:
                    delta_s = (arr["ts"][idx] - st64) / np.timedelta64(1, "s")
                    if 0 < delta_s <= MAX_DELAY_S:
                        return pd.to_datetime(arr["ts"][idx])
                    return pd.NaT
                idx += 1
            return pd.NaT
        # run lookup per-row (chunk-sized, avoids huge intermediate frames)
        merged_valid["actual_arrival_ts"] = merged_valid.apply(
            lambda r: lookup_arrival_ts(r["bus_id"], r["stop_id"], r["start_ts"]),
            axis=1
        )

        # compute eta_error / target
        merged_valid["pred_eta_s"] = pd.to_numeric(merged_valid["eta_seconds"], errors="coerce")
        merged_valid["actual_travel_s"] = (pd.to_datetime(merged_valid["actual_arrival_ts"]) - pd.to_datetime(merged_valid["start_ts"])).dt.total_seconds()
        merged_valid["eta_error_s"] = np.where(merged_valid["actual_travel_s"].notna() & merged_valid["pred_eta_s"].notna(),
                                               merged_valid["actual_travel_s"] - merged_valid["pred_eta_s"],
                                               np.nan)

        # pax_load: prefer ETA (none in schema) else bus.pax_load (bus_pax)
        merged_valid["pax_load"] = pd.to_numeric(merged_valid.get("eta_pax"), errors="coerce")
        # fallback to bus_pax (from Bus_Logs)
        merged_valid.loc[merged_valid["pax_load"].isna(), "pax_load"] = merged_valid.loc[merged_valid["pax_load"].isna(), "bus_pax"]

        # time-of-day
        merged_valid["start_ts"] = pd.to_datetime(merged_valid["start_ts"])
        merged_valid["hour"] = merged_valid["start_ts"].dt.hour
        merged_valid["time_of_day_s"] = merged_valid["hour"] * 3600 + merged_valid["start_ts"].dt.minute * 60 + merged_valid["start_ts"].dt.second

        # speed features via segs_map lookups
        sp_prev_list = []
        sp_1min_list = []
        for _, row in merged_valid.iterrows():
            bid = row["bus_id"]
            st = row["start_ts"]
            sp_prev, sp_1min = compute_speed_features_for_row(bid, st)
            sp_prev_list.append(sp_prev)
            sp_1min_list.append(sp_1min)
        merged_valid["speed_prev_mps"] = sp_prev_list
        merged_valid["speed_1min_mps"] = sp_1min_list

        out = merged_valid[[
            "eta_row_id", "log_id", "bus_id", "stop_id", "sort_order", "eta_seconds", "pred_eta_s",
            "actual_arrival_ts", "actual_travel_s", "eta_error_s",
            "pax_load", "hour", "time_of_day_s", "speed_prev_mps", "speed_1min_mps"
        ]].copy()
        features_parts.append(out)
        processed += len(chunk)
        print(f"[ETA] completed {processed} / {total_eta} rows ({chunk_idx}/{total_chunks} chunks)", flush=True)

if features_parts:
    features = pd.concat(features_parts, ignore_index=True)
else:
    features = pd.DataFrame(columns=["eta_row_id","log_id","bus_id","stop_id","sort_order","eta_seconds","pred_eta_s",
                                     "actual_arrival_ts","actual_travel_s","eta_error_s","pax_load","hour",
                                     "time_of_day_s","speed_prev_mps","speed_1min_mps"])

features = features.sort_values(["bus_id","eta_row_id"]).reset_index(drop=True)
print("Prepared features rows:", len(features))

  speed = dist / dt
  speed = dist / dt


[ETA] chunk 1/56 — rows 1 / 11153406
[ETA] completed 200000 / 11153406 rows (1/56 chunks)
[ETA] chunk 2/56 — rows 200001 / 11153406
[ETA] completed 400000 / 11153406 rows (2/56 chunks)
[ETA] chunk 3/56 — rows 400001 / 11153406
[ETA] completed 600000 / 11153406 rows (3/56 chunks)
[ETA] chunk 4/56 — rows 600001 / 11153406
[ETA] completed 800000 / 11153406 rows (4/56 chunks)
[ETA] chunk 5/56 — rows 800001 / 11153406
[ETA] completed 1000000 / 11153406 rows (5/56 chunks)
[ETA] chunk 6/56 — rows 1000001 / 11153406
[ETA] completed 1200000 / 11153406 rows (6/56 chunks)
[ETA] chunk 7/56 — rows 1200001 / 11153406
[ETA] completed 1400000 / 11153406 rows (7/56 chunks)
[ETA] chunk 8/56 — rows 1400001 / 11153406
[ETA] completed 1600000 / 11153406 rows (8/56 chunks)
[ETA] chunk 9/56 — rows 1600001 / 11153406
[ETA] completed 1800000 / 11153406 rows (9/56 chunks)
[ETA] chunk 10/56 — rows 1800001 / 11153406
[ETA] completed 2000000 / 11153406 rows (10/56 chunks)
[ETA] chunk 11/56 — rows 2000001 / 1115340

In [1]:
# ...existing code...
import sqlite3
import pandas as pd
import numpy as np
import math
from typing import Dict

# Fixed schema column names (per provided schema)
DB_FILE = globals().get("db_file", "rutgers_buses.db")
BUS_TBL = "Bus_Logs"
ETA_TBL = "ETA_Logs"

# column names from schema
BUS_LOG_ID = "log_id"
BUS_TS = "timestamp"
BUS_BUS_ID = "bus_id"
BUS_LAT = "latitude"
BUS_LON = "longitude"
BUS_PAX = "pax_load"
BUS_ARRIVED_STOP = "arrived_stop_id"

ETA_LOG_ID = "log_id"
ETA_STOP = "stop_id"
ETA_SECONDS = "eta_seconds"
ETA_SORT = "sort_order"
# config
MAX_DELAY_S = 60 * 60  # 1 hour
ETA_CHUNK = 200000

def haversine_meters_vec(lat1, lon1, lat2, lon2):
    R = 6371000.0
    phi1 = np.radians(lat1)
    phi2 = np.radians(lat2)
    dphi = np.radians(lat2 - lat1)
    dlambda = np.radians(lon2 - lon1)
    a = np.sin(dphi/2.0)**2 + np.cos(phi1) * np.cos(phi2) * np.sin(dlambda/2.0)**2
    return R * 2 * np.arcsin(np.sqrt(a))

with sqlite3.connect(DB_FILE) as con:
    bus = pd.read_sql_query(f"SELECT * FROM {BUS_TBL}", con)
    # inspect ETA columns via pragma to confirm presence (will select explicitly later)
    eta_cols = pd.read_sql_query(f"PRAGMA table_info('{ETA_TBL}')", con)['name'].tolist()

# enforce expected columns exist
required_bus_cols = {BUS_LOG_ID, BUS_TS, BUS_BUS_ID, BUS_LAT, BUS_LON, BUS_PAX, BUS_ARRIVED_STOP}
missing = required_bus_cols - set(bus.columns)
if missing:
    raise RuntimeError(f"Bus_Logs missing expected columns: {missing}")

# detect route-id column in Bus_Logs (if present) so we can map to Route_Stops
route_col_in_bus = next((c for c in bus.columns if "route" in c.lower() and "id" in c.lower()), None)

# normalize/parse types
bus = bus.copy()
# timestamp numeric unit detection (same heuristic as before)
_ts_vals = pd.to_numeric(bus[BUS_TS], errors="coerce").dropna()
detected_unit = None
if len(_ts_vals):
    mx = _ts_vals.max()
    if mx > 1e15:
        detected_unit = "ns"
    elif mx > 1e11:
        detected_unit = "ms"
    elif mx > 1e9:
        detected_unit = "s"
if detected_unit:
    bus["__ts"] = pd.to_datetime(bus[BUS_TS].astype(float), unit=detected_unit, errors="coerce")
else:
    bus["__ts"] = pd.to_datetime(bus[BUS_TS], errors="coerce")

# coerce lat/lon and pax
bus[BUS_LAT] = pd.to_numeric(bus[BUS_LAT], errors="coerce")
bus[BUS_LON] = pd.to_numeric(bus[BUS_LON], errors="coerce")
bus[BUS_PAX] = pd.to_numeric(bus[BUS_PAX], errors="coerce")

# quick lookup by log_id
if BUS_LOG_ID not in bus.columns:
    raise RuntimeError("Bus_Logs does not contain log_id; schema mismatch.")
bus_by_log = bus.set_index(BUS_LOG_ID, drop=False)

# prepare bus stop events from arrived_stop_id
bus_stop_events = bus.loc[bus[BUS_ARRIVED_STOP].notna(),
                          [BUS_BUS_ID, "__ts", BUS_ARRIVED_STOP, BUS_LAT, BUS_LON]].copy()
bus_stop_events = bus_stop_events.rename(columns={BUS_ARRIVED_STOP: "event_stop_id", "__ts": "event_ts"})

# build per-bus segs_map for speeds
bus_sorted = bus.sort_values([BUS_BUS_ID, "__ts"]).reset_index(drop=True)
segs_map: Dict[str, Dict[str, np.ndarray]] = {}
for bid, grp in bus_sorted.groupby(BUS_BUS_ID):
    seq = grp.sort_values("__ts")
    ts = seq["__ts"].values.astype("datetime64[ns]")
    lat = seq[BUS_LAT].values if BUS_LAT in seq.columns else None
    lon = seq[BUS_LON].values if BUS_LON in seq.columns else None
    if len(ts) < 2 or lat is None or lon is None:
        segs_map[bid] = {"ts": np.array([], dtype="datetime64[ns]"),
                         "speed": np.array([], dtype=float),
                         "dt": np.array([], dtype=float)}
        continue
    dt = (ts[1:] - ts[:-1]) / np.timedelta64(1, "s")
    dist = haversine_meters_vec(lat[:-1].astype(float), lon[:-1].astype(float),
                                lat[1:].astype(float), lon[1:].astype(float))
    speed = dist / dt
    segs_map[bid] = {"ts": ts[1:], "speed": speed, "dt": dt}

def compute_speed_features_for_row(bid, start_ts):
    if bid not in segs_map:
        return np.nan, np.nan
    rec = segs_map[bid]
    seg_ts = rec["ts"]
    if seg_ts.size == 0:
        return np.nan, np.nan
    st = np.datetime64(pd.to_datetime(start_ts).to_datetime64())
    idx = np.searchsorted(seg_ts, st) - 1
    speed_prev = np.nan
    if idx >= 0:
        if rec["dt"][idx] > 0 and rec["dt"][idx] <= 30:
            speed_prev = float(rec["speed"][idx])
    lower = st - np.timedelta64(60, "s")
    mask = (seg_ts > lower) & (seg_ts <= st)
    if mask.any():
        speeds = rec["speed"][mask]
        speeds = speeds[np.isfinite(speeds)]
        speed_1min = float(np.nanmean(speeds)) if speeds.size else np.nan
    else:
        speed_1min = np.nan
    return speed_prev, speed_1min

# load Route_Stops mapping once for next-stop detection (if table exists)
route_next_map = {}
try:
    with sqlite3.connect(DB_FILE) as _con:
        rs = pd.read_sql_query("SELECT * FROM Route_Stops", _con)
except Exception:
    try:
        with sqlite3.connect(DB_FILE) as _con:
            rs = pd.read_sql_query("SELECT * FROM route_stops", _con)
    except Exception:
        rs = pd.DataFrame(columns=["route_id", "stop_id", "position_on_route"])

# guess column names in route_stops (prioritize exact names from schema)
rs_route_col = next((c for c in rs.columns if c.lower() in ("route_id_from_stop","route_myid","route_id","route")), None)
if rs_route_col is None:
    rs_route_col = next((c for c in rs.columns if "route" in c.lower() and "id" in c.lower()), None)

rs_stop_col = next((c for c in rs.columns if c.lower() in ("stop_id","stop_myid","stop")), None)
if rs_stop_col is None:
    rs_stop_col = next((c for c in rs.columns if "stop" in c.lower() and "id" in c.lower()), None)

rs_pos_col  = next((c for c in rs.columns if c.lower() in ("position_on_route","position","order","stop_sequence")), None)
if rs_pos_col is None:
    rs_pos_col = next((c for c in rs.columns if "position" in c.lower() or "order" in c.lower() or "seq" in c.lower()), None)

# build mapping with consistent string keys to avoid type mismatches
if not rs.empty and rs_route_col and rs_stop_col and rs_pos_col:
    # coerce to strings to ensure comparisons succeed across tables
    rs[rs_route_col] = rs[rs_route_col].astype(str)
    rs[rs_stop_col] = rs[rs_stop_col].astype(str)
    rs = rs.sort_values([rs_route_col, rs_pos_col]).reset_index(drop=True)
    for rid, g in rs.groupby(rs_route_col):
        g2 = g.sort_values(rs_pos_col).reset_index(drop=True)
        stops = g2[rs_stop_col].astype(str).tolist()
        if not stops:
            continue
        nxt = {}
        L = len(stops)
        if L == 1:
            nxt[stops[0]] = stops[0]
        else:
            for i in range(L):
                if i == L - 1:
                    nxt[stops[i]] = stops[1]  # last -> second element
                else:
                   nxt[stops[i]] = stops[i + 1]            # store route id as string key
        route_next_map[str(rid)] = {"stops": stops, "next": nxt, "max_pos": len(stops) - 1}
if len(route_next_map):
    # show a few samples
    import itertools
    for k,v in route_next_map.items():
        print("route:", k, "sample next mappings:", list(v["next"].items()))
def find_next_stop_id(route_id, stop_id):
    """
    Return expected next stop_id on route (wrap-around). If route_id or stop_id not found return None.
    """
    if route_id is None or pd.isna(route_id) or stop_id is None or pd.isna(stop_id):
        return None
    entry = route_next_map.get(str(route_id))
    if not entry:
        return None
    return entry["next"].get(str(stop_id))

# process ETA_Logs in chunks (explicit column names from schema)
features_parts = []
with sqlite3.connect(DB_FILE) as con:
    select_cols = [f"{ETA_LOG_ID} AS log_id", f"{ETA_STOP} AS stop_id",
                   f"{ETA_SECONDS} AS eta_seconds", f"{ETA_SORT} AS sort_order"]
    sql = f"SELECT {', '.join(select_cols)} FROM {ETA_TBL}"
    total_eta = pd.read_sql_query(f"SELECT COUNT(*) AS c FROM {ETA_TBL}", con).iloc[0, 0]
    total_chunks = max(1, math.ceil(total_eta / ETA_CHUNK))
    chunk_iter = pd.read_sql_query(sql, con, chunksize=ETA_CHUNK)

    base_idx = 0
    processed = 0
    chunk_idx = 0
    for chunk in chunk_iter:
        chunk_idx += 1
        chunk = chunk.reset_index(drop=True)
        chunk["eta_row_id"] = np.arange(len(chunk)) + base_idx
        base_idx += len(chunk)

        # print progress (a / b)
        print(f"[ETA] chunk {chunk_idx}/{total_chunks} — rows {processed + 1} / {total_eta}", flush=True)

        # join on log_id (foreign key) - include route column if available
        cols = [BUS_LOG_ID, BUS_BUS_ID, "__ts", BUS_LAT, BUS_LON, BUS_PAX, BUS_ARRIVED_STOP]
        # avoid reset_index() here — bus_by_log already contains the log_id column (set_index(..., drop=False))
        if route_col_in_bus and route_col_in_bus in bus_by_log.columns:
            cols.append(route_col_in_bus)
        # safe reset_index(drop=True) to produce a plain DF view for selection
        bus_by_log_view = bus_by_log.reset_index(drop=True)[cols]

        merged = chunk.merge(
            bus_by_log_view,
            left_on="log_id",
            right_on=BUS_LOG_ID,
            how="left",
            suffixes=("_eta", "_bus")
        ).rename(columns={BUS_BUS_ID: "bus_id", "__ts": "start_ts", BUS_PAX: "bus_pax"})

        # normalize route column name in merged to 'route_id' if present
        if route_col_in_bus and route_col_in_bus in merged.columns:
            merged = merged.rename(columns={route_col_in_bus: "route_id"})

        # drop rows without originating bus log
        merged_valid = merged[merged["start_ts"].notna()].copy()
        if BUS_ARRIVED_STOP in merged_valid.columns:
            merged_valid = merged_valid[ merged_valid[BUS_ARRIVED_STOP].isna() | (merged_valid[BUS_ARRIVED_STOP] != merged_valid["stop_id"]) ].copy()
        if merged_valid.empty:
            processed += len(chunk)
            continue

        # actual arrival: restrict bus_stop_events to the buses in this chunk
        bus_events_map = {}
        for bid, g in bus_stop_events.groupby(BUS_BUS_ID):
            g2 = g.sort_values("event_ts")[["event_ts", "event_stop_id"]].dropna(subset=["event_ts"]).reset_index(drop=True)
            if g2.empty:
                continue
            bus_events_map[bid] = {
                "ts": g2["event_ts"].values.astype("datetime64[ns]"),
                "stop": g2["event_stop_id"].astype(object).values
            }

        def lookup_arrival_ts(bid, stop_id, start_ts, route_id=None):
            """
            Find first arrival event for bid at stop_id after start_ts.
            If the expected next stop (per route_stops) is observed before the target stop -> return pd.NaT.
            """
            if bid not in bus_events_map or pd.isna(stop_id) or pd.isna(start_ts):
                return pd.NaT
            arr = bus_events_map[bid]
            try:
                st64 = np.datetime64(pd.to_datetime(start_ts).to_datetime64())
            except Exception:
                return pd.NaT
            idx = np.searchsorted(arr["ts"], st64)
            # compare using strings
            next_stop = find_next_stop_id(route_id, stop_id) if route_id is not None else None
            target_stop_str = str(stop_id)
            # scan forward until matching stop found, next-stop encountered, or beyond MAX_DELAY_S
            while idx < arr["ts"].size:
                event_stop = str(int(arr["stop"][idx]))
                #DEBUGGING print statement #print("event_stop:", event_stop, "target_stop:", target_stop_str, "next_stop:", next_stop)
                delta_s = (arr["ts"][idx] - st64) / np.timedelta64(1, "s")
                # if we encounter next_stop before target -> dataset inconsistency -> treat as not found
                if next_stop is not None and event_stop == str(next_stop):
                    return pd.NaT
                if event_stop == target_stop_str:
                    if 0 < delta_s <= MAX_DELAY_S:
                        return pd.to_datetime(arr["ts"][idx])
                    return pd.NaT
                if delta_s > MAX_DELAY_S:
                    return pd.NaT
                idx += 1
            return pd.NaT

        # run lookup per-row (chunk-sized, avoids huge intermediate frames)
        merged_valid["actual_arrival_ts"] = merged_valid.apply(
            lambda r: lookup_arrival_ts(r["bus_id"], r["stop_id"], r["start_ts"], route_id=r.get("route_id")),
            axis=1
        )

        # compute eta_error / target
        merged_valid["pred_eta_s"] = pd.to_numeric(merged_valid["eta_seconds"], errors="coerce")
        merged_valid["actual_travel_s"] = (pd.to_datetime(merged_valid["actual_arrival_ts"]) - pd.to_datetime(merged_valid["start_ts"])).dt.total_seconds()
        merged_valid["eta_error_s"] = np.where(merged_valid["actual_travel_s"].notna() & merged_valid["pred_eta_s"].notna(),
                                               merged_valid["actual_travel_s"] - merged_valid["pred_eta_s"],
                                               np.nan)

        # pax_load: prefer ETA (none in schema) else bus.pax_load (bus_pax)
        merged_valid["pax_load"] = pd.to_numeric(merged_valid.get("eta_pax"), errors="coerce")
        # fallback to bus_pax (from Bus_Logs)
        merged_valid.loc[merged_valid["pax_load"].isna(), "pax_load"] = merged_valid.loc[merged_valid["pax_load"].isna(), "bus_pax"]

        # time-of-day
        merged_valid["start_ts"] = pd.to_datetime(merged_valid["start_ts"])
        merged_valid["hour"] = merged_valid["start_ts"].dt.hour
        merged_valid["time_of_day_s"] = merged_valid["hour"] * 3600 + merged_valid["start_ts"].dt.minute * 60 + merged_valid["start_ts"].dt.second

        # speed features via segs_map lookups
        sp_prev_list = []
        sp_1min_list = []
        for _, row in merged_valid.iterrows():
            bid = row["bus_id"]
            st = row["start_ts"]
            sp_prev, sp_1min = compute_speed_features_for_row(bid, st)
            sp_prev_list.append(sp_prev)
            sp_1min_list.append(sp_1min)
        merged_valid["speed_prev_mps"] = sp_prev_list
        merged_valid["speed_1min_mps"] = sp_1min_list

        out = merged_valid[[
            "eta_row_id", "log_id", "bus_id", "stop_id", "sort_order", "eta_seconds", "pred_eta_s",
            "actual_arrival_ts", "actual_travel_s", "eta_error_s",
            "pax_load", "hour", "time_of_day_s", "speed_prev_mps", "speed_1min_mps"
        ]].copy()
        features_parts.append(out)
        processed += len(chunk)
        print(f"[ETA] completed {processed} / {total_eta} rows ({chunk_idx}/{total_chunks} chunks)", flush=True)

if features_parts:
    features = pd.concat(features_parts, ignore_index=True)
else:
    features = pd.DataFrame(columns=["eta_row_id","log_id","bus_id","stop_id","sort_order","eta_seconds","pred_eta_s",
                                     "actual_arrival_ts","actual_travel_s","eta_error_s","pax_load","hour",
                                     "time_of_day_s","speed_prev_mps","speed_1min_mps"])

features = features.sort_values(["bus_id","eta_row_id"]).reset_index(drop=True)
print("Prepared features rows:", len(features))
# ...existing code...

  speed = dist / dt
  speed = dist / dt


route: 26280 sample next mappings: [('10035', '27767'), ('27767', '10038'), ('10038', '10035'), ('10034', '10041'), ('10041', '10052'), ('10052', '10071'), ('10071', '10029'), ('10029', '10065'), ('10065', '12913'), ('12913', '10037'), ('10037', '10059'), ('10059', '10042'), ('10042', '10061'), ('10061', '10036'), ('10036', '62662'), ('62662', '10038')]
route: 26281 sample next mappings: [('10035', '27767'), ('27767', '10075'), ('10075', '10037'), ('10037', '10059'), ('10059', '10042'), ('10042', '10026'), ('10026', '10061'), ('10061', '10036'), ('10036', '10038'), ('10038', '10071'), ('10071', '10029'), ('10029', '10065'), ('10065', '10052'), ('10052', '10039'), ('10039', '21050'), ('21050', '27767')]
route: 26435 sample next mappings: [('10035', '27767'), ('27767', '10038'), ('10038', '10035'), ('10034', '10041'), ('10041', '10052'), ('10052', '10071'), ('10071', '10029'), ('10029', '12913'), ('12913', '10037'), ('10037', '10059'), ('10059', '10042'), ('10042', '10026'), ('10026', '1

In [2]:
features_clean = features.dropna(how="any").reset_index(drop=True)
print(f"features: {len(features)} rows -> features_clean: {len(features_clean)} rows")
print(features_clean.head())
features_clean.to_csv("features_clean_updated_2.csv", index=False)

features: 11017166 rows -> features_clean: 7178015 rows
   eta_row_id  log_id  bus_id  stop_id  sort_order  eta_seconds  pred_eta_s  \
0      293333   23284    4850    10039           0           92          92   
1      293335   23284    4850    10060           2          497         497   
2      293336   23284    4850    10035           3          812         812   
3      293337   23284    4850    27767           4         1000        1000   
4      293338   23284    4850    10038           5         1159        1159   

    actual_arrival_ts  actual_travel_s  eta_error_s  pax_load  hour  \
0 2025-11-10 12:21:23            167.0         75.0       4.0    12   
1 2025-11-10 12:25:14            398.0        -99.0       4.0    12   
2 2025-11-10 12:30:45            729.0        -83.0       4.0    12   
3 2025-11-10 12:35:59           1043.0         43.0       4.0    12   
4 2025-11-10 12:38:20           1184.0         25.0       4.0    12   

   time_of_day_s  speed_prev_mps  speed_1m

In [19]:
print("route_next_map size:", len(route_next_map))
if len(route_next_map):
    # show a few samples
    import itertools
    for k,v in itertools.islice(route_next_map.items(), 5):
        print("route:", k, "sample next mappings:", list(v["next"].items())[:5])

route_next_map size: 22
route: 4056 sample next mappings: [(4056, 4056)]
route: 4063 sample next mappings: [(4063, 4063)]
route: 4088 sample next mappings: [(4088, 4088)]
route: 4098 sample next mappings: [(4098, 4098)]
route: 26280 sample next mappings: [(26280, 26280)]


In [3]:
import sqlite3
with sqlite3.connect(DB_FILE) as con:
    routes = pd.read_sql_query("SELECT route_myid, short_name FROM Routes", con)
    lx_route_myids = routes[routes["short_name"].astype(str).str.upper() == "LX"]["route_myid"].unique()
    bus_route_map = pd.read_sql_query("SELECT log_id, route_myid FROM Bus_Logs", con)
lx_log_ids = bus_route_map[bus_route_map["route_myid"].isin(lx_route_myids)]["log_id"].unique()
lx_features = features_clean[features_clean["log_id"].isin(lx_log_ids)].copy()
lx_features.to_csv("lx_features_updated_2.csv", index=False)

We clean the LX dataset so that it drops every row that contains at least one NA feature

In [4]:
import sqlite3
with sqlite3.connect(DB_FILE) as con:
    routes = pd.read_sql_query("SELECT route_myid, short_name FROM Routes", con)
    b_route_myids = routes[routes["short_name"].astype(str).str.upper() == "B"]["route_myid"].unique()
    bus_route_map = pd.read_sql_query("SELECT log_id, route_myid FROM Bus_Logs", con)
b_log_ids = bus_route_map[bus_route_map["route_myid"].isin(b_route_myids)]["log_id"].unique()
b_features = features_clean[features_clean["log_id"].isin(b_log_ids)].copy()
b_features.to_csv("b_features_2.csv", index=False)