In [21]:
import logging
import os
import random
import re
import time
from collections import Counter
from collections.abc import Sequence, Iterable
from pathlib import Path
from typing import Literal, Any

import polars as pl
import requests
from tqdm.auto import tqdm


In [2]:
logging.basicConfig(
    format="%(levelname)s: %(message)s",
    level=logging.INFO,
    force=True
)

PATH_RAW = Path("../data/raw")
PATH_MONTHLY = Path("../data/monthly")
YELLOW_PATH_PATTERN = "yellow_tripdata*.parquet"

In [3]:
def get_progress_bar(
        data: Iterable,
        desc: str | None = None,
        unit: str | None = None,
        colour: str | None = None,
        leave: bool | None = None,
        position: int | None = None
) -> tqdm:
    """
    Create a tqdm progress bar with optional parameters.

    :param data: Iterable to iterate over.
    :param desc: Description text displayed before the bar.
    :param unit: Unit of iteration (e.g., 'files', 'rows').
    :param colour: Colour of the progress bar.
    :param leave: Whether to leave the bar after completion.
    :param position: Position index for multiple bars.
    :return: Configured tqdm progress bar.
    """

    opts = {k: v for k, v in {
        "desc": desc, "unit": unit, "colour": colour,
        "leave": leave, "position": position
    }.items() if v is not None}

    return tqdm(data, **opts)

# EDA NYC TLC

## Descarga de los datos

In [4]:
def get_dates(
        years: Sequence[str] | None = None,
        months: Sequence[str] | None = None
) -> list[tuple[str, str]]:
    """
    Generate all (year, month) combinations.

    :param years: Sequence of years (e.g., ``["2020", "2021"]``). Defaults to 2011–2025.
    :param months: Sequence of months in ``"MM"`` format. Defaults to 01–12.
    :return: List of (year, month) tuples.
    """
    years = years or [str(i) for i in range(2011, 2025)]
    months = months or [str(i).zfill(2) for i in range(1, 13)]
    return [(year, month) for year in years for month in months]


def print_summary(
        total_downloads: int,
        files_already_downloaded: int,
        errors: list[tuple[str, str]],
        check: bool = False
) -> None:
    """
    Print a summary of the download process.

    :param total_downloads: Number of files successfully downloaded.
    :param files_already_downloaded: Number of files skipped because they already existed.
    :param errors: List of (url, error) tuples for failed downloads.
    :param check: If True, some downloads may have been skipped due to existing files.
    """
    msg = ["\n📥 Download summary:"]
    if check:
        msg.append(f"\t📂 Files already existed: {files_already_downloaded}")
    msg.append(f"\t✅ Files downloaded: {total_downloads}")

    logging.info("\n".join(msg))

    if errors:
        err_msg = [
            f"\n⚠️ Errors in {len(errors)} downloads (showing up to 5):"]
        err_msg += [f"  - {url} → {err}" for url, err in errors[:5]]
        logging.warning("\n".join(err_msg))


def download_nyc_taxi(
        years: Sequence[str] | None = None,
        months: Sequence[str] | None = None,
        check: bool = False
) -> None:
    """
    Download NYC yellow taxi trip data in Parquet format.

    :param years: Sequence of years (e.g., ``["2020", "2021"]``). Defaults to 2011–2025.
    :param months: Sequence of months in ``"MM"`` format. Defaults to 01–12.
    :param check: If True, skip downloads for files that already exist.
    """
    dates = get_dates(years, months)
    total, skipped, errors = 0, 0, []

    progress = get_progress_bar(dates, desc="📂 Downloading files", unit="file")

    for year, month in progress:
        path = Path(PATH_RAW) / f"yellow_tripdata_{year}-{month}.parquet"
        if check and path.exists():
            skipped += 1
            continue

        path.parent.mkdir(parents=True, exist_ok=True)
        url = f"https://d37ci6vzurychx.cloudfront.net/trip-data/{path.name}"

        try:
            with requests.get(url, stream=True, timeout=60) as r:
                r.raise_for_status()
                with open(path, "wb") as f:
                    for chunk in r.iter_content(chunk_size=8192):
                        f.write(chunk)
            total += 1
        except requests.exceptions.RequestException as e:
            errors.append((url, str(e)))

        # Random wait to avoid hammering the server
        time.sleep(random.uniform(1, 3))

    print_summary(total, skipped, errors, check)

In [7]:
download_nyc_taxi(check=True)

📂 Downloading files:   0%|          | 0/168 [00:00<?, ?file/s]

INFO: 
📥 Download summary:
	📂 Files already existed: 0
	✅ Files downloaded: 168


## Inspección de datos

### Diccionario de Variables – Yellow Taxi TLC


- **VendorID**
  Código que indica el proveedor TPEP que proporcionó el registro.
  - 1 = Creative Mobile Technologies, LLC
  - 2 = Curb Mobility, LLC
  - 6 = Myle Technologies Inc
  - 7 = Helix

- **tpep_pickup_datetime**
  Fecha y hora en que se activó el taxímetro.

- **tpep_dropoff_datetime**
  Fecha y hora en que se desactivó el taxímetro.

- **passenger_count**
  Número de pasajeros en el vehículo.

- **trip_distance**
  Distancia recorrida del viaje en millas reportada por el taxímetro.

- **RatecodeID**
  Código de tarifa final en vigor al final del viaje.
  - 1 = Tarifa estándar
  - 2 = JFK
  - 3 = Newark
  - 4 = Nassau o Westchester
  - 5 = Tarifa negociada
  - 6 = Viaje en grupo
  - 99 = Nulo/desconocido

- **store_and_fwd_flag**
  Indicador de sí el registro de viaje se guardó en la memoria del vehículo antes de enviarse al proveedor (“almacenar y reenviar”), debido a que no había conexión al servidor.
  - Y = viaje almacenado y reenviado
  - N = viaje no almacenado y reenviado

- **PULocationID**
  Zona TLC donde se activó el taxímetro.

- **DOLocationID**
  Zona TLC donde se desactivó el taxímetro.

- **payment_type**
  Código numérico que indica cómo pagó el pasajero el viaje.
  - 0 = Viaje Flex Fare
  - 1 = Tarjeta de crédito
  - 2 = Efectivo
  - 3 = Sin cargo
  - 4 = Disputa
  - 5 = Desconocido
  - 6 = Viaje anulado

- **fare_amount**
  Tarifa calculada por tiempo y distancia según el taxímetro.

- **extra**
  Extras y recargos misceláneos.

- **mta_tax**
  Impuesto activado automáticamente según la tarifa en uso.

- **tip_amount**
  Monto de la propina. Este campo se completa automáticamente para pagos con tarjeta de crédito (las propinas en efectivo no se incluyen).

- **tolls_amount**
  Total de peajes pagados en el viaje.

- **improvement_surcharge**
  Recargo por mejora aplicado al inicio del viaje (implementado en 2015).

- **total_amount**
  Importe total cobrado a los pasajeros (no incluye propinas en efectivo).

- **congestion_surcharge**
  Monto total recaudado en el viaje por el recargo de congestión del Estado de Nueva York.

- **airport_fee**
  Cargo por recogida en los aeropuertos LaGuardia y John F. Kennedy.

- **cbd_congestion_fee**
  Cargo por viaje en la Zona de Alivio de Congestión del MTA a partir del 5 de enero de 2025.

### Comprobación tamaño de los ficheros

In [8]:
def fmt_size(mb: float) -> str:
    """
    Format a numeric size dynamically in MB or GB.

    :param mb: File size in megabytes (MB).
    :return: Formatted size as a string, using MB if below 1024 or GB otherwise.
    """
    return f"{mb / 1024:.2f} GB" if mb >= 1024 else f"{mb:.2f} MB"


def check_files_size(
        path: Path | str,
        pattern: str = "yellow_tripdata_*.parquet",
        count_limit: int = 3,
        size_limit: int = 10
) -> None:
    """
    Generate a summary of the number and size of the Parquet files in a directory.

    :param path: Directory path or string containing files.
    :param pattern: Glob pattern for files. Defaults to ``"yellow_tripdata_*.parquet"``.
    :param count_limit: Maximum number of file names to display. Defaults to 3.
    :param size_limit: Maximum number of files to display with size information. Defaults to 10.
    :return: None
    """
    path = Path(path)
    files = sorted(path.glob(pattern))
    msg = [f"🧾 Total number of files: {len(files)}"]

    # File listing (limited)
    listed = [f"📂 {f.name}" for f in files[:count_limit]]
    if len(files) > count_limit:
        listed.append("...")
    msg.append("  " + "\n  ".join(listed))

    if files:
        # Calculate sizes in MB
        sizes = [f.stat().st_size / 1e6 for f in files]
        total, min_s, mean_s, max_s = (
            sum(sizes), min(sizes), sum(sizes) / len(sizes), max(sizes)
        )

        stats = (
            f"📏 Sizes → TOTAL: {fmt_size(total)} | Min: {fmt_size(min_s)} | "
            f"Mean: {fmt_size(mean_s)} | Max: {fmt_size(max_s)}"
        )

        # Detailed file sizes (limited)
        sized = [
            f"📂 {f.name} — {fmt_size(f.stat().st_size / 1e6)}"
            for f in files[:size_limit]
        ]
        if len(files) > size_limit:
            sized.append("...")
        msg.append(stats + "\n  " + "\n  ".join(sized))

    logging.info("\n" + "\n".join(msg))

In [9]:
check_files_size(PATH_RAW)

INFO: 
🧾 Total number of files: 168
  📂 yellow_tripdata_2011-01.parquet
  📂 yellow_tripdata_2011-02.parquet
  📂 yellow_tripdata_2011-03.parquet
  ...
📏 Sizes → TOTAL: 19.18 GB | Min: 4.44 MB | Mean: 116.93 MB | Max: 204.98 MB
  📂 yellow_tripdata_2011-01.parquet — 167.18 MB
  📂 yellow_tripdata_2011-02.parquet — 175.98 MB
  📂 yellow_tripdata_2011-03.parquet — 199.93 MB
  📂 yellow_tripdata_2011-04.parquet — 184.20 MB
  📂 yellow_tripdata_2011-05.parquet — 193.43 MB
  📂 yellow_tripdata_2011-06.parquet — 189.12 MB
  📂 yellow_tripdata_2011-07.parquet — 184.03 MB
  📂 yellow_tripdata_2011-08.parquet — 166.21 MB
  📂 yellow_tripdata_2011-09.parquet — 183.49 MB
  📂 yellow_tripdata_2011-10.parquet — 196.27 MB
  ...


### Comprobación Nombre de Variables

Como *improvement_surcharge* se implantó en 2015 y *cbd_congestion_fee* se implantó en 2025, crearemos dos listas de variables. Una lista con las variables obligatorias y otra con las variables opcionales que no comparten todos los archivos.

In [10]:
# Required columns (present every year since 2011)
REQUIRED_COLS = [
    "VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime",
    "passenger_count", "trip_distance", "RatecodeID", "store_and_fwd_flag",
    "PULocationID", "DOLocationID", "payment_type", "fare_amount",
    "extra", "mta_tax", "tip_amount", "tolls_amount", "total_amount",
    "congestion_surcharge", "airport_fee"
]

# Optional columns (added in subsequent years)
OPTIONAL_COLS = [
    "improvement_surcharge",  # since 2015
    "cbd_congestion_fee"  # since 2025
]

In [11]:
def check_schema_variations(
        path: Path | str,
        required_cols: Sequence[str],
        optional_cols: Sequence[str] | None = None,
        pattern: str = "yellow_tripdata_*.parquet"
) -> None:
    """
    Inspect Parquet files in a directory, check their schema, and detect missing or inconsistent columns.

    :param path: Path or string pointing to the directory containing the files.
    :param required_cols: List of required column names.
    :param optional_cols: Optional list of column names to check (perhaps missing in older files).
    :param pattern: File search pattern. Defaults to ``"yellow_tripdata_*.parquet"``.
    :return: None
    """
    # --- Logging setup ---
    logger = logging.getLogger("check_schema")
    if not logger.handlers:
        handler = logging.StreamHandler()
        handler.setFormatter(logging.Formatter("%(levelname)s - %(message)s"))
        logger.addHandler(handler)
    logger.setLevel(logging.INFO)
    logger.propagate = False

    # --- File collection ---
    path = Path(path)
    files = sorted(path.glob(pattern))
    print(f"🔎 Found {len(files)} file(s) to inspect.")

    # --- Containers for results ---
    required_issues, optional_issues, bad_files = [], [], []

    # -- Main loop ---
    for file in files:
        try:
            lf = pl.scan_parquet(file)
            schema = [c.lower() for c in lf.collect_schema().names()]

            missing_required = (
                [c for c in required_cols if c.lower() not in schema]

            )
            missing_optional = (
                [c for c in optional_cols if c.lower() not in schema]
                if optional_cols else []
            )

            if missing_required:
                required_issues.append((file, missing_required))
            if missing_optional:
                optional_issues.append((file, missing_optional))

        except Exception as e:
            bad_files.append((file, [f"Error: {e}"]))

    # --- Logging summary ---
    if not required_issues:
        print("✅ All files contain the required columns.")
    else:
        logger.warning(
            f"\n❌ {len(required_issues)} file(s) missing required columns:\n"
            + "\n".join(
                f"  📂 {f.name}: {missing}" for f, missing in required_issues)
        )

    if optional_cols:
        if not optional_issues:
            print(
                "✅ All files contain the optional columns (if expected)."
            )
        else:
            logger.warning(
                (f"\n⚠️ {len(optional_issues)} file(s) missing optional "
                 "columns:\n" + "\n".join(
                    f"  📂 {f.name}: {missing}"
                    for f, missing in optional_issues
                )
                 )
            )

    if bad_files:
        logger.error(
            f"\n🚫 {len(bad_files)} file(s) failed to load:\n"
            + "\n".join(f"  📂 {f.name}: {err}" for f, err in bad_files)
        )


check_schema_variations(PATH_RAW, REQUIRED_COLS, OPTIONAL_COLS)

🔎 Found 168 file(s) to inspect.


⚠️ 168 file(s) missing optional columns:
  📂 yellow_tripdata_2011-01.parquet: ['cbd_congestion_fee']
  📂 yellow_tripdata_2011-02.parquet: ['cbd_congestion_fee']
  📂 yellow_tripdata_2011-03.parquet: ['cbd_congestion_fee']
  📂 yellow_tripdata_2011-04.parquet: ['cbd_congestion_fee']
  📂 yellow_tripdata_2011-05.parquet: ['cbd_congestion_fee']
  📂 yellow_tripdata_2011-06.parquet: ['cbd_congestion_fee']
  📂 yellow_tripdata_2011-07.parquet: ['cbd_congestion_fee']
  📂 yellow_tripdata_2011-08.parquet: ['cbd_congestion_fee']
  📂 yellow_tripdata_2011-09.parquet: ['cbd_congestion_fee']
  📂 yellow_tripdata_2011-10.parquet: ['cbd_congestion_fee']
  📂 yellow_tripdata_2011-11.parquet: ['cbd_congestion_fee']
  📂 yellow_tripdata_2011-12.parquet: ['cbd_congestion_fee']
  📂 yellow_tripdata_2012-01.parquet: ['cbd_congestion_fee']
  📂 yellow_tripdata_2012-02.parquet: ['cbd_congestion_fee']
  📂 yellow_tripdata_2012-03.parquet: ['cbd_congestion_fee']
  📂 yellow_tripdata_2012-04.parquet: ['cbd_congestion_fee']

✅ All files contain the required columns.


Como *cbd_congestion_fee* se implantó en 2025 tiene sentido que no exista en ningún archivo, aunque la variable *improvement_surcharge* si aparece en todos los años. Además, para facilitar el manejo de los datos con polars, vamos a unificar todos los archivos *parquet* con un esquema común por meses como en su versión original. Para ello creamos el Esquema Esperado, especificando para cada variable su yipo. Estos tipos se han elegido de acuerdo a los datos aportados por la documentación.

In [12]:
# Expected schema
EXPECTED_SCHEMA = {
    "vendorid": pl.Categorical,
    "tpep_pickup_datetime": pl.Datetime("ns"),
    "tpep_dropoff_datetime": pl.Datetime("ns"),
    "passenger_count": pl.Int16,
    "trip_distance": pl.Float32,
    "ratecodeid": pl.Categorical,
    "store_and_fwd_flag": pl.Categorical,
    "pulocationid": pl.Int32,
    "dolocationid": pl.Int32,
    "payment_type": pl.Categorical,
    "fare_amount": pl.Float32,
    "extra": pl.Float32,
    "mta_tax": pl.Float32,
    "tip_amount": pl.Float32,
    "tolls_amount": pl.Float32,
    "total_amount": pl.Float32,
    "congestion_surcharge": pl.Float32,
    "airport_fee": pl.Float32,
    # Optional
    "improvement_surcharge": pl.Float32,
    "cbd_congestion_fee": pl.Float32,
}


def normalize_cols(lf: pl.LazyFrame) -> pl.LazyFrame:
    """
    Convert all column names in a Lazyframe to lowercase.

    :param lf: LazyFrame whose column names will be converted to lowercase.
    :return: LazyFrame with all column names converted to lowercase.
    """

    return lf.rename({c: c.lower() for c in lf.collect_schema().names()})


def load_with_schema(path: Path | str) -> pl.LazyFrame:
    """
    Load a Parquet file as a LazyFrame and enforce expected schema.

    :param path: Path or string pointing to the parquet file.
    :return: LazyFrame with the expected schema.
    """

    lf = normalize_cols(pl.scan_parquet(path))
    actual_cols = lf.collect_schema().names()

    # Add any missing columns with proper dtypes
    for col, col_type in EXPECTED_SCHEMA.items():
        if col not in actual_cols:
            lf = lf.with_columns(pl.lit(None).cast(col_type).alias(col))

    # Apply consistent casting
    expressions = [
        pl.col(c).cast(pl.Utf8).cast(col_type)
        if col_type == pl.Categorical else pl.col(c).cast(col_type)
        for c, col_type in EXPECTED_SCHEMA.items()
    ]

    return lf.select(expressions)


def consolidate_parquet_monthly(
        input_path: Path | str,
        output_path: Path | str,
        delete_originals: bool = False
) -> None:
    """
    Consolidate monthly Parquet files into a consistent schema and save them.

    :param input_path: Path or string pointing to the directory  containing input files.
    :param output_path: Path or string where processed files will be saved.
    :param delete_originals: If True, delete the original parquet files.
    :return: None

    **Example:**
        .. code-block:: python

            consolidate_parquet_monthly("../data/raw", "../data/monthly", delete_originals=True)
    """
    files = sorted(Path(input_path).glob("yellow_tripdata_*.parquet"))
    progress = get_progress_bar(
        files, desc="Consolidating parquet files", unit="files"
    )

    for file in progress:
        m = re.search(
            r"yellow_tripdata_(\d{4})-(\d{2})\.parquet", file.name
        )
        if not m:
            continue
        year, month = m.groups()

        print(f"\n📂 Processing {year}-{month} → {file.name}")
        lf = load_with_schema(file)

        out_path = (
                Path(output_path) /
                f"yellow_tripdata_{year}-{month}.parquet"
        )
        out_path.parent.mkdir(parents=True, exist_ok=True)

        lf.sink_parquet(
            out_path,
            compression="zstd",
            statistics=True,
            row_group_size=200_000
        )
        print(f"    ✅ Guardado {out_path}")

        if delete_originals:
            os.remove(file)
            print(f"    🗑️ Eliminado {file}")

In [13]:
consolidate_parquet_monthly(
    "../data/raw",
    "../data/monthly",
    delete_originals=True
)

Consolidating parquet files:   0%|          | 0/168 [00:00<?, ?files/s]


📂 Processing 2011-01 → yellow_tripdata_2011-01.parquet
    ✅ Guardado ../data/monthly/yellow_tripdata_2011-01.parquet
    🗑️ Eliminado ../data/raw/yellow_tripdata_2011-01.parquet

📂 Processing 2011-02 → yellow_tripdata_2011-02.parquet
    ✅ Guardado ../data/monthly/yellow_tripdata_2011-02.parquet
    🗑️ Eliminado ../data/raw/yellow_tripdata_2011-02.parquet

📂 Processing 2011-03 → yellow_tripdata_2011-03.parquet
    ✅ Guardado ../data/monthly/yellow_tripdata_2011-03.parquet
    🗑️ Eliminado ../data/raw/yellow_tripdata_2011-03.parquet

📂 Processing 2011-04 → yellow_tripdata_2011-04.parquet
    ✅ Guardado ../data/monthly/yellow_tripdata_2011-04.parquet
    🗑️ Eliminado ../data/raw/yellow_tripdata_2011-04.parquet

📂 Processing 2011-05 → yellow_tripdata_2011-05.parquet
    ✅ Guardado ../data/monthly/yellow_tripdata_2011-05.parquet
    🗑️ Eliminado ../data/raw/yellow_tripdata_2011-05.parquet

📂 Processing 2011-06 → yellow_tripdata_2011-06.parquet
    ✅ Guardado ../data/monthly/yellow_tripda

## Examen inicial del EDA

In [14]:
# Scan parquet files lazily
lf_yellow = pl.scan_parquet(f'../data/monthly/yellow_tripdata*.parquet')
lf_schema = lf_yellow.collect_schema()

In [19]:
# --- Summary ---
print("📊 DataFrame information:")
n_rows = lf_yellow.select(pl.len()).collect(engine="streaming").item()
print(f'Total number of rows: {n_rows}')

# --- Dtype summary ---
dtype_counts = Counter(lf_schema.values())
print("Unique dtypes:")
for dtype, count in dtype_counts.items():
    print(f"  {dtype}: {count}")

# Schema summary ---
print(f'Data columns (total {len(lf_schema)} columns):')

# Columns and Types
schema_df = pl.DataFrame({
    "Column": list(lf_schema.keys()),
    "Type": [str(v) for v in lf_schema.values()]
})

schema_df

📊 DataFrame information:
Total number of rows: 1438340823
Unique dtypes:
  Categorical(ordering='physical'): 4
  Datetime(time_unit='ns', time_zone=None): 2
  Int16: 1
  Float32: 11
  Int32: 2
Data columns (total 20 columns):


Column,Type
str,str
"""vendorid""","""Categorical(ordering='physical…"
"""tpep_pickup_datetime""","""Datetime(time_unit='ns', time_…"
"""tpep_dropoff_datetime""","""Datetime(time_unit='ns', time_…"
"""passenger_count""","""Int16"""
"""trip_distance""","""Float32"""
…,…
"""total_amount""","""Float32"""
"""congestion_surcharge""","""Float32"""
"""airport_fee""","""Float32"""
"""improvement_surcharge""","""Float32"""


Vemos que hay miles de millones de datos, por lo que para poder hacer un examen inicial, agruparemos los datos por años.

In [16]:
NUMERIC_TYPES = {
    pl.Int8, pl.Int16, pl.Int32, pl.Int64,
    pl.UInt8, pl.UInt16, pl.UInt32, pl.UInt64,
    pl.Float32, pl.Float64
}


def is_numeric_dtype(col_type: pl.DataType) -> bool:
    """
    Check if a Polars data type is numeric.

    :param col_type: Polars DataType to check.
    :return: True if col_type is numeric, False otherwise.
    """
    return type(col_type) in NUMERIC_TYPES


def is_datetime_dtype(col_type: pl.DataType) -> bool:
    """
    Check if a Polars data type is datetime-like.

    :param col_type: Polars DataType to check.
    :return: True if col_type is a datetime, False otherwise.
    """
    # cover Datetime("ns"), Datetime("us"), ...
    return str(col_type).startswith("Datetime")


def is_textual_dtype(col_type: pl.DataType) -> bool:
    """
    Check if a Polars data type is textual (Utf8, Categorical, or Boolean).

    :param col_type: Polars DataType to check.
    :return: True if col_type is textual, False otherwise.
    """
    return str(col_type).startswith(("Utf8", "Categorical", "Boolean"))


def count_categories(
        lf: pl.LazyFrame,
        column: str,
        engine: Literal["auto", "in-memory", "streaming", "gpu"] = "streaming"
) -> list[tuple[Any, int, float]]:
    """
    Count the number and percentage of occurrences for each category
    in a specified column of a Polars LazyFrame.

    :param lf: Polars LazyFrame to analyze.
    :param column: Column name whose categorical values will be counted.
    :param engine: Execution engine to use when collecting results
                   (default: ``"streaming"``).
    :return: A list of tuples (category, count, percentage).
    """
    df = (
        lf.group_by(column)
        .agg(pl.len().alias("count"))
        .with_columns(
            (pl.col("count") / pl.sum("count") * 100).alias("pct")
        )
        .sort("count", descending=True)
        .collect(engine=engine)
    )
    if df.is_empty():
        return []
    return list(zip(df[column], df["count"], df["pct"]))


def collect_item(
        lf: pl.LazyFrame,
        exprs: list[pl.Expr],
        engine: Literal["auto", "in-memory", "streaming", "gpu"] = "streaming"
) -> dict[str, Any]:
    """
    Execute multiple Polars expressions on a LazyFrame and return their results.

    :param lf: Polars LazyFrame on which to execute expressions.
    :param exprs: List of Polars expressions (e.g., aggregations) to compute.
    :param engine: Execution engine to use when collecting results.
    :return: A dictionary mapping expression output names to their computed values.
    """
    df = lf.select(exprs).collect(engine=engine)
    return {c: df[c][0] for c in df.columns}


def quantiles_for_column(
        lf: pl.LazyFrame,
        col: str,
) -> dict[str, Any]:
    """
    Calculate the 25th, 50th, and 75th quantiles for a column in a LazyFrame.

    :param lf: Polars LazyFrame on which to compute quantiles.
    :param col: Name of the column to calculate quantiles for.
    :return: A dictionary mapping quantile labels (``"25%"``, ``"50%"``, ``"75%"``) to their values.
    """
    col_type = lf.collect_schema()[col]

    if col_type == pl.Datetime("ns"):
        exprs = [
            pl.col(col)
            .cast(pl.Int64)
            .quantile(q)
            .cast(pl.Datetime("ns"))
            .alias(name)
            for name, q in zip(["25%", "50%", "75%"], (0.25, 0.5, 0.75))
        ]
        res = collect_item(lf, exprs)
        return {k: v.strftime("%Y-%m-%d %H:%M:%S") for k, v in res.items()}

    q_exprs = {
        "25%": pl.col(col).quantile(0.25).alias("25%"),
        "50%": pl.col(col).quantile(0.50).alias("50%"),
        "75%": pl.col(col).quantile(0.75).alias("75%"),
    }
    return {k: collect_item(lf, [expr])[k] for k, expr in q_exprs.items()}


def get_stats(
        years: Iterable[int] = range(2011, 2025)
) -> dict[int, dict[str, list[Any]]]:
    """
    Compute descriptive statistics for NYC taxi data by year.

    :param years: Iterable of years to process (default: 2011–2024).
    :return: Nested dictionary with per-year statistics for each column.
    """
    stat_keys = [
        "column", "dtype", "count", "null_count", "mean", "std",
        "min", "25%", "50%", "75%", "max", "n_values", "cat_count"
    ]
    stats_dict = {year: {k: [] for k in stat_keys} for year in years}

    years_progress = get_progress_bar(
        years,
        desc="📆 Recording annual statistics",
        unit="year",
        position=0
    )

    for year in years_progress:
        lf_p = pl.scan_parquet(
            f'../data/monthly/yellow_tripdata_{year}*.parquet'
        )
        schema = lf_p.collect_schema()

        col_progress = get_progress_bar(
            schema.items(),
            desc=f"  📊 Processing columns for {year}",
            unit="col",
            leave=True,
            position=1
        )

        for col, col_type in col_progress:
            # siempre pedimos count, null_count y n_unique
            exprs = [
                pl.col(col).len().alias("count"),
                pl.col(col).null_count().alias("null_count"),
                pl.col(col).n_unique().alias("n_unique"),
            ]

            if is_numeric_dtype(col_type):
                quant_exprs = [
                    pl.col(col)
                    .quantile(q)
                    .alias(f"{int(q * 100)}%") for q in (0.25, 0.5, 0.75)
                ]
                exprs += [
                    pl.col(col).mean().alias("mean"),
                    pl.col(col).std().alias("std"),
                    pl.col(col).min().alias("min"),
                    pl.col(col).max().alias("max"),
                    *quant_exprs
                ]
            elif is_datetime_dtype(col_type):
                quant_exprs = [
                    pl.col(col)
                    .cast(pl.Int64)
                    .quantile(q)
                    .cast(pl.Datetime("ns"))
                    .alias(f"{int(q * 100)}%") for q in (0.25, 0.5, 0.75)
                ]
                exprs += [
                    pl.col(col).min().alias("min"),
                    pl.col(col).max().alias("max"),
                    *quant_exprs
                ]

            res = collect_item(lf_p, exprs)

            cat_count = None
            if not is_numeric_dtype(col_type) and not is_datetime_dtype(
                    col_type):
                cat_count = count_categories(lf_p, col)

            def fmt(variable: Any) -> Any:
                return (variable.strftime("%Y-%m-%d %H:%M:%S")
                        if hasattr(variable, "strftime") else variable)

            cat_fmt = (
                [f"{c}: {cnt} ({pct:.2f}%)" for c, cnt, pct in cat_count]
                if cat_count else None
            )

            stats = {
                "column": col,
                "dtype": col_type,
                "count": res.get("count"),
                "null_count": res.get("null_count"),
                "mean": res.get("mean"),
                "std": res.get("std"),
                "25%": fmt(res.get("25%")),
                "50%": fmt(res.get("50%")),
                "75%": fmt(res.get("75%")),
                "min": fmt(res.get("min")),
                "max": fmt(res.get("max")),
                "n_values": int(res.get("n_unique") or 0),
                "cat_count": cat_fmt
            }

            for k, v in stats.items():
                stats_dict[year][k].append(v)

    print(f'✅ Statistics for every year recorded')

    return stats_dict

In [17]:
stats_data = get_stats()

📆 Recording annual statistics:   0%|          | 0/14 [00:00<?, ?year/s]

  📊 Processing columns for 2011:   0%|          | 0/20 [00:00<?, ?col/s]

  📊 Processing columns for 2012:   0%|          | 0/20 [00:00<?, ?col/s]

  📊 Processing columns for 2013:   0%|          | 0/20 [00:00<?, ?col/s]

  📊 Processing columns for 2014:   0%|          | 0/20 [00:00<?, ?col/s]

  📊 Processing columns for 2015:   0%|          | 0/20 [00:00<?, ?col/s]

  📊 Processing columns for 2016:   0%|          | 0/20 [00:00<?, ?col/s]

  📊 Processing columns for 2017:   0%|          | 0/20 [00:00<?, ?col/s]

  📊 Processing columns for 2018:   0%|          | 0/20 [00:00<?, ?col/s]

  📊 Processing columns for 2019:   0%|          | 0/20 [00:00<?, ?col/s]

  📊 Processing columns for 2020:   0%|          | 0/20 [00:00<?, ?col/s]

  📊 Processing columns for 2021:   0%|          | 0/20 [00:00<?, ?col/s]

  📊 Processing columns for 2022:   0%|          | 0/20 [00:00<?, ?col/s]

  📊 Processing columns for 2023:   0%|          | 0/20 [00:00<?, ?col/s]

  📊 Processing columns for 2024:   0%|          | 0/20 [00:00<?, ?col/s]

✅ Statistics for every year recorded


In [22]:
stats_2024 = pl.LazyFrame(stats_data[2024], strict=False)

stats_2024.collect()

column,dtype,count,null_count,mean,std,min,25%,50%,75%,max,n_values,cat_count
str,object,i64,i64,f64,f64,str,str,str,str,str,i64,list[str]
"""vendorid""",Categorical(ordering='physical'),41169720,0,,,,,,,,4,"[""2: 31451503 (76.39%)"", ""1: 9715918 (23.60%)"", … ""7: 230 (0.00%)""]"
"""tpep_pickup_datetime""","Datetime(time_unit='ns', time_zone=None)",41169720,0,,,"""2002-12-31 16:46:07""","""2024-04-06 20:07:28""","""2024-07-03 23:35:16""","""2024-10-08 17:33:35""","""2026-06-26 23:53:12""",20049544,
"""tpep_dropoff_datetime""","Datetime(time_unit='ns', time_zone=None)",41169720,0,,,"""2002-12-31 17:24:07""","""2024-04-06 20:23:52""","""2024-07-03 23:50:35""","""2024-10-08 17:53:27""","""2026-06-27 20:59:10""",20030251,
"""passenger_count""",Int16,41169720,4091232,1.333931,0.815824,"""0""","""1.0""","""1.0""","""1.0""","""9""",11,
"""trip_distance""",Float32,41169720,0,4.976101,419.230499,"""0.0""","""1.01""","""1.76""","""3.36""","""398608.625""",8808,
…,…,…,…,…,…,…,…,…,…,…,…,…
"""total_amount""",Float32,41169720,0,27.832813,78.053589,"""-2265.449951""","""15.75""","""21.0""","""30.6""","""335550.9375""",42932,
"""congestion_surcharge""",Float32,41169720,4091232,2.232144,0.874653,"""-2.5""","""2.5""","""2.5""","""2.5""","""2.52""",9,
"""airport_fee""",Float32,41169720,4091232,0.147006,0.502041,"""-1.75""","""0.0""","""0.0""","""0.0""","""1.75""",5,
"""improvement_surcharge""",Float32,41169720,0,0.962993,0.255055,"""-1.0""","""1.0""","""1.0""","""1.0""","""2.0""",7,


## Análisis univariante

### Valores Numéricos