In [2]:
# -*- coding: utf-8 -*-
# Paso 3: Construcción del OBT_TRIPS limpio y consolidado
# - Toma ANALYTICS.STG_TRIPS_ENRICHED
# - Sanea tiempos, deduplica, crea features y carga en ANALYTICS.OBT_TRIPS (esquema rico)
# - Endurecido: descarta duraciones negativas/>48h, distancias imposibles y velocidades outlier
# - Idempotente vía TRIP_ID (HASH determinístico)
# - Opción A: sincronización total por mes/servicio (UPSERT + DELETE anti-join)
# -----------------------------------------------------------------------------

import os, time
from snowflake.snowpark import Session

# --- Conexión Snowflake ---
cfg = {
    "account":   os.getenv("SNOWFLAKE_ACCOUNT"),
    "user":      os.getenv("SNOWFLAKE_USER"),
    "password":  os.getenv("SNOWFLAKE_PASSWORD"),
    "role":      os.getenv("SNOWFLAKE_ROLE", "SYSADMIN"),
    "warehouse": os.getenv("SNOWFLAKE_WAREHOUSE"),
    "database":  os.getenv("SNOWFLAKE_DATABASE"),
}
session = Session.builder.configs(cfg).create()
print(f"✅ Conectado: {cfg['database']} (WH={cfg['warehouse']}, ROLE={cfg.get('role')})")

DB = cfg["database"]
AN_SCHEMA  = "ANALYTICS"
SOURCE_TABLE = f"{DB}.{AN_SCHEMA}.STG_TRIPS_ENRICHED"
OBT_TABLE    = f"{DB}.{AN_SCHEMA}.OBT_TRIPS"

# Asegura contexto
session.sql(f"USE DATABASE {DB}").collect()
session.sql(f"USE SCHEMA {AN_SCHEMA}").collect()
print("📍 Contexto:", session.sql("SELECT CURRENT_DATABASE() db, CURRENT_SCHEMA() sch").collect())

# --- Crea OBT si no existe ---
session.sql(f"""
CREATE TABLE IF NOT EXISTS {OBT_TABLE} (
  TRIP_ID               STRING,
  PICKUP_DATETIME       TIMESTAMP_NTZ,
  DROPOFF_DATETIME      TIMESTAMP_NTZ,
  PICKUP_DATE           DATE,
  PICKUP_HOUR           NUMBER(2,0),
  DROPOFF_DATE          DATE,
  DROPOFF_HOUR          NUMBER(2,0),
  DAY_OF_WEEK           NUMBER(1,0),
  YEAR                  NUMBER(4,0),
  MONTH                 NUMBER(2,0),
  SERVICE               STRING,
  PULOCATIONID          NUMBER(38,0),
  PU_BOROUGH            STRING,
  PU_ZONE               STRING,
  PU_SERVICE_ZONE       STRING,
  DOLOCATIONID          NUMBER(38,0),
  DO_BOROUGH            STRING,
  DO_ZONE               STRING,
  DO_SERVICE_ZONE       STRING,
  VENDORID              NUMBER(38,0),
  VENDOR_NAME           STRING,
  RATECODEID            NUMBER(38,0),
  RATE_DESC             STRING,
  PAYMENT_TYPE          NUMBER(38,0),
  PAYMENT_DESC          STRING,
  TRIP_TYPE             NUMBER(38,0),
  STORE_AND_FWD_FLAG    STRING,
  PASSENGER_COUNT       NUMBER(38,0),
  TRIP_DISTANCE         FLOAT,
  TRIP_DURATION_MIN     FLOAT,
  AVG_SPEED_MPH         FLOAT,
  FARE_AMOUNT           FLOAT,
  EXTRA                 FLOAT,
  MTA_TAX               FLOAT,
  TIP_AMOUNT            FLOAT,
  TOLLS_AMOUNT          FLOAT,
  IMPROVEMENT_SURCHARGE FLOAT,
  CONGESTION_SURCHARGE  FLOAT,
  AIRPORT_FEE           FLOAT,
  CBD_CONGESTION_FEE    FLOAT,
  TOTAL_AMOUNT          FLOAT,
  TIP_PCT               FLOAT,
  RUN_ID                STRING,
  BUILT_AT_UTC          TIMESTAMP_NTZ,
  INGESTED_AT_UTC       TIMESTAMP_NTZ,
  SOURCE_SERVICE        STRING,
  SOURCE_YEAR           NUMBER(4,0),
  SOURCE_MONTH          NUMBER(2,0)
)
DATA_RETENTION_TIME_IN_DAYS = 1;
""").collect()
print("✅ OBT_TRIPS listo (o ya existía)")

# --- Alcance de ejecución ---
RUN_ALL = True  # ⇦ pon True para histórico completo
if RUN_ALL:
    services = ['yellow', 'green']
    years = list(range(2015, 2026))
    months = list(range(1, 13))
else:
    services = ['yellow', 'green']
    years = [2015]
    months = [1]

# --- Función de construcción mensual (endurecida y sincronizada) ---
def build_month(service: str, year: int, month: int) -> int:
    print(f"\n🧱 Armando OBT para {service} {year}-{str(month).zfill(2)}")

    # 1) Vista temporal con saneo, deduplicación y features
    sql_tmp = f"""
    CREATE OR REPLACE TEMP VIEW tmp_obt_src AS
    WITH base AS (
      SELECT *
      FROM {SOURCE_TABLE}
      WHERE SERVICE = '{service}' AND YEAR = {year} AND MONTH = {month}
    ),
    time_sanitized AS (
      SELECT *,
             CASE
               WHEN PICKUP_DATETIME IS NULL OR DROPOFF_DATETIME IS NULL THEN NULL
               WHEN DATEDIFF('hour', PICKUP_DATETIME, DROPOFF_DATETIME) < 0 THEN NULL
               WHEN DATEDIFF('hour', PICKUP_DATETIME, DROPOFF_DATETIME) > 48 THEN NULL
               ELSE DROPOFF_DATETIME
             END AS DO_CLEAN
      FROM base
    ),
    dedup AS (
      SELECT *,
             ROW_NUMBER() OVER (
               PARTITION BY SERVICE, YEAR, MONTH,
                            PICKUP_DATETIME, DO_CLEAN,
                            PULOCATIONID, DOLOCATIONID,
                            VENDORID, TOTAL_AMOUNT
               ORDER BY _ENRICHED_AT DESC
             ) AS rn
      FROM time_sanitized
    ),
    shaped AS (
      SELECT
        TO_VARCHAR(HASH(
          SERVICE, YEAR, MONTH,
          PICKUP_DATETIME, DO_CLEAN,
          PULOCATIONID, DOLOCATIONID,
          VENDORID, TOTAL_AMOUNT, RATECODEID
        ))                                              AS TRIP_ID,
        PICKUP_DATETIME,
        DO_CLEAN                                        AS DROPOFF_DATETIME,
        TO_DATE(PICKUP_DATETIME)                        AS PICKUP_DATE,
        EXTRACT(HOUR FROM PICKUP_DATETIME)::NUMBER(2,0) AS PICKUP_HOUR,
        TO_DATE(DO_CLEAN)                               AS DROPOFF_DATE,
        EXTRACT(HOUR FROM DO_CLEAN)::NUMBER(2,0)        AS DROPOFF_HOUR,
        EXTRACT(DOW  FROM PICKUP_DATETIME)::NUMBER(1,0) AS DAY_OF_WEEK,
        YEAR::NUMBER(4,0)                               AS YEAR,
        MONTH::NUMBER(2,0)                              AS MONTH,
        SERVICE,
        PULOCATIONID, PU_BOROUGH, PU_ZONE, PU_SERVICE_ZONE,
        DOLOCATIONID, DO_BOROUGH, DO_ZONE, DO_SERVICE_ZONE,
        VENDORID, VENDOR_NAME,
        RATECODEID, RATE_DESC,
        PAYMENT_TYPE, PAYMENT_DESC,
        TRIP_TYPE, STORE_AND_FWD_FLAG,
        PASSENGER_COUNT, TRIP_DISTANCE,
        CASE WHEN DO_CLEAN IS NOT NULL AND PICKUP_DATETIME IS NOT NULL
             THEN DATEDIFF('minute', PICKUP_DATETIME, DO_CLEAN)::FLOAT
             ELSE NULL END AS TRIP_DURATION_MIN,
        CASE
          WHEN DO_CLEAN IS NOT NULL AND PICKUP_DATETIME IS NOT NULL
               AND DATEDIFF('minute', PICKUP_DATETIME, DO_CLEAN) > 0
            THEN TRIP_DISTANCE / (DATEDIFF('minute', PICKUP_DATETIME, DO_CLEAN) / 60.0)
          ELSE NULL
        END AS AVG_SPEED_MPH,
        FARE_AMOUNT, EXTRA, MTA_TAX, TIP_AMOUNT, TOLLS_AMOUNT,
        IMPROVEMENT_SURCHARGE, CONGESTION_SURCHARGE, AIRPORT_FEE, CBD_CONGESTION_FEE,
        TOTAL_AMOUNT,
        CASE WHEN FARE_AMOUNT IS NOT NULL AND FARE_AMOUNT > 0
             THEN TIP_AMOUNT / FARE_AMOUNT ELSE NULL END AS TIP_PCT,
        _RUN_ID AS RUN_ID,
        CURRENT_TIMESTAMP() AS BUILT_AT_UTC,
        _ENRICHED_AT AS INGESTED_AT_UTC,
        SERVICE AS SOURCE_SERVICE,
        YEAR::NUMBER(4,0) AS SOURCE_YEAR,
        MONTH::NUMBER(2,0) AS SOURCE_MONTH
      FROM dedup
      WHERE rn = 1 AND DO_CLEAN IS NOT NULL
    ),
    filtered AS (
      SELECT *
      FROM shaped
      WHERE TRIP_DURATION_MIN BETWEEN 0 AND 48*60
        AND TRIP_DISTANCE > 0 AND TRIP_DISTANCE <= 150
        AND (AVG_SPEED_MPH IS NULL OR AVG_SPEED_MPH <= 100)
    )
    SELECT * FROM filtered;
    """
    session.sql(sql_tmp).collect()

    # 2) MERGE idempotente (UPSERT)
    merge_sql = f"""
    MERGE INTO {OBT_TABLE} AS tgt
    USING tmp_obt_src AS src
      ON tgt.TRIP_ID = src.TRIP_ID
    WHEN MATCHED THEN UPDATE SET
      tgt.PICKUP_DATETIME       = src.PICKUP_DATETIME,
      tgt.DROPOFF_DATETIME      = src.DROPOFF_DATETIME,
      tgt.PICKUP_DATE           = src.PICKUP_DATE,
      tgt.PICKUP_HOUR           = src.PICKUP_HOUR,
      tgt.DROPOFF_DATE          = src.DROPOFF_DATE,
      tgt.DROPOFF_HOUR          = src.DROPOFF_HOUR,
      tgt.DAY_OF_WEEK           = src.DAY_OF_WEEK,
      tgt.YEAR                  = src.YEAR,
      tgt.MONTH                 = src.MONTH,
      tgt.SERVICE               = src.SERVICE,
      tgt.PULOCATIONID          = src.PULOCATIONID,
      tgt.PU_BOROUGH            = src.PU_BOROUGH,
      tgt.PU_ZONE               = src.PU_ZONE,
      tgt.PU_SERVICE_ZONE       = src.PU_SERVICE_ZONE,
      tgt.DOLOCATIONID          = src.DOLOCATIONID,
      tgt.DO_BOROUGH            = src.DO_BOROUGH,
      tgt.DO_ZONE               = src.DO_ZONE,
      tgt.DO_SERVICE_ZONE       = src.DO_SERVICE_ZONE,
      tgt.VENDORID              = src.VENDORID,
      tgt.VENDOR_NAME           = src.VENDOR_NAME,
      tgt.RATECODEID            = src.RATECODEID,
      tgt.RATE_DESC             = src.RATE_DESC,
      tgt.PAYMENT_TYPE          = src.PAYMENT_TYPE,
      tgt.PAYMENT_DESC          = src.PAYMENT_DESC,
      tgt.TRIP_TYPE             = src.TRIP_TYPE,
      tgt.STORE_AND_FWD_FLAG    = src.STORE_AND_FWD_FLAG,
      tgt.PASSENGER_COUNT       = src.PASSENGER_COUNT,
      tgt.TRIP_DISTANCE         = src.TRIP_DISTANCE,
      tgt.TRIP_DURATION_MIN     = src.TRIP_DURATION_MIN,
      tgt.AVG_SPEED_MPH         = src.AVG_SPEED_MPH,
      tgt.FARE_AMOUNT           = src.FARE_AMOUNT,
      tgt.EXTRA                 = src.EXTRA,
      tgt.MTA_TAX               = src.MTA_TAX,
      tgt.TIP_AMOUNT            = src.TIP_AMOUNT,
      tgt.TOLLS_AMOUNT          = src.TOLLS_AMOUNT,
      tgt.IMPROVEMENT_SURCHARGE = src.IMPROVEMENT_SURCHARGE,
      tgt.CONGESTION_SURCHARGE  = src.CONGESTION_SURCHARGE,
      tgt.AIRPORT_FEE           = src.AIRPORT_FEE,
      tgt.CBD_CONGESTION_FEE    = src.CBD_CONGESTION_FEE,
      tgt.TOTAL_AMOUNT          = src.TOTAL_AMOUNT,
      tgt.TIP_PCT               = src.TIP_PCT,
      tgt.RUN_ID                = src.RUN_ID,
      tgt.BUILT_AT_UTC          = src.BUILT_AT_UTC,
      tgt.INGESTED_AT_UTC       = src.INGESTED_AT_UTC,
      tgt.SOURCE_SERVICE        = src.SOURCE_SERVICE,
      tgt.SOURCE_YEAR           = src.SOURCE_YEAR,
      tgt.SOURCE_MONTH          = src.SOURCE_MONTH
    WHEN NOT MATCHED THEN INSERT (
      TRIP_ID, PICKUP_DATETIME, DROPOFF_DATETIME,
      PICKUP_DATE, PICKUP_HOUR, DROPOFF_DATE, DROPOFF_HOUR,
      DAY_OF_WEEK, YEAR, MONTH, SERVICE,
      PULOCATIONID, PU_BOROUGH, PU_ZONE, PU_SERVICE_ZONE,
      DOLOCATIONID, DO_BOROUGH, DO_ZONE, DO_SERVICE_ZONE,
      VENDORID, VENDOR_NAME, RATECODEID, RATE_DESC,
      PAYMENT_TYPE, PAYMENT_DESC, TRIP_TYPE, STORE_AND_FWD_FLAG,
      PASSENGER_COUNT, TRIP_DISTANCE, TRIP_DURATION_MIN, AVG_SPEED_MPH,
      FARE_AMOUNT, EXTRA, MTA_TAX, TIP_AMOUNT, TOLLS_AMOUNT,
      IMPROVEMENT_SURCHARGE, CONGESTION_SURCHARGE, AIRPORT_FEE, CBD_CONGESTION_FEE,
      TOTAL_AMOUNT, TIP_PCT,
      RUN_ID, BUILT_AT_UTC, INGESTED_AT_UTC,
      SOURCE_SERVICE, SOURCE_YEAR, SOURCE_MONTH
    ) VALUES (
      src.TRIP_ID, src.PICKUP_DATETIME, src.DROPOFF_DATETIME,
      src.PICKUP_DATE, src.PICKUP_HOUR, src.DROPOFF_DATE, src.DROPOFF_HOUR,
      src.DAY_OF_WEEK, src.YEAR, src.MONTH, src.SERVICE,
      src.PULOCATIONID, src.PU_BOROUGH, src.PU_ZONE, src.PU_SERVICE_ZONE,
      src.DOLOCATIONID, src.DO_BOROUGH, src.DO_ZONE, src.DO_SERVICE_ZONE,
      src.VENDORID, src.VENDOR_NAME, src.RATECODEID, src.RATE_DESC,
      src.PAYMENT_TYPE, src.PAYMENT_DESC, src.TRIP_TYPE, src.STORE_AND_FWD_FLAG,
      src.PASSENGER_COUNT, src.TRIP_DISTANCE, src.TRIP_DURATION_MIN, src.AVG_SPEED_MPH,
      src.FARE_AMOUNT, src.EXTRA, src.MTA_TAX, src.TIP_AMOUNT, src.TOLLS_AMOUNT,
      src.IMPROVEMENT_SURCHARGE, src.CONGESTION_SURCHARGE, src.AIRPORT_FEE, src.CBD_CONGESTION_FEE,
      src.TOTAL_AMOUNT, src.TIP_PCT,
      src.RUN_ID, src.BUILT_AT_UTC, src.INGESTED_AT_UTC,
      src.SOURCE_SERVICE, src.SOURCE_YEAR, src.SOURCE_MONTH
    );
    """
    session.sql(merge_sql).collect()

    # 3) DELETE anti-join: elimina del OBT lo que no esté en la vista del mes/servicio
    delete_sql = f"""
    DELETE FROM {OBT_TABLE}
    WHERE SERVICE = '{service}' AND YEAR = {year} AND MONTH = {month}
      AND TRIP_ID NOT IN (SELECT TRIP_ID FROM tmp_obt_src);
    """
    session.sql(delete_sql).collect()

    # 4) Conteo final
    cnt = session.sql(f"""
      SELECT COUNT(*) AS C
      FROM {OBT_TABLE}
      WHERE SERVICE='{service}' AND YEAR={year} AND MONTH={month}
    """).collect()[0]['C']
    print(f"✅ OBT filas {service} {year}-{str(month).zfill(2)}: {cnt:,}")
    return int(cnt)

# --- Ejecución controlada ---
summary = []
t0 = time.time()
for s in services:
    for y in years:
        for m in months:
            summary.append((s, y, m, build_month(s, y, m)))

print(f"\n🏁 Paso 3 listo en {time.time()-t0:,.1f}s")
summary


✅ Conectado: NYC_TAXI_DM (WH=COMPUTE_WH, ROLE=ACCOUNTADMIN)
📍 Contexto: [Row(DB='NYC_TAXI_DM', SCH='ANALYTICS')]
✅ OBT_TRIPS listo (o ya existía)

🧱 Armando OBT para yellow 2015-01
✅ OBT filas yellow 2015-01: 12,665,073

🧱 Armando OBT para yellow 2015-02
✅ OBT filas yellow 2015-02: 12,368,347

🧱 Armando OBT para yellow 2015-03
✅ OBT filas yellow 2015-03: 13,260,358

🧱 Armando OBT para yellow 2015-04
✅ OBT filas yellow 2015-04: 12,984,370

🧱 Armando OBT para yellow 2015-05
✅ OBT filas yellow 2015-05: 13,073,844

🧱 Armando OBT para yellow 2015-06
✅ OBT filas yellow 2015-06: 12,246,555

🧱 Armando OBT para yellow 2015-07
✅ OBT filas yellow 2015-07: 11,486,184

🧱 Armando OBT para yellow 2015-08
✅ OBT filas yellow 2015-08: 11,052,998

🧱 Armando OBT para yellow 2015-09
✅ OBT filas yellow 2015-09: 11,145,256

🧱 Armando OBT para yellow 2015-10
✅ OBT filas yellow 2015-10: 12,230,107

🧱 Armando OBT para yellow 2015-11
✅ OBT filas yellow 2015-11: 11,233,020

🧱 Armando OBT para yellow 2015-12
✅ OBT

[('yellow', 2015, 1, 12665073),
 ('yellow', 2015, 2, 12368347),
 ('yellow', 2015, 3, 13260358),
 ('yellow', 2015, 4, 12984370),
 ('yellow', 2015, 5, 13073844),
 ('yellow', 2015, 6, 12246555),
 ('yellow', 2015, 7, 11486184),
 ('yellow', 2015, 8, 11052998),
 ('yellow', 2015, 9, 11145256),
 ('yellow', 2015, 10, 12230107),
 ('yellow', 2015, 11, 11233020),
 ('yellow', 2015, 12, 11381903),
 ('yellow', 2016, 1, 10839922),
 ('yellow', 2016, 2, 11309845),
 ('yellow', 2016, 3, 12134694),
 ('yellow', 2016, 4, 11852134),
 ('yellow', 2016, 5, 11761002),
 ('yellow', 2016, 6, 11062584),
 ('yellow', 2016, 7, 10229093),
 ('yellow', 2016, 8, 9880010),
 ('yellow', 2016, 9, 10051282),
 ('yellow', 2016, 10, 10786656),
 ('yellow', 2016, 11, 10036731),
 ('yellow', 2016, 12, 10379864),
 ('yellow', 2017, 1, 9649991),
 ('yellow', 2017, 2, 9110089),
 ('yellow', 2017, 3, 10226585),
 ('yellow', 2017, 4, 9977078),
 ('yellow', 2017, 5, 10033630),
 ('yellow', 2017, 6, 9590198),
 ('yellow', 2017, 7, 8527130),
 ('yello