In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from notebookutils import mssparkutils
import sys
from datetime import datetime as dt 

StatementMeta(, 6669dea6-fad1-497f-a439-dbf04a00876f, 3, Finished, Available, Finished)

In [None]:
# ============================================
# Configuración
# ============================================
bronze_path_param = "Files/bronze/REDATA/data"
category_param = ""
widget_param = ""
lakehouse_name = "lh_bronze"

# Obtener parámetros del pipeline
if 'bronze_path' in locals() or 'bronze_path' in globals():
    bronze_path_param = bronze_path if 'bronze_path' in locals() else globals().get('bronze_path', bronze_path_param)
if 'category' in locals() or 'category' in globals():
    category_param = category if 'category' in locals() else globals().get('category', "")
if 'widget' in locals() or 'widget' in globals():
    widget_param = widget if 'widget' in locals() else globals().get('widget', "")

print(f"📂 Bronze path: {bronze_path_param}")
print(f"📦 Category: {category_param or 'Todas'}")
print(f"🔧 Widget: {widget_param or 'Todos'}")


In [None]:
# ============================================
# Carga json_log.py
# ============================================
json_log_path = "Files/code/json_log.py"
try:
    json_log_code = mssparkutils.fs.head(json_log_path, 100000)
    exec(json_log_code)
    print("✅ json_log.py cargado correctamente")
except Exception as e:
    print(f"⚠️ Error cargando json_log.py: {e}")
    print("Se usará lógica de log inline como fallback")
    
    def check_json_log_exists(lakehouse_name, medallion_short, source_file, api_name=None):
        log_table = f"{medallion_short}_json_log"
        if not spark._jsparkSession.catalog().tableExists(log_table):
            return False
        df = spark.table(log_table).filter(col("source_file") == source_file)
        if api_name:
            df = df.filter(col("api_name") == api_name)
        return df.limit(1).count() > 0
    
    def save_json_log(api_name, source_file, target_table, ingestion_date, 
                      write_mode="append", lakehouse_name="lh_bronze", medallion_short="brz"):
        from pyspark.sql.types import StructType, StructField, StringType, TimestampType
        
        schema = StructType([
            StructField("api_name", StringType(), True),
            StructField("source_file", StringType(), True),
            StructField("target_table", StringType(), True),
            StructField("ingestion_date", TimestampType(), True),
        ])
        
        new_row = [(str(api_name), str(source_file), str(target_table), ingestion_date)]
        df_new = spark.createDataFrame(new_row, schema=schema)
        log_table = f"{medallion_short}_json_log"
        
        try:
            if spark._jsparkSession.catalog().tableExists(log_table):
                df_existing = spark.table(log_table)
                duplicate = (
                    df_existing
                    .filter((col("api_name") == api_name) & (col("source_file") == source_file))
                    .limit(1).count()
                )
                if duplicate > 0:
                    print(f"\t⚠️ Ya procesado: {source_file}")
                    return None
            
            df_new.write.format("delta").mode(write_mode).saveAsTable(log_table)
            print(f"\t✅ Log guardado: {source_file}")
        except Exception as e:
            print(f"\t⛔ Error guardando log: {str(e)}")


In [None]:
# ============================================
# Inicializar Spark
# ============================================

spark = SparkSession.builder.appName("REData_JSON_to_Delta").getOrCreate()

StatementMeta(, 6669dea6-fad1-497f-a439-dbf04a00876f, 6, Finished, Available, Finished)

In [None]:
# ============================================
# Funciones auxiliares
# ============================================

def list_categories():
    """Lista categorías disponibles en bronze"""
    try:
        files = mssparkutils.fs.ls(bronze_path_param)
        return [f.name for f in files if f.isDir]
    except:
        return []


def list_widgets(category):
    """Lista widgets de una categoría"""
    try:
        path = f"{bronze_path_param}/{category}"
        files = mssparkutils.fs.ls(path)
        return [f.name for f in files if f.isDir]
    except:
        return []


def get_new_json_files(category, widget, time_trunc):
    """
    Obtiene lista de archivos JSON aún no procesados para un time_trunc específico.
    ✅ SOLO PROCESA 'month' - ignora 'day'
    """
    if time_trunc != "month":
        return []  # ✅ SOLO MENSUALES
    
    json_paths = []
    api_name = f"{category}/{widget}/{time_trunc}"
    
    path = f"{bronze_path_param}/{category}/{widget}/{time_trunc}"
    try:
        files = mssparkutils.fs.ls(path)
        for f in files:
            if f.name.endswith('.json'):
                full_path = f"{path}/{f.name}"
                
                # Verificar si ya fue procesado
                if not check_json_log_exists(
                    lakehouse_name=lakehouse_name,
                    medallion_short="brz",
                    source_file=full_path,
                    api_name=api_name
                ):
                    json_paths.append(full_path)
    except:
        pass
    
    return json_paths


def extract_metadata(df_raw, category, widget, time_trunc):
    """Extrae metadatos del nodo api_response.data"""
    try:
        sample = df_raw.select(
            col("api_response.data.type").alias("data_type"),
            col("api_response.data.id").alias("data_id"),
            col("api_response.data.attributes.title").alias("data_title"),
            col("api_response.data.attributes.last-update").alias("data_last_update"),
            col("api_response.data.attributes.description").alias("data_description")
        ).first()
        
        if sample:
            return {
                "category": category,
                "widget": widget,
                "time_trunc": time_trunc,
                "data_type": sample["data_type"],
                "data_id": sample["data_id"],
                "data_title": sample["data_title"],
                "data_last_update": sample["data_last_update"],
                "data_description": sample["data_description"],
                "metadata_extraction_timestamp": dt.now()  
            }
    except Exception as e:
        print(f"\t⚠️ No se pudo extraer metadata: {str(e)}")
    
    return None


def save_metadata(metadata_dict):
    """Guarda metadatos en la tabla brz_redata_metadata"""
    if not metadata_dict:
        return
    
    schema = StructType([
        StructField("category", StringType(), True),
        StructField("widget", StringType(), True),
        StructField("time_trunc", StringType(), True),
        StructField("data_type", StringType(), True),
        StructField("data_id", StringType(), True),
        StructField("data_title", StringType(), True),
        StructField("data_last_update", StringType(), True),
        StructField("data_description", StringType(), True),
        StructField("metadata_extraction_timestamp", TimestampType(), True)
    ])
    
    df_meta = spark.createDataFrame([tuple(metadata_dict.values())], schema=schema)
    
    try:
        # Verificar si ya existe este registro
        if spark._jsparkSession.catalog().tableExists("brz_redata_metadata"):
            existing = spark.table("brz_redata_metadata").filter(
                (col("category") == metadata_dict["category"]) &
                (col("widget") == metadata_dict["widget"]) &
                (col("time_trunc") == metadata_dict["time_trunc"])
            )
            if existing.count() > 0:
                print(f"\t📋 Metadata ya existe para {metadata_dict['category']}/{metadata_dict['widget']}/{metadata_dict['time_trunc']}")
                return
        
        df_meta.write.format("delta").mode("append").saveAsTable("brz_redata_metadata")
        print(f"\t✅ Metadata guardada: {metadata_dict['category']}/{metadata_dict['widget']}/{metadata_dict['time_trunc']}")
    except Exception as e:
        print(f"\t⛔ Error guardando metadata: {str(e)}")


def parse_datetime_local(datetime_str):
    """
    ✅ SOLUCIÓN AL PROBLEMA DE FECHAS
    Convierte datetime preservando la fecha local sin convertir a UTC
    
    Entrada:  "2024-10-01T00:00:00.000+02:00"
    Salida:   "2024-10-01 00:00:00" (mantiene fecha/hora local)
    """
    # Extraer solo la parte de fecha y hora, ignorar offset
    return regexp_replace(datetime_str, r"^(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}).*$", "$1")


def process_balance_electrico(df_raw, category, widget, time_trunc):
    """Procesa balance/balance-electrico (estructura con content anidado)"""
    df_meta = df_raw.select(
        col("request_metadata.geo_id").alias("geo_id"),
        col("request_metadata.geo_name").alias("geo_name"),
        col("request_metadata.geo_limit").alias("geo_limit"),
        col("request_metadata.time_trunc").alias("time_trunc"),
        col("request_metadata.ingestion_timestamp").alias("ingestion_timestamp"),
        col("api_response.included").alias("included")
    )
    
    df_included = df_meta.select("*", explode("included").alias("series")).drop("included")
    
    df_step1 = df_included.select(
        "geo_id", "geo_name", "geo_limit", "time_trunc", "ingestion_timestamp",
        col("series.type").alias("series_type"),
        col("series.id").alias("series_id"),
        col("series.attributes.title").alias("series_title"),
        col("series.attributes.last-update").alias("series_last_update"),
        col("series.attributes.description").alias("series_description"),
        col("series.attributes.magnitude").alias("series_magnitude"),
        col("series.attributes.content").alias("content")
    )
    
    df_step2 = df_step1.select("*", explode("content").alias("metric")).drop("content")
    
    df_step3 = df_step2.select(
        "geo_id", "geo_name", "geo_limit", "time_trunc", "ingestion_timestamp",
        "series_type", "series_id", "series_title", "series_last_update", "series_description", "series_magnitude",
        col("metric.type").alias("metric_type"),
        col("metric.id").alias("metric_id"),
        col("metric.groupId").alias("metric_group_id"),
        col("metric.attributes.title").alias("metric_title"),
        col("metric.attributes.description").alias("metric_description"),
        col("metric.attributes.color").alias("metric_color"),
        col("metric.attributes.icon").alias("metric_icon"),
        col("metric.attributes.type").alias("metric_attribute_type"),
        col("metric.attributes.magnitude").alias("metric_magnitude"),
        col("metric.attributes.composite").alias("is_composite"),
        col("metric.attributes.last-update").alias("metric_last_update"),
        col("metric.attributes.values").alias("values")
    )
    
    df_step4 = df_step3.select("*", explode("values").alias("val")).drop("values")
    
    # ✅ FIX FECHAS: Usar parse_datetime_local
    df_final = df_step4.select(
        "geo_id", "geo_name", "geo_limit", "time_trunc",
        "series_type", "series_id", "series_title", "series_last_update", "series_description", "series_magnitude",
        "metric_type", "metric_id", "metric_group_id", "metric_title", "metric_description",
        "metric_color", "metric_icon", "metric_attribute_type", "metric_magnitude", "is_composite", "metric_last_update",
        to_timestamp(parse_datetime_local(col("val.datetime"))).alias("datetime"),  # ✅ CORREGIDO
        col("val.value").cast("double").alias("value"),
        col("val.percentage").cast("double").alias("percentage"),
        col("ingestion_timestamp").cast("timestamp").alias("ingestion_timestamp")
    )
    
    return df_final.filter(col("datetime").isNotNull() & col("value").isNotNull())


def process_standard_widget(df_raw, category, widget, time_trunc):
    """Procesa widgets estándar (sin content anidado): demanda, generacion, mercados"""
    df_meta = df_raw.select(
        col("request_metadata.geo_id").alias("geo_id"),
        col("request_metadata.geo_name").alias("geo_name"),
        col("request_metadata.geo_limit").alias("geo_limit"),
        col("request_metadata.time_trunc").alias("time_trunc"),
        col("request_metadata.ingestion_timestamp").alias("ingestion_timestamp"),
        col("api_response.included").alias("included")
    )
    
    df_included = df_meta.select("*", explode("included").alias("series")).drop("included")
    
    df_step1 = df_included.select(
        "geo_id", "geo_name", "geo_limit", "time_trunc", "ingestion_timestamp",
        col("series.type").alias("series_type"),
        col("series.id").alias("series_id"),
        col("series.groupId").alias("series_group_id"),
        col("series.attributes.title").alias("series_title"),
        col("series.attributes.description").alias("series_description"),
        col("series.attributes.color").alias("series_color"),
        col("series.attributes.icon").alias("series_icon"),
        col("series.attributes.type").alias("series_attribute_type"),
        col("series.attributes.magnitude").alias("series_magnitude"),
        col("series.attributes.composite").alias("is_composite"),
        col("series.attributes.last-update").alias("series_last_update"),
        col("series.attributes.values").alias("values")
    )
    
    df_step2 = df_step1.select("*", explode("values").alias("val")).drop("values")
    
    # ✅ FIX FECHAS: Usar parse_datetime_local
    df_final = df_step2.select(
        "geo_id", "geo_name", "geo_limit", "time_trunc",
        "series_type", "series_id", "series_group_id", "series_title", "series_description",
        "series_color", "series_icon", "series_attribute_type", "series_magnitude",
        "is_composite", "series_last_update",
        to_timestamp(parse_datetime_local(col("val.datetime"))).alias("datetime"),  # ✅ CORREGIDO
        col("val.value").cast("double").alias("value"),
        col("val.percentage").cast("double").alias("percentage"),
        col("ingestion_timestamp").cast("timestamp").alias("ingestion_timestamp")
    )
    
    return df_final.filter(col("datetime").isNotNull() & col("value").isNotNull())


def process_widget_by_time_trunc(category, widget, time_trunc):
    """
    Procesa un widget específico para un time_trunc dado
    ✅ SOLO PROCESA 'month' - ignora 'day'
    """
    if time_trunc != "month":
        return None  # ✅ SOLO MENSUALES
    
    print(f"\n📦 {category}/{widget}/{time_trunc}")
    
    api_name = f"{category}/{widget}/{time_trunc}"
    json_paths = get_new_json_files(category, widget, time_trunc)
    
    if not json_paths:
        print(f"  ✅ Sin archivos nuevos")
        return None
    
    print(f"  📄 {len(json_paths)} archivos nuevos a procesar")
    
    df_raw = spark.read.option("multiline", "true").json(json_paths)
    
    # Extraer y guardar metadata
    metadata = extract_metadata(df_raw, category, widget, time_trunc)
    if metadata:
        save_metadata(metadata)
    
    # Procesar según la estructura del widget
    if category == "balance" and widget == "balance-electrico":
        df_final = process_balance_electrico(df_raw, category, widget, time_trunc)
    else:
        df_final = process_standard_widget(df_raw, category, widget, time_trunc)
    
    # Nombre de tabla con sufijo time_trunc
    table_name = f"brz_redata_{category}_{widget}_{time_trunc}".replace("-", "_")
    
    df_final.write.format("delta") \
        .mode("append") \
        .option("mergeSchema", "true") \
        .saveAsTable(table_name)
    
    count = df_final.count()
    print(f"  ✅ {count} registros añadidos a {table_name}")
    
    ingestion_ts = dt.now()  
    for json_path in json_paths:
        save_json_log(
            api_name=api_name,
            source_file=json_path,
            target_table=table_name,
            ingestion_date=ingestion_ts,
            lakehouse_name=lakehouse_name
        )
    
    return table_name

StatementMeta(, 6669dea6-fad1-497f-a439-dbf04a00876f, 7, Finished, Available, Finished)

In [None]:
# ============================================
# Main
# ============================================

def main():
    print("🚀 JSON → Delta v3 (SOLO MENSUALES + FIX FECHAS)")
    print(f"📂 {bronze_path_param}\n")
    
    if category_param and widget_param:
        categories_to_process = [(category_param, [widget_param])]
    elif category_param:
        widgets = list_widgets(category_param)
        categories_to_process = [(category_param, widgets)]
    else:
        categories = list_categories()
        categories_to_process = [(cat, list_widgets(cat)) for cat in categories]
    
    tables_updated = []
    
    for category, widgets in categories_to_process:
        for widget in widgets:
            # ✅ SOLO MENSUALES
            try:
                table = process_widget_by_time_trunc(category, widget, "month")
                if table:
                    tables_updated.append(table)
            except Exception as e:
                print(f"  ⛔ Error en {category}/{widget}/month: {str(e)}")
    
    print(f"\n📊 Resumen:")
    print(f"  - Tablas actualizadas: {len(tables_updated)}")
    print(f"  - Tablas únicas: {len(set(tables_updated))}")
    if tables_updated:
        print(f"  - Tablas: {', '.join(sorted(set(tables_updated)))}")
    
    return {"tables_updated": len(tables_updated), "tables": list(set(tables_updated))}


if __name__ == "__main__":
    main()