In [0]:
from pyspark.sql import functions as F

# 1) Read Silver table
silver_df = spark.table("catalog_anushka.pdf_fin_silver.pdf_silver")

# 2) Gold transformation
gold_df = (
    silver_df
    # IBAN masking: keep first 4 & last 4, mask the middle
    .withColumn(
        "iban_masked",
        F.when(
            F.col("iban").isNotNull(),
            F.concat(
                F.substring("iban", 1, 4),
                F.lit("XXXXXX"),
                F.substring("iban", -4, 4)
            )
        )
    )
    # Direction: debit vs credit
    .withColumn(
        "direction",
        F.when(F.col("amount") < 0, F.lit("DEBIT")).otherwise(F.lit("CREDIT"))
    )
    # Absolute amount for analytics
    .withColumn("amount_abs", F.abs(F.col("amount")))
    # Simple large transaction flag (tune threshold as needed)
    .withColumn("is_large_tx", F.col("amount_abs") > F.lit(1000.0))
    # Optional: standardized category names
    .withColumn(
        "category_std",
        F.when(F.col("category") == "LEBENSHALTUNG", F.lit("GROCERIES"))
         .when(F.col("category") == "FREIZEIT", F.lit("LEISURE"))
         .when(F.col("category") == "ONLINE", F.lit("ONLINE_SHOPPING"))
         .otherwise(F.col("category"))
    )
    # Drop raw IBAN from Gold (PII)
    .drop("iban")
)

# 3) Create Gold schema if not exists
spark.sql("""
    CREATE SCHEMA IF NOT EXISTS catalog_anushka.pdf_fin_gold
""")

# 4) Write Gold Delta table
gold_df.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("catalog_anushka.pdf_fin_gold.pdf_gold")


In [0]:
# --------------------------------------------------
# 1️⃣ EXECUTIVE SUMMARY KPIs (PHYSICAL TABLES)
# --------------------------------------------------

# ✅ Total Income
spark.sql("""
    CREATE OR REPLACE TABLE catalog_anushka.pdf_fin_gold.kpi_total_income AS
    SELECT 
        SUM(amount_abs) AS total_income
    FROM catalog_anushka.pdf_fin_gold.pdf_gold
    WHERE direction = 'CREDIT'
""")

# ✅ Total Expenses
spark.sql("""
    CREATE OR REPLACE TABLE catalog_anushka.pdf_fin_gold.kpi_total_expenses AS
    SELECT 
        SUM(amount_abs) AS total_expenses
    FROM catalog_anushka.pdf_fin_gold.pdf_gold
    WHERE direction = 'DEBIT'
""")

# ✅ Net Savings / Net Cash Flow
spark.sql("""
    CREATE OR REPLACE TABLE catalog_anushka.pdf_fin_gold.kpi_net_savings AS
    SELECT
        SUM(CASE WHEN direction = 'CREDIT' THEN amount_abs ELSE 0 END)
      - SUM(CASE WHEN direction = 'DEBIT' THEN amount_abs ELSE 0 END)
        AS net_savings
    FROM catalog_anushka.pdf_fin_gold.pdf_gold
""")

# ✅ Savings Rate (%)
spark.sql("""
    CREATE OR REPLACE TABLE catalog_anushka.pdf_fin_gold.kpi_savings_rate AS
    SELECT
        ROUND(
            (
                (
                    SUM(CASE WHEN direction = 'CREDIT' THEN amount_abs ELSE 0 END)
                  - SUM(CASE WHEN direction = 'DEBIT' THEN amount_abs ELSE 0 END)
                )
                /
                SUM(CASE WHEN direction = 'CREDIT' THEN amount_abs ELSE 0 END)
            ) * 100, 2
        ) AS savings_rate_percent
    FROM catalog_anushka.pdf_fin_gold.pdf_gold
""")

# ✅ Total Transactions
spark.sql("""
    CREATE OR REPLACE TABLE catalog_anushka.pdf_fin_gold.kpi_total_transactions AS
    SELECT COUNT(*) AS total_transactions
    FROM catalog_anushka.pdf_fin_gold.pdf_gold
""")

# --------------------------------------------------
# 2️⃣ TIME SERIES KPIs
# --------------------------------------------------

# ✅ Daily Income vs Expense
spark.sql("""
    CREATE OR REPLACE TABLE catalog_anushka.pdf_fin_gold.kpi_daily_income_vs_expense AS
    SELECT
        booking_date,
        SUM(CASE WHEN direction = 'CREDIT' THEN amount_abs ELSE 0 END) AS daily_income,
        SUM(CASE WHEN direction = 'DEBIT' THEN amount_abs ELSE 0 END) AS daily_expense
    FROM catalog_anushka.pdf_fin_gold.pdf_gold
    GROUP BY booking_date
    ORDER BY booking_date
""")

# ✅ Monthly Income vs Expense
spark.sql("""
    CREATE OR REPLACE TABLE catalog_anushka.pdf_fin_gold.kpi_monthly_income_vs_expense AS
    SELECT
        DATE_FORMAT(booking_date, 'yyyy-MM') AS month,
        SUM(CASE WHEN direction = 'CREDIT' THEN amount_abs ELSE 0 END) AS income,
        SUM(CASE WHEN direction = 'DEBIT' THEN amount_abs ELSE 0 END) AS expense
    FROM catalog_anushka.pdf_fin_gold.pdf_gold
    GROUP BY month
    ORDER BY month
""")

# ✅ Running Balance (Cumulative)
spark.sql("""
    CREATE OR REPLACE TABLE catalog_anushka.pdf_fin_gold.kpi_running_balance AS
    SELECT
        booking_date,
        SUM(
            CASE 
                WHEN direction = 'CREDIT' THEN amount_abs
                ELSE -amount_abs
            END
        ) OVER (ORDER BY booking_date) AS running_balance
    FROM catalog_anushka.pdf_fin_gold.pdf_gold
    ORDER BY booking_date
""")

# --------------------------------------------------
# 3️⃣ CATEGORY & MERCHANT KPIs
# --------------------------------------------------

# ✅ Expense by Category
spark.sql("""
    CREATE OR REPLACE TABLE catalog_anushka.pdf_fin_gold.kpi_category_expense AS
    SELECT
        category_std AS category,
        SUM(amount_abs) AS total_expense
    FROM catalog_anushka.pdf_fin_gold.pdf_gold
    WHERE direction = 'DEBIT'
    GROUP BY category_std
    ORDER BY total_expense DESC
""")

# ✅ Top 10 Merchants by Spend
spark.sql("""
    CREATE OR REPLACE TABLE catalog_anushka.pdf_fin_gold.kpi_top_merchants AS
    SELECT
        description AS merchant,
        SUM(amount_abs) AS total_spend
    FROM catalog_anushka.pdf_fin_gold.pdf_gold
    WHERE direction = 'DEBIT'
    GROUP BY description
    ORDER BY total_spend DESC
    LIMIT 10
""")

# --------------------------------------------------
# 4️⃣ RISK & BEHAVIOR KPIs
# --------------------------------------------------

# ✅ Burn Rate (Average Daily Expense)
spark.sql("""
    CREATE OR REPLACE TABLE catalog_anushka.pdf_fin_gold.kpi_burn_rate AS
    SELECT
        ROUND(AVG(amount_abs), 2) AS avg_daily_burn
    FROM catalog_anushka.pdf_fin_gold.pdf_gold
    WHERE direction = 'DEBIT'
""")

# ✅ Large Transaction Count
spark.sql("""
    CREATE OR REPLACE TABLE catalog_anushka.pdf_fin_gold.kpi_large_tx_count AS
    SELECT COUNT(*) AS large_tx_count
    FROM catalog_anushka.pdf_fin_gold.pdf_gold
    WHERE is_large_tx = TRUE
""")

# --------------------------------------------------
# ✅ ALL KPI TABLES MATERIALIZED
# --------------------------------------------------

DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]