# Pipeline de ingestión de datos grandes

In [None]:
# === Trabajo con dataset >200MB directamente desde la nube (sin guardar local) ===
# En este notebook mostraremos varias estrategias:
# 1. Consultar múltiples archivos Parquet remotos grandes (NYC Taxi Trips) usando DuckDB sin descargarlos por completo.
# 2. Patrones de procesamiento en streaming / por chunks para CSV remoto grande.
# 3. Streaming con Hugging Face Datasets (sin descargar al disco) como demostración opcional.
#
# Requisito: No almacenar el archivo completo en disco local. Las herramientas (DuckDB, fsspec, datasets) permiten acceso lazy.
#
# ---- Dataset principal elegido ----
# NYC Yellow Taxi Trip Records (CloudFront CDN):
#   URL base: https://d37ci6vzurychx.cloudfront.net/trip-data/
#   Archivos mensuales: yellow_tripdata_YYYY-MM.parquet (≈180–250 MB cada uno).
# Procesaremos varios meses hasta superar 200 MB (p.ej. 2024-01, 2024-02, 2024-03).
#
# Ventajas de usar Parquet remoto + DuckDB:
# - DuckDB puede hacer predicate pushdown y column projection, leyendo sólo las columnas necesarias.
# - Evitamos descargar todo el archivo (lee fragmentos vía HTTP range requests).
#
# Si deseas otros datasets grandes: Amazon Reviews (parquet en S3), OpenSky, LA Taxi, etc.


In [None]:
# Instalación (idempotente) de librerías necesarias para acceso remoto
import sys, subprocess, importlib

def ensure(pkg):
    try:
        importlib.import_module(pkg)
    except ImportError:
        print(f"Instalando {pkg}...")
        subprocess.check_call([sys.executable, '-m', 'pip', 'install', '--quiet', pkg])

for p in ['duckdb', 'pyarrow', 'fsspec', 'datasets']:  # datasets para streaming HF
    ensure(p)

import duckdb, pyarrow as pa, pyarrow.dataset as ds
print('Versiones -> duckdb:', duckdb.__version__, 'pyarrow:', pa.__version__)

  from .autonotebook import tqdm as notebook_tqdm


Versiones -> duckdb: 1.4.1 pyarrow: 21.0.0


In [None]:
# 1. Consulta remota de varios Parquet (>200MB total) con DuckDB (lazy scan)
import duckdb, pandas as pd

base_url = 'https://d37ci6vzurychx.cloudfront.net/trip-data'
months = ['2024-01', '2024-02', '2024-03']  # 3 meses ~ >500MB combinados
urls = [f"{base_url}/yellow_tripdata_{m}.parquet" for m in months]
print('Archivos remotos seleccionados:')
for u in urls: print(' -', u)

# Nota: El esquema usa tpep_pickup_datetime (no pickup_datetime)
query = f"""
SELECT
  date_trunc('day', tpep_pickup_datetime) AS pickup_date,
  passenger_count,
  trip_distance,
  total_amount
FROM read_parquet([{','.join([repr(u) for u in urls])}])
WHERE trip_distance > 0
LIMIT 1000
"""

print('Ejecutando consulta...')
df_sample = duckdb.query(query).to_df()
df_sample.head()

Archivos remotos seleccionados:
 - https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet
 - https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-02.parquet
 - https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-03.parquet
Ejecutando consulta...


Unnamed: 0,pickup_date,passenger_count,trip_distance,total_amount
0,2024-01-01,1,1.72,22.7
1,2024-01-01,1,1.8,18.75
2,2024-01-01,1,4.7,31.3
3,2024-01-01,1,1.4,17.0
4,2024-01-01,1,0.8,16.1


In [None]:
# 1.b Agregaciones (sólo lee columnas necesarias)
agg_query = f"""
SELECT
  date_trunc('day', tpep_pickup_datetime) AS day,
  COUNT(*) AS trips,
  AVG(trip_distance) AS avg_distance,
  AVG(total_amount) AS avg_amount
FROM read_parquet([{','.join([repr(u) for u in urls])}])
WHERE trip_distance BETWEEN 0.1 AND 100 -- filtra outliers y fuerza predicate pushdown
GROUP BY 1
ORDER BY 1
LIMIT 15
"""
print('Ejecutando agregaciones...')
df_agg = duckdb.query(agg_query).to_df()
df_agg

Ejecutando agregaciones...


Unnamed: 0,day,trips,avg_distance,avg_amount
0,2002-12-31,4,2.0,10.7875
1,2008-12-31,1,1.62,19.9
2,2009-01-01,4,5.725,36.2125
3,2023-12-31,10,2.601,22.462
4,2024-01-01,76312,4.29612,30.149872
5,2024-01-02,73697,4.189915,30.146731
6,2024-01-03,80709,3.82603,28.556487
7,2024-01-04,101058,3.371255,27.160826
8,2024-01-05,101089,3.244567,26.380395
9,2024-01-06,94634,3.206192,25.013898


In [None]:
# 2. Patrón de lectura por chunks (streaming) para CSV remoto grande
# Ejemplo: usaremos un CSV público relativamente grande. Si no está disponible, mostrar fallback.
import pandas as pd
import io, requests

csv_url = 'https://raw.githubusercontent.com/plotly/datasets/master/2011_february_us_airport_traffic.csv'  # reemplazar por uno más grande si se desea
chunk_size = 5000
acc_rows = 0
sum_metric = 0

print('Descargando stream (simulado) y procesando por chunks...')
try:
    with requests.get(csv_url, stream=True, timeout=60) as r:
        r.raise_for_status()
        buffer = io.StringIO()
        for i, line in enumerate(r.iter_lines(decode_unicode=True)):
            if line is None:
                continue
            buffer.write(line + '\n')
            # Procesar cada N líneas acumuladas (chunk lógico)
            if (i+1) % chunk_size == 0:
                buffer.seek(0)
                df_chunk = pd.read_csv(buffer)
                if 'Count' in df_chunk.columns:
                    sum_metric += df_chunk['Count'].sum()
                acc_rows += len(df_chunk)
                buffer = io.StringIO()
        # Procesar resto
        buffer.seek(0)
        remaining = buffer.getvalue().strip()
        if remaining:
            df_chunk = pd.read_csv(io.StringIO(remaining))
            if 'Count' in df_chunk.columns:
                sum_metric += df_chunk['Count'].sum()
            acc_rows += len(df_chunk)
    print('Filas procesadas:', acc_rows, '| Suma Count:', sum_metric)
except Exception as e:
    print('Fallo streaming CSV:', e)

Descargando stream (simulado) y procesando por chunks...
Filas procesadas: 221 | Suma Count: 0
Filas procesadas: 221 | Suma Count: 0


In [None]:
# Celda extra: inspección de esquema remoto (lee sólo metadatos)
import duckdb
print('Mostrando esquema del primer archivo remoto (sin leer datos completos)...')
# PRAGMA table_info no acepta directamente una table function; usamos DESCRIBE SELECT
info_df = duckdb.query(f"DESCRIBE SELECT * FROM read_parquet('{urls[0]}') LIMIT 0").to_df()
print('Columnas y tipos detectados (primeros 30):')
info_df.head(30)

Mostrando esquema del primer archivo remoto (sin leer datos completos)...
Columnas y tipos detectados (primeros 30):
Columnas y tipos detectados (primeros 30):


Unnamed: 0,column_name,column_type,null,key,default,extra
0,VendorID,INTEGER,YES,,,
1,tpep_pickup_datetime,TIMESTAMP,YES,,,
2,tpep_dropoff_datetime,TIMESTAMP,YES,,,
3,passenger_count,BIGINT,YES,,,
4,trip_distance,DOUBLE,YES,,,
5,RatecodeID,BIGINT,YES,,,
6,store_and_fwd_flag,VARCHAR,YES,,,
7,PULocationID,INTEGER,YES,,,
8,DOLocationID,INTEGER,YES,,,
9,payment_type,BIGINT,YES,,,


## Comparativa Pandas vs Dask para Lectura y Manipulación de Datos

Objetivos:
- Mostrar enfoques de lectura de datos grandes sin cargar todo en memoria.
- Comparar patrón de chunks manual en Pandas vs carga perezosa/particionada en Dask.
- Efectuar operaciones típicas: filtrado, agregados y cálculo de métricas.
- Usar dataset remoto (NYC Yellow Taxi Parquet) y un ejemplo CSV (stream) para chunks.

Resumen conceptual:
- **Pandas**: Opera en memoria; soporta `chunksize` en `read_csv` pero no en `read_parquet` directamente. Para Parquet se puede usar PyArrow / RowGroups manual.
- **Dask DataFrame**: Abstracción paralela de DataFrames (“colección” de particiones de Pandas). Lee múltiples archivos (`read_parquet`, `read_csv`) y ejecuta un DAG; computa solo al llamar `.compute()`.
- Caso de uso: Agregaciones diarias y métricas condicionales sobre millones de filas sin traer todo a memoria a la vez.


In [None]:
# Instalación/verificación Dask
import importlib, sys, subprocess

def ensure(pkg):
    try:
        importlib.import_module(pkg)
    except ImportError:
        print('Instalando', pkg)
        subprocess.check_call([sys.executable, '-m', 'pip', 'install', '--quiet', pkg])

for p in ['dask[dataframe]']:
    ensure(p.split('[')[0])

import dask, dask.dataframe as dd
print('Version Dask:', dask.__version__)

In [None]:
# Dask: lectura perezosa de múltiples Parquet remotos
# Nota: Dask puede necesitar fsspec + s3fs/httpfs; ya instalamos fsspec.
import dask.dataframe as dd

parquet_pattern = base_url + '/yellow_tripdata_2024-0*.parquet'  # patrón para meses 01..09 (aquí aplican 01,02,03 disponibles)
print('Leyendo patrón remoto con Dask:', parquet_pattern)
# engine=pyarrow para mejor compatibilidad
ddf = dd.read_parquet(parquet_pattern, engine='pyarrow', gather_statistics=False)
print(ddf)

# Seleccionar columnas necesarias (optimiza graph)
cols_needed = ['tpep_pickup_datetime', 'trip_distance', 'total_amount', 'passenger_count']
existing = [c for c in cols_needed if c in ddf.columns]
ddf_small = ddf[existing]

# Agregación diaria (lazy)
ddf_small['pickup_date'] = ddf_small['tpep_pickup_datetime'].dt.date
agg_dd = ddf_small.groupby('pickup_date').agg({
    'trip_distance':'mean',
    'total_amount':'mean',
    'passenger_count':'mean'
}).rename(columns={
    'trip_distance':'avg_distance',
    'total_amount':'avg_amount',
    'passenger_count':'avg_passengers'
})

print('Computando agregación Dask...')
result_dd = agg_dd.head(15).compute()  # head primero reduce costo
result_dd

In [None]:
# Comparativa CSV grande: Pandas chunks vs Dask
# Reutilizamos csv_url (puedes sustituir por un CSV más grande público)
import dask.dataframe as dd

print('Leyendo CSV remoto con Dask (lazy)...')
ddf_csv = dd.read_csv(csv_url, blocksize=256_000)  # blocksize ~256KB ejemplo
print(ddf_csv)

# Ejemplo filtrado y agregado (si existe la columna 'Count')
if 'Count' in ddf_csv.columns:
    dask_csv_metric = ddf_csv['Count'].mean().compute()
    print('Dask mean Count:', dask_csv_metric)
else:
    print('Columna Count no encontrada en CSV remoto para Dask ejemplo.')

# Para comparar: ya calculamos sum_metric en Pandas (chunks). Podríamos normalizar:
# (suma / filas) equivalente a media si la queremos; aquí dejamos referencia.


### Resumen Comparativo Pandas vs Dask

| Aspecto | Pandas | Dask |
|---------|--------|------|
| Modelo de ejecución | Inmediato, en memoria | Lazy (DAG), paralelo |
| Escalado | Memoria RAM de una sola máquina | Multi-core/local cluster; escala mejor |
| Lectura Parquet múltiple | Necesita manejar lista y concat; sin lazy row-group by default | `read_parquet` con comodín/patrón, particiones distribuidas |
| Chunks CSV | `read_csv(chunksize=...)` (iterador manual) | `read_csv` crea particiones; operaciones vectorizadas sobre todas |
| Optimizaciones | Vectorización y C internals | Task graph, predicate pushdown parcial vía motores subyacentes |
| Cuándo elegir | Datos caben en RAM y prototipado rápido | Datos grandes/borderline RAM, pipelines repetibles, paralelismo |

Puntos clave:
- Para datasets > memoria, Dask evita OOM ejecutando en particiones.
- Pandas chunks requiere lógica manual (acumulación de agregados). Dask ofrece API similar a Pandas con `.compute()` al final.
- DuckDB complementa ambos cuando se necesitan consultas SQL/pushdown sobre archivos columnares remotos.

Recomendación práctica:
1. Empezar con Pandas para exploración rápida.
2. Migrar a Dask si: (a) RAM insuficiente, (b) se requieren múltiples archivos grandes, (c) necesidad de paralelizar.
3. Integrar DuckDB para consultas ad-hoc y reducir volumen antes de pasar a Pandas/Dask.


## Pandas vs Dask: Funcionalidad, Ventajas, Limitaciones y Rendimiento

### 1. Funcionalidad Básica
- **Pandas**: Librería de manipulación de datos en memoria (DataFrame/Series) con operaciones vectorizadas, rica API (groupby, merges, reshape) y excelente integración con ecosistema científico (NumPy, scikit-learn, matplotlib). Ideal para exploración interactiva y datasets que caben en RAM.
- **Dask DataFrame**: Capa distribuida/perezosa que parte un dataset grande en múltiples particiones (cada una un DataFrame Pandas) y construye un *DAG* de tareas. Solo ejecuta (descarga/lee/computa) cuando se llama a `.compute()` (o métodos que disparan acción). Permite escalar a datos mayores que la memoria y aprovechar múltiples núcleos.

### 2. Modelo de Memoria
| Aspecto | Pandas | Dask |
|---------|--------|------|
| Carga inicial | Trae el dataset completo (salvo iteradores CSV con `chunksize`) | Crea un plan (DAG) y lectura diferida por partición |
| Memoria pico | Aproximadamente tamaño del dataset (más overhead de estructuras) | Depende de tamaño de partición; puede procesar en bloques y liberar |
| Control granular | Requiere manualmente `chunksize` / filtrado previo | Divisiones automáticas; puede persistir solo pasos intermedios críticos |
| Evitar OOM | Difícil si el dataset completo no cabe | Procesa por partición; se puede ajustar el tamaño de bloque |

### 3. Paralelismo y Ejecución
| Tema | Pandas | Dask |
|------|--------|------|
| Paralelismo CPU | Limitado (Global Interpreter Lock) salvo operaciones en C internas | Paraleliza entre particiones (multi-core) y puede ir a cluster |
| Lazy vs Eager | Eager (inmediato) | Lazy (optimiza y ejecuta al final) |
| Overhead | Mínimo para operaciones simples | Overhead de scheduler (≈10–50 ms + planificación) por cómputo |
| Escenarios ideales | Análisis rápido, prototipos, transformaciones intensivas pero medianas | ETL de muchos archivos, pipelines repetibles, datos >RAM, integración con cluster |

### 4. Facilidad de Uso
- Pandas tiene curva de aprendizaje más corta y más documentación/examples.
- Dask replica gran parte de la API de Pandas, pero exige entender diferencias: ejecuciones diferidas, `.compute()`, particiones, efectos de persistencia/caché.
- Debug: en Pandas se inspecciona inmediatamente; en Dask hay que revisar el DAG ( `.visualize()` ) y a veces forzar materialización parcial.

### 5. Rendimiento (Observaciones Generales)
| Situación | Qué suele ganar | Razón |
|-----------|-----------------|-------|
| Dataset pequeño (<200 MB) | Pandas | Menor overhead; todo cabe en RAM |
| Muchos archivos Parquet/CSV en un patrón | Dask | Lectura distribuida y pushdown columnar por partición |
| Fusión / groupby grande que casi llena RAM | Dask (si bien configurado) | Procesa por particiones y reduce uso pico de memoria |
| Operaciones muy vectorizadas en columnas numéricas moderadas | Pandas | Menos capas; ejecución directa en C/NumPy |
| Pipeline escalable / reejecutable en batch | Dask | DAG reproducible y escalable a cluster |

### 6. Limitaciones
| Pandas | Dask |
|--------|------|
| No escala fuera de RAM sin trabajo manual (particionamiento propio) | Algunas funciones Pandas no están 100% implementadas; requiere cuidado en merges grandes |
| Operaciones paralelas limitadas | Overhead puede volverlo más lento en datasets pequeños |
| Riesgo de OOM en joins / groupbys grandes | Requiere tuning (tamaño de partición, persist, scheduler) |

### 7. Integración con Otros Enfoques
- **DuckDB** + Pandas: Pre-filtrar y seleccionar columnas desde archivos Parquet remotos y luego pasar un subconjunto a Pandas/Dask.
- **Dask + scikit-learn**: Para grandes datos tabulares, usar `dask-ml` o convertir a muestras representativas.
- **Polars** (mención rápida): Otra alternativa columnar/rápida (no incluida en demo) que mezcla lazy + eager.

### 8. Recomendación de Estrategia
1. Usar DuckDB o filtros de origen para reducir columnas/filas iniciales.
2. Si el resultado reducido cabe en memoria → Pandas.
3. Si no cabe o el pipeline recorrerá muchos archivos cada día → Dask.
4. Medir con un subconjunto primero (1–5%) para calibrar costos.

La siguiente celda incluye benchmarks reproducibles (lectura Parquet parcial y agregación) comparando Pandas (vía DuckDB extracción limitada) vs Dask (paralelo). Ejecuta y observa los tiempos en tu entorno.


In [None]:
# Benchmarks comparativos (tiempos) Pandas (vía DuckDB para extraer subset) vs Dask
import time, duckdb, pandas as pd, dask.dataframe as dd
from statistics import mean

# Reutilizamos variables: base_url, urls ya definidos antes (meses 2024-01..03)
# 1. Pandas: extraer un subconjunto con DuckDB (simulando filtrado previo) y luego groupby en Pandas
pandas_times = []
for i in range(3):
    t0 = time.perf_counter()
    # Leer solo columnas necesarias y cierta ventana (limit) para aislar costo lógico
    pdf = duckdb.query(f"SELECT tpep_pickup_datetime, trip_distance, total_amount, passenger_count FROM read_parquet([{','.join([repr(u) for u in urls])}]) WHERE trip_distance > 0 LIMIT 300000").to_df()
    # Groupby Pandas
    pdf['pickup_date'] = pdf['tpep_pickup_datetime'].dt.date
    agg_pdf = pdf.groupby('pickup_date', as_index=False).agg({
        'trip_distance':'mean', 'total_amount':'mean', 'passenger_count':'mean'
    })
    pandas_times.append(time.perf_counter()-t0)

# 2. Dask: usar DataFrame completo (lazy) y computar la misma agregación (head para similar volumen)
dask_times = []
# Construimos solo si no existe
ddf_cols = ['tpep_pickup_datetime','trip_distance','total_amount','passenger_count']
if 'ddf' in globals():
    ddf_bench = ddf[ddf_cols]
else:
    parquet_pattern = base_url + '/yellow_tripdata_2024-0*.parquet'
    ddf_bench = dd.read_parquet(parquet_pattern, engine='pyarrow', gather_statistics=False, columns=ddf_cols)

for i in range(3):
    t0 = time.perf_counter()
    tmp = ddf_bench.assign(pickup_date=ddf_bench.tpep_pickup_datetime.dt.date) \
        .groupby('pickup_date').agg({
            'trip_distance':'mean', 'total_amount':'mean', 'passenger_count':'mean'
        }).head(20).compute()
    dask_times.append(time.perf_counter()-t0)

summary = pd.DataFrame({
    'framework':['pandas']*len(pandas_times)+['dask']*len(dask_times),
    'seconds': pandas_times + dask_times
})
print('Resultados de cada repetición (s):')
print(summary)
print('\nPromedios:')
print(summary.groupby('framework')['seconds'].agg(['count','mean','std']))

print('\nObservaciones (heurísticas):')
print('- Pandas incluye costo de materializar 300k filas + groupby en memoria.')
print('- Dask incluye overhead de scheduler; para volúmenes pequeños puede ser más lento.')
print('- Si aumentas el LIMIT (ej. a millones) el gap puede invertirse a favor de Dask al paralelizar.')

summary.head()