In [0]:
# Celda 1: Instalar librerías esenciales
%pip install snowflake-connector-python==3.2.0 pyarrow==10.0.1
dbutils.library.restartPython()  # Reinicia el kernel para aplicar cambios

Python interpreter will be restarted.
Python interpreter will be restarted.


In [0]:
# Celda 2: Configurar conexión
sf_options = {
    "sfUrl": "NDVNNHP-ZM24191.snowflakecomputing.com",
    "sfUser": "KENDERAG",
    "sfPassword": "Hp7g33snkm64alqxlixn",
    "sfDatabase": "INV_DB",
    "sfSchema": "PUBLIC",
    "sfWarehouse": "COMPUTE_WH"
}

# Prueba de conexión rápida
try:
    test_df = spark.read.format("snowflake") \
        .options(**sf_options) \
        .option("query", "SELECT 1 AS conexion_exitosa") \
        .load()
    display(test_df)  # Debe mostrar "1" si todo está bien
except Exception as e:
    print(f"❌ Error de conexión: {str(e)}")
    print("Verifica: 1) Credenciales, 2) Warehouse activo, 3) Internet")

CONEXION_EXITOSA
1


In [0]:
# Celda 3: Cargar datos de origen
try:
    df = spark.read.format("snowflake") \
        .options(**sf_options) \
        .option("dbtable", "TECH_INVENTORY") \
        .load()
    print(f"✅ Filas cargadas: {df.count()}")
    display(df.limit(5))  # Muestra las primeras filas
except Exception as e:
    print(f"❌ Error al leer datos: {str(e)}")
    # Guarda un backup local por si falla Snowflake
    df.write.format("csv").save("/FileStore/backup_tech_inventory.csv")

✅ Filas cargadas: 150000


PRODUCT_ID,PRODUCT_NAME,CATEGORY,BRAND,PRICE,CURRENT_STOCK,DAILY_SALES_AVG,REGION,STORE_ID,STOCK_STATUS,ABC_CLASS,LAST_RESTOCK_DATE
P00001,Samsung Laptop Pm,Laptop,Samsung,273.86,242,7,SOUTH,STORE_SOUTH_2,EXCESS,B,2025-03-23T19:04:57.589+0000
P00002,Samsung Smartwatch Civil,Smartwatch,Samsung,1976.55,546,28,NORTH,STORE_NORTH_2,OK,A,2025-02-05T19:04:57.590+0000
P00003,Sony Laptop Skin,Laptop,Sony,943.74,186,17,SOUTH,STORE_SOUTH_2,OK,A,2025-02-05T19:04:57.590+0000
P00004,Sony Smartwatch Back,Smartwatch,Sony,1418.8,131,17,WEST,STORE_WEST_1,OK,A,2025-03-02T19:04:57.590+0000
P00005,Apple Tablet Material,Tablet,Apple,544.64,135,18,NORTH,STORE_NORTH_3,OK,A,2025-02-11T19:04:57.591+0000


In [0]:
# Celda 4: Calcular métricas clave
from pyspark.sql.functions import col, when, round

try:
    df_transform = df.withColumn(
        "dias_stock_restante",
        when(col("DAILY_SALES_AVG") > 0, round(col("CURRENT_STOCK") / col("DAILY_SALES_AVG"), 2))
    ).withColumn(
        "priority_flag",
        (col("ABC_CLASS") == "A") & (col("STOCK_STATUS") == "RISK")
    )
    
    display(df_transform.limit(5))  # Verifica resultados
except Exception as e:
    print(f"❌ Error en transformaciones: {str(e)}")

PRODUCT_ID,PRODUCT_NAME,CATEGORY,BRAND,PRICE,CURRENT_STOCK,DAILY_SALES_AVG,REGION,STORE_ID,STOCK_STATUS,ABC_CLASS,LAST_RESTOCK_DATE,dias_stock_restante,priority_flag
P00001,Samsung Laptop Pm,Laptop,Samsung,273.86,242,7,SOUTH,STORE_SOUTH_2,EXCESS,B,2025-03-23T19:04:57.589+0000,34.57,False
P00002,Samsung Smartwatch Civil,Smartwatch,Samsung,1976.55,546,28,NORTH,STORE_NORTH_2,OK,A,2025-02-05T19:04:57.590+0000,19.5,False
P00003,Sony Laptop Skin,Laptop,Sony,943.74,186,17,SOUTH,STORE_SOUTH_2,OK,A,2025-02-05T19:04:57.590+0000,10.94,False
P00004,Sony Smartwatch Back,Smartwatch,Sony,1418.8,131,17,WEST,STORE_WEST_1,OK,A,2025-03-02T19:04:57.590+0000,7.71,False
P00005,Apple Tablet Material,Tablet,Apple,544.64,135,18,NORTH,STORE_NORTH_3,OK,A,2025-02-11T19:04:57.591+0000,7.5,False


In [0]:
# Celda 5: Escribir resultados en Snowflake
try:
    df_transform.write.format("snowflake") \
        .options(**sf_options) \
        .option("dbtable", "INVENTORY_ANALYTICS") \
        .mode("overwrite") \
        .save()
    print("✅ Datos guardados en INVENTORY_ANALYTICS")
    
    # Verificación adicional
    df_check = spark.read.format("snowflake") \
        .options(**sf_options) \
        .option("query", "SELECT COUNT(*) AS filas FROM INVENTORY_ANALYTICS") \
        .load()
    display(df_check)
except Exception as e:
    print(f"❌ Error al guardar: {str(e)}")
    # Respaldar en DBFS por si falla Snowflake
    df_transform.write.format("parquet").save("/FileStore/backup_inventory_analytics")

✅ Datos guardados en INVENTORY_ANALYTICS


FILAS
150000


In [0]:
# Celda 6: Resumen final
print("🎉 Resumen del ETL:")
print(f"- Filas procesadas: {df_transform.count()}")
print(f"- Columnas generadas: {len(df_transform.columns)}")
print(f"- Priority Flags (True): {df_transform.filter(col('priority_flag') == True).count()}")

🎉 Resumen del ETL:
- Filas procesadas: 150000
- Columnas generadas: 14
- Priority Flags (True): 6373
