In [0]:
from pyspark.sql.window import Window
from pyspark.sql import functions as F

In [0]:
catalog_name = "taller_03"
schema_bronze = "bronze"
schema_silver = "silver"
schema_gold = "gold"

In [0]:
sql_catalog = f"""
    CREATE CATALOG IF NOT EXISTS {catalog_name}
"""

spark.sql(sql_catalog)

In [0]:
sql_bronze = f"""
    CREATE SCHEMA IF NOT EXISTS {catalog_name}.{schema_bronze}
"""

spark.sql(sql_bronze)

In [0]:
sql_silver = f"""
    CREATE SCHEMA IF NOT EXISTS {catalog_name}.{schema_silver}
"""

spark.sql(sql_silver)

In [0]:
sql_gold = f"""
    CREATE SCHEMA IF NOT EXISTS {catalog_name}.{schema_gold}
"""

spark.sql(sql_gold)

###Bronze

In [0]:
path_base = "/Volumes/dmc_01/default/volumen_01/tareas/tarea_03/input"

path_clientes = f"{path_base}/clientes.csv"
path_envios_base = f"{path_base}/envios_base.csv"
path_envios_incremento = f"{path_base}/envios_incremento.csv"
path_rutas = f"{path_base}/rutas.csv"
path_sucursales = f"{path_base}/sucursales.csv"

In [0]:
clientes = spark.read.option("header", True).option("inferSchema", True).csv(path_clientes)
envios_base = spark.read.option("header", True).option("inferSchema", True).csv(path_envios_base)
envios_incremento = spark.read.option("header", True).option("inferSchema", True).csv(path_envios_incremento)
rutas = spark.read.option("header", True).option("inferSchema", True).csv(path_rutas)
sucursales = spark.read.option("header", True).option("inferSchema", True).csv(path_sucursales)

In [0]:
clientes.write.format("delta").mode("overwrite").saveAsTable(f"{catalog_name}.{schema_bronze}.clientes")
envios_base.write.format("delta").mode("overwrite").saveAsTable(f"{catalog_name}.{schema_bronze}.envios_base")
envios_incremento.write.format("delta").mode("overwrite").saveAsTable(f"{catalog_name}.{schema_bronze}.envios_incremento")
rutas.write.format("delta").mode("overwrite").saveAsTable(f"{catalog_name}.{schema_bronze}.rutas")
sucursales.write.format("delta").mode("overwrite").saveAsTable(f"{catalog_name}.{schema_bronze}.sucursales")

In [0]:
spark.table(f"{catalog_name}.{schema_bronze}.clientes").show(10)

In [0]:
spark.table(f"{catalog_name}.{schema_bronze}.envios_base").show(10)

In [0]:
spark.table(f"{catalog_name}.{schema_bronze}.envios_incremento").show(10)

In [0]:
spark.table(f"{catalog_name}.{schema_bronze}.rutas").show(10)

In [0]:

spark.table(f"{catalog_name}.{schema_bronze}.sucursales").show(10)

In [0]:
from pyspark.sql.functions import col, trim, initcap, when, lit, year, month, dayofmonth, concat, concat_ws

### silver

In [0]:
silver_clientes = (
  spark.table(f"{catalog_name}.{schema_bronze}.clientes")
  .withColumn("nombre", initcap(trim(col("nombre"))))
  .withColumn("apellido", initcap(trim(col("apellido"))))
  .withColumn("email", initcap(trim(col("email"))))
  .withColumn("segmento", initcap(trim(col("segmento"))))
  .withColumn("fecha_registro", col("fecha_registro").cast("date"))
  .dropna(subset=["id_cliente"])
  .dropDuplicates(["id_cliente"])
)

In [0]:
display(silver_clientes)

In [0]:
silver_sucursales = (
  spark.table(f"{catalog_name}.{schema_bronze}.sucursales")
  .withColumn("ciudad", initcap(trim(col("ciudad"))))
  .withColumn("distrito", initcap(trim(col("distrito"))))
  .withColumn("region", initcap(trim(col("region"))))
  .withColumn("tipo", initcap(trim(col("tipo"))))
  .withColumn("fecha_apertura", col("fecha_apertura").cast("date"))
  .dropna(subset=["id_sucursal"])
  .dropDuplicates(["id_sucursal"])
)


In [0]:
display(silver_sucursales)

In [0]:
silver_rutas = (
  spark.table(f"{catalog_name}.{schema_bronze}.rutas")
  .withColumn("origen", initcap(trim(col("origen"))))
  .withColumn("destino", initcap(trim(col("destino"))))
  .withColumn("distancia_km", col("distancia_km").cast("int"))
  .withColumn("tiempo_estimado_horas", col("tiempo_estimado_horas").cast("DECIMAL(10,2)"))
  .dropna(subset=["id_ruta"])
  .dropDuplicates(["id_ruta"])
)


In [0]:
display(silver_rutas)

In [0]:
keys = ["id_envio","id_linea","id_cliente","id_sucursal","id_ruta"]
w = Window.partitionBy(*keys).orderBy(F.col("updated_at").desc())

silver_envios_base = (
  spark.table(f"{catalog_name}.{schema_bronze}.envios_base")
  .withColumn("fecha_envio", col("fecha_envio").cast("date"))
  .withColumn("estado", initcap(trim(col("estado"))))
  .withColumn("peso_kg", col("peso_kg").cast("DOUBLE"))
  .withColumn("costo_envio", col("costo_envio").cast("DOUBLE"))
  .withColumn("updated_at", col("updated_at").cast("timestamp"))
  .dropna(subset=keys)
  .withColumn("rn", F.row_number().over(w))   # numerar por updated_at DESC
  .filter(F.col("rn") == 1)                   # solo el más reciente
  .drop("rn")
)


In [0]:
display(silver_envios_base)

In [0]:
keys = ["id_envio","id_linea","id_cliente","id_sucursal","id_ruta"]
w = Window.partitionBy(*keys).orderBy(F.col("updated_at").desc())

silver_envios_incremento = (
  spark.table(f"{catalog_name}.{schema_bronze}.envios_incremento")
  .withColumn("fecha_envio", col("fecha_envio").cast("date"))
  .withColumn("estado", initcap(trim(col("estado"))))
  .withColumn("peso_kg", col("peso_kg").cast("DOUBLE"))
  .withColumn("costo_envio", col("costo_envio").cast("DOUBLE"))
  .withColumn("updated_at", col("updated_at").cast("timestamp"))
  .dropna(subset=keys)
  .withColumn("rn", F.row_number().over(w))   # numerar por updated_at DESC
  .filter(F.col("rn") == 1)                   # solo el más reciente
  .drop("rn")
)


In [0]:
display(silver_envios_incremento)

In [0]:
silver_clientes.write.format("delta").mode("overwrite").saveAsTable(f"{catalog_name}.{schema_silver}.clientes")

In [0]:
spark.table(f"{catalog_name}.{schema_silver}.clientes").show(10)

In [0]:
silver_envios_base.write.format("delta").mode("overwrite").saveAsTable(f"{catalog_name}.{schema_silver}.envios_base")


In [0]:
spark.table(f"{catalog_name}.{schema_silver}.envios_base").show(10)

In [0]:
silver_envios_incremento.write.format("delta").mode("overwrite").saveAsTable(f"{catalog_name}.{schema_silver}.envios_incremento")

In [0]:
spark.table(f"{catalog_name}.{schema_silver}.envios_incremento").show(10)

In [0]:
silver_rutas.write.format("delta").mode("overwrite").saveAsTable(f"{catalog_name}.{schema_silver}.rutas")

In [0]:
spark.table(f"{catalog_name}.{schema_silver}.rutas").show(10)

In [0]:
silver_sucursales.write.format("delta").mode("overwrite").saveAsTable(f"{catalog_name}.{schema_silver}.sucursales")


In [0]:
spark.table(f"{catalog_name}.{schema_silver}.sucursales").show(10)

### Gold

In [0]:
dim_tiempo = (
    spark.table(f"{catalog_name}.{schema_silver}.envios_base")
    .select(
        col("fecha_envio").alias("fecha")
    ).dropna().distinct()
    .withColumn("anio", year(col("fecha")))
    .withColumn("mes", month(col("fecha")))
    .withColumn("dia", dayofmonth(col("fecha")))
    .withColumn("semestre", concat_ws("-",col("anio"), when(col("mes") <=7, lit("01")).otherwise(lit("02"))))
    .withColumn("id_tiempo", (col("anio")*10000 + col("mes")*100 + col("dia")).cast("int"))
)


In [0]:
display(dim_tiempo)

In [0]:
dim_cliente = (
    spark.table(f"{catalog_name}.{schema_silver}.clientes")
    .dropDuplicates(["id_cliente"])
)

In [0]:
display(dim_cliente)

In [0]:
dim_sucursal = (
    spark.table(f"{catalog_name}.{schema_silver}.sucursales")
    .dropDuplicates(["id_sucursal"])
)


In [0]:
display(dim_sucursal)

In [0]:
dim_ruta = (
    spark.table(f"{catalog_name}.{schema_silver}.rutas")
    .dropDuplicates(["id_ruta"])
)


In [0]:
display(dim_ruta)

In [0]:
dim_region = (
    spark.table(f"{catalog_name}.{schema_silver}.envios_base").alias("e")
    .join(
        dim_sucursal.alias("s"),
        col("s.id_sucursal") == col("e.id_sucursal"),
        "inner"
    )
    .groupBy("region")
     .agg(
        F.countDistinct("e.id_sucursal").alias("num_sucursales"),  
        F.count("id_envio").alias("volumen_envio")            
    )
    .withColumn("id_region", F.monotonically_increasing_id())
    .select(
        col("id_region"),
        col("s.region").alias("region"),  
        col("volumen_envio"),
        col("num_sucursales")
        )
)


In [0]:
display(dim_region)

In [0]:
b = spark.table(f"{catalog_name}.{schema_silver}.envios_base").alias("b")
i = spark.table(f"{catalog_name}.{schema_silver}.envios_incremento").alias("i")

fact_envio = (
    b.join(i, col("b.id_envio") == col("i.id_envio"), "full_outer")
    .join(dim_sucursal.alias("su"), col("b.id_sucursal") == col("su.id_sucursal"), "left")
    .join(dim_region.alias("dreg"), col("su.region") == col("dreg.region"), "left")
    .select(
        F.coalesce(i["id_envio"], b["id_envio"]).alias("id_envio"),
        F.coalesce(i["id_linea"], b["id_linea"]).alias("id_linea"),
        F.coalesce(i["id_cliente"], b["id_cliente"]).alias("id_cliente"),
        F.coalesce(i["id_sucursal"], b["id_sucursal"]).alias("id_sucursal"),
        F.coalesce(i["id_ruta"], b["id_ruta"]).alias("id_ruta"),
        col("dreg.id_region").alias("id_region"),
        F.coalesce(i["fecha_envio"], b["fecha_envio"]).alias("fecha_envio"),
        F.coalesce(i["estado"], b["estado"]).alias("estado"),
        F.coalesce(i["peso_kg"], b["peso_kg"]).alias("peso_kg"),
        F.coalesce(i["costo_envio"], b["costo_envio"]).alias("costo_envio"),
        F.coalesce(i["updated_at"], b["updated_at"]).alias("updated_at"),
        F.when(i["costo_envio"] > b["costo_envio"], i["costo_envio"] - b["costo_envio"])
         .otherwise(F.lit(0)).alias("incremento")
    )
)


In [0]:
duplicados = (
    fact_envio.groupBy("id_envio")
              .count()
              .filter(F.col("count") > 1)
)

In [0]:
display(duplicados)

In [0]:
display(fact_envio)

In [0]:
dim_tiempo.write.format("delta").mode("overwrite").saveAsTable(f"{catalog_name}.{schema_gold}.dim_tiempo")

In [0]:
spark.table(f"{catalog_name}.{schema_gold}.dim_tiempo").show(10)

In [0]:
dim_cliente.write.format("delta").mode("overwrite").saveAsTable(f"{catalog_name}.{schema_gold}.dim_cliente")

In [0]:
spark.table(f"{catalog_name}.{schema_gold}.dim_cliente").show(10)

In [0]:
dim_sucursal.write.format("delta").mode("overwrite").saveAsTable(f"{catalog_name}.{schema_gold}.dim_sucursal")

In [0]:
spark.table(f"{catalog_name}.{schema_gold}.dim_sucursal").show(10)

In [0]:
dim_ruta.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable(f"{catalog_name}.{schema_gold}.dim_ruta")


In [0]:
spark.table(f"{catalog_name}.{schema_gold}.dim_ruta").show(10)

In [0]:
dim_region.write.format("delta").mode("overwrite").saveAsTable(f"{catalog_name}.{schema_gold}.dim_region")

In [0]:
spark.table(f"{catalog_name}.{schema_gold}.dim_region").show(10)

In [0]:
fact_envio.write.format("delta").mode("overwrite").saveAsTable(f"{catalog_name}.{schema_gold}.fact_envio")

In [0]:
spark.table(f"{catalog_name}.{schema_gold}.fact_envio").show(10)

In [0]:
from delta.tables import DeltaTable

gold_table = DeltaTable.forName(spark, f"{catalog_name}.gold.fact_envio")

gold_table.alias("tgt").merge(
    fact_envio.alias("src"),
    "tgt.id_envio = src.id_envio"
).whenMatchedUpdate(set={
    "id_linea": "src.id_linea",
    "id_cliente": "src.id_cliente",
    "id_sucursal": "src.id_sucursal",
    "id_ruta":"src.id_ruta",
    "id_region": "src.id_region",
    "fecha_envio": "src.fecha_envio",
    "estado": "src.estado",
    "peso_kg": "src.peso_kg",
    "costo_envio": "src.costo_envio",
    "updated_at": "src.updated_at",
    "incremento": "src.incremento"
}).whenNotMatchedInsert(values={
    "id_envio": "src.id_envio",
    "id_linea": "src.id_linea",
    "id_cliente": "src.id_cliente",
    "id_sucursal": "src.id_sucursal",
    "id_ruta":"src.id_ruta",
    "id_region": "src.id_region",
    "fecha_envio": "src.fecha_envio",
    "estado": "src.estado",
    "peso_kg": "src.peso_kg",
    "costo_envio": "src.costo_envio",
    "updated_at": "src.updated_at",
    "incremento": "src.incremento"
}).execute()


### Auditoria

In [0]:
from pyspark.sql.functions import col, current_timestamp
import uuid

spark.sql(f"""
CREATE TABLE IF NOT EXISTS {catalog_name}.{schema_gold}.audit_ingestion (
    audit_id STRING,
    lote_id STRING,
    source_table STRING,
    records_read LONG,
    records_deduplicated LONG,
    records_inserted LONG,
    records_updated LONG,
    target_table STRING,
    timestamp TIMESTAMP
) USING DELTA
""")


In [0]:
records_read = fact_envio.count()

records_deduplicated = fact_envio.dropDuplicates(["id_envio"]).count()

records_inserted = records_deduplicated  
records_updated = 0  

audit_id = str(uuid.uuid4())
lote_id = "lote_test_001"
source_table = f"{catalog_name}.{schema_silver}.envios_incremento"
target_table = f"{catalog_name}.{schema_gold}.fact_envio"


In [0]:
from pyspark.sql.types import StructType, StructField, StringType, LongType, TimestampType
from pyspark.sql.functions import current_timestamp
import uuid

schema = StructType([
    StructField("audit_id", StringType(), True),
    StructField("lote_id", StringType(), True),
    StructField("source_table", StringType(), True),
    StructField("records_read", LongType(), True),
    StructField("records_deduplicated", LongType(), True),
    StructField("records_inserted", LongType(), True),
    StructField("records_updated", LongType(), True),
    StructField("target_table", StringType(), True),
    StructField("timestamp", TimestampType(), True)
])


audit_df = spark.createDataFrame(
    [(audit_id, lote_id, source_table, records_read, records_deduplicated,
      records_inserted, records_updated, target_table, None)],
    schema
).withColumn("timestamp", current_timestamp())


In [0]:
audit_df.write.format("delta") \
    .option("mergeSchema", "true") \
    .mode("append") \
    .saveAsTable(f"{catalog_name}.{schema_gold}.audit_ingestion")

In [0]:
audit_check = spark.table(f"{catalog_name}.{schema_gold}.audit_ingestion")  
audit_check.show(truncate=False)  


In [0]:
fc = fact_envio.alias("f")
dc = dim_cliente.alias("c")

top5_clientes = (
    fc.join(dc, fc.id_cliente == dc.id_cliente, "left")     
      .groupBy(fc.id_cliente, dc.nombre, dc.apellido)        
      .agg(F.count(fc.id_envio).alias("total_envios"))      
      .withColumn("nombre_completo", F.concat_ws(" ", dc.nombre, dc.apellido)) 
      .orderBy(F.desc("total_envios"))                    
      .limit(5)                                           
)

top5_clientes.select("id_cliente", "nombre_completo", "total_envios").show(truncate=False)


In [0]:

fe = fact_envio.alias("f")
ds = dim_sucursal.alias("s")

ranking_sucursales = (
    fe.join(ds, fe.id_sucursal == ds.id_sucursal, "left") 
      .groupBy(fe.id_sucursal)        
      .agg(F.count(fe.id_envio).alias("total_envios"))    
      .orderBy(F.desc("total_envios"))                   
)

ranking_sucursales.show(10, truncate=False) 
