In [1]:
# Instalar librer√≠as
%pip install google-genai pandas openpyxl pdfplumber unidecode

import os
import json
import re
import time
import pandas as pd
import pdfplumber
from unidecode import unidecode
from google import genai
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, max as max_col, trim, upper, when, lit, udf, current_timestamp
from pyspark.sql.types import StringType
from notebookutils import mssparkutils 

# Iniciar Spark
spark = SparkSession.builder.getOrCreate()

# --- üìç RUTAS CORREGIDAS (Directo a la ra√≠z) ---
# Apuntamos a Files/bronze directamente, sin la carpeta intermedia 'lakehouse'
BASE_LAKE = "/lakehouse/default/Files" 

BRONZE_PATH = f"{BASE_LAKE}/bronze"
ARCHIVE_PATH = f"{BASE_LAKE}/bronze/procesados" 
SILVER_PATH = f"{BASE_LAKE}/silver" # Solo referencia por si acaso

# Asegurar carpeta de archivado
try:
    mssparkutils.fs.mkdirs(ARCHIVE_PATH)
except:
    pass

# --- üîë API KEY ---
os.environ["GEMINI_API_KEY"] = "AIzaSyDs_ZZ6xPugNrw1fRAMA7JgP_MpQlZhqBk" 

print(f"‚úÖ Entorno listo.\nüìÇ Origen (Bronze): {BRONZE_PATH}\nüìÇ Archive: {ARCHIVE_PATH}")

StatementMeta(, cf145edb-5df2-4ea7-83b4-7bc6209b2520, 8, Finished, Available, Finished)

Collecting google-genai
  Downloading google_genai-1.52.0-py3-none-any.whl.metadata (46 kB)
[?25l     [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m0.0/46.8 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m46.8/46.8 kB[0m [31m2.2 MB/s[0m eta [36m0:00:00[0m
Collecting pdfplumber
  Downloading pdfplumber-0.11.8-py3-none-any.whl.metadata (43 kB)
[?25l     [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m0.0/43.6 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m43.6/43.6 kB[0m [31m6.3 MB/s[0m eta [36m0:00:00[0m
Collecting anyio<5.0.0,>=4

In [2]:
# --- FUNCI√ìN DE LIMPIEZA DE TEXTO ---
def normalizar_texto(texto):
    """Limpia nombres de productos para mejorar cruces"""
    if not texto: return None
    # 1. May√∫sculas y sin tildes
    t = unidecode(str(texto)).upper().strip()
    
    # 2. ARREGLAR LAS 'X' (Ej: "2 x 4" -> "2X4")
    t = re.sub(r'(\d)\s*[xX]\s*(\d)', r'\1X\2', t)
    
    # 3. Solo dejar caracteres seguros
    t = re.sub(r'[^A-Z0-9\s\.\/]', '', t) 
    t = re.sub(r'\s+', ' ', t)
    return t

# Registrar como UDF para Spark
udf_normalizar = udf(normalizar_texto, StringType())

# --- FUNCI√ìN DE EXTRACCI√ìN CON GEMINI ---
def extraer_items_con_ia(texto_pdf, nombre_archivo):
    """Usa Gemini 2.5 Flash para sacar datos estructurados del PDF"""
    client = genai.Client(api_key=os.environ["GEMINI_API_KEY"])
    
    prompt = f"""
    Analiza este documento comercial (Orden de Compra/Proforma).
    Extrae la lista de productos con su PRECIO UNITARIO.
    
    Archivo: {nombre_archivo}
    Texto:
    {texto_pdf[:8000]} 
    
    Responde SOLO un JSON con esta estructura de lista:
    [
      {{"producto": "Nombre del item", "precio": 10.50}},
      {{"producto": "Otro item", "precio": 15.00}}
    ]
    Ignora items con precio 0 o vac√≠os.
    """
    
    try:
        response = client.models.generate_content(
            model="gemini-2.5-flash",
            contents=prompt,
            config={"response_mime_type": "application/json"}
        )
        return json.loads(response.text)
    except Exception as e:
        print(f"‚ö†Ô∏è Error IA en {nombre_archivo}: {e}")
        return []

print("‚úÖ Funciones auxiliares cargadas.")

StatementMeta(, cf145edb-5df2-4ea7-83b4-7bc6209b2520, 10, Finished, Available, Finished)

‚úÖ Funciones auxiliares cargadas.


In [3]:
def mover_a_procesados(nombre_archivo):
    """Mueve el archivo para no procesarlo doble (Corregido con prefijo file:)"""
    # Fabric requiere 'file:' al inicio para movimientos internos
    origen = f"file:{BRONZE_PATH}/{nombre_archivo}"
    destino = f"file:{ARCHIVE_PATH}/{nombre_archivo}"
    try:
        mssparkutils.fs.mv(origen, destino) 
        print(f"   üì¶ Archivado: {nombre_archivo}")
    except Exception as e:
        # Si falla moverlo, no rompemos el flujo, solo avisamos
        print(f"   ‚ö†Ô∏è No se pudo mover (pero s√≠ se proces√≥): {e}")

def etapa_bronze_a_silver():
    print("\nüî® --- ETAPA 1: PROCESANDO ARCHIVOS ---")
    
    # --- 1. MAESTRO ---
    print("üìÇ Buscando Maestro...")
    files = os.listdir(BRONZE_PATH)
    maestro_files = [f for f in files if "maestro" in f.lower()]
    
    if maestro_files:
        archivo = maestro_files[0]
        path = f"{BRONZE_PATH}/{archivo}"
        print(f"   -> Leyendo Maestro: {archivo}")
        try:
            if archivo.endswith(".csv"):
                pdf = pd.read_csv(path, encoding='utf-8', on_bad_lines='skip')
            else:
                pdf = pd.read_excel(path)
            
            # Normalizaci√≥n
            pdf = pdf.iloc[:, [0, 1]]
            pdf.columns = ['producto', 'precio_base']
            if pdf['precio_base'].dtype == 'O':
                pdf['precio_base'] = pdf['precio_base'].astype(str).str.replace('$', '').str.replace(',', '')
            pdf['precio_base'] = pd.to_numeric(pdf['precio_base'], errors='coerce').fillna(0)
            
            df_maestro = spark.createDataFrame(pdf)
            df_maestro.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable("silver_maestro_productos")
            print("   ‚úÖ Tabla 'silver_maestro_productos' actualizada.")
        except Exception as e:
            print(f"   ‚ùå Error leyendo maestro: {e}")

    # --- 2. NUEVAS VENTAS (PDFs) ---
    print("üïµÔ∏è‚Äç‚ôÄÔ∏è Buscando √ìrdenes y Proformas (PDFs)...")
    pdf_files = [f for f in files if f.lower().endswith(".pdf")]
    
    ventas = []
    if not pdf_files:
        print("   (No hay documentos nuevos)")
    else:
        for i, f in enumerate(pdf_files):
            path = f"{BRONZE_PATH}/{f}"
            print(f"   [{i+1}/{len(pdf_files)}] Procesando: {f}...")
            
            try:
                texto = ""
                with pdfplumber.open(path) as pdf:
                    for page in pdf.pages: texto += page.extract_text() or ""
                
                if texto:
                    # LLAMADA A LA IA
                    items = extraer_items_con_ia(texto, f)
                    
                    # --- EL FRENO DE MANO (IMPORTANTE) ---
                    # Esperamos 10 segundos entre cada llamada para no enojar a Google
                    print("      ‚è≥ Esperando 10s para respetar l√≠mites de API...")
                    time.sleep(10) 
                    
                    for item in items:
                        if item.get('producto'):
                            ventas.append({
                                "producto": item['producto'],
                                "precio_detectado": float(item.get('precio', 0)),
                                "origen": f,
                                "fecha_carga": time.strftime("%Y-%m-%d")
                            })
                    
                    mover_a_procesados(f)
            except Exception as e:
                print(f"   ‚ùå Error en {f}: {e}")
                time.sleep(5) # Esperamos un poco si falla

        # Guardar Historial
        if ventas:
            df_ventas = spark.createDataFrame(pd.DataFrame(ventas))
            df_ventas.write.format("delta").mode("append").option("mergeSchema", "true").saveAsTable("silver_historial_ventas")
            print(f"   ‚úÖ {len(ventas)} items agregados a 'silver_historial_ventas'.")

print("‚úÖ Funciones de Etapa 1 listas (Con freno de mano).")

StatementMeta(, cf145edb-5df2-4ea7-83b4-7bc6209b2520, 11, Finished, Available, Finished)

‚úÖ Funciones de Etapa 1 listas (Con freno de mano).


In [4]:
def etapa_silver_a_gold():
    print("\nü•á --- ETAPA 2: GENERANDO GOLD (FUSI√ìN) ---")
    
    # 1. ESTRATEGIA DE MEMORIA (Feedback Loop)
    try:
        df_base = spark.table("gold_maestro_inteligente")
        print("   üß† Memoria activada: Usando conocimiento previo de Gold.")
        df_maestro = df_base.select(col("Producto").alias("producto"), col("Precio_Final").alias("precio_base"))
    except:
        print("   üå± Inicio fresco: Usando CSV Maestro original (Silver).")
        try:
            df_maestro = spark.table("silver_maestro_productos")
        except:
            print("‚ùå Error cr√≠tico: No hay datos ni en Gold ni en Silver.")
            return

    # 2. Cargar Ventas Acumuladas
    try:
        df_ventas = spark.table("silver_historial_ventas")
    except:
        print("‚ö†Ô∏è No hay historial de ventas. El Gold ser√° igual al Maestro.")
        df_ventas = None

    # 3. Normalizaci√≥n y Cruce
    df_maestro = df_maestro.withColumn("producto_norm", udf_normalizar(col("producto")))
    
    if df_ventas is not None:
        df_ventas = df_ventas.withColumn("producto_norm", udf_normalizar(col("producto")))

        df_precios_mercado = df_ventas.groupBy("producto_norm").agg(
            max_col("precio_detectado").alias("precio_mercado_max"),
            max_col("producto").alias("nombre_ejemplo")
        )

        df_final = df_maestro.join(
            df_precios_mercado, 
            df_maestro.producto_norm == df_precios_mercado.producto_norm, 
            "full_outer"
        )
    else:
        df_final = df_maestro.withColumn("precio_mercado_max", lit(None)).withColumn("nombre_ejemplo", lit(None))

    # 4. Reglas Gold: "El Precio M√°s Alto Gana"
    df_gold = df_final.withColumn(
        "Producto", 
        when(col("producto").isNotNull(), col("producto")).otherwise(col("nombre_ejemplo"))
    ).withColumn(
        "Precio_Final",
        when(
            (col("precio_base") > 0) & (col("precio_mercado_max") > col("precio_base")),
            col("precio_mercado_max") 
        ).when(
            col("precio_base") > 0, col("precio_base")
        ).otherwise(col("precio_mercado_max")) 
    ).withColumn(
        "Estado",
        when(col("precio_base").isNull(), lit("NUEVO DEL MERCADO"))
        .when((col("precio_mercado_max") > col("precio_base")), lit("PRECIO ACTUALIZADO"))
        .otherwise(lit("VIGENTE"))
    ).withColumn("Fecha_Actualizacion", current_timestamp())

    # 5. Guardar Gold Definitivo
    gold_final = df_gold.select("Producto", "Precio_Final", "Estado", "Fecha_Actualizacion")
    
    gold_final.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable("gold_maestro_inteligente")
    
    print("‚úÖ TABLA FINAL GENERADA:")
    gold_final.show(10, truncate=False)
    print("üíæ Tabla 'gold_maestro_inteligente' lista para Power BI.")

print("‚úÖ Funciones de Etapa 2 listas.")

StatementMeta(, cf145edb-5df2-4ea7-83b4-7bc6209b2520, 12, Finished, Available, Finished)

‚úÖ Funciones de Etapa 2 listas.


In [5]:
# Ejecutar el Pipeline Completo
etapa_bronze_a_silver()
etapa_silver_a_gold()

StatementMeta(, cf145edb-5df2-4ea7-83b4-7bc6209b2520, 13, Finished, Available, Finished)


üî® --- ETAPA 1: PROCESANDO ARCHIVOS ---
üìÇ Buscando Maestro...
   -> Leyendo Maestro: maestro_productos.csv
   ‚úÖ Tabla 'silver_maestro_productos' actualizada.
üïµÔ∏è‚Äç‚ôÄÔ∏è Buscando √ìrdenes y Proformas (PDFs)...
   [1/32] Procesando: PROFORMA 7346.pdf...
      ‚è≥ Esperando 10s para respetar l√≠mites de API...
   ‚ö†Ô∏è No se pudo mover (pero s√≠ se proces√≥): An error occurred while calling z:notebookutils.fs.mv.
: org.apache.hadoop.fs.PathNotFoundException: `file:/lakehouse/default/Files/bronze/procesados/PROFORMA 7346.pdf not found': No such file or directory
	at com.microsoft.spark.notebook.msutils.impl.MSFsUtilsImpl.$anonfun$mv$2(MSFsUtilsImpl.scala:350)
	at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
	at com.microsoft.spark.notebook.msutils.impl.MSFsUtilsImpl.fsTSG(MSFsUtilsImpl.scala:223)
	at com.microsoft.spark.notebook.msutils.impl.MSFsUtilsImpl.$anonfun$mv$1(MSFsUtilsImpl.scala:331)
	at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunctio