In [0]:
# Databricks notebook source
# =======================================================================================
# 05_fraud_summary_pro
#
# Description:
#   1. Reads the daily transaction-level fraud predictions generated by the ML model.
#   2. Derives necessary time-based features (hour, day_of_week) from the timestamp.
#   3. Computes key performance indicators (KPIs) and business metrics by aggregating
#      the prediction data. This includes fraud rates by count and amount, total value
#      at risk, and average model scores.
#   4. Writes the final, aggregated summary table to a curated Delta table in a clean,
#      idempotent manner using dynamic partition overwrites.
#   5. Generates a business-focused visualization of the hourly fraud summary.
#
# What's New (Professional Enhancements):
#   - SIMPLICITY: Removed complex, defensive code for column normalization, as the
#     input schema from the previous notebook is now standardized and trusted.
#   - ENHANCED METRICS: Calculates more valuable business KPIs, such as fraud rate
#     by transaction value, not just by count.
#   - IDEMPOTENT WRITES: Uses dynamic partition overwrites to ensure that re-running
#     the job for a specific day safely replaces the data without creating duplicates.
#   - MODULARITY: Encapsulates logic into functions for clarity and reusability.
#   - VISUALIZATION: Adds a clear bar chart to visualize the financial impact of fraud.
# =======================================================================================

from pyspark.sql import SparkSession, functions as F
from datetime import datetime, timedelta, timezone
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd

# --------------------------------------------------------------------------------------
# 1. Setup & Parameters
# --------------------------------------------------------------------------------------

spark = SparkSession.builder.appName("FraudSummary").getOrCreate()
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")

# --- Widgets for parameterization ---
dbutils.widgets.text("ingest_date", "", "YYYY-MM-DD (leave blank to auto-detect latest)")
dbutils.widgets.text("prediction_container", "prediction", "ADLS container for prediction data")
dbutils.widgets.text("curated_container", "curated", "ADLS container for summary data")

# --- Read parameters from widgets ---
raw_ingest_date = dbutils.widgets.get("ingest_date")
prediction_container = dbutils.widgets.get("prediction_container")
curated_container = dbutils.widgets.get("curated_container")
storage_account = "finlakeadlsa3b3"
scope = "finlake_scope"

# --- Define paths ---
prediction_path = f"abfss://{prediction_container}@{storage_account}.dfs.core.windows.net/delta/predicted_fraud"
curated_summary_path = f"abfss://{curated_container}@{storage_account}.dfs.core.windows.net/delta/fraud_summary_hourly"

# --------------------------------------------------------------------------------------
# 2. Helper Functions
# --------------------------------------------------------------------------------------

def setup_spark_adls_auth(spark, storage_account, scope):
    """Configures Spark to authenticate with ADLS Gen2 using Key Vault secrets."""
    print(f"Configuring authentication for storage account: {storage_account}...")
    client_id = dbutils.secrets.get(scope=scope, key="finlake-sp-client-id")
    tenant_id = dbutils.secrets.get(scope=scope, key="finlake-sp-tenant-id")
    client_secret = dbutils.secrets.get(scope=scope, key="finlake-sp-client-secret")
    
    spark.conf.set(f"fs.azure.account.auth.type.{storage_account}.dfs.core.windows.net", "OAuth")
    spark.conf.set(f"fs.azure.account.oauth.provider.type.{storage_account}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
    spark.conf.set(f"fs.azure.account.oauth2.client.id.{storage_account}.dfs.core.windows.net", client_id)
    spark.conf.set(f"fs.azure.account.oauth2.client.secret.{storage_account}.dfs.core.windows.net", client_secret)
    spark.conf.set(f"fs.azure.account.oauth2.client.endpoint.{storage_account}.dfs.core.windows.net", f"https://login.microsoftonline.com/{tenant_id}/oauth2/token")
    print("Authentication configured successfully.")

def get_ingest_date(widget_date, source_path):
    """Determines the target ingest date, using the widget value or inferring the latest from the source."""
    if widget_date and widget_date.strip():
        print(f"Using provided ingest_date from widget: {widget_date}")
        return widget_date.strip()
    else:
        print(f"Widget for ingest_date is empty. Auto-detecting latest date from: {source_path}")
        try:
            latest_date = (
                spark.read.format("delta").load(source_path)
                .select(F.max("ingest_date"))
                .collect()[0][0]
            )
            if latest_date:
                print(f"Auto-detected latest ingest_date: {latest_date}")
                return latest_date
        except Exception as e:
            print(f"Warning: Could not auto-detect latest date. Error: {e}")

        fallback_date = (datetime.now(timezone.utc) - timedelta(days=1)).strftime("%Y-%m-%d")
        print(f"Falling back to yesterday's date (UTC): {fallback_date}")
        return fallback_date

def create_summary_visualization(summary_df, ingest_date):
    """Converts summary data to Pandas and creates a bar chart visualization."""
    print("Generating summary visualization...")
    
    # The summary DataFrame is small, so .toPandas() is safe
    pdf = summary_df.orderBy("hour").toPandas()
    
    # --- Plotting Setup ---
    plt.style.use('seaborn-v0_8-whitegrid')
    fig, ax = plt.subplots(figsize=(14, 7))

    # --- Bar Chart ---
    # Plot total transaction amount in a light color
    sns.barplot(x="hour", y="total_transaction_amount", data=pdf, color="lightblue", label="Total Amount ($)")
    
    # Overlay the fraud amount in a darker color
    sns.barplot(x="hour", y="total_fraud_amount", data=pdf, color="salmon", label="Fraud Amount ($)")

    # --- Formatting ---
    ax.set_title(f"Total vs. Fraud Transaction Amount by Hour for {ingest_date}", fontsize=16, fontweight='bold')
    ax.set_xlabel("Hour of Day", fontsize=12)
    ax.set_ylabel("Transaction Amount ($)", fontsize=12)
    
    # Format Y-axis to be readable (e.g., $2.5M)
    formatter = plt.FuncFormatter(lambda x, pos: f'${x/1e6:.1f}M')
    ax.yaxis.set_major_formatter(formatter)
    
    ax.legend()
    plt.tight_layout()
    
    # Display the plot in the Databricks notebook
    display(fig)

# --------------------------------------------------------------------------------------
# 3. Main ETL Logic
# --------------------------------------------------------------------------------------

def run_summary_job(ingest_date):
    """Loads prediction data, computes summaries, and writes to a curated Delta table."""
    
    # --- Load Data ---
    print(f"Loading prediction data for ingest_date = {ingest_date}")
    df_pred = (
        spark.read.format("delta")
        .load(prediction_path)
        .filter(F.col("ingest_date") == ingest_date)
    )

    if df_pred.rdd.isEmpty():
        dbutils.notebook.exit(f"No prediction data found for ingest_date = {ingest_date}. Job exiting.")
    
    print(f"Loaded {df_pred.count()} records.")

    # --- Feature Derivation for Aggregation ---
    df_with_features = df_pred.withColumn("hour", F.hour("detection_ts")).withColumn("day_of_week", F.date_format("detection_ts", "E"))
    
    df_with_fraud_amount = df_with_features.withColumn(
        "fraud_amount",
        F.when(F.col("predicted_fraud") == 1, F.col("Amount")).otherwise(0)
    )

    # --- Compute Hourly Summary ---
    print("Computing hourly fraud summary...")
    df_summary = (
        df_with_fraud_amount.groupBy("ingest_date", "hour", "day_of_week")
        .agg(
            F.count("*").alias("total_transactions"),
            F.sum("predicted_fraud").alias("fraud_transactions"),
            F.sum("Amount").alias("total_transaction_amount"),
            F.sum("fraud_amount").alias("total_fraud_amount"),
            F.avg("score").alias("avg_fraud_score"),
            F.avg(F.when(F.col("predicted_fraud") == 1, F.col("score"))).alias("avg_score_of_flagged_frauds")
        )
        .withColumn("fraud_rate_by_count", F.col("fraud_transactions") / F.col("total_transactions"))
        .withColumn("fraud_rate_by_amount", F.col("total_fraud_amount") / F.col("total_transaction_amount"))
        .withColumn("summary_ts", F.current_timestamp())
    )

    print("Summary computed successfully.")
    
    # --- Write to Curated Delta Table ---
    print(f"Writing summary data to: {curated_summary_path}")
    (
        df_summary.write
        .format("delta")
        .mode("overwrite")
        .partitionBy("ingest_date")
        .save(curated_summary_path)
    )
    print("Write operation completed successfully.")
    
    # --- Visualization Step ---
    create_summary_visualization(df_summary, ingest_date)
    
# --------------------------------------------------------------------------------------
# 4. Job Execution
# --------------------------------------------------------------------------------------

if __name__ == "__main__":
    setup_spark_adls_auth(spark, storage_account, scope)
    target_ingest_date = get_ingest_date(raw_ingest_date, prediction_path)
    run_summary_job(target_ingest_date)
    print("Fraud summary job finished.")