Tarea 2 For-each

In [None]:
# --- CELDA DE PRUEBA MANUAL ---
# (Borra esta celda antes de ponerla en el Job final)

# 1. Pega la configuración COMPLETA Y VÁLIDA
test_config_json = """
{
  "name": "df1",
  "sources": [
    {
      "name": "person_inputs",
      "path": "/Volumes/workspace/prueba_tecnica/data/inputs/events/person/*",
      "format": "JSON"
    },
    {
      "name": "employees_inputs",
      "path": "/Volumes/workspace/prueba_tecnica/data/inputs/events/employees/*",
      "format": "JSON"
    }
  ],
  "transformations": [
    {
      "name": "validation",
      "type": "validate_fields",
      "params": { 
        "input": "person_inputs", 
        "validations": [
          {
            "field": "office",
            "validations": ["notEmpty"]
          },
          {
            "field": "age",
            "validations": ["notNull"]
          }
        ] 
      }
    },
    {
      "name": "ok_with_date",
      "type": "add_fields",
      "params": { 
        "input": "validation_ok", 
        "addFields": [
          {
            "name": "dt",
            "function": "current_timestamp"
          }
        ] 
      }
    }
  ],
  "sinks": [
    {
      "input": "ok_with_date",
      "name": "raw_ok",
      "path": "/Volumes/workspace/prueba_tecnica/silver/person",
      "format": "DELTA",
      "saveMode": "OVERWRITE"
    },
    {
      "input": "validation_ko",
      "name": "raw_ko",
      "path": "/Volumes/workspace/prueba_tecnica/discards/person",
      "format": "DELTA",
      "saveMode": "OVERWRITE"
    }
  ]
}
"""

# 2. Esto simula lo que hace el Job:
dbutils.widgets.text("dataflow_config_json", test_config_json)
print("Widget 'dataflow_config_json' rellenado con datos de prueba VÁLIDOS.")

In [None]:
import sys
import os

# 1. Obtener la raíz del proyecto (en Repos, os.getcwd() es la raíz)
project_root = os.getcwd()

# 2. Añadir la raíz al path de Python
if project_root not in sys.path:
    sys.path.append(project_root)
  
  
# Databricks Notebook: 02_task_run_dataflow
import json
import logging

# IMPORTANTE: Asume que tu código 'src' está disponible
# (ej. porque estás usando Databricks Repos)
from src import orchestrator
from src import utils

# Configuración del logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)
log = logging.getLogger(__name__)

# --- 1. Definir el Widget de Entrada ---
# Este widget recibirá la configuración de UN solo dataflow.
# El Job "For-Each" de Databricks rellenará este widget.
# IMPORTANTE: Databricks pasa los diccionarios como un string de JSON.
dbutils.widgets.text("dataflow_config_json", "{}", "Configuración del Dataflow (JSON String)")

# --- 2. Leer el Widget ---
dataflow_json_string = dbutils.widgets.get("dataflow_config_json")
log.info("Tarea 2 (For-Each) iniciada.")

try:
    # --- 3. Convertir el string JSON de vuelta a un diccionario ---
    dataflow_config = json.loads(dataflow_json_string)
    
    if not dataflow_config or 'name' not in dataflow_config:
        raise ValueError("Configuración de dataflow vacía o inválida recibida.")
        
    dataflow_name = dataflow_config.get("name")
    log.info(f"Procesando dataflow: {dataflow_name}")

    # --- 4. Obtener Spark y Ejecutar el Framework ---
    
    # utils.get_spark_session() detectará que está en Databricks
    # y simplemente obtendrá la sesión 'spark' existente.
    spark = utils.get_spark_session()
    
    # ¡Aquí es donde se llama a todo tu código de 'src' (orchestrator, writers, etc.)!
    orchestrator.run_single_dataflow(spark, dataflow_config)
    
    log.info(f"Dataflow {dataflow_name} completado exitosamente.")

except Exception as e:
    log.error(f"Error fatal durante la ejecución del dataflow: {e}", exc_info=True)
    # Lanzamos el error para que la Tarea del Job de Databricks falle
    raise