# Pipeline Config-Driven: Notebook E2E

Este notebook implementa un pipeline de datos completo y funcional con:
- Ingesta multi-fuente (CSV/JSON/JDBC/API) con autenticación y manejo de errores.
- Transformaciones: renombres, casts con validación, limpieza de nulos/duplicados y normalización.
- Flujo Raw → Silver → Gold con escritura en bucket (S3A/MinIO) y base de datos (PostgreSQL).
- Pruebas unitarias básicas y verificación de outputs.
- Métricas de ejecución por etapa (tiempos y conteos).

## 0. Setup e Imports
Se crean utilidades, métricas y se configura un `run_id` para trazabilidad.

In [1]:
import os, json, glob, time
from datetime import datetime
from pyspark.sql import SparkSession, functions as F

from pipelines.config.loader import load_env_config, load_dataset_config
from pipelines.utils.logger import get_logger
from pipelines.transforms.apply import apply_sql_transforms, apply_udf_transforms
from pipelines.validation.quality import apply_quality
from pipelines.io.reader import read_parquet
from pipelines.io.writer import write_parquet
from pipelines.io.s3a import configure_s3a
from pipelines.sources import load_sources_or_source, sanitize_nulls, project_columns, flatten_json
from pipelines.common import safe_cast, maybe_config_s3a
from pipelines.database.db_manager import create_database_manager_from_file
from pipelines.spark_job_with_db import create_gold_table_name

RUN_ID = f"notebook-{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}"
logger = get_logger('docs_pipeline', run_id=RUN_ID)
METRICS = {}
logger.info('Notebook inicializado', extra={'run_id': RUN_ID})


ModuleNotFoundError: No module named 'pipelines'

## Quick Start (ejecución directa)
Ejecuta todo el flujo con fuentes de ejemplo locales/JDBC/API y escribe en rutas locales (`data/output/quick_start`).
Puedes ajustar las rutas en `QS_SOURCES` y `QS_OUTPUT_*`. Usa la variable `USE_QS` para activar/usar este flujo.

In [2]:
USE_QS = True  # Cambia a False si prefieres usar un dataset.yml real

QS_SOURCES = [
  {'type': 'file', 'input_format': 'csv', 'path': 'data/raw/sample_payments.csv', 'options': {'header': True, 'inferSchema': True}},
  {'type': 'file', 'input_format': 'json', 'path': 'data/raw/sample_events.json', 'options': {'multiline': True}},
  {'type': 'jdbc', 'jdbc': {'url': 'jdbc:postgresql://localhost:5432/testdb', 'table': 'finanzas.payments_db_only_src', 'user': 'testuser', 'password': 'testpass'}},
  {'type': 'api', 'api': {'method': 'GET', 'endpoint': 'https://httpbin.org/json', 'items_key': 'slideshow/slides', 'pagination': {'enabled': False}}}
]
QS_OUTPUT_SILVER = 'data/output/quick_start/silver'
QS_OUTPUT_GOLD = 'data/output/quick_start/gold'
QS_QUARANTINE = 'data/output/quick_start/quarantine'

qs_cfg = {
  'id': 'quick_start',
  'sources': QS_SOURCES,
  'output': {
    'silver': {'path': QS_OUTPUT_SILVER, 'partition_by': ['year','month'], 'partition_from': 'created_at'},
    'gold': {
      'bucket': {'enabled': True, 'path': QS_OUTPUT_GOLD, 'exclude_columns': ['_run_id','_ingestion_ts']},
      'db': {'enabled': False}
    }
  },
  'standardization': {
    'renames': {'id': 'payment_id'},
    'casts': [ {'column': 'created_at', 'to': 'timestamp'} ]
  },
  'json_normalization': {'flatten': True},
  'null_handling': {'fills': {'currency': 'USD'}, 'drop_if_null': {'payment_id': True}},
  'quality': {
    'quarantine': QS_QUARANTINE,
    'rules': [
      {'name': 'non_negative_amount', 'filter': 'amount >= 0', 'action': 'drop'},
      {'name': 'valid_currency', 'filter': "currency IN ('USD','EUR','CLP','COP')", 'action': 'quarantine'}
    ]
  }
}

def quick_run(cfg, env):
    global spark, METRICS
    # Ingesta
    t0 = time.time()
    df_raw = load_sources_or_source(cfg, spark, env)
    df_raw.cache()
    METRICS['ingesta_rows'] = df_raw.count()
    METRICS['ingesta_sec'] = time.time() - t0
    print('[QS][ingesta] filas:', METRICS['ingesta_rows'])
    # Standardización
    t1 = time.time()
    df = df_raw
    std = cfg.get('standardization', {})
    renames = std.get('renames', {})
    casts = std.get('casts', [])
    if renames:
        df = df.toDF(*[renames.get(c, c) for c in df.columns])
    df = flatten_json(df)
    df = sanitize_nulls(df, fills=cfg.get('null_handling',{}).get('fills'), drop_if_null=cfg.get('null_handling',{}).get('drop_if_null'))
    for cdef in casts:
        df = safe_cast(df, cdef.get('column'), cdef.get('to'), on_error=cdef.get('on_error'))
    METRICS['transform_sec'] = time.time() - t1
    # Calidad
    t2 = time.time()
    qcfg = cfg.get('quality', {})
    df_q, bad_df, stats = apply_quality(df, qcfg.get('rules',[]), qcfg.get('quarantine'), run_id=RUN_ID)
    METRICS['quality_sec'] = time.time() - t2
    METRICS['quality_stats'] = stats
    # Particionado y escritura
    out_silver = cfg.get('output',{}).get('silver',{})
    parts = out_silver.get('partition_by', [])
    base_col_name = out_silver.get('partition_from')
    df_s = df_q
    if parts:
        base_col = F.col(base_col_name) if base_col_name and base_col_name in df_s.columns else None
        if base_col is not None:
            for p in parts:
                lp = p.lower()
                if lp == 'year' and 'year' not in df_s.columns:
                    df_s = df_s.withColumn('year', F.year(base_col))
                if lp == 'month' and 'month' not in df_s.columns:
                    df_s = df_s.withColumn('month', F.month(base_col))
    # Heurística de coalesce (datasets pequeños)
    rows_s = df_s.count()
    METRICS['silver_rows'] = rows_s
    coalesce_val = 1 if rows_s <= 100000 else None
    t3 = time.time()
    write_parquet(df_s, QS_OUTPUT_SILVER, mode='overwrite', partition_by=parts, coalesce=coalesce_val)
    METRICS['silver_write_sec'] = time.time() - t3
    exclude_cols = {'_run_id','_ingestion_ts'}
    df_g = df_s.select([c for c in df_s.columns if c not in exclude_cols])
    rows_g = df_g.count()
    METRICS['gold_rows'] = rows_g
    t4 = time.time()
    write_parquet(df_g, QS_OUTPUT_GOLD, mode='overwrite', partition_by=parts, coalesce=coalesce_val)
    METRICS['gold_write_sec'] = time.time() - t4
    print('[QS] Silver:', QS_OUTPUT_SILVER)
    print('[QS] Gold:', QS_OUTPUT_GOLD)
    return {'df_raw': df_raw, 'df_silver': df_s, 'df_gold': df_g, 'bad_df': bad_df, 'stats': stats}


## 1. Carga de Configuración
Se cargan `env.yml` y el `dataset.yml`. Fallback inline si no existen.
Incluye rutas de salida, parámetros Spark y credenciales (S3A/DB/API).

In [3]:
ENV_PATH = 'config/env.yml'
DATASET_PATH = 'config/datasets/finanzas/payments_v1/dataset_with_gold.yml'
DB_CONFIG_PATH = 'config/database.yml'

try:
    env = load_env_config(ENV_PATH)
except Exception as e:
    env = {}
    print('Aviso: env.yml no encontrado, usando defaults. Error:', e)

try:
    dataset_cfg = load_dataset_config(DATASET_PATH)
except Exception as e:
    dataset_cfg = {
        'id': 'demo_multi',
        'sources': QS_SOURCES,
        'output': {
            'silver': {'path': QS_OUTPUT_SILVER, 'partition_by': ['year','month'], 'partition_from': 'created_at'},
            'gold': {
                'bucket': {'enabled': True, 'path': QS_OUTPUT_GOLD, 'exclude_columns': ['_run_id','_ingestion_ts']},
                'db': {'enabled': False}
            }
        },
        'standardization': {
            'renames': {'id': 'payment_id'},
            'casts': [ {'column': 'created_at', 'to': 'timestamp'} ]
        },
        'json_normalization': {'flatten': True},
        'null_handling': {'fills': {'currency': 'USD'}, 'drop_if_null': {'payment_id': True}},
        'quality': {
            'quarantine': QS_QUARANTINE,
            'rules': [
                {'name': 'non_negative_amount', 'filter': 'amount >= 0', 'action': 'drop'},
                {'name': 'valid_currency', 'filter': "currency IN ('USD','EUR','CLP','COP')", 'action': 'quarantine'}
            ]
        }
    }
    print('Aviso: dataset.yml no encontrado, usando fallback inline. Error:', e)

logger.info('Config cargada', extra={'env_keys': list(env.keys()), 'dataset_id': dataset_cfg.get('id')})


Aviso: env.yml no encontrado, usando defaults. Error: name 'load_env_config' is not defined
Aviso: dataset.yml no encontrado, usando fallback inline. Error: name 'load_dataset_config' is not defined


NameError: name 'logger' is not defined

## 2. Inicializar Spark y parámetros
Detecta JARs en `jars/` y configura S3A si se usan rutas `s3a://`.
Reduce `spark.sql.shuffle.partitions` para mejorar performance en datasets pequeños.

In [4]:
spark = (
    SparkSession.builder
    .master(env.get('spark',{}).get('master','local[2]'))
    .appName(f"notebook::{dataset_cfg.get('id','demo')}")
    .config('spark.sql.session.timeZone', env.get('timezone','UTC'))
    .config('spark.sql.sources.partitionOverwriteMode', 'dynamic')
    .getOrCreate()
)
spark.conf.set('spark.sql.shuffle.partitions', '8')
print('Spark version:', spark.version)

# JARs locales (si existen)
jars_dir = 'jars'
jar_files = glob.glob(os.path.join(jars_dir, '*.jar')) if os.path.isdir(jars_dir) else []
if jar_files:
    spark.sparkContext.addPyFile(jar_files[0])
    print('JARs detectados:', len(jar_files))

# Configurar S3A según rutas
silver_path = dataset_cfg.get('output',{}).get('silver',{}).get('path','')
gold_bucket_path = dataset_cfg.get('output',{}).get('gold',{}).get('bucket',{}).get('path','')
configure_s3a(spark, silver_path, env)
configure_s3a(spark, gold_bucket_path, env)


KeyboardInterrupt: 

## 3. Ingesta Multi-Fuente (o Quick Start)
Si `USE_QS=True`, ejecuta `quick_run` y salta a verificación; de lo contrario, usa la configuración del dataset.

In [None]:
if USE_QS:
    results = quick_run(qs_cfg, env)
    df_raw = results['df_raw']; df_s = results['df_silver']; df_g = results['df_gold']; bad_df = results['bad_df']; stats = results['stats']
else:
    try:
        t0 = time.time()
        df_raw = load_sources_or_source(dataset_cfg, spark, env)
        df_raw.cache()
        METRICS['ingesta_rows'] = df_raw.count()
        METRICS['ingesta_sec'] = time.time() - t0
        print('[ingesta] filas leídas:', METRICS['ingesta_rows'])
    except Exception as e:
        print('[ingesta][ERROR] No se pudo leer fuentes:', e)
        df_raw = spark.createDataFrame([(1, 10.0, 'USD', '2025-09-01T10:00:00Z')], ['id','amount','currency','created_at'])

    df_raw.printSchema()
    df_raw.show(5)


## 4. Transformaciones (Renombres, Casts, Limpieza, Normalización)
Aplicamos operaciones base y opcionalmente `SQL/UDF` declarativas si `transforms_ref` está configurado.

In [None]:
if not USE_QS:
    t1 = time.time()
    df = df_raw
    std = dataset_cfg.get('standardization', {})
    renames = std.get('renames', {})
    casts = std.get('casts', [])
    if renames:
        df = df.toDF(*[renames.get(c, c) for c in df.columns])
    json_norm = dataset_cfg.get('json_normalization', {})
    if json_norm.get('flatten', False):
        df = flatten_json(df)
    nulls_cfg = dataset_cfg.get('null_handling', {})
    df = sanitize_nulls(df, fills=nulls_cfg.get('fills'), drop_if_null=nulls_cfg.get('drop_if_null'))
    for cdef in casts:
        df = safe_cast(df, cdef.get('column'), cdef.get('to'), on_error=cdef.get('on_error'))
    key_cols = [c for c in ['payment_id','created_at'] if c in df.columns]
    if key_cols:
        df = df.dropDuplicates(key_cols)
    keep_cols = dataset_cfg.get('output',{}).get('gold',{}).get('bucket',{}).get('keep_columns')
    df = project_columns(df, keep=keep_cols)
    tpath = dataset_cfg.get('transforms_ref')
    if tpath:
        try:
            transforms_cfg = load_dataset_config(tpath)
            df = apply_sql_transforms(df, transforms_cfg)
            df = apply_udf_transforms(df, transforms_cfg)
            print('[transforms] aplicadas desde', tpath)
        except Exception as e:
            print('[transforms][WARN] No se aplicaron transforms:', e)
    METRICS['transform_sec'] = time.time() - t1
    df.printSchema()
    df.show(10)


## 5. Validaciones de Calidad y Cuarentena
Se aplican reglas con acciones `quarantine`, `drop`, `warn`, `fail`. Los inválidos se escriben en `quarantine` si está configurado.
Se muestra un sample de `bad_df` con `_failed_rules` si existen registros inválidos.

In [None]:
if not USE_QS:
    quality_cfg = dataset_cfg.get('quality', {})
    quarantine_path = quality_cfg.get('quarantine')
    if quarantine_path:
        maybe_config_s3a(spark, quarantine_path, env)
    rules = quality_cfg.get('rules', [])
    t2 = time.time()
    df_q, bad_df, stats = apply_quality(df, rules, quarantine_path, run_id=RUN_ID)
    METRICS['quality_sec'] = time.time() - t2
    METRICS['quality_stats'] = stats
    print('[quality] stats:', stats)
    df_q.show(10)
    if bad_df is not None:
        print('[quality] sample bad_df:')
        bad_df.select('_failed_rules').show(5)


## 6. Escritura a Silver y Gold (Bucket)
Configura particionado `year/month` a partir de `created_at` si existe. Usa heurística de coalesce para datasets pequeños.

In [None]:
if not USE_QS:
    out_silver = dataset_cfg.get('output',{}).get('silver',{})
    silver_path = out_silver.get('path')
    parts = out_silver.get('partition_by', [])
    base_col_name = out_silver.get('partition_from')
    df_s = df_q
    if parts:
        base_col = F.col(base_col_name) if base_col_name and base_col_name in df_s.columns else None
        if base_col is not None:
            for p in parts:
                lp = p.lower()
                if lp == 'year' and 'year' not in df_s.columns:
                    df_s = df_s.withColumn('year', F.year(base_col))
                if lp == 'month' and 'month' not in df_s.columns:
                    df_s = df_s.withColumn('month', F.month(base_col))
    rows_s = df_s.count()
    METRICS['silver_rows'] = rows_s
    coalesce_val = 1 if rows_s <= 100000 else None
    t3 = time.time()
    if silver_path:
        write_parquet(df_s, silver_path, mode='overwrite', partition_by=parts, coalesce=coalesce_val)
        METRICS['silver_write_sec'] = time.time() - t3
        print('[silver] escrito en', silver_path)
    gold_bucket_cfg = dataset_cfg.get('output',{}).get('gold',{}).get('bucket',{})
    gold_path = gold_bucket_cfg.get('path')
    exclude_cols = set(gold_bucket_cfg.get('exclude_columns', []))
    df_g = df_s.select([c for c in df_s.columns if c not in exclude_cols])
    rows_g = df_g.count()
    METRICS['gold_rows'] = rows_g
    t4 = time.time()
    if gold_path:
        write_parquet(df_g, gold_path, mode='overwrite', partition_by=parts, coalesce=coalesce_val)
        METRICS['gold_write_sec'] = time.time() - t4
        print('[gold][bucket] escrito en', gold_path)


## 7. Publicación en Gold (Base de Datos)
Opcional: si existe `config/database.yml`, usa `DatabaseManager` para escritura/UPSERT y logging de ejecución.

In [None]:
if not USE_QS:
    try:
        if os.path.exists(DB_CONFIG_PATH):
            db_manager = create_database_manager_from_file(DB_CONFIG_PATH, environment='default')
            table_settings = load_env_config(DB_CONFIG_PATH).get('table_settings', {})
            table_name = create_gold_table_name(dataset_cfg.get('id','demo'), table_settings) or f"{dataset_cfg.get('id','demo')}_gold"
            exec_id = db_manager.log_pipeline_execution(dataset_name=dataset_cfg.get('id','demo'), pipeline_type='etl', status='started')
            df_gdb = df_s.select([c for c in df_s.columns if c not in exclude_cols])
            tdb = time.time()
            success = db_manager.write_dataframe(df_gdb, table_name, mode='append', upsert_keys=table_settings.get('upsert_keys'))
            METRICS['gold_db_write_sec'] = time.time() - tdb
            db_manager.log_pipeline_execution(dataset_name=dataset_cfg.get('id','demo'), status=('completed' if success else 'failed'), execution_id=exec_id)
            print('[gold][db] escrito en tabla', table_name, 'éxito=', success)
        else:
            print('[gold][db] Config DB no encontrado, se omite.')
    except Exception as e:
        print('[gold][db][ERROR] escritura falló:', e)


## 8. Verificación de Outputs
Lista archivos generados en Silver/Gold y muestra un sample desde Gold si es posible.

In [None]:
silver_base = silver_path if 'silver_path' in globals() and silver_path else QS_OUTPUT_SILVER
gold_base = gold_path if 'gold_path' in globals() and gold_path else QS_OUTPUT_GOLD
print('Silver base:', silver_base)
print('Gold base:', gold_base)

def list_files(base):
    for p in glob.glob(os.path.join(base, '**', '*'), recursive=True):
        if os.path.isfile(p):
            print(p)

if silver_base:
    print('[verify] Archivos en Silver:')
    list_files(silver_base)
if gold_base:
    print('[verify] Archivos en Gold:')
    list_files(gold_base)

try:
    df_ver = spark.read.parquet(gold_base)
    print('[verify] Rows en Gold:', df_ver.count())
    df_ver.show(5)
except Exception as e:
    print('[verify][WARN] No se pudo leer desde Gold:', e)


## 9. Pruebas Unitarias Básicas
Validan ingesta, transformaciones, calidad y escritura.

In [None]:
assert (METRICS.get('ingesta_rows', 1) >= 1) or USE_QS, 'Ingesta vacía'
if 'payment_id' in (df_s.columns if USE_QS else df_q.columns):
    colname = 'payment_id'
    target_df = df_s if USE_QS else df_q
    assert target_df.select(colname).where(F.col(colname).isNull()).count() == 0, 'payment_id no debe ser nulo tras limpieza'
assert isinstance(METRICS.get('quality_stats', {}), dict), 'Stats debe ser dict'
try:
    df_s2 = read_parquet(spark, silver_base)
    assert df_s2.count() == (METRICS.get('silver_rows') or df_s.count()), 'Conteo en silver debe coincidir'
except Exception as e:
    print('[test][WARN] Lectura silver falló:', e)
print('Pruebas básicas OK')


## 10. Métricas de Ejecución
Resumen de tiempos y conteos clave por etapa.

In [None]:
summary = {
  'ingesta_rows': METRICS.get('ingesta_rows'),
  'ingesta_sec': METRICS.get('ingesta_sec'),
  'transform_sec': METRICS.get('transform_sec'),
  'quality_sec': METRICS.get('quality_sec'),
  'silver_rows': METRICS.get('silver_rows'),
  'silver_write_sec': METRICS.get('silver_write_sec'),
  'gold_rows': METRICS.get('gold_rows'),
  'gold_write_sec': METRICS.get('gold_write_sec'),
  'gold_db_write_sec': METRICS.get('gold_db_write_sec'),
  'quality_stats': METRICS.get('quality_stats'),
}
print(json.dumps(summary, indent=2))


## 11. Cierre
Detener Spark y próximos pasos (Delta Lake, métricas avanzadas, orquestación).

In [None]:
spark.stop()
print('Spark detenido, pipeline notebook finalizado.')
