### SILVER LAYER -> Banking Data - Validated & Cleaned

  Convert raw Bronze data into trusted, business-ready data by enforcing rules, data types, and quality checks.

##### Input (from Bronze)

We will read Delta tables, NOT CSVs.
-     apextrust_catalog.bronze.customers
-     apextrust_catalog.bronze.accounts
-     apextrust_catalog.bronze.transactions
-     apextrust_catalog.bronze.branches


#### Customers
  -     Only kyc_status = 'VERIFIED'
  -     Remove duplicates by customer_id
  -     Cast dates properly


In [0]:
from pyspark.sql.functions import col, to_date, current_timestamp

# Read Bronze table
customers_df = spark.read.table("apextrust_catalog.bronz.customers")

# Apply business rules
silver_customers_df = (
    customers_df
    .filter(col("kyc_status") == "VERIFIED")
    .dropDuplicates(["customer_id"])
    .withColumn("created_date", to_date("created_date"))
    .withColumn("silver_processed_ts", current_timestamp())
)

# Write to Silver
silver_customers_df.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("apextrust_catalog.silver.customers")


display(silver_customers_df)



In [0]:
display(spark.sql("DESCRIBE DETAIL apextrust_catalog.silver.customers"));

#### Accounts
-     Balance must be >= 0
-     Valid account types only (SAVINGS, CURRENT)

In [0]:
from pyspark.sql.functions import col, current_timestamp

accounts_df = spark.read.table("apextrust_catalog.bronz.accounts")

silver_accounts_df = (
    accounts_df
    .filter(col("balance") >= 0)
    .filter(col("account_type").isin("SAVINGS", "CURRENT"))
    .dropDuplicates(["account_id"])
    .withColumn("silver_processed_ts", current_timestamp())
)

silver_accounts_df.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("apextrust_catalog.silver.accounts")

display(silver_accounts_df)


In [0]:
display(spark.sql("DESCRIBE DETAIL apextrust_catalog.silver.accounts"))

##### Transactions

-     Amount > 0
-     Only DEBIT / CREDIT
-     Cast transaction date


In [0]:
from pyspark.sql.functions import col, to_date, current_timestamp

txn_df = spark.read.table("apextrust_catalog.bronz.transactions")

silver_txn_df = (
    txn_df
    .filter(col("amount") > 0)
    .filter(col("txn_type").isin("DEBIT", "CREDIT"))
    .withColumn("txn_date", to_date("txn_date"))
    .dropDuplicates(["txn_id"])
    .withColumn("silver_processed_ts", current_timestamp())
)

silver_txn_df.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("apextrust_catalog.silver.transactions")


display(silver_txn_df)

In [0]:
display(spark.sql("DESCRIBE DETAIL apextrust_catalog.silver.transactions"))

Branches

-     Standardize region (UPPER case)
-     Deduplicate by branch_id

In [0]:
from pyspark.sql.functions import upper, current_timestamp

branches_df = spark.read.table("apextrust_catalog.bronz.branches")

silver_branches_df = (
    branches_df
    .dropDuplicates(["branch_id"])
    .withColumn("region", upper("region"))
    .withColumn("silver_processed_ts", current_timestamp())
)

silver_branches_df.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("apextrust_catalog.silver.branches")


display(silver_branches_df)
display(spark.sql("DESCRIBE DETAIL apextrust_catalog.silver.branches"))

- Silver layer = business rules
- Reading from Delta tables only
- Writing as managed Delta tables
- Use filters, casts, dedup
- Avoid CSVs completely
