In [None]:
from pyspark.sql.functions import col, trim, when, sum, count
from pyspark.sql.types import IntegerType, DoubleType, StringType

In [None]:
# Cargar Bronze Data
df_transactions = spark.read.parquet("FileStore/bronze/credit_transactions")
df_accounts = spark.read.parquet("FileStore/bronze/customer_accounts")
df_demographics = spark.read.parquet("FileStore/bronze/customer_demographics")
df_merch_categories = spark.read.parquet("FileStore/bronze/merchant_categories")

In [None]:
#SILVER LAYER

#Conversiones a tipos fecha y decimales
df_transactions_silver = df_transactions.withColumn("transaction_date", col("transaction_date").cast("date"))
df_transactions_silver = df_transactions_silver.withColumn("amount", col("amount").cast("double"))

df_accounts_silver  = df_accounts.withColumn("account_open_date", col("account_open_date").cast("date"))
df_accounts_silver  = df_accounts_silver.withColumn("last_payment_date", col("last_payment_date").cast("date"))



In [None]:
#Validaciones Intermedias:
#1. Eliminar duplicados
df_transactions_silver = df_transactions_silver.dropDuplicates(["transaction_id"])
df_accounts_silver = df_accounts_silver.dropDuplicates()
df_demographics_silver = df_demographics.dropDuplicates()
df_merch_categories_silver = df_merch_categories.dropDuplicates()

#Manejo de nulos rellenando con 0
df_accounts_silver = df_accounts_silver.na.fill({"credit_score": 0})

In [None]:
#Validaciones de Integridad

#1.Verificar transacciones con clientes válidos (existentes en customer_accounts)
valid_customers = df_accounts_silver.select("customer_id")
df_transactions_silver = df_transactions_silver.join(valid_customers, "customer_id", "inner")

#2. Verificar Fechas consistentes
df_transactions_silver = df_transactions_silver.join(
    df_accounts_silver.select("customer_id", "account_open_date"),
    "customer_id",
    "left"
).filter(col("transaction_date") >= col("account_open_date"))

#3. Verificar transacciones con categorias de comercio válidas
valid_categories = df_merch_categories_silver.select("category")
df_transactions_silver = df_transactions_silver.join(valid_categories, "merchant_category", "inner")

#4. Verificar pagos mínimos consistentes, no debe ser mayor a pago total
df_accounts_silver = df_accounts_silver.withColumn(
    "valid_min_payment",
    when(col("min_payment") > col("total_due"), "invalid").otherwise("valid")
)

In [None]:
# Normalización de categorías y valores

#1. Asignar tipo de dato correcto
df_merch_categories_silver = df_merch_categories_silver.withColumn("risk_level", col("risk_level").cast("string"))
df_transactions_silver = df_transactions_silver.withColumn("amount", col("amount").cast("double"))

df_accounts_silver = df_accounts_silver.withColumn(
    "credit_limit", col("credit_limit").cast("double")
).withColumn(
    "income", col("income").cast("double")
).withColumn(
    "total_due", col("total_due").cast("double")
)

#2. Eliminar espacios en columnas categóricas
df_merch_categories_silver = df_merch_categories_silver.withColumn("risk_level", lower(trim(col("risk_level"))))
df_transactions_silver = df_transactions_silver.withColumn("merchant_category", lower(trim(col("merchant_category"))))


In [None]:
#Guardar tablas en formato optimizado
df_transactions_silver.write.mode("overwrite").parquet("FileStore/silver/credit_transactions")
df_accounts_silver.write.mode("overwrite").parquet("FileStore/silver/customer_accounts")
df_demographics_silver.write.mode("overwrite").parquet("FileStore/silver/customer_demographics")
df_merch_categories_silver.write.mode("overwrite").parquet("FileStore/silver/merchant_categories")