%md
## Capa Silver - Transformación y validación de transacciones

En esta etapa del pipeline ETL, se realiza la **limpieza y enriquecimiento de las transacciones financieras** provenientes de la capa Bronze, manteniendo un flujo continuo mediante streaming estructurado.

### Transformaciones aplicadas:
- **Validaciones de calidad:**
  - Transacciones con `amount > 0` y campos clave no nulos
  - Eliminación de duplicados por `transaction_id`
- **Conversión de timestamp:** 
  - Se transforma a tipo `timestamp` y se derivan campos de fecha/hora (`year`, `month`, `day`, `hour`)
- **Estandarización de texto:**
  - Campos como `country`, `channel`, `merchant`, `category` se convierten a minúsculas
- **Flag de sospecha de fraude:**
  - Se marca una transacción como `is_fraud_suspected = True` si es en `ATM` y supera los $10,000
- **Clasificación del monto:**
  - Se categorizan las transacciones como `low`, `medium` o `high` según el valor

###  Resultado:
Se genera una tabla Delta estructurada en schema: silver_transactions

In [0]:
# DBTITLE 1: Imports y configuración
from pyspark.sql.functions import (
    col, to_timestamp, year, month, dayofmonth, hour, when, lit, lower
)

In [0]:

# Rutas
bronze_path = "abfss://bronzesmartbank@mistorageprincipal.dfs.core.windows.net/transactions_eventhub/"
silver_path = "abfss://silversmartbank@mistorageprincipal.dfs.core.windows.net/transactions_silver/"

table_name = "silver_transactions_table"

In [0]:

try:
# Crear schema
    spark.sql(f"CREATE SCHEMA IF NOT EXISTS silver")
    spark.sql(f"CREATE TABLE IF NOT EXISTS silver_transactions_table (transaction_id STRING, customer_id STRING, amount DOUBLE, timestamp TIMESTAMP, country STRING, channel STRING, category STRING, merchant STRING, is_fraud_suspected STRING, amount_level STRING)")
except Exception as e:
    dbutils.notebook.exit(f"[ERROR - SILVER INIT] {str(e)}")

try:   
# DBTITLE 1: Leer desde Bronze
    bronze_df = (
        spark.readStream
        .format("delta")
        .load(bronze_path)
    )
except Exception as e:
    dbutils.notebook.exit(f"[ERROR - SILVER READ] {str(e)}")

# Transformaciones en Silver
try:
    silver_df = (
        bronze_df
        # Validaciones básicas
        .filter(col("amount").isNotNull() & (col("amount") > 0))
        .filter(col("transaction_id").isNotNull() & col("customer_id").isNotNull())
    
        # Conversión de timestamp
        .withColumn("timestamp", to_timestamp("timestamp"))
    
        # Derivar campos de fecha/hora
        .withColumn("year", year("timestamp"))
        .withColumn("month", month("timestamp"))
        .withColumn("day", dayofmonth("timestamp"))
        .withColumn("hour", hour("timestamp"))
    
        # Enriquecimiento: normalizar texto
        .withColumn("country", lower(col("country")))
        .withColumn("channel", lower(col("channel")))
        .withColumn("category", lower(col("category")))
        .withColumn("merchant", lower(col("merchant")))
    
        # Detección de posibles fraudes
        .withColumn("is_fraud_suspected", when((col("channel") == "atm") & (col("amount") > 10000), lit(True)).otherwise(lit(False)))
    
        # Categoría de monto
        .withColumn("amount_level", when(col("amount") >= 10000, "high")
                                .when(col("amount") >= 1000, "medium")
                                .otherwise("low"))
    
        # Remover duplicados por transaction_id si llega a pasar
        .dropDuplicates(["transaction_id"])
    )
except Exception as e:
    dbutils.notebook.exit(f"[ERROR- SILVER TRANSFORM] {str(e)}")
# DBTITLE 1: Escritura continua a Silver (Delta Table gestionada)
try:
    (
        silver_df.writeStream
        .format("delta")
        .outputMode("append")
        .option("checkpointLocation", silver_path + "/_checkpoint")
        .trigger(once=True)
        .toTable(f"silver_transactions.silver.{table_name}")
        .awaitTermination()
    )
except Exception as e:
    dbutils.notebook.exit(f"[ERROR - SILVER WRITE] {str(e)}")

dbutils.notebook.exit("OK")




In [0]:
spark.sql(
    """
        SELECT * FROM silver_transactions.silver.silver_transactions_table
    """
).display()

In [0]:
spark.sql(
    """
    DESCRIBE HISTORY silver_transactions.silver.silver_transactions_table
    """
).display()