## 📘 Arquitectura Medallion aplicada a un Lakehouse en Databricks


🛠️ Flujo de datos "" Bronce → Plata → Oro "" usando Delta Lake y Volúmenes de Unity Catalog

🖋️ Autor: Brayan R. Neciosup Bolaños

🎯 Objetivo
Aplicar la arquitectura Medallion construyendo un flujo de datos limpio, modular y gobernable sobre Databricks, empleando volúmenes, tablas Delta y transformaciones Spark SQL.

In [0]:
"""
    💡 COMO SE UTILIZARÁ EL SERVICIO DE UNTY CATALOG, CREAREMOS UN CATALAGO QUE ALMACENARA TODAS LAS CAPAS
        DE LA ARQUITECTURA MEDALLION EN ESQUEMAS INDEPENDIENTES. 
"""
from pyspark.sql import SparkSession # Puerta de acceso a todas las funcionalidades de apache spark
from pyspark.sql.functions import * # Funciones SQL
from pyspark.sql.types import * # Funciones de tipos de datos
spark = SparkSession.builder.appName("LakehouseDatabricksMedallion").getOrCreate()

###✅ CREACIÓN DEL CATÁLAGO
spark.sql("CREATE CATALOG IF NOT EXISTS lakehouse")
print("Catálago creado para almacenar la Arquitectura Medallion")

#### Capa Bronce (Raw Layer) 🥉

In [0]:
"""
  📝 BRONZE LAYER 🥉
"""
## 1️⃣ CREAMOS ESQUEMA DENTRO DEL CATÁLAGO >>LAKEHOUSE<< QUE REPRESENTARÁ BRONZE LAYER
spark.sql("CREATE SCHEMA IF NOT EXISTS lakehouse.bronze_layer")
print("Se creó la capa BRONZE LAYER")
ventas = spark.table("workspace.default.ventas") ### ⬅️ CARGAMOS ARCHIVO DE FUENTE DE DATO.
# ventas.show(6)

## 2️⃣ ALMACENAMOS EL ARCHIVO >>VENTAS<< EN BRONZE LAYER
ventas.write.format("delta").mode("overwrite").saveAsTable("lakehouse.bronze_layer.ventas_bronze")
print("Archivo ventas almacenado en la capa BRONZE LAYER")

## 3️⃣ VERFIFICAMOS LOS TIPOS DE DATOS POR COLUMNA.
ventas_bronze_layer = spark.sql("SELECT * FROM lakehouse.bronze_layer.ventas_bronze")
# ventas_bronze_layer.printSchema()

## 4️⃣ VERIFICAMOS DATOS NULOS EN EL DATASET.
ventas_bronze_layer_nulos = ventas_bronze_layer.select([
    sum(when(col(i).isNull(),1).otherwise(0)).alias(i)
    for i in ventas_bronze_layer.columns
])
# ventas_bronze_layer_nulos.show()

## 🗒️ OBSERVACIONES
## 📌 Los tipos de datos encontrados por columna fueron:
        # |-- id_venta: long (nullable = true)
        # |-- producto: string (nullable = true)
        # |-- categoria: string (nullable = true)
        # |-- cantidad: long (nullable = true)
        # |-- precio_unitario: double (nullable = true)
        # |-- fecha_venta: date (nullable = true)
## 📌 No existen valores nulos en todo el dataset.
print("CÓDIGO DE BRONZE LAYER FINALIZADO")

#### Capa Plata (Staging Layer) 🥈

In [0]:
"""
  📝 SILVER LAYER 🥈
"""
## 1️⃣ CREAMOS ESQUEMA DENTRO DEL CATÁLAGO >>LAKEHOUSE<< QUE REPRESENTARÁ SILVER LAYER
spark.sql("CREATE SCHEMA IF NOT EXISTS lakehouse.silver_layer")
print("Se creó la capa SILVER LAYER")

## 2️⃣ LEEMOS DATOS DEL ARCHIVO ALMACENADO EN BRONZE LAYER 🥉
ventas_bronze = spark.sql("SELECT * FROM lakehouse.bronze_layer.ventas_bronze")
# ventas_bronze.show(6)

## 3️⃣ FILTRAMOS VALORES NULOS (ESTE PASO SE COLOCARÁ DE EJEMPLO, PORQUE ANTERIORMENTE VIMOS QUE NO EXISTÍAN NULOS)
##---- VERFIFICAMOS NULOS
nulos_ventas_bronze = ventas_bronze.select([
  sum(when(col(i).isNull(),1).otherwise(0)).alias(i)
  for i in ventas_bronze.columns
])
# nulos_ventas_bronze.show(5)
##---- FILTRAMOS NULOS Y CREAMOS NUEVO ARCHIVO DE VENTAS
ventas_silver_layer = ventas_bronze.select([
  when(col(i).isNotNull(),col(i)).alias(i)
  for i in ventas_bronze.columns
])

## 4️⃣ ESTANDARIZAMOS VALORES EN LAS COLUMNAS
#------ Quitamos guiones entre la información y capitalizamos.
import re
ventas_silver_layer = ventas_silver_layer.select([
    col(re.sub(r'\s',"_",i).title()).alias(i) 
    for i in ventas_silver_layer.columns
])
# ventas_silver_layer.show(5)

#------ Creamos columna adicional para calcular el total de venta
ventas_silver_layer = ventas_silver_layer.withColumn(
  "total_venta",
  round((col("cantidad")*col("precio_unitario")),2)
)
# ventas_silver_layer.show(5)
ventas_silver_layer.write.format("delta").saveAsTable("lakehouse.silver_layer.ventas_silver")
print("TABLA SILVER LISTA PARA SER UTILZIADA EN LA SIGUIENTE CAPA")
## 🗒️ OBSERVACIONES
## 📌 No existen valores nulos en todo el dataset.
## 📌 Estandarizamos valores en las columnas.
## 📌 Creamos columna adicional para calcular el total de venta.
print("CÓDIGO DE SILVER LAYER FINALIZADO")

#### Capa Oro (Curated Layer) 🥇

In [0]:
"""
  📝 GOLD LAYER 🥇
"""
## 1️⃣ CREAMOS ESQUEMA DENTRO DEL CATÁLAGO >>LAKEHOUSE<< QUE REPRESENTARÁ GOLD LAYER
spark.sql("CREATE SCHEMA IF NOT EXISTS lakehouse.gold_layer")
print("Se creó la capa GOLD LAYER")

## 2️⃣ LEEMOS DATOS DEL ARCHIVO ALMACENADO EN SILVER LAYER 🥉
ventas_silver = spark.sql("SELECT * FROM lakehouse.silver_layer.ventas_silver")
# ventas_silver.show(5)
## 3️⃣ GENERAMOS MÉTRICAS RICAS EN CONOCIMIENTO
"""
  Realizamos los requerimientos que pueden ser consumidos en dashboards o reportes.
  Como sabrás, esta capa está orientada a información rica en conocimiento, lista
  para métricas rápidas y analítica avanzada.
"""
##---------- Total de ventas y Promedio de ventas por categoría 📊
resumen_categorias = ventas_silver.groupBy("categoria").agg(
  round(sum(col("total_venta")),2).alias("total_ventas_categorias"),
  round(avg(col("total_venta")),2).alias("avg_total_ventas_categorias")
)
# resumen_categorias.show()

##---------- Top 5 productos con mayor cantidad vendidas 📊
from pyspark.sql.window import Window ##⬅️ Permite trabajar con ventanas deslizantes
window_especificacion = Window.orderBy(col("total_venta").desc())
ranking_productos = ventas_silver.select(
  "*",
  rank().over(window_especificacion).alias("ranking_productos")
)
# ranking_productos.show() ## Ranking de todos los productos

top_5_productos = ranking_productos.filter(
  (col("ranking_productos")<=5)
)
# top_5_productos.show()

## 4️⃣ ALMACENAMOS AMBAS METRICAS EN GOLD LAYER, PARA UN CONSUMO POSTERIOR 
resumen_categorias.write.format("delta").mode("overwrite").saveAsTable("lakehouse.gold_layer.metricas_categorias")
print("Resumen de ventas de categorías almacenadas en GOLD LAYER")
ranking_productos.write.format("delta").mode("overwrite").saveAsTable("lakehouse.gold_layer.metrica_ranking_productos")
print("Ranking de productos con mayor monto de venta almacenados en GOLD LAYER")
top_5_productos.write.format("delta").mode("overwrite").saveAsTable("lakehouse.gold_layer.metrica_top_5_productos")
print("Top 5 productos con mayor monto de venta almacenados en GOLD LAYER")
print("CÓDIGO DE GOLD LAYER FINALIZADO")