In [0]:
# PART 1: INGESTION
import os
from google.cloud import storage

# CONFIGURATION
DBFS_KEY_PATH = "dbfs:/FileStore/tables/gcp_key.json" 

BUCKET_NAME = "paysim-datalake"
FILE_NAME = "paysim.csv"

# Path ชั่วคราวบน Driver (Local)
LOCAL_KEY_PATH = "/tmp/gcp_key.json"
LOCAL_DATA_PATH = f"/tmp/{FILE_NAME}"
# Path บน DBFS (เพื่อให้ Spark อ่านได้)
DBFS_DATA_PATH = f"dbfs:/FileStore/raw/{FILE_NAME}"

print("Step 1: Secure Ingestion")

try:
    # 1. Setup Credentials (Copy Key from DBFS -> Local)
    dbutils.fs.cp(DBFS_KEY_PATH, f"file:{LOCAL_KEY_PATH}")
    os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = LOCAL_KEY_PATH
    print("Credentials Loaded.")

    # 2. Download Data from GCS to Local Driver
    print(f"Downloading {FILE_NAME}")
    storage_client = storage.Client()
    bucket = storage_client.bucket(BUCKET_NAME)
    blob = bucket.blob(FILE_NAME)
    blob.download_to_filename(LOCAL_DATA_PATH)
    
    # 3. Move Data to DBFS (เพื่อให้ใช้ spark.read ได้)
    dbutils.fs.mv(f"file:{LOCAL_DATA_PATH}", DBFS_DATA_PATH)
    
    # 4. Read with Spark
    print("Reading CSV with Spark")
    df = spark.read.format("csv") \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .load(DBFS_DATA_PATH)
    
    print("Ingestion successful.")

    # Preview Data
    print("Previewing Data:")
    display(df.limit(10))

In [0]:
# PART 2: BRONZE LAYER
    print("Step 1: Creating Bronze Layer")
    
    # 1. Check Schema
    print("Schema:")
    df.printSchema()
    
    # 2. Count Rows
    print(f"Total Rows: {df.count():,}")
    
    # 3. Save as Delta Table
    print("Creating Bronze Table: 'paysim_bronze'")
    spark.sql("DROP TABLE IF EXISTS paysim_bronze")
    df.write.format("delta").mode("overwrite").saveAsTable("paysim_bronze")
    
    print("Bronze Layer Created")
    
    # Cleanup Local Key
    if os.path.exists(LOCAL_KEY_PATH): os.remove(LOCAL_KEY_PATH)

except Exception as e:
    print(f"Error: {e}")
    if os.path.exists(LOCAL_KEY_PATH): os.remove(LOCAL_KEY_PATH)

In [0]:
# PART 3: SILVER LAYER (CLEANSED & QUALITY CHECK)
from pyspark.sql.functions import col, lit

print("Step 2: Creating Silver Layer")

# 1. Read from Bronze Delta
df_bronze = spark.table("paysim_bronze")

# Step A: Data Quality Check
# Rule: Amount must be non-negative. Negative values indicate system errors.
dq_condition = col("amount") >= 0

# Separate Bad Data (Quarantine) from "Valid Data"
df_valid_amount = df_bronze.filter(dq_condition)
df_quarantine = df_bronze.filter(~dq_condition).withColumn("dq_issue", lit("Negative Amount"))

# Step B: Business Scope Separation
# Rule: We strictly focus on TRANSFER and CASH_OUT for fraud detection.
# Other types (PAYMENT, DEBIT) are valid but out of scope for this model.
scope_condition = col("type").isin("TRANSFER", "CASH_OUT")

# 1. Target Data (Silver): Ready for ML processing
df_silver = df_valid_amount.filter(scope_condition)

# 2. Out-of-Scope Data (Others): Archived for future analytics
df_others = df_valid_amount.filter(~scope_condition)

# 4. Type Casting for Performance (Applied to Silver Table)
# Casting String to Double/Integer for efficient calculation
df_silver = df_silver.withColumn("amount", col("amount").cast("double")) \
                   .withColumn("oldbalanceOrg", col("oldbalanceOrg").cast("double")) \
                   .withColumn("newbalanceOrig", col("newbalanceOrig").cast("double")) \
                   .withColumn("oldbalanceDest", col("oldbalanceDest").cast("double")) \
                   .withColumn("newbalanceDest", col("newbalanceDest").cast("double")) \
                   .withColumn("isFraud", col("isFraud").cast("integer"))

# 5. Save as Delta Tables
print("Creating Silver Table (Target): 'paysim_silver'")
spark.sql("DROP TABLE IF EXISTS paysim_silver")
df_silver.write.format("delta").mode("overwrite").saveAsTable("paysim_silver")

print("Creating Others Table (Out of Scope): 'paysim_others'")
spark.sql("DROP TABLE IF EXISTS paysim_others")
df_others.write.format("delta").mode("overwrite").saveAsTable("paysim_others")

print("Creating Quarantine Table (Bad Data): 'paysim_quarantine'")
spark.sql("DROP TABLE IF EXISTS paysim_quarantine")
df_quarantine.write.format("delta").mode("overwrite").saveAsTable("paysim_quarantine")

# Print Statistics
print(f"""
Stats Summary:
- Silver (ML Ready):   {spark.table('paysim_silver').count():,} rows
- Others (Ignored):    {spark.table('paysim_others').count():,} rows
- Quarantine (Bad):    {spark.table('paysim_quarantine').count():,} rows
""")

In [0]:
# PART 4: GOLD LAYER (FEATURES & RISK)
from pyspark.sql.functions import col, when, current_date, lit, max as spark_max

print("Step 3: Creating Gold Layer")

# 1. Read from Silver Delta
df_silver = spark.table("paysim_silver")

# A: Feature Engineering
# We are adding 3 critical features for fraud detection:
# 1. errorBalance: Detects mathematical anomalies in account balances.
# 2. hourOfDay: Fraud often happens at specific hours (extracted from step).
# 3. amountRatio: Fraudsters tend to empty accounts (amount / oldBalance).

df_gold = df_silver.withColumn("errorBalanceOrig", col("newbalanceOrig") + col("amount") - col("oldbalanceOrg")) \
                   .withColumn("errorBalanceDest", col("oldbalanceDest") + col("amount") - col("newbalanceDest")) \
                   .withColumn("type_index", when(col("type") == "TRANSFER", 0).otherwise(1)) \
                   .withColumn("hourOfDay", col("step") % 24) \
                   .withColumn("amountRatio", col("amount") / (col("oldbalanceOrg") + 0.001))

# Select Final Columns for ML
final_columns = [
    "step", 
    "type_index", 
    "amount", 
    "amountRatio",      
    "hourOfDay",      
    "oldbalanceOrg", 
    "newbalanceOrig", 
    "errorBalanceOrig",
    "oldbalanceDest", 
    "newbalanceDest", 
    "errorBalanceDest",
    "isFraud"
]

df_gold_ml = df_gold.select(final_columns)

print("Creating Gold Table: 'paysim_gold'")
spark.sql("DROP TABLE IF EXISTS paysim_gold")
df_gold_ml.write.format("delta").mode("overwrite").saveAsTable("paysim_gold")

# B: Risk Profile (Customer Behavioral Snapshot)
# Identify high-risk customers based on max transaction history.
# Strategy: Daily Full Refresh (Snapshot) to update the latest status for ALL customers.

df_risk_profile = df_silver.groupBy("nameOrig") \
    .agg(spark_max("amount").alias("max_txn_amount")) \
    .withColumn("risk_level", when(col("max_txn_amount") > 1000000, "High").otherwise("Low")) \
    .withColumn("effective_date", current_date()) \
    .withColumn("is_current", lit(True))

# Write to Delta Lake (Overwrite mode = Always keep the latest state)
# Note: We save ALL customers (Low & High) to maintain a complete Dimension Table, not just a blacklist.
df_risk_profile.write.format("delta").mode("overwrite").saveAsTable("dim_customer_risk")

# For demonstration, display only HIGH RISK customers
print("Displaying Sample of HIGH RISK Customers:")
display(df_risk_profile.filter(col("risk_level") == "High"))

print("Gold Layer Created Successfully")

In [0]:
# PART 5: MACHINE LEARNING PIPELINE & EVALUATION
from pyspark.sql.functions import col
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

print("Starting ML Pipeline processing...")

# 1. Load Data
df_gold = spark.table("paysim_gold")

# 2. Data Splitting (Temporal Split)
# Train: First 600 hours (~25 days)
# Test:  Remaining hours (Future data)
split_step = 600
train_raw = df_gold.filter(col("step") <= split_step)
test_raw = df_gold.filter(col("step") > split_step)

# 3. Strategic Sampling

# A. Training Set: Majority Undersampling
# Logic: Keep 100% of Fraud, Sample 10% of Normal.
print("Preparing Training Data (Undersampling Normal class)")
train_data = train_raw.filter(col("isFraud") == 1).union(
    train_raw.filter(col("isFraud") == 0).sample(False, 0.10, seed=1234)
)

# B. Test Set: Full Real-World Data
print("Preparing Test Data...")
test_data = test_raw

# Display counts
print(f"Training Set Count: {train_data.count():,}")
print(f"Test Set Count:     {test_data.count():,}")

# 4. Model Pipeline Construction

feature_cols = [
    "type_index", "amount", "amountRatio", "hourOfDay",
    "oldbalanceOrg", "newbalanceOrig", "errorBalanceOrig",
    "oldbalanceDest", "newbalanceDest", "errorBalanceDest"
]

assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

# RandomForest Configuration
rf = RandomForestClassifier(
    labelCol="isFraud", 
    featuresCol="features", 
    numTrees=20,  
    maxDepth=10, 
    seed=1234
)

pipeline = Pipeline(stages=[assembler, rf])

# 5. Training
print("Training Random Forest Model...")
pipeline_model = pipeline.fit(train_data)
print("Training Complete.")

# 6. Prediction
print("Predicting on Test Data...")
predictions = pipeline_model.transform(test_data)

# 7. Show Full Confusion Matrix
print("Confusion Matrix Table:"
actual_counts = predictions.groupBy("isFraud", "prediction").count()

# Create template with all 4 possibilities
template_data = [
    (1, 1.0), # TP
    (0, 1.0), # FP
    (1, 0.0), # FN
    (0, 0.0)  # TN
]
template_df = spark.createDataFrame(template_data, ["isFraud", "prediction"])

# Join template with actual counts
full_matrix = template_df.join(actual_counts, on=["isFraud", "prediction"], how="left") \
                         .na.fill(0) \
                         .orderBy(col("isFraud").desc(), col("prediction").desc())

full_matrix.show()

# 8. Calculate Manual Metrics
# Use collected rows for metrics calculation
conf_matrix = full_matrix.collect()

tp = fp = fn = tn = 0
for row in conf_matrix:
    if row['isFraud'] == 1 and row['prediction'] == 1.0: tp = row['count']
    elif row['isFraud'] == 0 and row['prediction'] == 1.0: fp = row['count']
    elif row['isFraud'] == 1 and row['prediction'] == 0.0: fn = row['count']
    elif row['isFraud'] == 0 and row['prediction'] == 0.0: tn = row['count']

try:
    precision = tp / (tp + fp) if (tp + fp) > 0 else 0
    recall = tp / (tp + fn) if (tp + fn) > 0 else 0
    f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
    accuracy = (tp + tn) / (tp + tn + fp + fn)
except:
    precision = recall = f1 = accuracy = 0

# 9. Final Report
print("\n" + "="*50)
print("FINAL MODEL PERFORMANCE REPORT")
print("-"*50)
print(f"Accuracy:  {accuracy:.2%}")
print(f"Precision: {precision:.2%}")
print(f"Recall:    {recall:.2%}")
print(f"F1-Score:  {f1:.2%}")
print("-" * 50)
print(f"True Positives (Caught Fraud): {tp}")
print(f"False Negatives (Missed Fraud): {fn}")
print(f"False Positives (False Alarm):  {fp}")
print(f"True Negatives (Correct Normal): {tn}")
print("-"*50)


# PART 6: OVERFITTING CHECK (TRAIN vs TEST)
print("OVERFITTING CHECK (TRAIN vs TEST)")
print("-"*50)

def quick_evaluate(dataset, name):
    print(f"Evaluating on {name}")
    pred = pipeline_model.transform(dataset)
    
    # Use Spark GroupBy
    metrics = pred.groupBy("isFraud", "prediction").count().collect()
    
    tp = fp = fn = tn = 0
    for row in metrics:
        if row['isFraud'] == 1 and row['prediction'] == 1.0: tp = row['count']
        elif row['isFraud'] == 0 and row['prediction'] == 1.0: fp = row['count']
        elif row['isFraud'] == 1 and row['prediction'] == 0.0: fn = row['count']
        elif row['isFraud'] == 0 and row['prediction'] == 0.0: tn = row['count']
    
    recall = tp / (tp + fn) if (tp + fn) > 0 else 0
    precision = tp / (tp + fp) if (tp + fp) > 0 else 0
    
    print(f"Recall:    {recall:.2%}")
    print(f"Precision: {precision:.2%}")
    return recall

# 1. Check Training Set
train_recall = quick_evaluate(train_data, "TRAINING SET")

# 2. Check Test Set
test_recall = quick_evaluate(test_data, "TEST SET")

# 3. Compare Results
gap = train_recall - test_recall
print("-" * 50)
print(f"GAP ANALYSIS (Train - Test): {gap:.2%}")

if gap > 0.15:
    print("High Overfitting Risk")
    print("(Model memorized the training data too much)")
elif gap < -0.05:
    print("Test performed better (Unusual but okay due to sampling)")
else:
    print("Good Generalization (Not Overfitting)")
    print("(Model is robust and ready for production)")

In [0]:
# FEATURE IMPORTANCE VISUALIZATION
importances = pipeline_model.stages[-1].featureImportances
feat_imp = pd.DataFrame(list(zip(feature_cols, importances.toArray())), 
                        columns=['Feature', 'Importance']).sort_values('Importance', ascending=False)

plt.figure(figsize=(10, 6))
sns.barplot(x="Importance", y="Feature", data=feat_imp, palette="magma")
plt.title('Why the Model thinks it is Fraud?', fontsize=14)
plt.show()