In [12]:
import pandas as pd
import requests
from io import BytesIO
import os

BASE_URL = "https://d37ci6vzurychx.cloudfront.net/trip-data"
OUTPUT_DIR = "tlc_data_s3"
os.makedirs(OUTPUT_DIR, exist_ok=True)

TYPES = ["yellow", "green", "fhvhv"]
YEARS = range(2022, 2026)

def download_parquet_file(taxi_type, year, month):
    file_name = f"{taxi_type}_tripdata_{year}-{month:02d}.parquet"
    url = f"{BASE_URL}/{file_name}"
    dest_path = os.path.join(OUTPUT_DIR, file_name)

    if os.path.exists(dest_path):
        print(f"‚ö†Ô∏è Ya existe: {file_name}")
        return

    try:
        print(f"‚¨áÔ∏è Descargando {file_name}...")
        resp = requests.get(url, timeout=30)
        if resp.status_code == 200:
            df = pd.read_parquet(BytesIO(resp.content))
            df.to_parquet(dest_path, index=False, compression="snappy")
            print(f"‚úÖ Guardado {file_name} ({len(df):,} filas)")
        else:
            print(f"‚ùå No encontrado ({resp.status_code}): {url}")
    except Exception as e:
        print(f"‚ö†Ô∏è Error con {file_name}: {e}")

for taxi_type in TYPES:
    for year in YEARS:
        for month in range(1, 13):
            download_parquet_file(taxi_type, year, month)


‚¨áÔ∏è Descargando yellow_tripdata_2022-01.parquet...
‚úÖ Guardado yellow_tripdata_2022-01.parquet (2,463,931 filas)
‚¨áÔ∏è Descargando yellow_tripdata_2022-02.parquet...
‚úÖ Guardado yellow_tripdata_2022-02.parquet (2,979,431 filas)
‚¨áÔ∏è Descargando yellow_tripdata_2022-03.parquet...
‚úÖ Guardado yellow_tripdata_2022-03.parquet (3,627,882 filas)
‚¨áÔ∏è Descargando yellow_tripdata_2022-04.parquet...
‚úÖ Guardado yellow_tripdata_2022-04.parquet (3,599,920 filas)
‚¨áÔ∏è Descargando yellow_tripdata_2022-05.parquet...
‚úÖ Guardado yellow_tripdata_2022-05.parquet (3,588,295 filas)
‚¨áÔ∏è Descargando yellow_tripdata_2022-06.parquet...
‚úÖ Guardado yellow_tripdata_2022-06.parquet (3,558,124 filas)
‚¨áÔ∏è Descargando yellow_tripdata_2022-07.parquet...
‚úÖ Guardado yellow_tripdata_2022-07.parquet (3,174,394 filas)
‚¨áÔ∏è Descargando yellow_tripdata_2022-08.parquet...
‚úÖ Guardado yellow_tripdata_2022-08.parquet (3,152,677 filas)
‚¨áÔ∏è Descargando yellow_tripdata_2022-09.parquet...
‚úÖ Guarda

In [15]:
def normalize_columns(df: pd.DataFrame, trip_type: str) -> pd.DataFrame:
    """
    Mapea campos a un esquema unificado:
    - pickup_datetime, dropoff_datetime
    - pickup_location_id, dropoff_location_id
    - trip_distance
    - fare_amount
    - trip_minutes (si se puede calcular)
    """

    df = df.copy()

    # Datetime: diferentes nombres seg√∫n dataset
    time_map = [
        ("tpep_pickup_datetime", "pickup_datetime"),
        ("lpep_pickup_datetime", "pickup_datetime"),
        ("pickup_datetime", "pickup_datetime"),
        ("request_datetime", "pickup_datetime"),
        ("tpep_dropoff_datetime", "dropoff_datetime"),
        ("lpep_dropoff_datetime", "dropoff_datetime"),
        ("dropoff_datetime", "dropoff_datetime"),
    ]
    for src, dst in time_map:
        if src in df.columns:
            df.rename(columns={src: dst}, inplace=True)

    # ‚úÖ eliminar columnas duplicadas tras renombrar
    df = df.loc[:, ~df.columns.duplicated()]

    # Location IDs
    loc_map = [
        ("pulocationid", "pickup_location_id"),
        ("dolocationid", "dropoff_location_id"),
        ("PULocationID", "pickup_location_id"),
        ("DOLocationID", "dropoff_location_id"),
    ]
    for src, dst in loc_map:
        if src in df.columns:
            df.rename(columns={src: dst}, inplace=True)

    df = df.loc[:, ~df.columns.duplicated()]  # otra limpieza

    # Distancia
    if "trip_distance" not in df.columns:
        if "trip_miles" in df.columns:
            df.rename(columns={"trip_miles": "trip_distance"}, inplace=True)

    # Tarifa
    if "fare_amount" not in df.columns:
        if "base_passenger_fare" in df.columns:
            df.rename(columns={"base_passenger_fare": "fare_amount"}, inplace=True)
        elif "total_amount" in df.columns:
            df.rename(columns={"total_amount": "fare_amount"}, inplace=True)

    # Casting datetime
    for col in ["pickup_datetime", "dropoff_datetime"]:
        if col in df.columns:
            df[col] = pd.to_datetime(df[col], errors="coerce")

    # Trip minutes
    if "pickup_datetime" in df.columns and "dropoff_datetime" in df.columns:
        dt = (df["dropoff_datetime"] - df["pickup_datetime"]).dt.total_seconds() / 60.0
        df["trip_minutes"] = dt
    else:
        df["trip_minutes"] = np.nan

    # Asegurar columnas m√≠nimas
    keep = [
        "pickup_datetime", "dropoff_datetime",
        "pickup_location_id", "dropoff_location_id",
        "trip_distance", "fare_amount", "trip_minutes"
    ]
    for col in keep:
        if col not in df.columns:
            df[col] = np.nan

    df["trip_type"] = trip_type
    df = df.loc[:, ~df.columns.duplicated()]  # limpieza final
    return df[keep + ["trip_type"]]


In [18]:
#!/usr/bin/env python
# -*- coding: utf-8 -*-

# pip install pandas pyarrow numpy scipy tqdm

import pandas as pd
import numpy as np
from tqdm import tqdm
from pathlib import Path
import re

# ===============================
# CONFIGURACI√ìN
# ===============================
DATA_DIR = Path("tlc_data_s3")
REFERENCE = (2025, 1)
OUT_DIR = Path("analysis_outputs")
OUT_DIR.mkdir(exist_ok=True)

# ===============================
# FUNCIONES AUXILIARES
# ===============================
def detect_trip_type(name: str):
    name = name.lower()
    if "yellow" in name: return "yellow"
    if "green" in name: return "green"
    if "fhvhv" in name: return "fhvhv"
    return "unknown"

def parse_year_month(name: str):
    m = re.search(r"(\d{4})-(\d{2})", name)
    if not m:
        raise ValueError(f"No puedo extraer a√±o/mes de {name}")
    return int(m.group(1)), int(m.group(2))

def jensen_shannon(p, q):
    p, q = np.asarray(p), np.asarray(q)
    p, q = p / p.sum(), q / q.sum()
    m = 0.5 * (p + q)
    mask = (p > 0) & (q > 0)
    kl = lambda a, b: np.sum(a[mask] * np.log2(a[mask]/b[mask]))
    return np.sqrt(0.5*kl(p, m) + 0.5*kl(q, m))

def summarize_file(path: Path):
    """Lee un solo archivo Parquet (independientemente del tipo) y calcula m√©tricas clave."""
    # Intentamos leer solo columnas relevantes (las que existan realmente)
    desired_cols = [
        "tpep_pickup_datetime", "lpep_pickup_datetime", "request_datetime",
        "tpep_dropoff_datetime", "lpep_dropoff_datetime", "dropoff_datetime",
        "trip_distance", "fare_amount", "base_passenger_fare", "trip_miles",
    ]

    # Detectar columnas existentes sin cargar todo el dataset
    import pyarrow.parquet as pq
    schema = pq.read_schema(path)
    available = [c for c in desired_cols if c in schema.names]

    df = pd.read_parquet(path, columns=available)
    trip_type = detect_trip_type(path.name)
    year, month = parse_year_month(path.name)

    # Determinar columnas de tiempo (pickups y dropoffs)
    pickup_col = next(
        (c for c in ["tpep_pickup_datetime", "lpep_pickup_datetime", "request_datetime", "pickup_datetime"]
         if c in df.columns), None)
    drop_col = next(
        (c for c in ["tpep_dropoff_datetime", "lpep_dropoff_datetime", "dropoff_datetime"]
         if c in df.columns), None)

    if pickup_col is None:
        print(f"‚ö†Ô∏è No hay columna pickup en {path.name}, se omite.")
        return None

    df["pickup"] = pd.to_datetime(df[pickup_col], errors="coerce")
    df["hour"] = df["pickup"].dt.hour
    df["wday"] = df["pickup"].dt.dayofweek

    if drop_col in df.columns:
        df["drop"] = pd.to_datetime(df[drop_col], errors="coerce")
        df["minutes"] = (df["drop"] - df["pickup"]).dt.total_seconds() / 60.0
    else:
        df["minutes"] = np.nan

    # Unificar distancia y tarifa
    if "trip_distance" not in df.columns and "trip_miles" in df.columns:
        df["trip_distance"] = df["trip_miles"]
    if "fare_amount" not in df.columns and "base_passenger_fare" in df.columns:
        df["fare_amount"] = df["base_passenger_fare"]

    valid = (df["trip_distance"] > 0) & (df["fare_amount"] > 0)
    fare_per_mile = (df.loc[valid, "fare_amount"] / df.loc[valid, "trip_distance"]).mean()

    summary = {
        "year": year,
        "month": month,
        "trip_type": trip_type,
        "count": len(df),
        "avg_trip_distance": df["trip_distance"].mean(),
        "avg_minutes": df["minutes"].mean(),
        "avg_fare_per_mile": fare_per_mile,
        "hourly": np.bincount(df["hour"].dropna().astype(int), minlength=24),
        "wday": np.bincount(df["wday"].dropna().astype(int), minlength=7)
    }
    return summary


# ===============================
# PIPELINE PRINCIPAL
# ===============================
def main():
    print("üîÑ Analizando por archivo (streaming)‚Ä¶")
    summaries = []

    files = sorted(DATA_DIR.glob("*.parquet"))
    for f in tqdm(files):
        s = summarize_file(f)
        if s:
            summaries.append(s)

    # Construir DataFrame compacto
    df_sum = pd.DataFrame(summaries)
    df_sum["hourly_norm"] = df_sum["hourly"].apply(lambda x: x/x.sum() if x.sum()>0 else np.ones(24)/24)
    df_sum["wday_norm"] = df_sum["wday"].apply(lambda x: x/x.sum() if x.sum()>0 else np.ones(7)/7)

    # Referencia enero 2025 combinando tipos
    ref_mask = (df_sum["year"]==REFERENCE[0]) & (df_sum["month"]==REFERENCE[1])
    ref = df_sum.loc[ref_mask]
    ref_hour = np.sum(np.stack(ref["hourly_norm"]), axis=0)
    ref_wday = np.sum(np.stack(ref["wday_norm"]), axis=0)
    ref_hour /= ref_hour.sum()
    ref_wday /= ref_wday.sum()

    # Comparar cada mes con referencia
    results = []
    for _, r in df_sum.iterrows():
        js_h = jensen_shannon(r["hourly_norm"], ref_hour)
        js_w = jensen_shannon(r["wday_norm"], ref_wday)
        results.append({
            "year": r["year"],
            "month": r["month"],
            "trip_type": r["trip_type"],
            "count": r["count"],
            "avg_trip_distance": r["avg_trip_distance"],
            "avg_minutes": r["avg_minutes"],
            "avg_fare_per_mile": r["avg_fare_per_mile"],
            "js_hour": js_h,
            "js_wday": js_w
        })
    df_out = pd.DataFrame(results)
    out_path = OUT_DIR / "monthly_patterns_vs_2025_01.csv"
    df_out.to_csv(out_path, index=False)
    print(f"‚úÖ Guardado resumen en {out_path}")

    # Informe por a√±o
    agg = (df_out.groupby(["year"])
           .agg(count_mean=("count","mean"),
                js_hour_mean=("js_hour","mean"),
                js_wday_mean=("js_wday","mean"))
           .reset_index())
    print("\nResumen anual:")
    print(agg)

if __name__ == "__main__":
    main()


üîÑ Analizando por archivo (streaming)‚Ä¶


100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 132/132 [01:46<00:00,  1.24it/s]

‚úÖ Guardado resumen en analysis_outputs\monthly_patterns_vs_2025_01.csv

Resumen anual:
   year    count_mean  js_hour_mean  js_wday_mean
0  2022  7.025350e+06      0.075898      0.071348
1  2023  7.544092e+06      0.072559      0.068045
2  2024  7.813900e+06      0.072837      0.072460
3  2025  7.994751e+06      0.074709      0.069334





In [19]:
import pandas as pd
import numpy as np
from pathlib import Path

IN_CSV  = Path("analysis_outputs/monthly_patterns_vs_2025_01.csv")
OUT_DIR = Path("analysis_outputs")
OUT_DIR.mkdir(exist_ok=True)

# =========================
# Cargar datos
# =========================
df = pd.read_csv(IN_CSV)

# Limpieza ligera
# Aseguramos tipos correctos y evitamos NaNs problem√°ticos en c√°lculos
num_cols = ["count", "avg_trip_distance", "avg_minutes", "avg_fare_per_mile", "js_hour", "js_wday"]
for c in num_cols:
    if c in df.columns:
        df[c] = pd.to_numeric(df[c], errors="coerce")

# =========================
# 1) JS por tipo y a√±o
# =========================
js_by_type_year = (df
    .groupby(["year", "trip_type"], as_index=False)
    .agg(js_hour_mean=("js_hour","mean"),
         js_wday_mean=("js_wday","mean"),
         count_mean=("count","mean"))
    .sort_values(["year","trip_type"])
)
js_by_type_year.to_csv(OUT_DIR/"js_by_type_year.csv", index=False)
print("‚úÖ js_by_type_year.csv guardado")

# Versi√≥n agregada (todos los tipos combinados por a√±o)
js_all_year = (df
    .groupby(["year"], as_index=False)
    .agg(js_hour_mean=("js_hour","mean"),
         js_wday_mean=("js_wday","mean"),
         count_mean=("count","mean"))
    .sort_values("year")
)
js_all_year.to_csv(OUT_DIR/"js_all_year.csv", index=False)
print("‚úÖ js_all_year.csv guardado")

# =========================
# 2) Estacionalidad mensual
#    VISTA A: cada (a√±o,mes) vs enero-2025 (todos los tipos)
# =========================
ref = df[(df["year"]==2025) & (df["month"]==1)]
# combinamos tipos (sum/mean ponderado simple por conteo)
def combine_month_block(block):
    # para m√©tricas de nivel: sum count
    total_count = block["count"].sum()
    # para ratios: promedio simple (o ponderado si quieres)
    avg_dist = block["avg_trip_distance"].mean()
    avg_min  = block["avg_minutes"].mean()
    avg_fpm  = block["avg_fare_per_mile"].mean()
    # para distribuci√≥n (JS ya viene relativo al ref del pipeline anterior, as√≠ que aqu√≠ solo guardamos js medios)
    js_h = block["js_hour"].mean()
    js_w = block["js_wday"].mean()
    return pd.Series(dict(count=total_count,
                          avg_trip_distance=avg_dist,
                          avg_minutes=avg_min,
                          avg_fare_per_mile=avg_fpm,
                          js_hour=js_h,
                          js_wday=js_w))

by_ym_alltypes = (df
    .groupby(["year","month"], as_index=False)
    .apply(combine_month_block)
    .reset_index(drop=True)
)

ref_jan25 = by_ym_alltypes[(by_ym_alltypes["year"]==2025) & (by_ym_alltypes["month"]==1)].iloc[0]

def pct_change(a, b):
    if pd.isna(a) or pd.isna(b) or b==0:
        return np.nan
    return (a - b)/b

estacionalidad_vs_jan25 = by_ym_alltypes.copy()
estacionalidad_vs_jan25["count_vs_jan25_pct"]        = estacionalidad_vs_jan25["count"].apply(lambda x: pct_change(x, ref_jan25["count"]))
estacionalidad_vs_jan25["avg_dist_vs_jan25_pct"]     = estacionalidad_vs_jan25["avg_trip_distance"].apply(lambda x: pct_change(x, ref_jan25["avg_trip_distance"]))
estacionalidad_vs_jan25["avg_min_vs_jan25_pct"]      = estacionalidad_vs_jan25["avg_minutes"].apply(lambda x: pct_change(x, ref_jan25["avg_minutes"]))
estacionalidad_vs_jan25["farepm_vs_jan25_pct"]       = estacionalidad_vs_jan25["avg_fare_per_mile"].apply(lambda x: pct_change(x, ref_jan25["avg_fare_per_mile"]))

estacionalidad_vs_jan25.to_csv(OUT_DIR/"seasonality_vs_2025_01_alltypes.csv", index=False)
print("‚úÖ seasonality_vs_2025_01_alltypes.csv guardado")

# =========================
# 2) Estacionalidad mensual
#    VISTA B: cada mes (ene..dic) comparado con el MISMO mes de 2025
#    (√∫til para estacionalidad pura)
# =========================
ref_2025_by_month = (by_ym_alltypes[by_ym_alltypes["year"]==2025]
                     .set_index("month")[["count","avg_trip_distance","avg_minutes","avg_fare_per_mile"]])

def compare_to_2025_month(row):
    m = row["month"]
    base = ref_2025_by_month.loc[m] if m in ref_2025_by_month.index else None
    out = {}
    for k in ["count","avg_trip_distance","avg_minutes","avg_fare_per_mile"]:
        out[f"{k}_vs_same_month_2025_pct"] = pct_change(row[k], base[k]) if base is not None else np.nan
    return pd.Series(out)

estacionalidad_vs_same_month_2025 = by_ym_alltypes.copy()
estacionalidad_vs_same_month_2025 = pd.concat(
    [estacionalidad_vs_same_month_2025, estacionalidad_vs_same_month_2025.apply(compare_to_2025_month, axis=1)],
    axis=1
)
estacionalidad_vs_same_month_2025.to_csv(OUT_DIR/"seasonality_vs_same_month_2025_alltypes.csv", index=False)
print("‚úÖ seasonality_vs_same_month_2025_alltypes.csv guardado")

# =========================
# 3) Cambios YoY en fare_per_mile y minutes
#    (global y por tipo)
# =========================
# Global (todos los tipos combinados por a√±o)
y_agg = (df
    .groupby(["year"], as_index=False)
    .agg(avg_fare_per_mile=("avg_fare_per_mile","mean"),
         avg_minutes=("avg_minutes","mean"),
         count=("count","mean"))  # media de conteo mensual (proxy)
    .sort_values("year")
)
y_agg["farepm_yoy_pct"] = y_agg["avg_fare_per_mile"].pct_change()
y_agg["minutes_yoy_pct"] = y_agg["avg_minutes"].pct_change()
y_agg.to_csv(OUT_DIR/"yoy_global.csv", index=False)
print("‚úÖ yoy_global.csv guardado")

# Por tipo
y_type = (df
    .groupby(["year","trip_type"], as_index=False)
    .agg(avg_fare_per_mile=("avg_fare_per_mile","mean"),
         avg_minutes=("avg_minutes","mean"),
         count=("count","mean"))
    .sort_values(["trip_type","year"])
)
y_type["farepm_yoy_pct"]  = y_type.groupby("trip_type")["avg_fare_per_mile"].pct_change()
y_type["minutes_yoy_pct"] = y_type.groupby("trip_type")["avg_minutes"].pct_change()
y_type.to_csv(OUT_DIR/"yoy_by_type.csv", index=False)
print("‚úÖ yoy_by_type.csv guardado")

# =========================
# Extras: tablas pivot √∫tiles
# =========================
# JS promedio por a√±o y tipo (matriz)
pivot_js_hour = js_by_type_year.pivot(index="trip_type", columns="year", values="js_hour_mean")
pivot_js_wday = js_by_type_year.pivot(index="trip_type", columns="year", values="js_wday_mean")
pivot_js_hour.to_csv(OUT_DIR/"pivot_js_hour.csv")
pivot_js_wday.to_csv(OUT_DIR/"pivot_js_wday.csv")
print("‚úÖ pivot_js_hour.csv y pivot_js_wday.csv guardados")

print("\nListo. Archivos generados en 'analysis_outputs/'.")


‚úÖ js_by_type_year.csv guardado
‚úÖ js_all_year.csv guardado
‚úÖ seasonality_vs_2025_01_alltypes.csv guardado
‚úÖ seasonality_vs_same_month_2025_alltypes.csv guardado
‚úÖ yoy_global.csv guardado
‚úÖ yoy_by_type.csv guardado
‚úÖ pivot_js_hour.csv y pivot_js_wday.csv guardados

Listo. Archivos generados en 'analysis_outputs/'.


  .apply(combine_month_block)


In [20]:
import pandas as pd
from pathlib import Path

OUT_DIR = Path("analysis_outputs")

# ======================
# 1) Resumen de JS por tipo y a√±o
# ======================
js_by_type_year = pd.read_csv(OUT_DIR/"js_by_type_year.csv")
print("üìä --- JS promedio por tipo y a√±o ---")
print(js_by_type_year.round(4))

# ======================
# 2) JS global por a√±o
# ======================
js_all_year = pd.read_csv(OUT_DIR/"js_all_year.csv")
print("\nüåç --- JS global (todos los tipos combinados) ---")
print(js_all_year.round(4))

# ======================
# 3) Cambios anuales en tarifa/milla y minutos (global)
# ======================
yoy_global = pd.read_csv(OUT_DIR/"yoy_global.csv")
print("\nüí∏ --- Cambios YoY globales ---")
print(yoy_global.round(4))

# ======================
# 4) Cambios anuales en tarifa/milla y minutos (por tipo)
# ======================
yoy_by_type = pd.read_csv(OUT_DIR/"yoy_by_type.csv")
print("\nüöï --- Cambios YoY por tipo ---")
print(yoy_by_type.round(4))

# ======================
# 5) Estacionalidad (variaci√≥n mensual respecto a enero 2025)
# ======================
seasonality = pd.read_csv(OUT_DIR/"seasonality_vs_2025_01_alltypes.csv")
print("\nüìÖ --- Ejemplo de estacionalidad (primeros 12 meses) ---")
print(seasonality.head(12).round(3))

# ======================
# 6) (Opcional) Estacionalidad mes a mes vs mismo mes 2025
# ======================
seasonality_same = pd.read_csv(OUT_DIR/"seasonality_vs_same_month_2025_alltypes.csv")
print("\nüìÜ --- Estacionalidad comparada con el mismo mes de 2025 (primeros 12 meses) ---")
print(seasonality_same.head(12).round(3))


üìä --- JS promedio por tipo y a√±o ---
    year trip_type  js_hour_mean  js_wday_mean    count_mean
0   2022     fhvhv        0.0908        0.0878  1.770134e+07
1   2022     green        0.0833        0.0618  7.003350e+04
2   2022    yellow        0.0536        0.0644  3.304675e+06
3   2023     fhvhv        0.0923        0.0865  1.937417e+07
4   2023     green        0.0656        0.0585  6.558833e+04
5   2023    yellow        0.0598        0.0591  3.192519e+06
6   2024     fhvhv        0.0920        0.0859  1.995587e+07
7   2024     green        0.0663        0.0644  5.501817e+04
8   2024    yellow        0.0602        0.0670  3.430810e+06
9   2025     fhvhv        0.0863        0.0778  1.998996e+07
10  2025     green        0.0750        0.0642  4.973975e+04
11  2025    yellow        0.0628        0.0661  3.944555e+06

üåç --- JS global (todos los tipos combinados) ---
   year  js_hour_mean  js_wday_mean    count_mean
0  2022        0.0759        0.0713  7.025350e+06
1  2023      