In [3]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
Traffic pipeline (bulletproofed)
- ROADS: metadata from Roads_GTA_new.shp
- TRAFFIC_NEW_15: latest 15-min consolidation per Link_id (higher congestion / lower speed), enriched with ROADS
- TRAFFIC_NEW_60: hourly means (historical)
- TRAFFIC_NEW_NOW: current-hour snapshot (replaced each hour)
"""

import os
import sys
import time
import logging
import traceback
from datetime import datetime, timedelta, timezone

import requests
import pandas as pd
import geopandas as gpd
import pymongo
from dateutil import tz
import schedule

# --------------------- CONFIG ---------------------
LOGFILE = "/home/envitwin/Desktop/venvs/EnviTwin/Traffic Operational/Trafficdata.log"
MONGO_URI = "mongodb://bag:ugabunga@localhost:27017/"
DB_NAME   = "EIRG_data"

# Paths
ROADS_SHP_PATH = os.path.expanduser("~/Desktop/venvs/databases/data/Traffic/Roads_GTA_clean.shp")

# Collections
COLL_ROADS          = "ROADS"
COLL_15             = "TRAFFIC_NEW_15"
COLL_60             = "TRAFFIC_NEW_60"
COLL_NOW            = "TRAFFIC_NEW_NOW"

# Feeds (IMET)
URL_CONGESTION = "http://feed.opendata.imetb.gr/fcd/congestions.json?offset=0&limit=9000"
URL_SPEED      = "http://feed.opendata.imetb.gr/fcd/speed.json?offset=0&limit=9000"

# Timezone handling
TZ_ATH = tz.gettz("Europe/Athens")

# Congestion numeric scale (for means & comparisons)
CONG_MAP = {
    "low": 1.0,
    "medium_low": 1.5,   # used for hourly binning output; not present in raw 15-min feed
    "medium": 2.0,
    "medium_high": 2.5,  # used for hourly binning output; not present in raw 15-min feed
    "high": 3.0,
}

# Hourly binning thresholds (same logic you had)
def bin_congestion(mean_val: float) -> str:
    if mean_val <= 1.26: return "Low"
    elif mean_val <= 1.61: return "Medium_Low"
    elif mean_val <= 2.26: return "Medium"
    elif mean_val <= 2.61: return "Medium_High"
    else: return "High"

# Fields expected in ROADS shapefile
REQ_ROAD_FIELDS = ["Link_id", "fclass", "oneway", "factor"]  # geometry is implicit
# --------------------------------------------------

# --------------- LOGGING --------------------------
logging.basicConfig(filename=LOGFILE, level=logging.INFO,
                    format="%(asctime)s %(levelname)s %(message)s")
log = logging.getLogger("traffic_pipeline")
# --------------------------------------------------

# --------------- DB SETUP -------------------------
mongo = pymongo.MongoClient(MONGO_URI)
db = mongo[DB_NAME]
roads_coll = db[COLL_ROADS]
coll_15    = db[COLL_15]
coll_60    = db[COLL_60]
coll_now   = db[COLL_NOW]
# --------------------------------------------------

# --------------- UTILITIES ------------------------
def parse_imet_local_ts(ts_str: str) -> datetime:
    """
    IMET timestamps look like '2025-10-08 09:30:00.000' (local, naive).
    Return (aware_local, aware_utc).
    """
    # strip fractional if needed
    ts_str = ts_str.split(".")[0]
    dt_naive = datetime.strptime(ts_str, "%Y-%m-%d %H:%M:%S")
    dt_local = dt_naive.replace(tzinfo=TZ_ATH)
    dt_utc   = dt_local.astimezone(timezone.utc)
    return dt_local, dt_utc

def ensure_indexes():
    # ROADS: by Link_id unique
    roads_coll.create_index([("Link_id", pymongo.ASCENDING)], unique=True)

    # 15-min: unique per (Timestamp_local, Link_id)
    coll_15.create_index([("Timestamp_local", pymongo.ASCENDING), ("Link_id", pymongo.ASCENDING)], unique=True)
    coll_15.create_index([("Timestamp_utc", pymongo.ASCENDING)])

    # hourly
    coll_60.create_index([("Timestamp_local", pymongo.ASCENDING), ("Link_id", pymongo.ASCENDING)], unique=True)
    coll_60.create_index([("Timestamp_utc", pymongo.ASCENDING)])

    coll_now.create_index([("Timestamp_local", pymongo.ASCENDING)])
    coll_now.create_index([("Link_id", pymongo.ASCENDING)])

def load_roads_to_collection(shp_path: str):
    """
    Load Roads_GTA_new shapefile to ROADS collection (upsert by Link_id).
    Keeps: Link_id, fclass, oneway, factor, centroid lat/lon (for convenience).
    """
    gdf = gpd.read_file(shp_path)
    if gdf.crs is None:
        gdf.set_crs(epsg=4326, inplace=True)
    else:
        gdf = gdf.to_crs(epsg=4326)

    # sanity check
    for f in REQ_ROAD_FIELDS:
        if f not in gdf.columns:
            raise RuntimeError(f"ROADS shapefile missing required field: '{f}'")

    # compute centroids (WGS84)
    centroids = gdf.geometry.centroid
    gdf["Lat"] = centroids.y
    gdf["Lon"] = centroids.x

    # Upsert
    ops = []
    for _, r in gdf.iterrows():
        doc = {
            "Link_id": int(r["osm_id"]),
            "fclass": str(r["fclass"]).lower() if pd.notna(r["fclass"]) else None,
            "oneway": r.get("oneway"),
            "factor": float(r["factor"]) if pd.notna(r["factor"]) else None,
            "Lat": float(r["Lat"]) if pd.notna(r["Lat"]) else None,
            "Lon": float(r["Lon"]) if pd.notna(r["Lon"]) else None,
        }
        ops.append(pymongo.UpdateOne({"Link_id": doc["Link_id"]}, {"$set": doc}, upsert=True))

    if ops:
        res = roads_coll.bulk_write(ops, ordered=False)
        log.info(f"ROADS upserted: matched={res.matched_count}, upserted={len(res.upserted_ids)}, modified={res.modified_count}")
    else:
        log.warning("ROADS: nothing to upsert")

def fetch_feed(url: str) -> pd.DataFrame:
    r = requests.get(url, timeout=60)
    r.raise_for_status()
    df = pd.DataFrame(r.json())
    # normalize Link_id & Link_Direction
    if "Link_id" in df.columns:
        df["Link_id"] = pd.to_numeric(df["Link_id"], errors="coerce").astype("Int64")
    if "Link_Direction" in df.columns:
        df["Link_Direction"] = pd.to_numeric(df["Link_Direction"], errors="coerce").astype("Int64")
    return df

def consolidate_15min():
    """
    Pull congestion & speed, keep one record per Link_id:
      - Congestion: choose the **higher** (High > Medium > Low)
      - Speed: choose the **lower** (min)
    Only for Link_id present in ROADS.
    Join ROADS metadata. Upsert into TRAFFIC_NEW_15 (unique per (Timestamp_local, Link_id)).
    """
    # Fetch feeds
    cong = fetch_feed(URL_CONGESTION)
    spd  = fetch_feed(URL_SPEED)

    # Parse timestamps (assume one timestamp dominates; we will handle per-row)
    def parse_col(df, colname="Timestamp"):
        locs, utcs = [], []
        for ts in df[colname].astype(str).tolist():
            try:
                l, u = parse_imet_local_ts(ts)
            except Exception:
                l, u = None, None
            locs.append(l)
            utcs.append(u)
        df["Timestamp_local"] = locs
        df["Timestamp_utc"]   = utcs

    parse_col(cong)
    parse_col(spd)

    # Keep only ROADS Link_ids
    roads_ids = set(doc["Link_id"] for doc in roads_coll.find({}, {"Link_id":1}))
    cong = cong[cong["Link_id"].isin(roads_ids)].copy()
    spd  = spd[spd["Link_id"].isin(roads_ids)].copy()

    # Map congestion strings to numeric rank for comparison
    def cong_rank(s: str) -> float:
        if pd.isna(s): return float("nan")
        return CONG_MAP.get(str(s).strip().lower(), float("nan"))

    cong["Cong_rank"] = cong["Congestion"].astype(str).map(lambda x: cong_rank(x))

    # Reduce per (Timestamp_local, Link_id) by rule:
    # - pick row with max Cong_rank; if tie or missing, still keep one
    cong_sorted = cong.sort_values(["Timestamp_local", "Link_id", "Cong_rank"], ascending=[True, True, False])
    cong_best = cong_sorted.groupby(["Timestamp_local", "Link_id"], as_index=False).first()

    # Reduce speed per (Timestamp_local, Link_id) by min Speed
    spd["Speed"] = pd.to_numeric(spd.get("Speed"), errors="coerce")
    spd_sorted = spd.sort_values(["Timestamp_local", "Link_id", "Speed"], ascending=[True, True, True])
    spd_best = spd_sorted.groupby(["Timestamp_local", "Link_id"], as_index=False).first()

    # Outer-join the two reduced tables (same keys)
    merged = pd.merge(cong_best, spd_best[["Timestamp_local", "Link_id", "Speed", "Timestamp_utc"]],
                      on=["Timestamp_local", "Link_id"], how="outer", suffixes=("_cong", "_spd"))

    # Choose a UTC to store (prefer congestion utc if present)
    merged["Timestamp_utc"] = merged["Timestamp_utc_cong"].where(merged["Timestamp_utc_cong"].notna(), merged["Timestamp_utc_spd"])
    merged.drop(columns=["Timestamp_utc_cong", "Timestamp_utc_spd"], inplace=True)

    # Attach ROADS metadata
    roads_df = pd.DataFrame(list(roads_coll.find({}, {"_id":0, "Link_id":1, "fclass":1, "oneway":1, "factor":1, "Lat":1, "Lon":1})))
    enriched = pd.merge(merged, roads_df, on="Link_id", how="left")

    # Prepare upserts to TRAFFIC_NEW_15
    ops = []
    for _, r in enriched.iterrows():
        ts_loc = r["Timestamp_local"]
        ts_utc = r["Timestamp_utc"]
        if pd.isna(ts_loc) or pd.isna(ts_utc):  # skip broken timestamps
            continue
        doc = {
            "Timestamp_local": ts_loc.to_pydatetime(),
            "Timestamp_utc":   pd.to_datetime(ts_utc).to_pydatetime(),
            "Link_id":         int(r["Link_id"]),
            # Keep BOTH per-direction indicators if present from 'best' rows:
            "Congestion":      r.get("Congestion"),
            "Speed":           None if pd.isna(r.get("Speed")) else float(r.get("Speed")),
            "Link_Direction_congestion": r.get("Link_Direction_cong"),
            "Link_Direction_speed":      r.get("Link_Direction_spd"),
            # ROADS metadata
            "fclass":          r.get("fclass"),
            "oneway":          r.get("oneway"),
            "factor":          None if pd.isna(r.get("factor")) else float(r.get("factor")),
            "Lat":             None if pd.isna(r.get("Lat")) else float(r.get("Lat")),
            "Lon":             None if pd.isna(r.get("Lon")) else float(r.get("Lon")),
        }
        ops.append(pymongo.UpdateOne(
            {"Timestamp_local": doc["Timestamp_local"], "Link_id": doc["Link_id"]},
            {"$set": doc},
            upsert=True
        ))

    if ops:
        res = coll_15.bulk_write(ops, ordered=False)
        log.info(f"TRAFFIC_NEW_15 upserts: upserted={len(res.upserted_ids)}, modified={res.modified_count}, matched={res.matched_count}")
    else:
        log.warning("TRAFFIC_NEW_15: nothing to upsert")

def hourly_reduce():
    """
    For the last hour window, compute:
      - mean_congestion (numeric: Low=1, Medium=2, High=3; Medium_Low/High appear only as hourly bins)
      - mean_speed (simple arithmetic mean)
      - Congestion2 = mean_congestion * factor (factor from ROADS)
    Store to TRAFFIC_NEW_60 (historical) and TRAFFIC_NEW_NOW (fresh snapshot only).
    """
    # Define hour window in local tz
    now_local = datetime.now(TZ_ATH).replace(minute=0, second=0, microsecond=0)
    prev_hour = now_local - timedelta(hours=1)

    # Pull last-hour docs from 15-min collection using local timestamps
    docs = list(coll_15.find({
        "Timestamp_local": {"$gte": prev_hour, "$lt": now_local}
    }, {"_id":0}))

    if not docs:
        log.warning("Hourly reduce: no 15-min docs in last hour window.")
        # reset NOW collection anyway
        coll_now.delete_many({})
        return

    df = pd.DataFrame(docs)

    # Map congestion text to numeric
    def cong_to_num(s):
        if pd.isna(s): return None
        return CONG_MAP.get(str(s).strip().lower(), None)

    df["Cong_num"] = df["Congestion"].map(cong_to_num)
    df["Speed"] = pd.to_numeric(df["Speed"], errors="coerce")

    # Aggregate per Link_id
    agg = df.groupby("Link_id").agg(
        mean_congestion=("Cong_num", "mean"),
        mean_speed=("Speed", "mean"),
        fclass=("fclass", "first"),
        factor=("factor", "first"),
        oneway=("oneway", "first"),
        Lat=("Lat", "first"),
        Lon=("Lon", "first"),
    ).reset_index()

    # Compute bin label and Congestion2
    agg["Congestion"] = agg["mean_congestion"].apply(lambda x: bin_congestion(x) if pd.notna(x) else None)
    agg["Congestion2"] = agg["mean_congestion"] * agg["factor"]

    # Build the hourly timestamp (the closed hour)
    ts_local = prev_hour  # label the hour we aggregated
    ts_utc   = ts_local.astimezone(timezone.utc)

    # Prepare docs
    to_hist = []
    for _, r in agg.iterrows():
        doc = {
            "Timestamp_local": ts_local,
            "Timestamp_utc":   ts_utc,
            "Link_id":         int(r["Link_id"]),
            "Congestion":      r["Congestion"],
            "mean_congestion": None if pd.isna(r["mean_congestion"]) else float(r["mean_congestion"]),
            "mean_speed":      None if pd.isna(r["mean_speed"]) else float(r["mean_speed"]),
            "Congestion2":     None if pd.isna(r["Congestion2"]) else float(r["Congestion2"]),
            "fclass":          r.get("fclass"),
            "oneway":          r.get("oneway"),
            "factor":          None if pd.isna(r.get("factor")) else float(r.get("factor")),
            "Lat":             None if pd.isna(r.get("Lat")) else float(r.get("Lat")),
            "Lon":             None if pd.isna(r.get("Lon")) else float(r.get("Lon")),
        }
        to_hist.append(doc)

    if to_hist:
        # Historical upserts (unique per (Timestamp_local, Link_id))
        ops = [pymongo.UpdateOne(
            {"Timestamp_local": d["Timestamp_local"], "Link_id": d["Link_id"]},
            {"$set": d}, upsert=True) for d in to_hist]
        res = coll_60.bulk_write(ops, ordered=False)
        log.info(f"TRAFFIC_NEW_60 upserts: upserted={len(res.upserted_ids)}, modified={res.modified_count}, matched={res.matched_count}")

        # NOW snapshot: replace collection with current hour snapshot
        coll_now.delete_many({})
        if to_hist:
            coll_now.insert_many(to_hist)
            log.info(f"TRAFFIC_NEW_NOW replaced with {len(to_hist)} docs")

def main_once_load_roads():
    try:
        ensure_indexes()
        load_roads_to_collection(ROADS_SHP_PATH)
        log.info("ROADS loaded/refreshed successfully.")
    except Exception as e:
        log.error("Failed to load ROADS: " + traceback.format_exc())
        raise

def job_15():
    try:
        consolidate_15min()
    except Exception:
        log.error("15-min job failed: " + traceback.format_exc())

def job_hourly():
    try:
        hourly_reduce()
    except Exception:
        log.error("Hourly job failed: " + traceback.format_exc())

if __name__ == "__main__":
    # One-time load/refresh of ROADS
    main_once_load_roads()

    # Kick a first 15-min fetch immediately
    job_15()

    # Schedule
    schedule.every(15).minutes.do(job_15)

    # Run hourly reduce at HH:59:59 to use the previous closed hour
    for hh in range(24):
        schedule.every().day.at(f"{hh:02d}:59:59").do(job_hourly)

    while True:
        schedule.run_pending()
        time.sleep(1)


RuntimeError: ROADS shapefile missing required field: 'Link_id'