In [0]:
# Fraud Detection Model that will scan through transaction records and identify suspicious transactions
import pandas as pd
from pyspark.sql.functions import col, sum, avg, count, when, lit, current_timestamp, date_format, hour, dayofmonth
from pyspark.sql.window import Window
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# --- 1. Define Database and Table Names, and Read Existing Data ---
# IMPORTANT: Ensure these database and table names match your actual setup in Databricks.
# The 'customers' and 'transactions' tables are assumed to already exist with the specified schema.

database_name = "fraud_detection_db" # Example: "your_data_warehouse"
customer_table_name = "customers"
transactions_table_name = "transactions"

# Read existing Customer Data from the Delta Table
try:
    # Expected Customer Table Schema:
    # - Id (LongType)
    # - First Name (StringType)
    # - Last Name (StringType)
    # - Age (IntegerType)
    # - Location (StringType)
    # - Annual Income (DoubleType)
    # - Debt-To-Income Ratio (DTI) (DoubleType)
    # - Loan-to-Value Ratio (LTV) (DoubleType)
    # - Average Monthly Spending (DoubleType)
    # - Credit Score (IntegerType)
    spark_customer_df = spark.read.format("delta").table(f"{database_name}.{customer_table_name}")
    print(f"Customer table '{database_name}.{customer_table_name}' loaded successfully.")
except Exception as e:
    print(f"Error loading customer table: {e}")
    print("Please ensure the database and table exist and are accessible with the correct schema.")
    raise # Re-raise the exception to stop execution if tables are not found

# Read existing Transactions Data from the Delta Table
try:
    # Expected Transactions Table Schema:
    # - Customer ID (LongType)
    # - Transaction Date (TimestampType)
    # - Amount (DoubleType)
    # - Recipient (StringType)
    # - Device Type (StringType)
    spark_transactions_df = spark.read.format("delta").table(f"{database_name}.{transactions_table_name}")
    print(f"Transactions table '{database_name}.{transactions_table_name}' loaded successfully.")
except Exception as e:
    print(f"Error loading transactions table: {e}")
    print("Please ensure the database and table exist and are accessible with the correct schema.")
    raise # Re-raise the exception to stop execution if tables are not found

print("\n--- Raw Customer Data (first 5 rows) ---")
spark_customer_df.show(5)

print("\n--- Raw Transactions Data (first 5 rows) ---")
spark_transactions_df.show(5, truncate=False)

# --- 2. Feature Engineering using Spark SQL ---
# Join customer and transactions, and calculate aggregate features per customer and per transaction.

# Define temporary views for easier SQL querying
spark_customer_df.createOrReplaceTempView("all_customers")
spark_transactions_df.createOrReplaceTempView("all_transactions")
print("\nTemporary views 'all_customers' and 'all_transactions' created.")

# SQL query to join tables and calculate features based on the new schema
# This query now includes features relevant for ML models.
features_df = spark.sql(f"""
    SELECT
        t."Customer ID", -- Using "Customer ID" from transactions
        c.Id AS customer_id_from_customer_table, -- Using Id from customers for join clarity
        c."First Name" AS customer_first_name,
        c."Last Name" AS customer_last_name,
        t.Amount,
        t."Transaction Date" AS transaction_timestamp, -- Alias for consistency with previous logic
        t.Recipient,
        t."Device Type",
        c.Age,
        c.Location,
        c."Annual Income",
        c."Debt-To-Income Ratio (DTI)" AS dti,
        c."Loan-to-Value Ratio (LTV)" AS ltv,
        c."Average Monthly Spending" AS avg_monthly_spending,
        c."Credit Score" AS credit_score,
        -- Global customer aggregates (using "Customer ID" from transactions for partitioning)
        SUM(t.Amount) OVER (PARTITION BY t."Customer ID") AS total_amount_per_customer,
        AVG(t.Amount) OVER (PARTITION BY t."Customer ID") AS avg_amount_per_customer,
        MAX(t.Amount) OVER (PARTITION BY t."Customer ID") AS max_amount_per_customer,
        COUNT(1) OVER (PARTITION BY t."Customer ID") AS num_transactions_per_customer, -- Count rows
        -- Rolling window features for fraud detection
        COUNT(1) OVER (
            PARTITION BY t."Customer ID"
            ORDER BY t."Transaction Date"
            RANGE BETWEEN INTERVAL 1 HOUR PRECEDING AND CURRENT ROW
        ) AS transaction_count_last_hour,
        SUM(t.Amount) OVER (
            PARTITION BY t."Customer ID"
            ORDER BY t."Transaction Date"
            RANGE BETWEEN INTERVAL 1 HOUR PRECEDING AND CURRENT ROW
        ) AS transaction_sum_last_hour,
        COUNT(1) OVER (
            PARTITION BY t."Customer ID"
            ORDER BY t."Transaction Date"
            RANGE BETWEEN INTERVAL 24 HOURS PRECEDING AND CURRENT ROW
        ) AS transaction_count_last_day,
        SUM(t.Amount) OVER (
            PARTITION BY t."Customer ID"
            ORDER BY t."Transaction Date"
            RANGE BETWEEN INTERVAL 24 HOURS PRECEDING AND CURRENT ROW
        ) AS transaction_sum_last_day,
        HOUR(t."Transaction Date") AS transaction_hour_of_day
    FROM
        all_transactions t
    JOIN
        all_customers c ON t."Customer ID" = c.Id -- Join on Customer ID and Id
    ORDER BY
        t."Customer ID", t."Transaction Date"
""")

print("\n--- Features DataFrame (first 10 rows) ---")
features_df.show(truncate=False)

# --- 3. Prepare Data for Machine Learning Model ---
# For demonstration, we'll create a dummy 'is_fraudulent' label based on some rules.
# In a real scenario, this label would come from historical fraud data.

# Apply the same rule-based logic to create a 'label' column for training
# This is a placeholder for actual historical fraud labels.
data_for_ml_df = features_df.withColumn(
    "label", # 'label' is the standard column name for target variable in Spark ML
    when(col("Amount") >= 1500.00, lit(1.0)) # Very high value transaction
    .when(col("transaction_count_last_hour") >= 4, lit(1.0)) # More than 4 transactions in an hour
    .when(col("transaction_sum_last_hour") >= 2000.00, lit(1.0)) # Sum of transactions over $2000 in an hour
    .when((col("transaction_hour_of_day") >= UNUSUAL_HOUR_START) & (col("transaction_hour_of_day") < UNUSUAL_HOUR_END) & (col("Amount") > 800), lit(1.0)) # High value at unusual hour
    .otherwise(lit(0.0)) # Not fraudulent
)

print("\n--- Data with Dummy 'label' for ML Training ---")
data_for_ml_df.select("Customer ID", "Amount", "transaction_count_last_hour", "transaction_sum_last_hour", "transaction_hour_of_day", "label").show(truncate=False)

# Define numerical and categorical features
numerical_features = [
    "Amount", "Age", "Annual Income", "dti", "ltv", "avg_monthly_spending", "credit_score",
    "total_amount_per_customer", "avg_amount_per_customer", "max_amount_per_customer",
    "num_transactions_per_customer", "transaction_count_last_hour", "transaction_sum_last_hour",
    "transaction_count_last_day", "transaction_sum_last_day", "transaction_hour_of_day"
]
categorical_features = ["Location", "Device Type", "Recipient"] # Note: Recipient might have high cardinality

# Create a Pipeline for feature processing
# StringIndexer for categorical features
indexers = [
    StringIndexer(inputCol=col, outputCol=col + "_indexed", handleInvalid="keep")
    for col in categorical_features
]

# OneHotEncoder for indexed categorical features
encoders = [
    OneHotEncoder(inputCol=indexer.getOutputCol(), outputCol=indexer.getOutputCol() + "_encoded")
    for indexer in indexers
]

# Assemble all features into a single vector column
assembler_inputs = numerical_features + [encoder.getOutputCol() for encoder in encoders]
vector_assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features", handleInvalid="keep")

# Create a Pipeline for feature transformation
feature_pipeline = Pipeline(stages=indexers + encoders + [vector_assembler])

# Fit the feature pipeline to the data and transform it
# This step creates the 'features' column required by ML models
ml_data = feature_pipeline.fit(data_for_ml_df).transform(data_for_ml_df)

print("\n--- ML Data with 'features' Vector Column (first 5 rows) ---")
ml_data.select("Customer ID", "Amount", "label", "features").show(5, truncate=False)

# --- 4. Train and Evaluate Machine Learning Model ---

# Split data into training and test sets
# In a real-world scenario, you might split by time to avoid data leakage.
train_data, test_data = ml_data.randomSplit([0.7, 0.3], seed=42)
print(f"\nTraining data count: {train_data.count()}")
print(f"Test data count: {test_data.count()}")

# Initialize Logistic Regression model
lr = LogisticRegression(featuresCol="features", labelCol="label", maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Train the model
print("\n--- Training Logistic Regression Model ---")
lr_model = lr.fit(train_data)
print("Model training complete.")

# Make predictions on the test data
predictions = lr_model.transform(test_data)

print("\n--- Predictions on Test Data (first 10 rows) ---")
predictions.select("Customer ID", "Amount", "label", "prediction", "probability").show(10, truncate=False)

# Evaluate the model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="label", metricName="areaUnderROC")
auc = evaluator.evaluate(predictions)
print(f"\nArea Under ROC (AUC) on test data: {auc}")

evaluator_accuracy = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="label", metricName="accuracy")
accuracy = evaluator_accuracy.evaluate(predictions)
print(f"Accuracy on test data: {accuracy}")

# --- 5. Apply the Trained Model for Fraud Prediction ---
# Apply the trained model to the entire dataset (or new incoming data)

# First, re-apply the feature transformation to the full dataset if not already done
# (ml_data already has features, but if you were processing new data, you'd do this)
final_predictions_df = lr_model.transform(ml_data)

# Select relevant columns and rename 'prediction' to 'is_fraudulent' for clarity
fraud_results_df = final_predictions_df.select(
    col("Customer ID"),
    col("Amount"),
    col("transaction_timestamp"),
    col("Recipient"),
    col("Device Type"),
    col("label").alias("actual_fraud_label"), # The dummy label used for training
    col("prediction").cast("boolean").alias("is_fraudulent_predicted"), # Convert 0.0/1.0 to boolean
    col("probability")[1].alias("fraud_probability") # Probability of being class 1 (fraud)
)

print("\n--- Final Fraud Predictions on All Data ---")
fraud_results_df.orderBy(col("fraud_probability").desc()).show(truncate=False)

print("\n--- Potentially Fraudulent Transactions (Predicted by ML Model) ---")
fraud_results_df.filter(col("is_fraudulent_predicted") == True).orderBy(col("fraud_probability").desc()).show(truncate=False)

# --- Optional: Save Predicted Fraudulent Transactions to a separate table ---
ml_fraud_output_table_name = "ml_potential_fraud_transactions"
fraud_results_df.filter(col("is_fraudulent_predicted") == True) \
    .write.format("delta").mode("overwrite").saveAsTable(f"{database_name}.{ml_fraud_output_table_name}")

print(f"\nML-predicted fraudulent transactions saved to '{database_name}.{ml_fraud_output_table_name}'.")

print("\n--- Verify ML Fraud Output Table ---")
spark.sql(f"SELECT * FROM {database_name}.{ml_potential_fraud_transactions}").show(truncate=False)

# --- Clean up temporary views (optional) ---
spark.sql("DROP VIEW IF EXISTS all_transactions")
spark.sql("DROP VIEW IF EXISTS all_customers")
print("\nTemporary views dropped.")