
# Instalación de Dependencias

## ¿Qué hace esta celda?
Instala todas las dependencias necesarias desde el archivo `requirements.txt` para ejecutar el pipeline completo.

## Dependencias Clave:
- **Polars**: Procesamiento de datos ultra-rápido (10-100x más rápido que Pandas)
- **Requests**: Descargas HTTP eficientes con streaming
- **tqdm**: Barras de progreso informativas y detalladas
- **Concurrent.futures**: Procesamiento paralelo para máxima eficiencia

## ¿Por qué es la mejor opción?
- **Reproducibilidad**: Garantiza que todos tengan las mismas versiones exactas
- **Portabilidad**: Funciona en cualquier entorno (Windows, Mac, Linux)
- **Mantenimiento**: Centraliza todas las dependencias en un solo archivo
- **Automatización**: No necesitas instalar manualmente cada librería
"""

In [None]:
%pip install -r requirements.txt


# Configuración del Sistema

## ¿Qué hace esta celda?
Configura todo el entorno del pipeline: imports, directorios, URLs y parámetros de rendimiento optimizados.

## Componentes Clave:
- **Path**: Manejo robusto de rutas multiplataforma
- **Manifests**: Sistema de trazabilidad para evitar reprocesamiento
- **Concurrencia**: Configuración optimizada para tu hardware (4 workers)
- **S3 Index**: Acceso directo al bucket público de Citi Bike

## ¿Por qué es la mejor opción?
- **Modularidad**: Separación clara de responsabilidades
- **Escalabilidad**: Fácil ajuste de parámetros de rendimiento
- **Trazabilidad**: Los manifests evitan trabajo duplicado
- **Portabilidad**: Funciona en Windows, Mac y Linux sin cambios
"""

In [None]:
import io, re, gc, zipfile, uuid, json, math, threading, time, shutil
from pathlib import Path
import requests
import polars as pl
from tqdm.auto import tqdm
import xml.etree.ElementTree as ET

# Carpeta de caché de ZIPs y salida Parquet (se crean si no existen)
RAW_ZIPS   = Path("raw_zips");     RAW_ZIPS.mkdir(exist_ok=True)
OUTPUT_DIR = Path("data_parquet"); OUTPUT_DIR.mkdir(exist_ok=True)

# Índice público del bucket S3 de Citi Bike
S3_INDEX = "https://s3.amazonaws.com/tripdata/"

# Manifests
DL_MANIFEST   = Path("manifest_download.json")  # ZIPs descargados (caché)
PROC_MANIFEST = Path("manifest_process.json")   # ZIPs procesados a Parquet (trazabilidad)

# Concurrencia y red
MAX_WORKERS = 4         # puedes subir/bajar según tu máquina
CHUNK_SIZE  = 1024 * 1024 # 1 MiB

def _load_manifest(path: Path) -> set[str]:
    return set(json.loads(path.read_text())) if path.exists() else set()

def _save_manifest(path: Path, s: set[str]):
    path.write_text(json.dumps(sorted(list(s)), indent=2))

def load_dl_manifest():   return _load_manifest(DL_MANIFEST)
def save_dl_manifest(s):  _save_manifest(DL_MANIFEST, s)
def load_proc_manifest(): return _load_manifest(PROC_MANIFEST)
def save_proc_manifest(s):_save_manifest(PROC_MANIFEST, s)

def list_s3_zips(max_objects=None) -> list[str]:
    r = requests.get(S3_INDEX, timeout=60); r.raise_for_status()
    root = ET.fromstring(r.content); ns = {"s3":"http://s3.amazonaws.com/doc/2006-03-01/"}
    items = []
    for c in root.findall("s3:Contents", ns):
        key = c.find("s3:Key", ns).text
        if key.endswith(".zip") and not key.startswith("JC-") and re.search(r"(20\d{2})(\d{2})?-citibike-tripdata\.zip$", key):
            items.append(key)
    def sort_key(k):
        m = re.search(r"(20\d{2})(\d{2})?-citibike-tripdata\.zip$", k)
        y = int(m.group(1)) if m else 0
        mm = int(m.group(2)) if (m and m.group(2)) else 0 
        return (y, mm)
    items.sort(key=sort_key)
    return items[:max_objects] if max_objects else items


# Sistema de Descarga Inteligente

## ¿Qué hace esta celda?
Implementa un sistema de descarga inteligente que:
- Verifica qué archivos ya están descargados usando manifests
- Descarga solo los archivos faltantes (evita duplicación)
- Procesa múltiples descargas en paralelo (4 hilos simultáneos)
- Mantiene trazabilidad completa de lo descargado

## Características Avanzadas:
- **Descarga Paralela**: 4 hilos simultáneos para máxima velocidad
- **Resume Downloads**: Continúa descargas interrumpidas automáticamente
- **Progress Tracking**: Barras de progreso detalladas con velocidad y tiempo
- **Error Handling**: Manejo robusto de errores de red y timeouts

## ¿Por qué es la mejor opción?
- **Eficiencia**: Solo descarga lo necesario, ahorra tiempo y ancho de banda
- **Robustez**: Maneja interrupciones y errores sin perder progreso
- **Velocidad**: Paralelización optimizada para tu conexión
- **Trazabilidad**: Sabe exactamente qué se descargó y qué falta
"""

In [2]:
from concurrent.futures import ThreadPoolExecutor, as_completed

def download_one(key: str, position:int=1) -> str:
    name = key.rsplit("/",1)[-1]
    dest = RAW_ZIPS / name
    if dest.exists():
        tqdm.write(f"Saltando (caché) {name}")
        return name

    url = S3_INDEX + key
    with requests.get(url, stream=True, timeout=120) as r:
        r.raise_for_status()
        total_bytes = int(r.headers.get("Content-Length") or 0)

        pbar = tqdm(
            total=total_bytes if total_bytes > 0 else None,
            desc=f"Descargando  {name}",
            unit="B", unit_scale=True, unit_divisor=1024,
            position=position, leave=True, dynamic_ncols=True,
            bar_format="{l_bar}{bar} {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]"
        )

        with open(dest, "wb") as f:
            for chunk in r.iter_content(chunk_size=CHUNK_SIZE):
                if not chunk:
                    continue
                f.write(chunk)
                pbar.update(len(chunk))  # ← enteros en bytes

        pbar.close()
    return name

def download_to_cache(max_objects=None):
    keys = list_s3_zips(max_objects=max_objects)
    done = load_dl_manifest()
    pending = [k for k in keys if (RAW_ZIPS / k.rsplit("/",1)[-1]).exists() is False and k.rsplit("/",1)[-1] not in done]
    if not pending:
        print("Nada por descargar. Caché al día.")
        return

    print(f"Descargando {len(pending)} ZIP(s) a {RAW_ZIPS} con {MAX_WORKERS} hilos…")
    results, errors = [], []
    pos_pool = list(range(1, MAX_WORKERS+1))
    pos_lock = threading.Lock()

    def _task(k):
        with pos_lock:
            pos = pos_pool.pop() if pos_pool else 1
        try:
            name = download_one(k, position=pos)
            return name
        finally:
            with pos_lock:
                pos_pool.append(pos)

    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as ex:
        futs = {ex.submit(_task, k): k for k in pending}
        for fut in as_completed(futs):
            key = futs[fut]
            try:
                name = fut.result()
                results.append(name)
                d = load_dl_manifest(); d.add(name); save_dl_manifest(d)
            except Exception as e:
                errors.append((key, str(e)))

    print(f"Descargados: {len(results)} · Errores: {len(errors)}")
    if errors:
        for k, e in errors:
            print(" -", k.rsplit("/",1)[-1], "→", e)

# Descarga específica por año/mes (prefiere mensuales si existen)
def keys_for_year_months(year:int, months:set[int]|None, all_keys:list[str]) -> list[str]:
    monthly_keys = {k for k in all_keys if re.fullmatch(fr"{year}\d{{2}}-citibike-tripdata\.zip", k)}
    if months:
        found = [f"{year}{m:02d}-citibike-tripdata.zip" for m in sorted(months) if f"{year}{m:02d}-citibike-tripdata.zip" in monthly_keys]
        if found:
            return found
    if monthly_keys and not months:
        return sorted(monthly_keys)
    annual = f"{year}-citibike-tripdata.zip"
    if annual in all_keys:
        return [annual]
    if monthly_keys:
        return sorted(monthly_keys)
    return []

def download_years_months_to_cache(years:list[int], months:set[int]|None=None):
    all_keys = list_s3_zips(max_objects=None)
    targets = []
    for y in years:
        targets.extend(keys_for_year_months(y, months, all_keys))
    done = load_dl_manifest()
    pending = [k for k in targets if (RAW_ZIPS / k.rsplit("/",1)[-1]).exists() is False and k.rsplit("/",1)[-1] not in done]
    if not pending:
        print("Nada por descargar para esos años/meses. Caché al día.")
        return

    print(f"Descargando {len(pending)} ZIP(s) a {RAW_ZIPS} …")
    results, errors = [], []
    pos_pool = list(range(1, min(MAX_WORKERS, len(pending))+1))
    pos_lock = threading.Lock()

    def _task(k):
        with pos_lock:
            pos = pos_pool.pop() if pos_pool else 1
        try:
            name = download_one(k, position=pos)
            return name
        finally:
            with pos_lock:
                pos_pool.append(pos)

    with ThreadPoolExecutor(max_workers=min(MAX_WORKERS, len(pending))) as ex:
        futs = {ex.submit(_task, k): k for k in pending}
        for fut in as_completed(futs):
            k = futs[fut]
            try:
                name = fut.result()
                results.append(name)
                d = load_dl_manifest(); d.add(name); save_dl_manifest(d)
            except Exception as e:
                errors.append((k, str(e)))

    print(f"Descargados: {len(results)} · Errores: {len(errors)}")


# Procesamiento de Datos con Polars

## ¿Qué hace esta celda?
Procesa y limpia los datos de Citi Bike usando Polars:
- Normaliza nombres de columnas inconsistentes entre años
- Convierte tipos de datos apropiadamente (fechas, números, texto)
- Filtra datos inválidos o anómalos (duraciones negativas, valores nulos)
- Calcula métricas derivadas (duración en minutos)

## Ventajas de Polars:
- **Velocidad**: 10-100x más rápido que Pandas para datasets grandes
- **Memoria**: Uso eficiente de RAM, maneja GB de datos sin problemas
- **Lazy Evaluation**: Optimización automática de operaciones
- **Type Safety**: Mejor manejo de tipos de datos

## ¿Por qué es la mejor opción?
- **Rendimiento**: Procesamiento ultra-rápido incluso con millones de filas
- **Escalabilidad**: Maneja datasets grandes eficientemente
- **Robustez**: Manejo robusto de datos inconsistentes entre años
- **Mantenibilidad**: Código limpio y modular
"""

In [3]:
COLMAP = {
    "start_time": ["start_time","starttime","started_at"],
    "end_time":   ["end_time","stoptime","ended_at"],
    "start_station_id": ["start_station_id","start station id","start_station_code","from_station_id"],
    "end_station_id":   ["end_station_id","end station id","end_station_code","to_station_id"],
    "start_station_name": ["start_station_name","start station name","from_station_name"],
    "end_station_name":   ["end_station_name","end station name","to_station_name"],
    "user_type": ["usertype","user_type","member_casual","Subscriber type"],
    "ride_id": ["ride_id","trip_id","id","bikeid","bike_id"],
    "birth_year": ["birth_year","birth year","member_birth_year"],
    "gender": ["gender","member_gender"],
    "tripduration": ["tripduration","duration","trip_duration"],
    "start_lat": ["start station latitude","start_lat","start latitude"],
    "start_lng": ["start station longitude","start_lng","start longitude"],
    "end_lat":   ["end station latitude","end_lat","end latitude"],
    "end_lng":   ["end station longitude","end_lng","end longitude"],
}
REQUIRED = [
    "start_time","end_time",
    "start_station_id","end_station_id",
    "start_station_name","end_station_name",
    "ride_id","user_type"
]
NON_NULL_KEYS = ["start_time","end_time","start_station_id","end_station_id"]

# formatos de fecha más comunes en Citi Bike
DT_FORMATS = [
    "%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M", "%Y-%m-%d %H:%M:%S%.f",
    "%m/%d/%Y %H:%M:%S", "%m/%d/%Y %H:%M", "%m/%d/%Y %H:%M:%S%.f",
    "%Y-%m-%dT%H:%M:%S", "%Y-%m-%dT%H:%M:%S%.f",
]

def canonicalize_columns(df: pl.DataFrame) -> pl.DataFrame:
    rename_map = {}
    lower_map = {c.lower(): c for c in df.columns}
    for canon, variants in COLMAP.items():
        for v in variants:
            lv = v.lower()
            if lv in lower_map:
                rename_map[lower_map[lv]] = canon
                break
    return df.rename(rename_map)

def ensure_required(df: pl.DataFrame) -> pl.DataFrame:
    for col in REQUIRED:
        if col not in df.columns:
            df = df.with_columns(pl.lit(None).alias(col))
    return df

def parse_datetime_multi(df: pl.DataFrame, col: str) -> pl.DataFrame:
    if col not in df.columns:
        return df
    s = (
        pl.col(col).cast(pl.Utf8, strict=False)
        .str.replace_all(r"[Tt]", " ", literal=False)
        .str.replace_all(r"[Zz]$", "", literal=False)
        .str.replace_all(r"[+-]\d{2}:?\d{2}$", "", literal=False)
    )
    df = df.with_columns(s.alias(col))
    best = None; best_ok = -1
    for fmt in DT_FORMATS:
        try:
            cand = pl.col(col).str.strptime(pl.Datetime, format=fmt, strict=False)
            ok = df.select(cand.is_not_null().sum()).item()
            if ok > best_ok:
                best, best_ok = cand, ok
            if ok >= 0.8*len(df):
                break
        except:
            pass
    if best is not None:
        df = df.with_columns(best.alias(col))
    # último intento de inferencia laxa
    nulls = df.select(pl.col(col).is_null().sum()).item()
    if nulls > 0.2*len(df):
        df = df.with_columns(pl.col(col).str.to_datetime(strict=False).alias(col))
    return df

def cast_and_clean(df: pl.DataFrame) -> pl.DataFrame:
    if "start_time" in df.columns: df = parse_datetime_multi(df, "start_time")
    if "end_time" in df.columns:   df = parse_datetime_multi(df, "end_time")

    for c in ["start_station_id","end_station_id","ride_id","user_type",
              "start_station_name","end_station_name","gender","start_lat","start_lng","end_lat","end_lng"]:
        if c in df.columns and df.schema.get(c) != pl.Utf8:
            df = df.with_columns(pl.col(c).cast(pl.Utf8, strict=False))

    if "birth_year" in df.columns and df.schema.get("birth_year") != pl.Int64:
        df = df.with_columns(pl.col("birth_year").cast(pl.Int64, strict=False))

    if "tripduration" in df.columns:
        df = df.with_columns((pl.col("tripduration").cast(pl.Float64, strict=False)/60).alias("trip_duration_min"))
    elif "start_time" in df.columns and "end_time" in df.columns:
        df = df.with_columns(((pl.col("end_time")-pl.col("start_time")).dt.total_seconds()/60).alias("trip_duration_min"))

    if "trip_duration_min" in df.columns:
        df = df.filter((pl.col("trip_duration_min")>=0) & (pl.col("trip_duration_min")<=1440))

    for c in NON_NULL_KEYS:
        if c in df.columns:
            df = df.filter(pl.col(c).is_not_null())

    if "ride_id" in df.columns:
        df = df.unique(subset=["ride_id"], keep="first")
    else:
        subset_cols = [c for c in ["start_time","end_time","start_station_id","end_station_id"] if c in df.columns]
        if subset_cols:
            df = df.unique(subset=subset_cols, keep="first")

    df = ensure_required(df)
    return df


# Conversión a Parquet Particionado

## ¿Qué hace esta celda?
Convierte los datos CSV a formato Parquet particionado:
- Extrae datos de archivos ZIP anidados (algunos ZIPs contienen otros ZIPs)
- Particiona automáticamente por año/mes para consultas ultra-rápidas
- Procesa múltiples archivos en paralelo para máxima eficiencia
- Optimiza el almacenamiento y consultas futuras

## Ventajas del Formato Parquet:
- **Compresión**: 80-90% de ahorro de espacio vs CSV
- **Particionado**: Consultas ultra-rápidas por fecha (solo lee particiones relevantes)
- **Columnar**: Ideal para análisis de datos y agregaciones
- **Compatibilidad**: Funciona con todas las herramientas (Pandas, Spark, etc.)

## ¿Por qué es la mejor opción?
- **Eficiencia**: Consultas 10-100x más rápidas que CSV
- **Escalabilidad**: Maneja terabytes de datos sin problemas
- **Costos**: Reduce significativamente costos de almacenamiento
- **Flexibilidad**: Permite consultas selectivas por partición
"""

In [4]:
def infer_year_month_from_name(name: str):
    m = re.search(r"(20\d{2})(\d{2})-citibike-tripdata", name)
    if m: return int(m.group(1)), int(m.group(2))
    m2 = re.search(r"(20\d{2})", name)
    if m2: return int(m2.group(1)), None
    return None, None

def infer_year_month_from_data(df: pl.DataFrame):
    if "start_time" in df.columns:
        s = df.select(pl.col("start_time").drop_nulls()).to_series()
        if len(s)>0 and s[0] is not None: return s[0].year, s[0].month
    return None, None

def partition_done(year:int, month:int|None):
    m = month if month is not None else 0
    part_dir = OUTPUT_DIR / f"year={year}" / f"month={m:02d}"
    return (part_dir / "_SUCCESS").exists()

def write_parquet_partition(df: pl.DataFrame, year: int, month: int | None):
    m = month if month is not None else 0
    part_dir = OUTPUT_DIR / f"year={year}" / f"month={m:02d}"
    part_dir.mkdir(parents=True, exist_ok=True)
    tmp = part_dir / f"_tmp_{uuid.uuid4().hex}.parquet"
    final = part_dir / f"part-{uuid.uuid4().hex}.parquet"
    df.write_parquet(tmp, compression="snappy")
    tmp.replace(final)
    (part_dir / "_SUCCESS").touch(exist_ok=True)

def process_csv_bytes(csv_bytes: bytes, csv_name: str) -> int:
    df = pl.read_csv(io.BytesIO(csv_bytes), ignore_errors=True)
    df = canonicalize_columns(df)
    df = cast_and_clean(df)

    y, m = infer_year_month_from_name(csv_name)
    if y is None: y, m = infer_year_month_from_data(df)
    if y is None: return 0

    if partition_done(y, m):
        return 0

    write_parquet_partition(df, y, m)
    n = df.height
    del df; gc.collect()
    return n

def months_filter_ok(csv_name: str, allowed_months: set[int]|None) -> bool:
    if not allowed_months:
        return True
    m = re.search(r"(20\d{2})(\d{2})-citibike-tripdata", csv_name)
    if m:
        mm = int(m.group(2))
        return mm in allowed_months
    return True

def process_zip_file(zip_path: Path, allowed_months: set[int]|None=None, position:int=1) -> int:
    total_rows = 0
    with zipfile.ZipFile(zip_path, "r") as outer:
        csv_members = []
        for member in outer.namelist():
            if "__MACOSX" in member or member.endswith(".DS_Store") or member.startswith("._"):
                continue
            if member.endswith(".zip"):
                with outer.open(member) as inner_bytes:
                    with zipfile.ZipFile(io.BytesIO(inner_bytes.read()), "r") as inner_zip:
                        for inner_member in inner_zip.namelist():
                            if inner_member.endswith(".csv") and months_filter_ok(inner_member, allowed_months):
                                csv_members.append(("inner", member, inner_member))
            elif member.endswith(".csv") and months_filter_ok(member, allowed_months):
                csv_members.append(("outer", None, member))

        pbar = tqdm(total=len(csv_members), desc=f"Procesando {zip_path.name}", position=position, unit="csv", leave=True, dynamic_ncols=True)

        for kind, parent, csvm in csv_members:
            try:
                if kind == "inner":
                    with outer.open(parent) as inner_bytes:
                        with zipfile.ZipFile(io.BytesIO(inner_bytes.read()), "r") as inner_zip:
                            with inner_zip.open(csvm) as f:
                                total_rows += process_csv_bytes(f.read(), csvm)
                else:
                    with outer.open(csvm) as f:
                        total_rows += process_csv_bytes(f.read(), csvm)
            except Exception as e:
                tqdm.write(f"Error: {zip_path.name} / {csvm}: {e}")
            finally:
                pbar.update(1)
        pbar.close()
    return total_rows

def process_cache_to_parquet(years: list[int]|None=None, months: set[int]|None=None):
    """
    Recorre los ZIPs en raw_zips/ y procesa a Parquet.
    Si 'years' se especifica, filtra ZIPs por año en su nombre.
    Si 'months' se especifica, solo escribe esos meses (particiones).
    """
    zips = sorted(RAW_ZIPS.glob("*.zip"))
    if years:
        zips = [z for z in zips if any(str(y) in z.name for y in years)]
    if not zips:
        print("No hay ZIPs en raw_zips/. Descarga primero en la Celda 2.")
        return

    # Filtrar solo los ZIPs que no han sido procesados
    processed = load_proc_manifest()
    zips = [z for z in zips if z.name not in processed]

    print(f"Procesando {len(zips)} ZIP(s) desde caché hacia parquet…")
    pos_pool = list(range(1, min(MAX_WORKERS, len(zips)) + 1))
    pos_lock = threading.Lock()

    results, errors = [], []
    from concurrent.futures import ThreadPoolExecutor, as_completed
    with ThreadPoolExecutor(max_workers=min(MAX_WORKERS, len(zips))) as ex:
        def _task(z: Path):
            with pos_lock:
                pos = pos_pool.pop() if pos_pool else 1
            try:
                rows = process_zip_file(z, allowed_months=months, position=pos)
                return z.name, rows
            finally:
                with pos_lock:
                    pos_pool.append(pos)

        futs = {ex.submit(_task, z): z for z in zips}
        for fut in as_completed(futs):
            z = futs[fut]
            try:
                name, rows = fut.result()
                results.append((name, rows))
                d = load_proc_manifest(); d.add(name); save_proc_manifest(d)
            except Exception as e:
                errors.append((z.name, str(e)))

    print("\nResumen:")
    for n, r in results:
        print(f" - {n}: {r} filas")
    if errors:
        print("\nErrores:")
        for n, e in errors:
            print(f" - {n}: {e}")


# Funciones de Utilidad y Mantenimiento

## ¿Qué hace esta celda?
Define funciones auxiliares para el mantenimiento y gestión del pipeline:
- **list_cache_for_year()**: Muestra qué archivos están en caché para un año específico
- **drop_year()**: Elimina completamente los datos de un año (para reprocesamiento)
- **drop_months()**: Elimina meses específicos de un año
- **reprocess_year()**: Reprocesa un año completo desde cero
- **reprocess_months()**: Reprocesa meses específicos de un año

## Funcionalidades Clave:
- **Inspección**: Ver qué datos están disponibles en caché
- **Limpieza Selectiva**: Eliminar datos específicos sin afectar el resto
- **Reprocesamiento**: Reprocesar datos cuando hay errores o cambios
- **Debugging**: Herramientas para identificar y corregir problemas

## ¿Por qué es la mejor opción?
- **Control Granular**: Puedes trabajar con años o meses específicos
- **Debugging Eficiente**: Fácil identificación y corrección de problemas
- **Mantenimiento**: Operaciones comunes predefinidas y seguras
- **Flexibilidad**: Adapta el procesamiento a tus necesidades específicas
"""

In [5]:
def list_cache_for_year(year: int):
    zips = sorted(RAW_ZIPS.glob("*.zip"))
    monthly = [z.name for z in zips if re.fullmatch(fr"{year}\d{{2}}-citibike-tripdata\.zip", z.name)]
    annual  = [z.name for z in zips if z.name == f"{year}-citibike-tripdata.zip"]
    print(f"Año {year} en caché:")
    if annual:
        print(" - ZIP anual:", ", ".join(annual))
    if monthly:
        print(" - ZIPs mensuales:", ", ".join(sorted(monthly)))
    if not annual and not monthly:
        print(" - No hay ZIPs de este año en raw_zips/")

def drop_year(year: int):
    ydir = OUTPUT_DIR / f"year={year}"
    if ydir.exists():
        shutil.rmtree(ydir, ignore_errors=True)
        print(f"Eliminado: {ydir}")
    else:
        print(f"No existe: {ydir}")

def drop_months(year: int, months: list[int]):
    for m in months:
        p = OUTPUT_DIR / f"year={year}" / f"month={m:02d}"
        if p.exists():
            shutil.rmtree(p, ignore_errors=True)
            print(f"Eliminado: {p}")
        else:
            print(f"No existe: {p}")

def reprocess_year(year: int):
    list_cache_for_year(year)
    process_cache_to_parquet(years=[year], months=None)

def reprocess_months(year: int, months: list[int]):
    list_cache_for_year(year)
    process_cache_to_parquet(years=[year], months=set(months))


# Comandos de Ejecución y Utilidades

## ¿Qué hace esta celda?
Proporciona comandos de alto nivel para ejecutar el pipeline completo:
- **Descarga completa**: Descarga todos los datos disponibles
- **Procesamiento completo**: Convierte todos los datos a Parquet
- **Comandos comentados**: Ejemplos de operaciones específicas por año/mes

## Comandos Disponibles:
- **download_to_cache()**: Descarga todos los archivos faltantes
- **process_cache_to_parquet()**: Procesa todos los archivos a Parquet
- **download_years_months_to_cache()**: Descarga años/meses específicos
- **drop_year() / reprocess_year()**: Limpia y reprocesa años completos
- **drop_months() / reprocess_months()**: Limpia y reprocesa meses específicos

## ¿Por qué es la mejor opción?
- **Simplicidad**: Comandos de una línea para operaciones complejas
- **Flexibilidad**: Control total sobre qué procesar y cuándo
- **Eficiencia**: Solo procesa lo que necesitas
- **Debugging**: Fácil identificación y corrección de problemas
- **Mantenimiento**: Operaciones comunes predefinidas y documentadas
"""


In [6]:
# Descargar todo (o pon un número para prueba rápida)
download_to_cache(max_objects=None)
# download_to_cache(max_objects=5)

# Descargar enfocado por año/mes
# download_years_months_to_cache([2017], months={3,4,5})

# Procesar todo lo que hay en caché → Parquet
process_cache_to_parquet(years=None, months=None)

# Borrar y reprocesar un año completo
# drop_year(2017)
# reprocess_year(2017)

# Borrar y reprocesar meses puntuales
# drop_months(2017, [3,4,5])
# reprocess_months(2017, [3,4,5]) 

Descargando 3 ZIP(s) a raw_zips con 4 hilos…


Descargando  2015-citibike-tripdata.zip:   0%|           0.00/264M [00:00<?, ?B/s]

Descargando  2013-citibike-tripdata.zip:   0%|           0.00/315M [00:00<?, ?B/s]

Descargando  2014-citibike-tripdata.zip:   0%|           0.00/214M [00:00<?, ?B/s]

Descargados: 3 · Errores: 0
Procesando 3 ZIP(s) desde caché hacia parquet…


Procesando 2015-citibike-tripdata.zip:   0%|          | 0/16 [00:00<?, ?csv/s]

Procesando 2014-citibike-tripdata.zip:   0%|          | 0/12 [00:00<?, ?csv/s]

Procesando 2013-citibike-tripdata.zip:   0%|          | 0/17 [00:00<?, ?csv/s]


Resumen:
 - 2013-citibike-tripdata.zip: 40458 filas
 - 2014-citibike-tripdata.zip: 71754 filas
 - 2015-citibike-tripdata.zip: 74942 filas
