# Silver Notebook
### Transforms files in the bronze layer and return clean silver delta tables

In [1]:
## Import Libraries and create helper read function to aviod repetition of code

from pyspark.sql.functions import *
from pyspark.sql.types import *

## helper read function to be used in reading in files from bronze layer to avoid repetition of code.
## This will be used in reading all files from bronze layers.

def read_bronze(table_name):
    return (spark.read
            .option("header", "true")
            .option("delimiter", ";")
            .csv(f"Files/bronze_layer/{table_name}.csv"))

print("Helper created to read from Files/bronze_layer")

StatementMeta(, b06478a5-d080-4f5e-ac63-304fb55f94de, 3, Finished, Available, Finished)

Helper created to read from Files/bronze_layer


In [2]:

## account dimension table
dim_account = (read_bronze("account")
    .withColumn("account_id", col("account_id").cast("int"))
    .withColumn("district_id", col("district_id").cast("int"))
    .withColumn("account_date",
            to_date(
            concat(lit("19"), col("date")),"yyyyMMdd"))
    .withColumn("frequency_en",
                when(trim(col("frequency")) == "POPLATEK MESICNE", "monthly")
                .when(trim(col("frequency")) == "POPLATEK TYDNE", "weekly")
                .when(trim(col("frequency")) == "POPLATEK PO OBRATU", "after_transaction")
                .otherwise("unknown"))
    .select("account_id", "district_id", "frequency_en", "account_date"))


# Save table with silver_prefix in medallion structure
dim_account.write \
    .mode("overwrite") \
    .format("delta") \
 .saveAsTable("silver_dim_account")

print(f"silver_dim_account created with {dim_account.count():,} rows")
display(dim_account.limit(10))

StatementMeta(, b06478a5-d080-4f5e-ac63-304fb55f94de, 4, Finished, Available, Finished)

silver_dim_account created with 4,500 rows


SynapseWidget(Synapse.DataFrame, 47c59274-ab2a-4560-8262-1c5a9809a3fc)

In [3]:
# card dimension
dim_card = (read_bronze("card")                   
    .withColumn("card_id", col("card_id").cast("int"))
    .withColumn("disp_id", col("disp_id").cast("int"))
    .withColumn("card_type", trim(col("type")))           
    .withColumn("issued_raw", col("issued"))
    .withColumn("issued_date",
                to_date(
                    concat(lit("19"), regexp_replace(col("issued"), " 00:00:00", "")), 
                    "yyyyMMdd"))
    .select("card_id", "disp_id", "card_type", "issued_date")
)

# save dim_card
dim_card.write \
    .mode("overwrite") \
    .format("delta") \
    .saveAsTable("silver_dim_card")

print(f"silver_dim_card created with {dim_card.count():,} rows")
display(dim_card.limit(10))

StatementMeta(, b06478a5-d080-4f5e-ac63-304fb55f94de, 5, Finished, Available, Finished)

silver_dim_card created with 892 rows


SynapseWidget(Synapse.DataFrame, 160f8964-3db4-4114-aa1e-8104d9c6b839)

In [4]:
##clients table
dim_client = (read_bronze("client")
    .withColumn("client_id", col("client_id").cast("int"))
    .withColumn("district_id", col("district_id").cast("int"))
    .withColumn("birth_number_str", lpad(col("birth_number"), 6, "0"))        ##check to ensure they are 6 digits

    # Extract raw month (will be 51–62 for women, 01–12 for men)
    .withColumn("month_raw", substring(col("birth_number_str"), 3, 2).cast("int"))

    # Determine gender and fix the month
    .withColumn("gender", when(col("month_raw") > 50, "F").otherwise("M"))
    .withColumn("month", when(col("month_raw") > 50, col("month_raw") - 50).otherwise(col("month_raw")))

    # Build proper date string 19YY-MM-DD
    .withColumn("birth_date", 
                to_date(
                    concat(
                        lit("19"), substring(col("birth_number_str"), 1, 2),   # year
                        lpad(col("month"), 2, "0"),                            # fixed month
                        substring(col("birth_number_str"), 5, 2)               # day
                    ), "yyyyMMdd"))

    # calculate age as of 1999 (end of dataset). This is more useful than current age
    .withColumn("age_1999", floor(datediff(lit("1999-12-31"), col("birth_date")) / 365.25))
    .select("client_id", "district_id", "gender", "birth_date", "age_1999")
)



dim_client.write \
    .mode("overwrite") \
    .format("delta") \
    .saveAsTable("silver_dim_client")

print(f"silver_dim_client created with {dim_client.count():,} rows")

## display(dim_client.limit(10))

StatementMeta(, b06478a5-d080-4f5e-ac63-304fb55f94de, 6, Finished, Available, Finished)

silver_dim_client created with 5,369 rows


In [5]:
## Disposition table

dim_disposition = (read_bronze("disp")
    .withColumn("disp_id", col("disp_id").cast("int"))
    .withColumn("client_id", col("client_id").cast("int"))
    .withColumn("account_id", col("account_id").cast("int"))
    .withColumn("disp_type", trim(col("type")))                 # "OWNER" or "DISPONENT"
    .withColumn("is_owner", col("type") == "OWNER")           # boolean flag – very useful later
    .select("disp_id", "client_id", "account_id", "disp_type", "is_owner")
)

dim_disposition.write \
    .mode("overwrite") \
    .format("delta") \
    .saveAsTable("silver_dim_disposition")

print(f"silver_dim_disposition created with {dim_disposition.count():,} rows")
## display(dim_disposition.limit(20))

StatementMeta(, b06478a5-d080-4f5e-ac63-304fb55f94de, 7, Finished, Available, Finished)

silver_dim_disposition created with 5,369 rows


In [6]:
## district demography table

from pyspark.sql.functions import *

dim_district = (read_bronze("district")

    # Renaming the column names based on data dictionary available
    # here https://webpages.charlotte.edu/mirsad/itcs6265/group1/demographic_preprocessing.html

    .toDF("district_id","district_name","region",
          "inhabitants","muni_lt_500","muni_500_1999","muni_2000_9999","muni_gt_10000",
          "no_of_cities","urban_ratio","avg_salary",
          "unemployment_1995","unemployment_1996","entrepreneurs_per_1000",
          "crimes_1995","crimes_1996")
    .withColumn("district_id", col("district_id").cast("int"))
    .withColumn("inhabitants", col("inhabitants").cast("int"))
    .withColumn("muni_lt_500", col("muni_lt_500").cast("int"))
    .withColumn("muni_500_1999", col("muni_500_1999").cast("int"))
    .withColumn("muni_2000_9999",col("muni_2000_9999").cast("int"))
    .withColumn("muni_gt_10000", col("muni_gt_10000").cast("int"))
    .withColumn("no_of_cities",col("no_of_cities").cast("int"))
    .withColumn("urban_ratio", col("urban_ratio").cast("double"))
    .withColumn("avg_salary", col("avg_salary").cast("int"))
    .withColumn("unemployment_1995",col("unemployment_1995").cast("double"))
    .withColumn("unemployment_1996", col("unemployment_1996").cast("double"))
    .withColumn("entrepreneurs_per_1000", col("entrepreneurs_per_1000").cast("int"))
    .withColumn("crimes_1995", col("crimes_1995").cast("int"))
    .withColumn("crimes_1996", col("crimes_1996").cast("int"))
    .withColumn("district_name", trim(col("district_name")))
    .withColumn("region", trim(col("region")))
)

dim_district.write \
    .mode("overwrite") \
    .format("delta") \
    .saveAsTable("silver_dim_district")

print(f"silver_dim_district created with {dim_district.count():,} rows")
## display(dim_district.limit(15))

StatementMeta(, b06478a5-d080-4f5e-ac63-304fb55f94de, 8, Finished, Available, Finished)

silver_dim_district created with 77 rows


In [7]:
## loan table

## category bands gotten from https://webpages.charlotte.edu/mirsad/itcs6265/group1/loan_preprocessing.html
## as detailed in my readme summary

from pyspark.sql.functions import *

fact_loan = (read_bronze("loan")
    .withColumn("loan_id", col("loan_id").cast("int"))
    .withColumn("account_id", col("account_id").cast("int"))

    .withColumn("loan_date", to_date(concat(lit("19"), col("date")), "yyyyMMdd"))
    
    .withColumn("amount", col("amount").cast("int"))
    .withColumn("duration_months", col("duration").cast("int"))
    .withColumn("monthly_payment", col("payments").cast("double"))
    .withColumn("status", trim(col("status")))
    
    # renaming status column to be more readable
    .withColumn("loan_status",
                when(col("status") == "A", "Finished - OK")
                .when(col("status") == "B", "Finished - Defaulted")
                .when(col("status") == "C", "Running - OK")
                .when(col("status") == "D", "Running - In debt")
                .otherwise("Unknown"))
    
    ## additional column for loan defaulters (perfect for modelling)
    .withColumn("is_default", (col("status") == "B").cast("boolean"))

    # band for the duration in months
    .withColumn("duration_band",
                when(col("duration_months") <= 30, "≤30")
                .when(col("duration_months") <= 42, "31-42")
                .when(col("duration_months") <= 54, "43-54")
                .otherwise("55+"))
    
    .withColumn("amount_band",
                when(col("amount") <= 30708, "≤30k")
                .when(col("amount") <= 49380, "30k-49k")
                .when(col("amount") <= 76926, "49k-76k")
                .when(col("amount") <= 230310, "76k-230k")
                .otherwise(">230k"))

    .select("loan_id", "account_id", "loan_date", "amount", "duration_months",
            "monthly_payment", "status", "loan_status", "is_default",
            "duration_band", "amount_band")
)

fact_loan.write \
    .mode("overwrite") \
    .format("delta") \
    .saveAsTable("silver_fact_loan")

print(f"silver_fact_loan created → {fact_loan.count():,} rows")

## display(fact_loan.orderBy("loan_date").limit(20))

StatementMeta(, b06478a5-d080-4f5e-ac63-304fb55f94de, 9, Finished, Available, Finished)

silver_fact_loan created → 682 rows


In [8]:
## permanent order table (fact table – recurring payments)

from pyspark.sql.functions import *

fact_order = (read_bronze("order")
    .withColumn("order_id", col("order_id").cast("int"))
    .withColumn("account_id", col("account_id").cast("int"))
    .withColumn("bank_to", trim(col("bank_to")))
    .withColumn("account_to", col("account_to").cast("long"))     
    .withColumn("amount", col("amount").cast("double"))
    .withColumn("k_symbol_raw", trim(col("k_symbol")))
    
    # Translating the Czech banking system k_symbol into English (the most common ones)
    .withColumn("payment_type",
                when(col("k_symbol_raw") == "POJISTNE", "Insurance")
                .when(col("k_symbol_raw") == "SIPO", "Household")
                .when(col("k_symbol_raw") == "LEASING", "Leasing")
                .when(col("k_symbol_raw") == "UVER", "Loan payment")
                .when(col("k_symbol_raw") == "", "Unknown / Not provided")
                .otherwise("Other"))
    
    .select("order_id", "account_id", "bank_to", "account_to", 
            "amount", "payment_type")
)

fact_order.write \
    .mode("overwrite") \
    .format("delta") \
    .saveAsTable("silver_fact_order")

print(f"silver_fact_order created with {fact_order.count():,} rows")
## display(fact_order.limit(20))

StatementMeta(, b06478a5-d080-4f5e-ac63-304fb55f94de, 10, Finished, Available, Finished)

silver_fact_order created with 6,471 rows


In [9]:
## transaction table (largest fact table)

from pyspark.sql.functions import *
from pyspark.sql.types import *

fact_transaction = (read_bronze("trans")
    # Cast & clean core columns
    .withColumn("trans_id", col("trans_id").cast("long"))
    .withColumn("account_id", col("account_id").cast("int"))
    .withColumn("date", to_date(concat(lit("19"), col("date")), "yyyyMMdd"))  # 930101 → 1993-01-01
    .withColumn("amount", col("amount").cast("double"))
    .withColumn("balance", col("balance").cast("double"))
    
    # Translate Czech values to English as explained in data dictionary
    # https://webpages.charlotte.edu/mirsad/itcs6265/group1/transaction_preprocessing.html

    .withColumn("type",
                when(col("type") == "PRIJEM", "Credit")
                .when(col("type") == "VYDAJ", "Debit")
                .otherwise("Other"))
    
    .withColumn("operation",
                when(col("operation") == "VKLAD", "Credit in Cash")
                .when(col("operation") == "VYBER", "Withdrawal in Cash")
                .when(col("operation") == "VYBER KARTOU", "Credit Card Withdrawal")
                .when(col("operation") == "PREVOD Z UCTU", "Collection from Another Bank")
                .when(col("operation") == "PREVOD NA UCET", "Remittance to Another Bank")
                .otherwise("Other"))
    
    .withColumn("k_symbol",
                when(col("k_symbol") == "POJISTNE","Insurance Payment")
                .when(col("k_symbol") == "SLUZBY", "Payment on Statement")
                .when(col("k_symbol") == "UROK", "Interest Credited")
                .when(col("k_symbol") == "SANKC. UROK","Sanction Interest")
                .when(col("k_symbol") == "SIPO", "Household")
                .when(col("k_symbol") == "DUCHOD", "Old-age Pension")
                .when(col("k_symbol") == "UVER", "Loan Payment")
                .when(trim(col("k_symbol")) == "", "Unknown/Not Specified")
                .otherwise("Other"))
    
    # Clean partner bank & account (can be null/empty)
    .withColumn("partner_bank",   trim(col("bank")))
    .withColumn("partner_account", col("account").cast("long"))
    
    .select("trans_id","account_id", "date", "type", "operation",      
            "amount", "balance", "k_symbol",  "partner_bank", "partner_account")
)

# writing to silver table
(fact_transaction.write
    .mode("overwrite")
    .format("delta")
    .saveAsTable("silver_fact_transaction"))

print(f"silver_fact_transaction created with {fact_transaction.count():,} rows")
## display(fact_transaction.limit(20))

StatementMeta(, b06478a5-d080-4f5e-ac63-304fb55f94de, 11, Finished, Available, Finished)

silver_fact_transaction created with 1,056,320 rows
