# _Inicio Silver_

### Aqui vale aclarar que usaremos Initialization buffer
Un tiempo de espera al inicio para asegurar que los recursos estén disponibles (montajes, paths), ya que en el mismo job se aloja bronce y silver.

In [0]:
import time

# Espera de 3 minutos (180 segundos)
wait_seconds = 180
print(f"Esperando {wait_seconds} segundos antes de iniciar el procesamiento Silver...")
for i in range(wait_seconds):
    if i % 30 == 0:  # Mostrar cada 10 segundos
        print(f"Esperando... {i}/{wait_seconds} segundos")
    time.sleep(1)

print("Espera completada. Iniciando módulo Silver...")

In [0]:
silver_mount_path= f"/mnt/silver/sales"
checkpoint_path_silver = "abfss://silver@mistorageprincipal.dfs.core.windows.net/checkpoints"
bronze_checkpoint_path = f"abfss://bronze@mistorageprincipal.dfs.core.windows.net/checkpoints"
bronze_mount_path = f"/mnt/bronze/sales"

### Lectura de archivos planos csv para enriquecimiento posterior

In [0]:
df_productos = (
    spark.read
    .format("csv")
    .option("header", True)
    .option("inferSchema", True) 
    .option("delimiter", ",")
    .option("encoding", "ISO-8859-1")
    .load(f"/mnt/rawmarket/csv-raw/products.csv")
)

df_stock = (
    spark.read
    .format("csv")
    .option("header", True)
    .option("inferSchema", True) 
    .option("delimiter", ",")
    .option("encoding", "ISO-8859-1") 
    .load(f"/mnt/rawmarket/csv-raw/stock.csv")
)

df_stores = (
    spark.read
    .format("csv")
    .option("header", True)
    .option("inferSchema", True) 
    .option("delimiter", ",")
    .option("encoding", "ISO-8859-1")    
    .load(f"/mnt/rawmarket/csv-raw/stores.csv")
)

df_channels = (
    spark.read
    .format("csv")
    .option("header", True)
    .option("inferSchema", True) 
    .option("delimiter", ",")
    .option("encoding", "ISO-8859-1")  
    .load(f"/mnt/rawmarket/csv-raw/channels.csv")
)

df_customers = (
    spark.read
    .format("csv")
    .option("header", True)
    .option("inferSchema", True) 
    .option("delimiter", ",")
    .option("encoding", "ISO-8859-1")   
    .load(f"/mnt/rawmarket/csv-raw/customers.csv")
)

## Transformaciones y limpiezas

#### 1 - Creación de alias de cada df: Evitar conflictos con nombres de columnas repetidas.

#### 2 - Creación de Joins entre los dfs y broadcasting para optimizar el rendimiento.

#### 3 -  Limpieza y filtros básicos

#### 4 - Generación de columnas de tiempo (event_timestamp, event_date)

#### 5 - Normalización de nombres de sucursales (expresión regular)

#### 6 - Creación de Hash único por transacción

#### 7 - Categorización del ticket

#### 8 - Escritura en formato Delta de 2 dfs (original y enriquecido)

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

# 1. Lectura streaming Bronze
bronze_stream_df = spark.readStream.format("delta").load(bronze_mount_path)

# 2. Renombrar columnas con alias para evitar ambigüedad

# Alias y rename bronze
bronze_alias = bronze_stream_df.alias("b").select(
    col("transaction_id").alias("b_transaction_id"),
    col("timestamp").alias("b_timestamp"),
    col("product_id").alias("b_product_id"),
    col("store").alias("b_store"),
    col("customer_id").alias("b_customer_id"),
    col("amount").alias("b_amount"),
    col("channel").alias("b_channel"),
    col("ingestion_time").alias("b_ingestion_time")
)

# Alias y rename batch dfs (adaptá los nombres reales)
df_productos_alias = df_productos.alias("p").select(
    col("product_id").alias("p_product_id"),
    col("name").alias("p_product_name"),
    col("category").alias("p_category"),
    col("price").alias("p_price")
)

df_stock_alias = df_stock.alias("st").select(
    col("product_id").alias("st_product_id"),
    col("available_stock").alias("st_available_stock")
)

df_stores_alias = df_stores.alias("s").select(
    col("store").alias("s_store_info"), 
    col("store_name").alias("s_store_name"),
    col("location").alias("s_location"),
    col("region").alias("s_region")
)

df_channels_alias = df_channels.alias("ch").select(
    col("channel").alias("ch_channel_info"),
    col("channel_name").alias("ch_channel_name"),
    col("is_digital").alias("ch_is_digital")
)

df_customers_alias = df_customers.alias("cu").select(
    col("customer_id").alias("cu_customer_info_id"),
    col("full_name").alias("cu_full_name"),
    col("age").alias("cu_age"),
    col("gender").alias("cu_gender"),
    col("loyalty_tier").alias("cu_loyalty_tier")
)

# 3. Joins con alias y condiciones explícitas

joined_df = bronze_alias \
    .join(broadcast(df_productos_alias), col("b_product_id") == col("p_product_id"), "left") \
    .join(broadcast(df_stock_alias), col("b_product_id") == col("st_product_id"), "left") \
    .join(broadcast(df_stores_alias), col("b_store") == col("s_store_info"), "left") \
    .join(broadcast(df_channels_alias), col("b_channel") == col("ch_channel_info"), "left") \
    .join(broadcast(df_customers_alias), col("b_customer_id") == col("cu_customer_info_id"), "left")

# 4. Aplicar transformaciones y select final

silver_enriched_df = (
    joined_df
    .dropDuplicates(["b_transaction_id"])
    .filter(col("b_transaction_id").isNotNull())
    .filter(col("b_amount").isNotNull() & (col("b_amount") > 0))
    .withColumn("event_timestamp", to_timestamp(col("b_timestamp")))
    .withColumn("event_date", to_date(col("event_timestamp")))
    .drop("b_timestamp")
    .withColumn("channel", upper(trim(col("b_channel"))))
    .withColumn("store", trim(col("b_store")))
    .withColumn("store_letter", regexp_extract(lower(col("b_store")), r"sucursal\s*([a-z])$", 1))
    .withColumn(
        "store",
        when(col("store_letter") != "", concat(lit("Sucursal "), upper(col("store_letter"))))
        .otherwise(initcap(col("store")))
    )
    .drop("store_letter")
    .withColumn("customer_id", trim(col("b_customer_id")))
    .withColumn("transaction_hash", sha2(concat_ws("|", col("b_transaction_id"), col("b_customer_id"), col("b_product_id"), col("event_timestamp")), 256))
    .withColumn(
        "ticket_category",
        when(col("b_amount") < 50, "BAJO")
        .when(col("b_amount").between(50, 200), "MEDIO")
        .otherwise("ALTO")
    )
    .select(
        col("b_transaction_id").alias("transaction_id"),
        col("event_date"),
        col("store"),
        col("channel"),
        col("b_product_id").alias("product_id"),
        col("customer_id"),
        col("b_amount").alias("amount"),
        col("ticket_category"),
        col("transaction_hash"),
        col("event_timestamp"),
        col("b_ingestion_time").alias("ingestion_time"),
        # Columnas enriquecidas
        col("p_product_name").alias("product_name"),
        col("st_available_stock").alias("stock_quantity"),
        col("s_store_name").alias("store_name"),
        col("s_location").alias("store_location"),
        col("s_region").alias("store_region"),
        col("ch_channel_name").alias("channel_name"),
        col("ch_is_digital").alias("channel_is_digital"),
        col("cu_full_name").alias("customer_name"),
        col("cu_age").alias("customer_age"),
        col("cu_gender").alias("customer_gender"),
        col("cu_loyalty_tier").alias("customer_loyalty_tier")
    )
)

# 6. Silver simple sin enriquecimiento para referencia

silver_simple_df = (
    bronze_stream_df
    .dropDuplicates(["transaction_id"])
    .filter(col("transaction_id").isNotNull())
    .filter(col("amount").isNotNull() & (col("amount") > 0))
    .withColumn("event_timestamp", to_timestamp(col("timestamp")))
    .withColumn("event_date", to_date(col("event_timestamp")))
    .drop("timestamp")
    .withColumn("channel", upper(trim(col("channel"))))
    .withColumn("store", trim(col("store")))
    .withColumn("store_letter", regexp_extract(lower(col("store")), r"sucursal\s*([a-z])$", 1))
    .withColumn(
        "store",
        when(col("store_letter") != "", concat(lit("Sucursal "), upper(col("store_letter"))))
        .otherwise(initcap(col("store")))
    )
    .drop("store_letter")
    .withColumn("customer_id", trim(col("customer_id")))
    .withColumn("transaction_hash", sha2(concat_ws("|", "transaction_id", "customer_id", "product_id", "event_timestamp"), 256))
    .withColumn(
        "ticket_category",
        when(col("amount") < 50, "BAJO")
        .when(col("amount").between(50, 200), "MEDIO")
        .otherwise("ALTO")
    )
    .select(
        "transaction_id", "event_date",
        "store", "channel", "product_id", "customer_id", "amount",
        "ticket_category", "transaction_hash", "event_timestamp", "ingestion_time"
    )
)

# Escritura streaming Silver normal
query_simple = (
    silver_simple_df.writeStream
    .format("delta")
    .option("checkpointLocation", silver_mount_path + "/_checkpoint_simple")
    .option("path", silver_mount_path + "/simple")
    .outputMode("append")
    .start()
)

# Escritura streaming Silver enriquecida
query_enriched = (
    silver_enriched_df.writeStream
    .format("delta")
    .option("checkpointLocation", silver_mount_path + "/_checkpoint_enriched")
    .option("path", silver_mount_path + "/enriched")
    .outputMode("append")
    .start()
)

display(silver_enriched_df)


In [0]:
silver_mount_path_enriched= f"/mnt/silver/sales/enriched"
spark.sql(f"SELECT * FROM delta.`{silver_mount_path_enriched}` ORDER BY ingestion_time DESC").display()

In [0]:
# display(silver_simple_df)

%md
# _Fin Silver_