In [0]:
import dlt
from pyspark.sql import functions as F
from pyspark.sql.types import *

# --- 1. Configuration & Schemas ---
spark.conf.set(
    "fs.azure.account.key.anzbanking.dfs.core.windows.net",
    dbutils.secrets.get(scope="az_storage_scope", key="storage_account_key")
)
spark.conf.set("spark.databricks.service.server.enabled", "true")

# ADLS Paths for Raw Source Data (Auto Loader will monitor these)
TRANSACTIONS_CSV_SOURCE_PATH = "abfss://datalake@anzbanking.dfs.core.windows.net/rawdata/transactions_csv/"

#ADLS Paths fro DKT Schema Locations (DLT needs write access here)
TRANSACTIONS_BRONZE_SCHEMA_LOCATION = "abfss://datalake@anzbanking.dfs.core.windows.net/dlt_pipelines/customer_analytics/schemas/transactions_bronze_autoloader_schema/"

# Schema for Transaction CSV Data (all strings for robust Bronze loading)
transaction_schema = StructType([
    StructField("status", StringType(), True),
    StructField("card_present_flag", StringType(), True),
    StructField("bpay_biller_code", StringType(), True),
    StructField("account", StringType(), True),
    StructField("currency", StringType(), True),
    StructField("long_lat", StringType(), True),
    StructField("txn_description", StringType(), True),
    StructField("merchant_id", StringType(), True),
    StructField("merchant_code", StringType(), True),
    StructField("first_name", StringType(), True),
    StructField("balance", StringType(), True),
    StructField("date", StringType(), True),
    StructField("gender", StringType(), True),
    StructField("age", StringType(), True),
    StructField("merchant_suburb", StringType(), True),
    StructField("merchant_state", StringType(), True),
    StructField("extraction", StringType(), True),
    StructField("amount", StringType(), True),
    StructField("transaction_id", StringType(), True),
    StructField("country", StringType(), True),
    StructField("customer_id", StringType(), True),
    StructField("merchant_long_lat", StringType(), True),
    StructField("movement", StringType(), True)
])

# --- 2. BRONZE LAYER ---
# Raw data ingested "as-is" with audit columns. Stored in ADLS

@dlt.table(
  name="transactions_bronze",
  comment="Raw transaction data from CSV scource via AutoLoader. Stored in ADLS.",
  table_properties={"quality":"bronze"}
)
def transactions_bronze():
    return(
        spark.readStream.format("cloudFiles")
        .option("cloudFiles.format","csv")
        .option("header","true")
        .schema(transaction_schema)
        .option("cloudFiles.schemaLocation",TRANSACTIONS_BRONZE_SCHEMA_LOCATION)
        .option("badRecordsPath","abfss://datalake@anzbanking.dfs.core.windows.net/dlt_pipelines/customer_analytics/bad_records/transactions_bronze/")
        .load(TRANSACTIONS_CSV_SOURCE_PATH)
        .withColumn("file_name",F.input_file_name())
        .withColumn("ingest_timestamp",F.current_timestamp())
    )

# --- 3. SILVER LAYER ---
# Cleaned, typed, and validated transaction data. Stored in ADLS.

@dlt.table(
    name="transactions_silver",
    comment="Cleaned, typed, and validated transaction data. Stored in ADLS.",
    table_properties={"quality": "silver"}
)
@dlt.expect_or_drop("valid_customer_id", "customer_id IS NOT NULL")
@dlt.expect_or_drop("valid_transaction_id", "transaction_id IS NOT NULL")
@dlt.expect("positive_debit_amount", "NOT (LOWER(movement) = 'debit' AND CAST(amount AS DOUBLE) <= 0)")
def transactions_silver():
    df_bronze = dlt.read_stream("transactions_bronze")
    return (
        df_bronze
        .withColumn("card_present_flag_bool", F.col("card_present_flag").cast(BooleanType()))
        .withColumn("balance_double", F.col("balance").cast(DoubleType()))
        .withColumn("transaction_date", F.to_date(F.col("date"), "M/d/yyyy"))
        .withColumn("age_int", F.col("age").cast(IntegerType()))
        .withColumn("extraction_ts", F.to_timestamp(F.col("extraction")))
        .withColumn("amount_double", F.col("amount").cast(DoubleType()))
        .select(
            F.col("transaction_id"), F.col("customer_id"), F.col("account"),
            F.col("transaction_date"),
            F.col("txn_description"),
            F.col("movement"),
            F.col("amount_double").alias("amount"),
            F.col("currency"), F.col("status"),
            F.col("balance_double").alias("balance_after_txn"),
            F.col("card_present_flag_bool").alias("card_present"),
            F.col("merchant_id"), F.col("merchant_suburb"), F.col("merchant_state"), F.col("merchant_long_lat"),
            F.col("first_name"), F.col("gender"), F.col("age_int").alias("age"),
            F.col("bpay_biller_code"), F.col("long_lat").alias("customer_long_lat"),
            F.col("country"), F.col("extraction_ts"),
            F.col("file_name"), F.col("ingest_timestamp")
        )
        .filter(F.lower(F.col("movement")) == "debit")
        .filter(F.col("amount") > 0)
        .filter(F.lower(F.col("status")).isin(["posted", "authorized"]))
        .dropDuplicates(["transaction_id"])
    )

    
# --- 4. GOLD LAYER ---
# Consolidated and deduplicated customer demographic information, derived from transactions. Stored in ADLS.


@dlt.table(
    name="customer_demographics_gold",
    comment="Consolidated and deduplicated customer demographic information, derived from transactions. Stored in ADLS.",
    table_properties={"quality": "gold"}
)
def customer_demographics_gold():
    df_transactions_silver = dlt.read_stream("transactions_silver")
    
    customer_demographics = df_transactions_silver.groupBy("customer_id").agg(
        F.first("first_name").alias("first_name"),
        F.first("gender").alias("gender"),
        F.max("age").alias("age"), 
        F.first("customer_long_lat").alias("customer_long_lat"),
        F.first("country").alias("country"),
        F.max("transaction_date").alias("last_seen_date")
    )

    return customer_demographics

# --------------------
# Aggregated customer monetary features. Stored in ADLS.
@dlt.table(
    name="customer_monetary_features_gold",
    comment="Aggregated customer monetary features and transaction behaviors.",
    table_properties={"quality": "gold"}
)
def customer_monetary_features_gold():
    df_silver = dlt.read_stream("transactions_silver")
    df_monetary_features = df_silver.groupBy("customer_id").agg(
        F.sum("amount").alias("total_spend"),
        F.avg("amount").alias("avg_transaction_amount"),
        F.expr("percentile_approx(amount, 0.5)").alias("median_transaction_amount"),
        F.max("amount").alias("max_transaction_amount"),
        F.min("amount").alias("min_transaction_amount"),
        F.stddev_samp("amount").alias("stddev_transaction_amount")
    )
    return df_monetary_features


# -----------
# Aggregated customer transaction frequency features. Stored in ADLS.

@dlt.table(
    name="customer_frequency_features_gold",
    comment="Aggregated customer transaction frequency features.",
    table_properties={"quality": "gold"}
)
def customer_frequency_features_gold():
    df_silver = dlt.read_stream("transactions_silver")

    df_frequency_features = df_silver.groupBy("customer_id").agg(
        F.count("*").alias("transaction_count"),
        F.approx_count_distinct("transaction_date").alias("unique_transaction_days")
    )

    return df_frequency_features




# -------------
# Function to categorize transaction descriptions (keep as is)
def categorize_txn_description(txn_desc_col):
    return (
        F.when(F.lower(txn_desc_col).like("%pos%"), "Point of Sale")          
        .when(F.lower(txn_desc_col).like("%sales%"), "Point of Sale")
        .when(F.lower(txn_desc_col).like("%payment%"), "Payment/Transfer")
        .when(F.lower(txn_desc_col).like("%inter bank%"), "Inter-Bank Transfer")
        .when(F.lower(txn_desc_col).like("%phone bank%"), "Phone Banking")
        .otherwise("Other")
    )

# Aggregated customer transaction recency features. Stored in ADLS.
@dlt.table(
    name="customer_recency_features_gold",
    comment="Aggregated customer transaction recency features including transactions per day.",
    table_properties={"quality": "gold"}
)
def customer_recency_features_gold():
    df_silver = dlt.read_stream("transactions_silver")

    df_silver_with_event_ts = df_silver.withColumn(
        "event_timestamp", F.to_timestamp(F.col("transaction_date"))
    )

    df_silver_watermarked = df_silver_with_event_ts.withWatermark("event_timestamp", "1 day")

    df_silver_categorized = df_silver_watermarked.withColumn(
        "txn_category", categorize_txn_description(F.col("txn_description"))
    )

    df_recency_agg = df_silver_categorized.groupBy("customer_id").agg(
        F.max("transaction_date").alias("last_transaction_date"), 
        F.min("transaction_date").alias("first_transaction_date")
    )

    df_recency_final = df_recency_agg.withColumn(
        "recency_days", F.datediff(F.current_date(), F.col("last_transaction_date"))
    ).withColumn(
        "customer_tenure_days", F.datediff(F.col("last_transaction_date"), F.col("first_transaction_date"))
    )

    return df_recency_final


# ------
# Aggregated customer spending features including unique merchants visited, 
# unique merchant states visited, unique transaction categories used, 
# and card involvement in transactions. Stored in ADLS.
@dlt.table(
    name="customer_spending_features_gold",
    comment="Aggregated customer spending features.",
    table_properties={"quality": "gold"}
)
def customer_spending_features_gold():
    df_silver = dlt.read_stream("transactions_silver")
    
    df_silver_with_event_ts = df_silver.withColumn(
        "event_timestamp", F.to_timestamp(F.col("transaction_date"))
    )

    df_silver_watermarked = df_silver_with_event_ts.withWatermark("event_timestamp", "1 day")
    df_silver_categorized = df_silver_watermarked.withColumn(
        "txn_category", categorize_txn_description(F.col("txn_description"))
    )
    
    categories_to_pivot = [
        "Point of Sale",
        "Payment/Transfer",
        "Inter-Bank Transfer",
        "Phone Banking"
        
    ]

    pivot_expressions = [
        F.sum(F.when(F.col("txn_category") == category, F.col("amount")).otherwise(0)).alias(
            f"spend_{category.lower().replace(' ', '_').replace('/', '_')}"
        )
        for category in categories_to_pivot
    ]
    df_combined_agg = df_silver_categorized.groupBy("customer_id").agg(
        F.approx_count_distinct("merchant_id").alias("unique_merchants_visited"),
        F.approx_count_distinct("merchant_state").alias("unique_merchant_states_visited"),
        F.approx_count_distinct("txn_category").alias("unique_txn_categories_used"),
        F.count(F.when(F.col("card_present") == True, 1)).alias("card_involved_transactions"),
        F.count(F.when(F.col("card_present") == False , 1)).alias("card_not_involved_transactions"), 
        *pivot_expressions
    )

    return df_combined_agg


# -----------
#Consolidated customer profile combining demographics, monetary, recency, frequency, and spending pattern features. Stored in ADLS
@dlt.table(
    name = "customer_master_profile_gold",
    comment="Consolidated customer profile combining demographics, monetary, recency, frequency, and spending pattern features.",
    table_properties={"quality": "gold"}
)

def customer_master_profile_gold():
    df_demographics = dlt.read("customer_demographics_gold")\
        .select("customer_id", "first_name", "gender", "age", "customer_long_lat", "country")
    df_monetary = dlt.read("customer_monetary_features_gold")\
        .select("customer_id", "total_spend", "avg_transaction_amount", "median_transaction_amount", "max_transaction_amount", "min_transaction_amount", "stddev_transaction_amount")
    df_frequency = dlt.read("customer_frequency_features_gold")\
        .select("customer_id","transaction_count", "unique_transaction_days") 
    df_recency = dlt.read("customer_recency_features_gold")\
        .select("customer_id", "last_transaction_date", "first_transaction_date", "customer_tenure_days", "recency_days")
    df_spending_patterns = dlt.read("customer_spending_features_gold")\
        .select("customer_id","unique_merchants_visited","unique_merchant_states_visited","unique_txn_categories_used","card_involved_transactions","card_not_involved_transactions")
    
    df_silver = dlt.read("transactions_silver")

    df_silver_categorized = df_silver.withColumn(
        "txn_category", categorize_txn_description(F.col("txn_description"))
    )

    df_spending_areas_agg = df_silver_categorized.groupBy("customer_id").agg(
        F.sum(F.when(F.col("txn_category") == "Point of Sale", F.col("amount")).otherwise(0)).alias("spend_pos")
        ,F.sum(F.when(F.col("txn_category") == "Payment/Transfer", F.col("amount")).otherwise(0)).alias("spend_payment_transfer")
        ,F.sum(F.when(F.col("txn_category") == "Inter-Bank Transfer", F.col("amount")).otherwise(0)).alias("spend_inter_bank_transfer")
        ,F.sum(F.when(F.col("txn_category") == "Phone Banking", F.col("amount")).otherwise(0)).alias("spend_phone_banking"),
        F.count(F.when(F.col("txn_category") == "Point of Sale", 1)).alias("count_pos"),
        F.count(F.when(F.col("txn_category") == "Payment/Transfer", 1)).alias("count_payment_transfer"),
        F.count(F.when(F.col("txn_category") == "Inter-Bank Transfer", 1)).alias("count_inter_bank_transfer"),
        F.count(F.when(F.col("txn_category") == "Phone Banking", 1)).alias("count_phone_banking")
    
    )

    df_profile = df_demographics\
        .join(df_monetary, "customer_id", "left_outer")\
        .join(df_recency, "customer_id", "left_outer")\
        .join(df_frequency, "customer_id", "left_outer")\
        .join(df_spending_patterns, "customer_id", "left_outer")\
        .join(df_spending_areas_agg, "customer_id", "left_outer")
    
    return df_profile


# ------------
# Customer segmentation flags for various marketing campaigns using a Rule based approach
@dlt.table(
    name="customer_campaign_segmentation_gold",
    comment="Customer segmentation flags for various marketing campaigns using a rule based approach.",
    table_properties={"quality": "gold"}
)
def customer_campaign_segmentation_gold():
    df_profiles = dlt.read("customer_master_profile_gold")

    # --- Define Thresholds
    RECENT_DAYS_THRESHOLD_GENERAL = 7 

    # Premium Credit Card Thresholds
    PREMIUM_CC_TOTAL_SPEND = 8000
    PREMIUM_CC_AVG_TXN_AMOUNT = 55
    PREMIUM_CC_TXN_COUNT = 80
    PREMIUM_CC_MIN_AGE = 26 
    PREMIUM_CC_MAX_AGE = 65

    # Standard Credit Card Thresholds
    STANDARD_CC_MIN_TOTAL_SPEND = 3500
    STANDARD_CC_MAX_TOTAL_SPEND = 7999 
    STANDARD_CC_TXN_COUNT = 60
    STANDARD_CC_POS_SPEND_RATIO = 0.40 
    STANDARD_CC_MIN_AGE = 20
    STANDARD_CC_MAX_AGE = 68

    # Personal Loan Thresholds
    LOAN_MIN_TENURE_DAYS = 80
    LOAN_TOTAL_SPEND_CAPACITY = 6500
    LOAN_MAX_TXN_AMOUNT_INDICATOR = 900
    LOAN_PAYMENT_TRANSFER_SPEND_RATIO_INDICATOR = 0.10
    LOAN_INTER_BANK_TRANSFER_SPEND_RATIO_INDICATOR = 0.05
    LOAN_MIN_AGE = 25
    LOAN_MAX_AGE = 50 

    # Potential Investor Thresholds
    INVESTOR_TOTAL_SPEND = 8500
    INVESTOR_MEDIAN_TXN_AMOUNT = 30 
    INVESTOR_INTER_BANK_TRANSFER_SPEND_RATIO = 0.04
    INVESTOR_PAYMENT_TRANSFER_SPEND_RATIO = 0.05
    INVESTOR_MIN_AGE = 28
    INVESTOR_MIN_TENURE_DAYS = 70

    # Life Insurance Thresholds
    LIFE_INS_MIN_AGE = 28
    LIFE_INS_MAX_AGE = 55
    LIFE_INS_MIN_TENURE_DAYS = 80 
    LIFE_INS_MIN_TOTAL_SPEND = 5000
    LIFE_INS_LOAN_CANDIDATE_PROXY = True
    LIFE_INS_MAX_TXN_PROXY = 2000
    LIFE_INS_PAYMENT_TRANSFER_PROXY = 1000


    df_profiles_with_flags = df_profiles.withColumn(
        "is_genuinely_recent", 
        F.when(
            F.datediff(F.lit("2018-10-31"), F.col("last_transaction_date")) <= RECENT_DAYS_THRESHOLD_GENERAL,
            True
        ).otherwise(False)
    )

    df_profiles_with_flags = df_profiles_with_flags.withColumn(
        "is_premium_cc_candidate",
        F.when(
            (F.col("total_spend") >= PREMIUM_CC_TOTAL_SPEND) &
            (F.col("avg_transaction_amount") >= PREMIUM_CC_AVG_TXN_AMOUNT) &
            (F.col("transaction_count") >= PREMIUM_CC_TXN_COUNT) &
            (F.col("is_genuinely_recent") == True) &
            (F.col("age").between(PREMIUM_CC_MIN_AGE, PREMIUM_CC_MAX_AGE)),
            True
        ).otherwise(False)
    )

    df_profiles_with_flags = df_profiles_with_flags.withColumn(
        "is_standard_cc_candidate",
        F.when(
            (F.col("is_premium_cc_candidate") == False) & 
            (F.col("total_spend").between(STANDARD_CC_MIN_TOTAL_SPEND, STANDARD_CC_MAX_TOTAL_SPEND)) &
            (F.col("transaction_count") >= STANDARD_CC_TXN_COUNT) &
            ((F.col("spend_pos") / F.greatest(F.col("total_spend"), F.lit(1.0))) >= STANDARD_CC_POS_SPEND_RATIO) & 
            (F.col("is_genuinely_recent") == True) &
            (F.col("age").between(STANDARD_CC_MIN_AGE, STANDARD_CC_MAX_AGE)),
            True
        ).otherwise(False)
    )

    df_profiles_with_flags = df_profiles_with_flags.withColumn(
        "is_personal_loan_candidate",
        F.when(
            (F.col("customer_tenure_days") >= LOAN_MIN_TENURE_DAYS) &
            (F.col("total_spend") >= LOAN_TOTAL_SPEND_CAPACITY) &
            (F.col("age").between(LOAN_MIN_AGE, LOAN_MAX_AGE)) &
            (F.col("is_genuinely_recent") == True) &
            (
                (F.col("max_transaction_amount") >= LOAN_MAX_TXN_AMOUNT_INDICATOR) |
                (F.col("spend_payment_transfer") >= (F.col("total_spend") * LOAN_PAYMENT_TRANSFER_SPEND_RATIO_INDICATOR)) |
                (F.col("spend_inter_bank_transfer") >= (F.col("total_spend") * LOAN_INTER_BANK_TRANSFER_SPEND_RATIO_INDICATOR))
            ),
            True
        ).otherwise(False)
    )

    df_profiles_with_flags = df_profiles_with_flags.withColumn(
        "is_investor_candidate",
        F.when(
            (F.col("total_spend") >= INVESTOR_TOTAL_SPEND) &
            (F.col("median_transaction_amount") >= INVESTOR_MEDIAN_TXN_AMOUNT) &
            (
                (F.col("spend_inter_bank_transfer") > (F.col("total_spend") * INVESTOR_INTER_BANK_TRANSFER_SPEND_RATIO)) |
                (F.col("spend_payment_transfer") > (F.col("total_spend") * INVESTOR_PAYMENT_TRANSFER_SPEND_RATIO))
            ) &
            (F.col("age") >= INVESTOR_MIN_AGE) &
            (F.col("customer_tenure_days") >= INVESTOR_MIN_TENURE_DAYS),
            True
        ).otherwise(False)
    )

    df_final_segmentation = df_profiles_with_flags.withColumn(
        "is_life_insurance_prospect",
        F.when(
            (F.col("age").between(LIFE_INS_MIN_AGE, LIFE_INS_MAX_AGE)) &
            (F.col("customer_tenure_days") >= LIFE_INS_MIN_TENURE_DAYS) &
            (F.col("total_spend") >= LIFE_INS_MIN_TOTAL_SPEND) &
            (
                (F.col("is_personal_loan_candidate") == True) |
                (F.col("max_transaction_amount") >= LIFE_INS_MAX_TXN_PROXY) |
                (F.col("spend_payment_transfer") >= LIFE_INS_PAYMENT_TRANSFER_PROXY)
            ) &
            (F.col("is_genuinely_recent") == True),
            True
        ).otherwise(False)
    )
    return df_final_segmentation