### Desarrollo Taller 04 - Databricks
#### Mario Alonso Vento Alvarado

####Arquitectura Medallion

In [0]:
catalog_name = "dmc_taller04"
schema_bronze = "bronze"
schema_silver = "silver"
schema_gold = "gold"

In [0]:
spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalog_name}")

In [0]:
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog_name}.{schema_bronze}")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog_name}.{schema_silver}")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog_name}.{schema_gold}")

In [0]:
spark.sql(f"CREATE VOLUME IF NOT EXISTS {catalog_name}.default.input")

### Bronze

In [0]:
spark.sql(f"""
CREATE OR REPLACE TABLE {catalog_name}.{schema_bronze}.clientes_raw(
  id_cliente string,
  nombre string,
  apellido string,
  email string,
  segmento string,
  fecha_registro string,
  ingest_at timestamp
)
"""
)

In [0]:
spark.sql(f"""
CREATE OR REPLACE TABLE {catalog_name}.{schema_bronze}.envios_base_raw(
  id_envio string,
  id_linea string,
  id_cliente string,
  id_sucursal string,
  id_ruta string,
  fecha_envio string,
  estado string,
  peso_kg string,
  costo_envio string,
  updated_at string,
  ingest_at timestamp
)
"""
)

In [0]:
spark.sql(f"""
CREATE OR REPLACE TABLE {catalog_name}.{schema_bronze}.rutas_raw(
  id_ruta string,
  origen string,
  destino string,
  distancia_km string,
  tiempo_estimado_horas string,
  ingest_at timestamp
)
"""
)

In [0]:
spark.sql(f"""
CREATE OR REPLACE TABLE {catalog_name}.{schema_bronze}.sucursales_raw(
  id_sucursal string,
  ciudad string,
  distrito string,
  region string,
  tipo string,
  fecha_apertura string,
  ingest_at timestamp
)
"""
)

In [0]:
spark.sql(f"""
CREATE OR REPLACE TABLE {catalog_name}.{schema_bronze}.envios_incremento_raw(
  id_envio string,
  id_linea string,
  id_cliente string,
  id_sucursal string,
  id_ruta string,
  fecha_envio string,
  estado string,
  peso_kg string,
  costo_envio string,
  updated_at string,
  ingest_at timestamp
)
"""
)

In [0]:
clientes.write.format("delta").mode("overwrite").saveAsTable(f"{catalog_name}.{schema_bronze}.clientes_raw")
envios_base.write.format("delta").mode("overwrite").saveAsTable(f"{catalog_name}.{schema_bronze}.envios_base_raw")
rutas.write.format("delta").mode("overwrite").saveAsTable(f"{catalog_name}.{schema_bronze}.rutas_raw")
sucursales.write.format("delta").mode("overwrite").saveAsTable(f"{catalog_name}.{schema_bronze}.sucursales_raw")
envios_incremento.write.format("delta").mode("overwrite").saveAsTable(f"{catalog_name}.{schema_bronze}.envios_incremento_raw")

### Silver

In [0]:
from pyspark.sql.functions import col, trim, initcap, when, lit, year, month, avg, count, lag, sum as spark_sum, row_number
from pyspark.sql.window import Window

In [0]:
path_base = "/Volumes/dmc_taller04/default/input"

path_clientes = f"{path_base}/clientes.csv"
path_envios_base = f"{path_base}/envios_base.csv"
path_rutas = f"{path_base}/rutas.csv"
path_sucursales = f"{path_base}/sucursales.csv"
path_envios_incremento = f"{path_base}/envios_incremento.csv"

In [0]:
# Lectura
clientes = (
    spark.read.option("header", True).csv(path_clientes)
    .withColumn("ingest_at", current_timestamp())
)

envios_base = (
    spark.read.option("header", True).csv(path_envios_base)
    .withColumn("ingest_at", current_timestamp())
)

rutas = (
    spark.read.option("header", True).csv(path_rutas)
    .withColumn("ingest_at", current_timestamp())
)

sucursales = (
    spark.read.option("header", True).csv(path_sucursales)
    .withColumn("ingest_at", current_timestamp())
)

envios_incremento = (
    spark.read.option("header", True).csv(path_envios_incremento)
    .withColumn("ingest_at", current_timestamp())
)

In [0]:
# Clientes (sin nulos, nombres normalizados)
silver_clientes = (
    spark.table(f"{catalog_name}.{schema_bronze}.clientes_raw")
    .withColumn("nombre", initcap(trim(col("nombre"))))
    .withColumn("apellido", initcap(trim(col("apellido"))))
    .dropna(subset=["id_cliente"])
    .dropDuplicates(["id_cliente"])
)

In [0]:
# Rutas (distancia y tiempo estimado normalizados)
silver_rutas = (
    spark.table(f"{catalog_name}.{schema_bronze}.rutas_raw")
    .withColumn("distancia_km", col("distancia_km").cast("double"))
    .withColumn("tiempo_estimado_horas", col("tiempo_estimado_horas").cast("double"))
    .dropna(subset=["id_ruta"])
    .dropDuplicates(["id_ruta"])
)

In [0]:
# Sucursales (ciudad/distrito/region normalizados)
silver_sucursales = (
    spark.table(f"{catalog_name}.{schema_bronze}.sucursales_raw")
    .withColumn("ciudad", initcap(trim(col("ciudad"))))
    .withColumn("distrito", initcap(trim(col("distrito"))))
    .withColumn("region", initcap(trim(col("region"))))
    .dropna(subset=["id_sucursal"])
    .dropDuplicates(["id_sucursal"])
)

In [0]:
# Envios_base (con monto_envio y tipos correctos)
silver_envios_base = (
    spark.table(f"{catalog_name}.{schema_bronze}.envios_base_raw")
    .withColumn("peso_kg", col("peso_kg").cast("double"))
    .withColumn("costo_envio", col("costo_envio").cast("double"))
    .withColumn("monto_envio", col("costo_envio") * col("peso_kg"))
    .dropna(subset=["id_envio"])
    .dropDuplicates(["id_envio"])
)

In [0]:
display(envios_base)

In [0]:
# Envios_incremento_dedup (deduplicado por BK con updated_at más reciente)
window_spec = Window.partitionBy("id_envio", "id_linea").orderBy(col("updated_at").desc())
silver_envios_incremento_dedup = (
    spark.table(f"{catalog_name}.{schema_bronze}.envios_incremento_raw")
    .withColumn("row_number", row_number().over(window_spec))
    .filter(col("row_number") == 1)
    .drop("row_number")
)

# Aplicar MERGE INTO para consolidar incrementales en Gold
silver_envios_incremento_dedup.createOrReplaceTempView("incremento_dedup")

spark.sql(f"""
MERGE INTO {catalog_name}.{schema_silver}.envios_base AS target
USING incremento_dedup AS source
ON target.id_envio = source.id_envio AND target.id_linea = source.id_linea
WHEN MATCHED THEN
  UPDATE SET
    id_cliente = source.id_cliente,
    id_sucursal = source.id_sucursal,
    id_ruta = source.id_ruta,
    fecha_envio = source.fecha_envio,
    estado = source.estado,
    peso_kg = source.peso_kg,
    costo_envio = source.costo_envio,
    updated_at = source.updated_at,
    ingest_at = source.ingest_at
WHEN NOT MATCHED THEN
  INSERT (
    id_envio,
    id_linea,
    id_cliente,
    id_sucursal,
    id_ruta,
    fecha_envio,
    estado,
    peso_kg,
    costo_envio,
    updated_at,
    ingest_at
  ) VALUES (
    source.id_envio,
    source.id_linea,
    source.id_cliente,
    source.id_sucursal,
    source.id_ruta,
    source.fecha_envio,
    source.estado,
    source.peso_kg,
    source.costo_envio,
    source.updated_at,
    source.ingest_at
  )
""")

In [0]:
display(silver_envios_incremento_dedup)

In [0]:
# Ranking de clientes por número total de envíos
window_spec_rank = Window.orderBy(col("total_envios").desc())
ranking_clientes = (
    silver_envios_base.groupBy("id_cliente")
    .agg(count("id_envio").alias("total_envios"))
    .withColumn("rank", row_number().over(window_spec_rank))
)

In [0]:
display(ranking_clientes)

In [0]:
# Peso acumulado de envíos por cliente
peso_acumulado_clientes = (
    silver_envios_base.groupBy("id_cliente")
    .agg(spark_sum("peso_kg").cast("decimal(10,2)").alias("peso_acumulado"))
)

In [0]:
display(peso_acumulado_clientes)

In [0]:
# Variación de costo promedio de envíos por sucursal entre meses consecutivos
window_spec_variation = Window.partitionBy("id_sucursal").orderBy("year", "month")
costo_promedio_sucursal = (
    silver_envios_base.withColumn("year", year(col("fecha_envio")))
    .withColumn("month", month(col("fecha_envio")))
    .groupBy("id_sucursal", "year", "month")
    .agg(avg("costo_envio").alias("costo_promedio"))
    .withColumn("costo_promedio_anterior", lag("costo_promedio").over(window_spec_variation))
    .withColumn("variacion_costo_promedio", col("costo_promedio") - col("costo_promedio_anterior"))
    .withColumn("costo_promedio", col("costo_promedio").cast("decimal(10,2)"))
    .withColumn("costo_promedio_anterior", col("costo_promedio_anterior").cast("decimal(10,2)"))
    .withColumn("variacion_costo_promedio", col("variacion_costo_promedio").cast("decimal(10,2)"))
)

In [0]:
display(costo_promedio_sucursal)

In [0]:
# Guardado de tablas Silver
silver_clientes.write.format("delta").mode("overwrite").saveAsTable(f"{catalog_name}.{schema_silver}.clientes")
silver_sucursales.write.format("delta").mode("overwrite").saveAsTable(f"{catalog_name}.{schema_silver}.sucursales")
silver_rutas.write.format("delta").mode("overwrite").saveAsTable(f"{catalog_name}.{schema_silver}.rutas")
silver_envios_base.write.format("delta").mode("overwrite").saveAsTable(f"{catalog_name}.{schema_silver}.envios_base")
silver_envios_incremento_dedup.write.format("delta").mode("overwrite").saveAsTable(f"{catalog_name}.{schema_silver}.envios_incremento_dedup")
ranking_clientes.write.format("delta").mode("overwrite").saveAsTable(f"{catalog_name}.{schema_silver}.ranking_clientes")
peso_acumulado_clientes.write.format("delta").mode("overwrite").saveAsTable(f"{catalog_name}.{schema_silver}.peso_acumulado_clientes")
costo_promedio_sucursal.write.format("delta").mode("overwrite").saveAsTable(f"{catalog_name}.{schema_silver}.costo_promedio_sucursal")

### Gold

In [0]:
#se crea la dimensión TIEMPO en base a la data de la tabla q vemos
#q es transaccional (donde hay mayor cantidad de registros)

# eliminar valores nulos = .dropna()
# eliminar valores duplicados = .distinct() || .dropDuplicates(subset=["columna"])
# eliminar columna = .drop("columna")

dim_tiempo = (
    spark.table(f"{catalog_name}.{schema_silver}.matriculas")
    .select(
        col("fecha_matricula").alias("fecha")
    ).dropna().distinct()
    .withColumn("anio", year(col("fecha")))
    .withColumn("mes", month(col("fecha")))
    .withColumn("dia", dayofmonth(col("fecha")))
    .withColumn("semestre", concat_ws("-",col("anio"), when(col("mes") <=7, lit("01")).otherwise(lit("02"))))
    .withColumn("id_tiempo", (col("anio")*10000 + col("mes")*100 + col("dia")).cast("int"))
)

display(dim_tiempo)

In [0]:
dim_estudiante = (
    spark.table(f"{catalog_name}.{schema_silver}.estudiantes")
    .dropDuplicates(["id_estudiante"])
)

display(dim_estudiante)

In [0]:
dim_cursos = (
    spark.table(f"{catalog_name}.{schema_silver}.cursos")
    .dropDuplicates(["id_curso"])
)

display(dim_cursos)

In [0]:
dim_profesor = (
    spark.table(f"{catalog_name}.{schema_silver}.profesores")
    .dropDuplicates(["id_profesor"])
)

display(dim_profesor)

In [0]:
fact_matriculas = (
    spark.table(f"{catalog_name}.{schema_silver}.matriculas").alias("m")
    .join(
        dim_tiempo.alias("t"),
        col("m.fecha_matricula") == col("t.fecha"),
        "left"
    )
    .join(
        dim_cursos.alias("c"),
        col("m.id_curso") == col("c.id_curso"),
        "left"
    )
    .select(
        col("m.id_matricula"),
        col("t.id_tiempo"),
        col("m.id_curso"),
        col("m.id_estudiante"),
        col("m.id_profesor"),
        col("c.creditos").alias("creditos_curso"),
        col("m.nota_final"),
        col("m.aprobado")
    )
)

display(fact_matriculas)

In [0]:
dim_tiempo.write.format("delta").mode("overwrite").saveAsTable(f"{catalog_name}.{schema_gold}.dim_tiempo")
dim_estudiante.write.format("delta").mode("overwrite").saveAsTable(f"{catalog_name}.{schema_gold}.dim_estudiante")
dim_cursos.write.format("delta").mode("overwrite").saveAsTable(f"{catalog_name}.{schema_gold}.dim_cursos")
dim_profesor.write.format("delta").mode("overwrite").saveAsTable(f"{catalog_name}.{schema_gold}.dim_profesor")
fact_matriculas.write.format("delta").mode("overwrite").saveAsTable(f"{catalog_name}.{schema_gold}.fact_matriculas")

In [0]:
%sql

Sele....

In [0]:
spark.sql("select.....")

In [0]:
%sql

CREATE OR REPLACE VIEW IF NOT EXISTS sesion_0701.gold.vw_kpi_carrera
AS


In [0]:
spark.sql(f"""
    CREATE OR REPLACE VIEW {catalog_name}.{schema_gold}.vw_kpi_carrera
    AS
    SELECT 
    de.carrera,
    COUNT(DISTINCT fm.id_estudiante) AS cantidad_alumnos,
    AVG(fm.nota_final) AS promedio_notas,
    SUM(fm.creditos_curso) AS creditos_totales
    FROM {catalog_name}.{schema_gold}.fact_matriculas fm
    LEFT JOIN {catalog_name}.{schema_gold}.dim_estudiante de ON fm.id_estudiante = de.id_estudiante
    GROUP BY de.carrera
""") 

In [0]:
%sql

select * from sesion_0701.gold.vw_kpi_carrera