# 🚖 Uber ETL + ML (Opción A y B)

Este notebook ejecuta paso a paso el proyecto Uber ETL con dos opciones de modelado:
- **Opción A**: Predicción de la demanda por hora.
- **Opción B**: Predicción de la tarifa o duración de un viaje.

Incluye extracción, transformación, entrenamiento y carga a PostgreSQL.

In [12]:
# 1) instalar kaggle (si ya lo tienes, no pasa nada)
!pip install -q kaggle




## 0. Requisitos

In [1]:
!pip install pandas numpy scikit-learn sqlalchemy psycopg2-binary python-dotenv kaggle

Collecting psycopg2-binary
  Downloading psycopg2_binary-2.9.10-cp313-cp313-win_amd64.whl.metadata (4.8 kB)
Collecting kaggle
  Using cached kaggle-1.7.4.5-py3-none-any.whl.metadata (16 kB)
Downloading psycopg2_binary-2.9.10-cp313-cp313-win_amd64.whl (2.6 MB)
   ---------------------------------------- 0.0/2.6 MB ? eta -:--:--
   ---------------------------------------- 2.6/2.6 MB 22.6 MB/s eta 0:00:00
Using cached kaggle-1.7.4.5-py3-none-any.whl (181 kB)
Installing collected packages: psycopg2-binary, kaggle

   -------------------- ------------------- 1/2 [kaggle]
   -------------------- ------------------- 1/2 [kaggle]
   -------------------- ------------------- 1/2 [kaggle]
   ---------------------------------------- 2/2 [kaggle]

Successfully installed kaggle-1.7.4.5 psycopg2-binary-2.9.10


## 1. Configuración

In [3]:

import os

PG_URI = os.getenv("PG_URI", "postgresql+psycopg2://postgres:P%40ss2022@localhost:5432/postgres")
DATA_DIR = "D:/etl_project - uber/data/uber"
os.makedirs(DATA_DIR, exist_ok=True)

print("PG_URI:", PG_URI)
print("DATA_DIR:", DATA_DIR)


PG_URI: postgresql+psycopg2://postgres:P%40ss2022@localhost:5432/postgres
DATA_DIR: D:/etl_project - uber/data/uber


In [13]:
import os
os.environ["KAGGLE_USERNAME"] = "tatianajejen"
os.environ["KAGGLE_KEY"] = "68ff2a5012294e9834d098b8b7e437a7"

print("KAGGLE_USERNAME set?", "KAGGLE_USERNAME" in os.environ)



KAGGLE_USERNAME set? True


In [23]:
import os, zipfile, glob, subprocess, pathlib

DATA_DIR = "D:/etl_project - uber/data/uber"
pathlib.Path(DATA_DIR).mkdir(parents=True, exist_ok=True)

# si ya hay CSVs, no baja de nuevo
csvs = glob.glob(f"{DATA_DIR}/**/*.csv", recursive=True)
if csvs:
    print("✅ CSVs ya presentes:", len(csvs))
else:
    print("⬇️ Descargando dataset Kaggle…")
    # si usas variables de entorno, ya están en os.environ por la celda anterior
    cmd = ["kaggle", "datasets", "download", "-d", "yashdevladdha/uber-ride-analytics-dashboard", "-p", DATA_DIR]
    print("CMD:", " ".join(cmd))
    subprocess.run(cmd, check=True)

    # descomprimir todos los .zip que queden en data/uber
    zips = glob.glob(f"{DATA_DIR}/*.zip")
    for z in zips:
        print("📦 Descomprimiendo:", os.path.basename(z))
        with zipfile.ZipFile(z, "r") as zz:
            zz.extractall(DATA_DIR)
    print("✅ Descarga y descompresión completas.")
print (DATA_DIR)


⬇️ Descargando dataset Kaggle…
CMD: kaggle datasets download -d yashdevladdha/uber-ride-analytics-dashboard -p D:/etl_project - uber/data/uber
📦 Descomprimiendo: uber-ride-analytics-dashboard.zip
✅ Descarga y descompresión completas.
D:/etl_project - uber/data/uber


In [24]:
import os
for root, dirs, files in os.walk(DATA_DIR):
    level = root.replace(DATA_DIR, "").count(os.sep)
    indent = "  " * level
    print(f"{indent}{os.path.basename(root) or root}/")
    for f in files:
        print(f"{indent}  {f}")

        


uber/
  Dasboard.gif
  ncr_ride_bookings.csv
  uber-ride-analytics-dashboard.zip
  Uber.pbix


In [37]:

import os, re, glob
import numpy as np
import pandas as pd
import re
import unicodedata

DATA_DIR = "D:/etl_project - uber/data/uber"  # <-- cambia si lo tienes en otra carpeta

def _norm_colname(c: str) -> str:
    c = c.strip().lower()
    c = c.replace("%", "pct")
    c = re.sub(r"[^\w]+", "_", c)          # no alfanum → _
    c = re.sub(r"_+", "_", c).strip("_")   # múltiple _ → _
    return c

def _clean_text_series(s: pd.Series) -> pd.Series:
    """Limpia comillas, espacios raros y normaliza acentos."""
    if s.dtype not in ("object", "string"):
        return s
    out = (s.astype("string")
             .str.replace(r'["“”‘’\']', "", regex=True)  # quita comillas
             .str.strip()
             .str.replace(r"\s+", " ", regex=True)
             .str.lower())
    # normaliza acentos (mantiene NaN)
    return out.map(lambda x: unicodedata.normalize("NFKD", x).encode("ascii","ignore").decode("utf-8") if (x is not None and x is not pd.NA) else x)

def _strip_string_cols(df: pd.DataFrame) -> pd.DataFrame:
    for col in df.select_dtypes(include=["object","string"]).columns:
        df[col] = _clean_text_series(df[col])
    return df

def _normalize_booleans(df: pd.DataFrame) -> pd.DataFrame:
    mapping = {
        "true": True, "false": False,
        "yes": True, "no": False,
        "y": True, "n": False,
        "si": True, "sí": True, "s": True
    }
    obj_cols = df.select_dtypes(include=["object","string"]).columns
    for col in obj_cols:
        # si la cardinalidad es baja y parece booleano, normaliza
        vals = set(df[col].dropna().astype(str).str.lower().unique())
        if vals and vals.issubset(set(mapping.keys()) | { "nan", "none", "null" }):
            df[col] = df[col].astype(str).str.lower().map(mapping).astype("boolean")
    return df

def _coerce_numeric(df: pd.DataFrame) -> pd.DataFrame:
    """Convierte columnas object que parecen numéricas (con comas/puntos/monedas) a float."""
    obj_cols = df.select_dtypes(include=["object","string"]).columns
    money_regex = re.compile(r"^[\s$€£S/]*-?\d{1,3}([.,]\d{3})*([.,]\d+)?\s*$")
    for col in obj_cols:
        s = df[col].astype("string")
        # si la mayoría parecen número/moneda, convertimos
        sample = s.dropna().head(100).str.strip()
        if len(sample) and (sample.str.match(money_regex).mean() > 0.6):
            # quitar símbolos y normalizar separadores
            tmp = s.str.replace(r"[^\d,.\-]", "", regex=True)
            # si hay comas como miles y punto decimal, o viceversa
            # heurística: si más puntos que comas → usaremos punto decimal
            if (tmp.str.count(r"\.").mean() >= tmp.str.count(",").mean()):
                tmp = tmp.str.replace(",", "", regex=False)
            else:
                tmp = tmp.str.replace(".", "", regex=False).str.replace(",", ".", regex=False)
            df[col] = pd.to_numeric(tmp, errors="coerce")
    return df

def _parse_datetimes(df: pd.DataFrame) -> pd.DataFrame:
    """Parsea columnas de fecha/hora de manera más precisa."""
    for col in df.columns:
        name = col.lower()

        # Caso especial: columna llamada exactamente "time"
        if name == "time":
            try:
                df[col] = pd.to_datetime(df[col], format="%H:%M:%S", errors="coerce").dt.time
            except Exception:
                pass
            continue  # salta a la siguiente columna

        # Columnas que realmente son fecha/datetime
        if any(k in name for k in ["date", "datetime", "timestamp"]):
            try:
                df[col] = pd.to_datetime(df[col], errors="coerce")
            except Exception:
                pass
    return df



def _fix_impossible_values(df: pd.DataFrame) -> pd.DataFrame:
    """Ejemplos típicos para Uber-like: distancias/tarifas negativas → NaN."""
    for cand in ["distance_km","distance","trip_distance"]:
        if cand in df.columns:
            df.loc[df[cand] < 0, cand] = np.nan
    for cand in ["fare","fare_amount","price","amount","revenue","cost"]:
        if cand in df.columns:
            df.loc[df[cand] < 0, cand] = np.nan
    return df

def _winsorize_numeric(df: pd.DataFrame, lower=0.01, upper=0.99) -> pd.DataFrame:
    """Recorte percentílico para mitigar outliers. No altera NaN."""
    num_cols = df.select_dtypes(include=[np.number]).columns
    for col in num_cols:
        s = df[col]
        if s.notna().sum() < 20:  # evita percentiles con muy pocas filas
            continue
        lo = s.quantile(lower)
        hi = s.quantile(upper)
        df[col] = s.clip(lo, hi)
    return df

def _impute_missing(df: pd.DataFrame) -> pd.DataFrame:
    """Imputación simple: num = mediana, cat = 'unknown' (strings), bool → False."""
    # numéricas
    for col in df.select_dtypes(include=[np.number]).columns:
        if df[col].isna().any():
            med = df[col].median()
            df[col] = df[col].fillna(med)
    # categóricas
    for col in df.select_dtypes(include=["object","string"]).columns:
        if df[col].isna().any():
            df[col] = df[col].fillna("unknown")
    # booleanas
    for col in df.select_dtypes(include=["boolean"]).columns:
        if df[col].isna().any():
            df[col] = df[col].fillna(False)
    return df

def _derive_useful_features(df: pd.DataFrame) -> pd.DataFrame:
    """Crea features comunes: ts, hour, dow, month, duration_min si hay pickup/dropoff."""
    # --- timestamp base (fecha completa) ---
    ts_col = None
    for cand in ["pickup_datetime","datetime","timestamp","date","request_datetime","start_time"]:
        if cand in df.columns and pd.api.types.is_datetime64_any_dtype(df[cand]):
            ts_col = cand
            break

    if ts_col is not None:
        df["ts"]   = pd.to_datetime(df[ts_col]).dt.floor("h")
        df["hour"] = df["ts"].dt.hour
        df["dow"]  = df["ts"].dt.weekday
        df["is_we"]= (df["dow"]>=5).astype("int8")
        df["month"]= df["ts"].dt.month

    # --- si existe columna "time" aislada (solo HH:MM:SS) ---
    if "time" in df.columns and df["time"].dtype == "object":  
        # si está como texto tipo "12:34:56"
        df["hour"] = pd.to_datetime(df["time"], format="%H:%M:%S", errors="coerce").dt.hour
    elif "time" in df.columns and str(df["time"].dtype).startswith("datetime"):
        # si ya es datetime.time
        df["hour"] = pd.to_datetime(df["time"].astype(str), format="%H:%M:%S", errors="coerce").dt.hour

    # --- duración si existe dropoff ---
    if "dropoff_datetime" in df.columns and ts_col is not None:
        mask = df["dropoff_datetime"].notna() & df[ts_col].notna()
        df["duration_min"] = np.nan
        if mask.any():
            df.loc[mask, "duration_min"] = (
                (pd.to_datetime(df.loc[mask,"dropoff_datetime"]) - pd.to_datetime(df.loc[mask,ts_col]))
                .dt.total_seconds() / 60.0
            )
            df.loc[df["duration_min"] < 0, "duration_min"] = np.nan

    return df


def read_clean_all_csvs_for_ml(data_dir: str) -> pd.DataFrame:
    """Lee TODOS los CSV recursivamente y aplica limpieza ‘pro-ML’ + features."""
    csvs = glob.glob(f"{data_dir}/**/*.csv", recursive=True)
    if not csvs:
        raise FileNotFoundError(f"No se encontraron CSV en {data_dir} (¿descargaste/descomprimiste?).")
    frames = []
    for f in csvs:
        try:
            df = pd.read_csv(f)
        except UnicodeDecodeError:
            df = pd.read_csv(f, encoding="latin-1")

        # 1) normaliza nombres
        df.columns = [_norm_colname(c) for c in df.columns]

        # 2) vacíos → NaN + strings limpias
        df = df.replace(r"^\s*$", np.nan, regex=True)
        df = _strip_string_cols(df)

        # 3) fechas/horas
        df = _parse_datetimes(df)

        # 4) numéricos (coma/punto/moneda)
        df = _coerce_numeric(df)

        # 5) booleanos
        df = _normalize_booleans(df)

        frames.append(df)
        print(f"✓ {os.path.basename(f)} – filas: {len(df)}, cols: {len(df.columns)}")

    raw = pd.concat(frames, ignore_index=True)

    # 6) duplicados exactos
    before_dups = len(raw)
    raw = raw.drop_duplicates()
    after_dups = len(raw)
    print(f"Deduplicadas: {before_dups - after_dups} filas")

    # 7) arregla imposibles (dist/fare < 0)
    raw = _fix_impossible_values(raw)

    # 8) winsorize (recorte outliers)
    raw = _winsorize_numeric(raw, lower=0.01, upper=0.99)

    # 9) features derivadas
    raw = _derive_useful_features(raw)

    # 10) imputación final
    raw = _impute_missing(raw)

    # 11) tipos finales ‘amigables’ para Postgres
    # - int suaves (si no hay decimales)
    for col in raw.select_dtypes(include=[np.number]).columns:
        if np.allclose(raw[col].dropna() % 1, 0):  # casi enteros
            raw[col] = raw[col].round().astype("Int64")  # nulos compatibles
        else:
            raw[col] = raw[col].astype("float64")

    return raw

# Ejecuta la limpieza total
df_clean = read_clean_all_csvs_for_ml(DATA_DIR)
print("df_clean shape:", df_clean.shape)
df_clean.head()


✓ ncr_ride_bookings.csv – filas: 150000, cols: 21
Deduplicadas: 0 filas
df_clean shape: (150000, 26)


Unnamed: 0,date,time,booking_id,booking_status,customer_id,vehicle_type,pickup_location,drop_location,avg_vtat,avg_ctat,...,booking_value,ride_distance,driver_ratings,customer_rating,payment_method,ts,hour,dow,is_we,month
0,2024-03-23,12:29:38,cnr5884300,no driver found,cid1982111,ebike,palam vihar,jhilmil,8.3,28.8,...,414.0,23.72,4.3,4.5,unknown,2024-03-23,12,5,1,3
1,2024-11-29,18:01:39,cnr1326809,incomplete,cid4604802,go sedan,shastri nagar,gurgaon sector 56,4.9,14.0,...,237.0,5.73,4.3,4.5,upi,2024-11-29,18,4,0,11
2,2024-08-23,08:56:10,cnr8494506,completed,cid9202816,auto,khandsa,malviya nagar,13.4,25.8,...,627.0,13.58,4.9,4.9,debit card,2024-08-23,8,4,0,8
3,2024-10-21,17:17:25,cnr8906825,completed,cid2610914,premier sedan,central secretariat,inderlok,13.1,28.5,...,416.0,34.02,4.6,5.0,upi,2024-10-21,17,0,0,10
4,2024-09-16,22:08:00,cnr1950162,completed,cid9933542,bike,ghitorni village,khan market,5.3,19.6,...,737.0,48.21,4.1,4.3,upi,2024-09-16,22,0,0,9


In [34]:
def data_quality_report(df: pd.DataFrame) -> pd.DataFrame:
    rep = pd.DataFrame({
        "dtype": df.dtypes.astype(str),
        "non_null": df.notna().sum(),
        "nulls": df.isna().sum(),
        "null_pct": (df.isna().mean() * 100).round(2),
        "n_unique": df.nunique(dropna=True)
    })
    return rep.sort_values(["null_pct","n_unique"], ascending=[False, True])

dq = data_quality_report(df_clean)
dq.head(20)


Unnamed: 0,dtype,non_null,nulls,null_pct,n_unique
cancelled_rides_by_customer,Int64,150000,0,0.0,1
cancelled_rides_by_driver,Int64,150000,0,0.0,1
incomplete_rides,Int64,150000,0,0.0,1
is_we,Int64,150000,0,0.0,2
incomplete_rides_reason,string,150000,0,0.0,4
booking_status,string,150000,0,0.0,5
driver_cancellation_reason,string,150000,0,0.0,5
reason_for_cancelling_by_customer,string,150000,0,0.0,6
payment_method,string,150000,0,0.0,6
vehicle_type,string,150000,0,0.0,7


In [38]:
from sqlalchemy import create_engine, text


TABLE_NAME = "uber_clean_ml"  # cámbiala si quieres otra

engine = create_engine(PG_URI, pool_pre_ping=True)
with engine.begin() as con:
    con.execute(text("SELECT 1"))

# Guarda (replace = sobreescribe; usa "append" si quieres acumular)
df_clean.to_sql(TABLE_NAME, engine, if_exists="replace", index=False)
print(f"✅ Cargado {len(df_clean)} filas en {TABLE_NAME}")


✅ Cargado 150000 filas en uber_clean_ml


In [61]:
# --- COMÚN PARA AMBAS OPCIONES ---
import os
import numpy as np
import pandas as pd
from sqlalchemy import create_engine, text

# sklearn
from sklearn.model_selection import TimeSeriesSplit
from sklearn.metrics import mean_absolute_error, r2_score
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.linear_model import RidgeCV

# Conexión (usa tu cadena)
PG_URI = os.getenv("PG_URI", "postgresql+psycopg2://postgres:P%40ss2022@localhost:5432/postgres")
engine = create_engine(PG_URI, pool_pre_ping=True)

with engine.begin() as con:
    con.execute(text("SELECT 1"))
print("✅ Conectado a Postgres")


✅ Conectado a Postgres


In [62]:
# ====== A) DEMANDA POR HORA ======

HORIZON_HOURS = 168  # predice 7 días
with engine.connect() as con:
    sql = """
    SELECT f.datetime_hour_id, d.ts_hour, d.hour, d.dow, d.is_weekend, d.month,
           f.trips
    FROM public.vw_hourly_demand f
    JOIN public.dim_datetime_hour d
      ON d.datetime_hour_id = f.datetime_hour_id
    ORDER BY d.ts_hour
    """
    hist = pd.read_sql(sql, con)
hist.head(), hist.shape


(   datetime_hour_id             ts_hour  hour  dow  is_weekend  month  trips
 0            101676 2024-01-01 00:19:34     0    1           0      1      1
 1            133623 2024-01-01 01:35:18     1    1           0      1      1
 2             51916 2024-01-01 01:37:50     1    1           0      1      1
 3             61694 2024-01-01 01:48:03     1    1           0      1      1
 4             23913 2024-01-01 01:49:56     1    1           0      1      1,
 (149532, 7))

In [74]:

# === Opción A: Demanda por hora ===
import os, numpy as np, pandas as pd
from sqlalchemy import create_engine, text
from sklearn.model_selection import TimeSeriesSplit
from sklearn.metrics import mean_absolute_error, r2_score
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.linear_model import RidgeCV

PG_URI = os.getenv("PG_URI", "postgresql+psycopg2://postgres:P%40ss2022@localhost:5432/postgres")
engine = create_engine(PG_URI, pool_pre_ping=True)

HORIZON_HOURS = 168  # predice 7 días (24*7)

with engine.connect() as con:
    sql = """
    SELECT f.datetime_hour_id, d.ts_hour, d.hour, d.dow, d.is_weekend, d.month,
           f.trips
    FROM public.vw_hourly_demand f
    JOIN public.dim_datetime_hour d
      ON d.datetime_hour_id = f.datetime_hour_id
    ORDER BY d.ts_hour
    """
    hist = pd.read_sql(sql, con)

df = hist.sort_values("ts_hour").reset_index(drop=True).copy()

# Fuerza a toparse en la hora exacta
df["ts_hour"] = pd.to_datetime(df["ts_hour"]).dt.floor("H")

# Asegura numérico
df["trips"] = pd.to_numeric(df["trips"], errors="coerce")

last_ts = df["ts_hour"].max()
last_ts = pd.to_datetime(last_ts).floor("H")  # en punto

future_ts = pd.date_range(last_ts + pd.Timedelta(hours=1),
                          periods=HORIZON_HOURS, freq="H")



def add_lags(frame, col="trips", lags=(24,168), rolls=(24,)):
    f = frame.copy()
    if isinstance(lags, (int, np.integer)): lags = [lags]
    if isinstance(rolls, (int, np.integer)): rolls = [rolls]

    f = f.sort_values("ts_hour").reset_index(drop=True)
    f[col] = pd.to_numeric(f[col], errors="coerce")

    for L in lags:
        f[f"{col}_lag{L}"] = f[col].shift(L)
    for R in rolls:
        f[f"{col}_roll{R}"] = f[col].rolling(R, min_periods=1).mean()
    return f

df = add_lags(df, "trips", lags=(24,168), rolls=(24,))
df_train = df.dropna(subset=["trips_lag24","trips_lag168"]).copy()


X_cols = ["hour","dow","is_weekend","month","trips_lag24","trips_lag168","trips_roll24"]
X, y = df_train[X_cols].values, df_train["trips"].values

tscv = TimeSeriesSplit(n_splits=5)
model = Pipeline([("scaler", StandardScaler()),
                  ("ridge", RidgeCV(alphas=np.logspace(-2,2,15), cv=tscv))])
model.fit(X, y)

y_hat = model.predict(X)
print("A) MAE:", round(mean_absolute_error(y, y_hat), 2),
      "| R²:", round(r2_score(y, y_hat), 3))

# Horizonte futuro
last_ts = df["ts_hour"].max()
future_ts = pd.date_range(last_ts + pd.Timedelta(hours=1), periods=HORIZON_HOURS, freq="H")
future = pd.DataFrame({
    "ts_hour": future_ts,
    "hour": future_ts.hour,
    "dow": future_ts.weekday,
    "is_weekend": (future_ts.weekday >= 5).astype(int),
    "month": future_ts.month
})

df_full = pd.concat([df, future], ignore_index=True).sort_values("ts_hour").reset_index(drop=True)
df_full = add_lags(df_full, "trips", lags=(24,168), rolls=(24,))
future_ready = df_full[df_full["ts_hour"].isin(future_ts)][["ts_hour"] + X_cols].copy()

# Imputa NaN del futuro con medianas del entrenamiento si faltan lags
future_ready[X_cols] = future_ready[X_cols].fillna(df_train[X_cols].median())

pred = model.predict(future_ready[X_cols].values)
pred_a = future_ready[["ts_hour"]].copy()
pred_a["trips_pred"] = np.clip(pred, 0, None)  # sin round todavía
pred_a.head(10)


# Guarda en Postgres (upsert)
with engine.begin() as con:
    con.execute(text("""
    CREATE TABLE IF NOT EXISTS public.hourly_demand_forecast (
        ts_hour    TIMESTAMP PRIMARY KEY,
        trips_pred INT
    );
    """))
    for _, r in pred_a.iterrows():
        con.execute(text("""
        INSERT INTO public.hourly_demand_forecast (ts_hour, trips_pred)
        VALUES (:ts, :tp)
        ON CONFLICT (ts_hour) DO UPDATE SET trips_pred = EXCLUDED.trips_pred;
        """), {"ts": r["ts_hour"], "tp": int(r["trips_pred"])})
print("✅ Opción A: guardado en public.hourly_demand_forecast")


  df["ts_hour"] = pd.to_datetime(df["ts_hour"]).dt.floor("H")
  last_ts = pd.to_datetime(last_ts).floor("H")  # en punto
  future_ts = pd.date_range(last_ts + pd.Timedelta(hours=1),


A) MAE: 1.99 | R²: 0.771


  future_ts = pd.date_range(last_ts + pd.Timedelta(hours=1), periods=HORIZON_HOURS, freq="H")


✅ Opción A: guardado en public.hourly_demand_forecast


In [75]:
hist['trips'].describe(), hist['trips'].value_counts().head(10)


(count    149206.000000
 mean         14.499698
 std           6.033184
 min           1.000000
 25%          10.000000
 50%          15.000000
 75%          19.000000
 max          38.000000
 Name: trips, dtype: float64,
 trips
 15    10137
 13     9936
 17     9858
 14     9608
 16     9470
 12     9268
 11     8177
 18     7876
 19     7090
 20     6807
 Name: count, dtype: int64)

In [85]:
# ⬅️ CELDA 1: extracción de datos para el modelo (B)
import pandas as pd

SQL_B = """
SELECT 
    f.trip_id,
    f.booking_id,
    -- Targets
    f.booking_value,         -- tarifa
    f.duration_min,          -- duración

    -- Features numéricas
    f.ride_distance_km,
    f.arrive_time_min,
    d.hour, d.dow, d.is_weekend, d.month,

    -- Categóricas (proxy)
    f.product_id

    -- Si quieres usar zonas (cuidado con cardinalidad):
    -- , f.pickup_location_id, f.drop_location_id
FROM public.fact_trip f
JOIN public.dim_datetime_hour d 
  ON d.datetime_hour_id = f.datetime_hour_id
WHERE f.booking_status ILIKE 'completed'
  AND f.ride_distance_km IS NOT NULL
  AND f.duration_min IS NOT NULL
  AND f.booking_value IS NOT NULL
"""
with engine.connect() as con:
    df = pd.read_sql(SQL_B, con)

print(df.shape)
df.head()


(93000, 11)


Unnamed: 0,trip_id,booking_id,booking_value,duration_min,ride_distance_km,arrive_time_min,hour,dow,is_weekend,month,product_id
0,2,cnr8612910,259.0,35.2,26.66,11.5,12,6,1,8,1
1,5,cnr2694235,896.0,29.7,44.46,8.3,8,3,0,2,7
2,7,cnr1539677,637.0,30.1,26.01,10.0,10,6,1,6,7
3,12,cnr9452807,78.0,19.2,39.61,6.6,22,5,0,5,1
4,14,cnr5022515,292.0,33.5,36.65,13.0,17,5,0,8,1


In [96]:
import pandas as pd

# Supongamos que ya cargaste tu DataFrame
df = df.dropna(subset=["ride_distance_km","booking_value","duration_min","hour","dow"]).copy()

X = df[["ride_distance_km","hour","dow"]]   # features
y_fare = df["booking_value"]                # target 1
y_dur  = df["duration_min"]                 # target 2

print (y_dur)


0        35.2
1        29.7
2        30.1
3        19.2
4        33.5
         ... 
92995    43.4
92996    30.6
92997    30.9
92998    26.6
92999    16.7
Name: duration_min, Length: 93000, dtype: float64


In [97]:
from sklearn.model_selection import train_test_split

X_train, X_test, y_train_f, y_test_f = train_test_split(X, y_fare, test_size=0.2, random_state=42)
_, _, y_train_d, y_test_d = train_test_split(X, y_dur, test_size=0.2, random_state=42)


In [98]:
from sklearn.ensemble import RandomForestRegressor

# Modelo para tarifa
fare_model = RandomForestRegressor(n_estimators=300, max_depth=15, random_state=42, n_jobs=-1)
fare_model.fit(X_train, y_train_f)

# Modelo para duración
dur_model = RandomForestRegressor(n_estimators=300, max_depth=15, random_state=42, n_jobs=-1)
dur_model.fit(X_train, y_train_d)


In [99]:
from sklearn.metrics import mean_absolute_error, r2_score

fare_pred = fare_model.predict(X_test)
dur_pred  = dur_model.predict(X_test)

print("Predicción de TARIFA:")
print("  MAE:", mean_absolute_error(y_test_f, fare_pred))
print("  R²:",  r2_score(y_test_f, fare_pred))

print("Predicción de DURACIÓN:")
print("  MAE:", mean_absolute_error(y_test_d, dur_pred))
print("  R²:",  r2_score(y_test_d, dur_pred))


Predicción de TARIFA:
  MAE: 270.1748563563962
  R²: 0.04225349024714942
Predicción de DURACIÓN:
  MAE: 7.529181618573846
  R²: -0.011115407232626273


In [100]:
df["fare_pred"] = fare_model.predict(X)
df["duration_pred"] = dur_model.predict(X)

df[["ride_distance_km","hour","dow","booking_value","fare_pred","duration_min","duration_pred"]].head(10)


Unnamed: 0,ride_distance_km,hour,dow,booking_value,fare_pred,duration_min,duration_pred
0,26.66,12,6,259.0,601.09411,35.2,29.954814
1,44.46,8,3,896.0,493.353542,29.7,29.375695
2,26.01,10,6,637.0,638.745622,30.1,29.939783
3,39.61,22,5,78.0,428.046047,19.2,29.850893
4,36.65,17,5,292.0,443.678244,33.5,30.140398
5,23.44,9,3,498.0,453.224192,21.3,29.358818
6,21.31,16,6,729.0,680.107199,27.6,29.80157
7,21.1,9,2,652.0,449.851666,43.7,30.185773
8,30.57,21,1,270.0,414.61935,30.6,29.758688
9,29.68,9,2,155.0,437.615426,15.2,29.404366


In [93]:
print("Predicciones tarifa nulas:", np.isnan(fare_all).sum())
print("Predicciones duración nulas:", np.isnan(dur_all).sum())


Predicciones tarifa nulas: 0
Predicciones duración nulas: 0


In [101]:
# CELDA 1B — si necesitas unir con la dimensión temporal
import pandas as pd

SQL = """
SELECT
  f.trip_id, f.booking_id,
  f.booking_value, f.duration_min,
  f.ride_distance_km,
  d.hour, d.dow
FROM public.fact_trip f
JOIN public.dim_datetime_hour d
  ON d.datetime_hour_id = f.datetime_hour_id
WHERE f.booking_status ILIKE 'completed'
  AND f.ride_distance_km IS NOT NULL
  AND f.booking_value   IS NOT NULL
  AND f.duration_min    IS NOT NULL;
"""
with engine.connect() as con:
    df = pd.read_sql(SQL, con)

df.shape, df.head()


((93000, 7),
    trip_id  booking_id  booking_value  duration_min  ride_distance_km  hour  \
 0        1  cnr5421706          826.0          18.0             36.15    15   
 1        2  cnr8612910          259.0          35.2             26.66    12   
 2        4  cnr4038553          131.0          16.6             39.16    22   
 3       19  cnr2871526          498.0          21.3             23.44     9   
 4       22  cnr2769060          729.0          27.6             21.31    16   
 
    dow  
 0    2  
 1    6  
 2    0  
 3    3  
 4    6  )

In [102]:
# CELDA 2 — limpieza y features
import numpy as np

# filtros razonables para evitar outliers bravos
df = df[(df["ride_distance_km"] > 0) & (df["ride_distance_km"] <= 60)]
df = df[(df["booking_value"]   >= 0) & (df["booking_value"]   <= 2000)]
df = df[(df["duration_min"]    >  0) & (df["duration_min"]    <= 240)]
df = df[(df["hour"].between(0,23)) & (df["dow"].between(0,6))]

# features y targets
X = df[["ride_distance_km", "hour", "dow"]].copy()
y_fare = df["booking_value"].copy()
y_dur  = df["duration_min"].copy()

print(X.isna().sum(), y_fare.isna().sum(), y_dur.isna().sum())


ride_distance_km    0
hour                0
dow                 0
dtype: int64 0 0


In [103]:
# CELDA 3 — entrenamiento y evaluación
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_absolute_error, r2_score

# split
X_train, X_test, y_train_f, y_test_f = train_test_split(X, y_fare, test_size=0.2, random_state=42)
_,       _,       y_train_d, y_test_d = train_test_split(X, y_dur,  test_size=0.2, random_state=42)

# modelos
fare_model = RandomForestRegressor(n_estimators=300, max_depth=16, random_state=42, n_jobs=-1)
dur_model  = RandomForestRegressor(n_estimators=300, max_depth=16, random_state=42, n_jobs=-1)

fare_model.fit(X_train, y_train_f)
dur_model.fit(X_train,  y_train_d)

fare_pred = fare_model.predict(X_test)
dur_pred  = dur_model.predict(X_test)

print("🔵 TARIFA  → MAE: {:.2f} | R²: {:.3f}".format(mean_absolute_error(y_test_f, fare_pred),
                                                    r2_score(y_test_f, fare_pred)))
print("🟣 DURACIÓN → MAE: {:.2f} | R²: {:.3f}".format(mean_absolute_error(y_test_d, dur_pred),
                                                    r2_score(y_test_d, dur_pred)))


🔵 TARIFA  → MAE: 275.02 | R²: 0.042
🟣 DURACIÓN → MAE: 7.45 | R²: -0.014


In [104]:
# CELDA 4 — predicción total + clipping a rangos razonables
fare_all = np.clip(fare_model.predict(X), 0, 2000)
dur_all  = np.clip(dur_model.predict(X),  0, 240)

df_pred = df[["trip_id","booking_id","ride_distance_km","hour","dow",
              "booking_value","duration_min"]].copy()
df_pred["fare_pred"]        = fare_all
df_pred["duration_pred_min"] = dur_all

df_pred.head(10)


Unnamed: 0,trip_id,booking_id,ride_distance_km,hour,dow,booking_value,duration_min,fare_pred,duration_pred_min
0,1,cnr5421706,36.15,15,2,826.0,18.0,435.907772,29.49276
1,2,cnr8612910,26.66,12,6,259.0,35.2,604.821115,29.825991
2,4,cnr4038553,39.16,22,0,131.0,16.6,601.810979,29.115389
3,19,cnr2871526,23.44,9,3,498.0,21.3,434.107492,28.817751
4,22,cnr2769060,21.31,16,6,729.0,27.6,647.926994,29.910682
5,26,cnr4712285,21.1,9,2,652.0,43.7,450.876749,30.345261
6,27,cnr9456552,30.57,21,1,270.0,30.6,416.237521,30.158875
7,28,cnr6837270,42.6,7,3,225.0,41.5,443.638239,32.28631
8,30,cnr3879580,29.68,9,2,155.0,15.2,454.194213,29.92782
9,32,cnr9032859,45.3,18,3,823.0,28.1,487.657618,29.397223


In [105]:
# CELDA 5 — guardar predicciones en Postgres
from sqlalchemy import text

DDL = """
CREATE TABLE IF NOT EXISTS public.trip_predictions (
  trip_id            BIGINT PRIMARY KEY,
  booking_id         TEXT,
  fare_pred          NUMERIC,
  duration_pred_min  NUMERIC
);
"""
UPSERT = """
INSERT INTO public.trip_predictions (trip_id, booking_id, fare_pred, duration_pred_min)
VALUES (:tid, :bid, :fp, :dp)
ON CONFLICT (trip_id) DO UPDATE
SET booking_id = EXCLUDED.booking_id,
    fare_pred  = EXCLUDED.fare_pred,
    duration_pred_min = EXCLUDED.duration_pred_min;
"""

with engine.begin() as con:
    con.execute(text(DDL))
    for _, r in df_pred.iterrows():
        con.execute(text(UPSERT), {
            "tid": int(r["trip_id"]),
            "bid": r["booking_id"],
            "fp":  float(r["fare_pred"]),
            "dp":  float(r["duration_pred_min"]),
        })

print(f"✅ Guardadas {len(df_pred)} filas en public.trip_predictions")


✅ Guardadas 93000 filas en public.trip_predictions
