In [0]:
spark.sql("USE CATALOG bankcatalog")
spark.sql("USE SCHEMA banksilver")

In [0]:
import dlt
from pyspark.sql import functions as F
from pyspark.sql.functions import col, when, concat_ws

In [0]:
@dlt.table(
    name="silver_bank_churn",
    comment="Bank churn main enriched with reference tables",
    table_properties={"quality": "silver"}
)

def silver_bank_churn():
    bank_churn = spark.read.table("bankcatalog.bankbronze.bronze_bank_churn")
    active_customer = spark.read.table("bankcatalog.bankbronze.bronze_active_customers")
    gender = spark.read.table("bankcatalog.bankbronze.bronze_gender")
    geography = spark.read.table("bankcatalog.bankbronze.bronze_geography")
    exit_customer = spark.read.table("bankcatalog.bankbronze.bronze_exit_customer")
    creditcard = spark.read.table("bankcatalog.bankbronze.bronze_credit_card")
 
    result = (
        bank_churn
        .join(active_customer, bank_churn.IsActiveMember == active_customer.ActiveID, "left")
        .drop(active_customer.ActiveID)
        .join(gender, bank_churn.GenderID == gender.GenderID, "left")
        .drop(gender.GenderID)
        .join(creditcard, bank_churn.HasCrCard == creditcard.CreditID, "left")
        .drop(creditcard.CreditID)
        .join(geography, bank_churn.GeographyID == geography.GeographyID, "left")
        .drop(geography.GeographyID)
        .join(exit_customer, bank_churn.Exited == exit_customer.ExitID, "left")
        .drop(exit_customer.ExitID)
    ).drop("_rescued_data")
    return result

In [0]:
@dlt.table(
    name="silver_customerinfo",
    comment="CustomerInfo carried as-is from bronze",    
    table_properties={"quality": "silver"}
)
def silver_customerinfo():    
    df = spark.read.table("bankcatalog.bankbronze.bronze_customer_info")    
    # Drop system columns if present    
    df = df.drop("_rescued_data")    
    return df