In [0]:
# Databricks notebook source
# 02_processing_uc.py - KYC Risk Processing with Unity Catalog

from pyspark.sql.functions import (
    col,
    when,
    lit,
    to_date,
    concat_ws,
    sum as _sum,
    count,
    max as _max
)
from pyspark.sql import SparkSession

# -----------------------------------------------
# Step 1: Parametrize catalog, schema and eval time
# -----------------------------------------------
dbutils.widgets.text("catalog", "governance_risk")
dbutils.widgets.text("schema", "kyc_project")
dbutils.widgets.text("evaluation_timestamp", "2025-06-10 12:00:00")

catalog = dbutils.widgets.get("catalog")
schema = dbutils.widgets.get("schema")
evaluation_timestamp = dbutils.widgets.get("evaluation_timestamp")

# -----------------------------------------------
# Step 2: Validate and set catalog/schema context
# -----------------------------------------------
catalogs = [row.catalog for row in spark.sql("SHOW CATALOGS").collect()]
if catalog not in catalogs:
    raise Exception(f"Catalog '{catalog}' not found. Please create it in the Unity Catalog UI.")

spark.sql(f"USE CATALOG {catalog}")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.{schema}")
spark.sql(f"USE SCHEMA {schema}")

# -----------------------------------------------
# Step 3: Load Delta tables from Unity Catalog (Bronze layer)
# -----------------------------------------------
clients_df = spark.read.table(f"{catalog}.{schema}.clients")
transactions_df = spark.read.table(f"{catalog}.{schema}.transactions")
high_risk_countries_df = spark.read.table(f"{catalog}.{schema}.high_risk_countries")

# -----------------------------------------------
# Step 4: Risk enrichment
# -----------------------------------------------
def enrich_risk(clients_df, transactions_df, high_risk_countries_df, evaluation_timestamp):
    joined_df = (
        transactions_df.alias("t")
        .join(clients_df.alias("c"), on="client_id", how="inner")
        .join(
            high_risk_countries_df.alias("h").withColumnRenamed("residency_country", "high_risk_country"),
            col("c.residency_country") == col("h.high_risk_country"),
            how="left"
        )
        .withColumn("is_high_risk_country", col("h.high_risk_country").isNotNull())
    )

    risk_df = (
        joined_df
        .withColumn("is_high_value_transaction", when(col("transaction_amount") > 10000.00, lit(True)).otherwise(lit(False)))
        .withColumn(
            "risk_score",
            col("is_high_risk_country").cast("int") * 1 +
            col("is_high_value_transaction").cast("int") * 1.5 
        )
        .withColumn("risk_flag", when(col("risk_score") >= 2, lit(True)).otherwise(lit(False)))
        .withColumn("evaluation_timestamp", lit(evaluation_timestamp).cast("timestamp"))
        .withColumn("evaluation_date", to_date(col("evaluation_timestamp")))
        .withColumn("event_id", concat_ws("_", col("client_id"), col("transaction_id")))
    )

    return risk_df

# -----------------------------------------------
# Step 5: Process and save Silver table to Unity Catalog
# -----------------------------------------------
risk_df = enrich_risk(clients_df, transactions_df, high_risk_countries_df, evaluation_timestamp)

risk_df.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .partitionBy("evaluation_date") \
    .saveAsTable(f"{catalog}.{schema}.client_transactions_risk")

print("✅ Silver table 'client_transactions_risk' successfully written to Unity Catalog.")


In [0]:
display(risk_df)