In [0]:
spark.sql("""
CREATE SCHEMA IF NOT EXISTS angad_kumar91.fraud_detection_goldlayer
""")


In [0]:
import mlflow
import mlflow.pyfunc

model_uri = "models:/fraud_isolation_forest@prod"
fraud_model = mlflow.pyfunc.load_model(model_uri)

print("Model loaded successfully")


In [0]:
silver_stream_df = spark.readStream.table(
    "angad_kumar91.fraud_detection_silverlayer.silver_transactions_base"
)


In [0]:
from pyspark.sql.functions import window, count, avg, stddev, approx_count_distinct

silver_base_batch = spark.read.table(
    "angad_kumar91.fraud_detection_silverlayer.silver_transactions_base"
)

batch_features_df = (
    silver_base_batch
        .groupBy(
            "card1",
            window("event_timestamp", "5 minutes")
        )
        .agg(
            count("*").alias("txn_count_5min"),
            avg("TransactionAmt").alias("avg_amount_5min"),
            stddev("TransactionAmt").alias("stddev_amount_5min"),
            approx_count_distinct("ProductCD").alias("product_diversity_5min")
        )
        .select(
            "card1",
            "txn_count_5min",
            "avg_amount_5min",
            "stddev_amount_5min",
            "product_diversity_5min"
        )
)


In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, col

w = Window.partitionBy("card1").orderBy(col("txn_count_5min").desc())

latest_features_df = (
    batch_features_df
        .withColumn("rn", row_number().over(w))
        .filter(col("rn") == 1)
        .drop("rn")
)

print("Latest feature rows:", latest_features_df.count())
latest_features_df.show(5)


In [0]:
scoring_stream_df = (
    silver_stream_df.join(
        latest_features_df,
        on="card1",
        how="left"
    )
)


In [0]:
# %sql
# CREATE TABLE IF NOT EXISTS angad_kumar91.fraud_detection_goldlayer.fraud_predictions (
#     TransactionID BIGINT,
#     event_timestamp TIMESTAMP,
#     card1 INT,
#     TransactionAmt DOUBLE,
#     fraud_score DOUBLE,
#     is_anomaly INT
# )
# USING DELTA;


In [0]:
# %sql
# DROP TABLE IF EXISTS angad_kumar91.fraud_detection_goldlayer.fraud_predictions;

In [0]:
# dbutils.fs.rm(
#     "/Volumes/angad_kumar91/fraud_detection_raw_data_files/checkpoints/fraud_scoring/",
#     recurse=True
# )

In [0]:
import pandas as pd
import numpy as np

feature_cols = [
    "TransactionAmt",
    "log_transaction_amount",
    "is_high_value_txn",
    "is_international_txn",
    "txn_count_5min",
    "avg_amount_5min",
    "stddev_amount_5min",
    "product_diversity_5min"
]



In [0]:
def score_batch(batch_df, batch_id):

    if batch_df.count() == 0:
        return

    pdf = batch_df.toPandas()

    # -------------------------
    # Fill missing features
    # -------------------------
    pdf["txn_count_5min"] = pdf["txn_count_5min"].fillna(0).astype("int64")
    pdf["avg_amount_5min"] = pdf["avg_amount_5min"].fillna(0.0).astype("float64")
    pdf["stddev_amount_5min"] = pdf["stddev_amount_5min"].fillna(0.0).astype("float64")
    pdf["product_diversity_5min"] = pdf["product_diversity_5min"].fillna(0).astype("int64")

    # -------------------------
    # MLflow schema casting
    # -------------------------
    X = pdf[feature_cols].astype({
        "TransactionAmt": "float64",
        "log_transaction_amount": "float64",
        "is_high_value_txn": "int32",
        "is_international_txn": "int32",
        "txn_count_5min": "int64",
        "avg_amount_5min": "float64",
        "stddev_amount_5min": "float64",
        "product_diversity_5min": "int64"
    })

    anomaly_scores = fraud_model.predict(X)

    pdf["fraud_score"] = -anomaly_scores
    pdf["is_anomaly"] = (pdf["fraud_score"] > 0.6).astype("int32")

    # -------------------------
    # SELECT ONLY GOLD COLUMNS
    # -------------------------
    gold_pdf = pdf[[
        "TransactionID",
        "event_timestamp",
        "card1",
        "TransactionAmt",
        "fraud_score",
        "is_anomaly"
    ]]

    spark.createDataFrame(gold_pdf) \
        .write \
        .mode("append") \
        .saveAsTable(
            "angad_kumar91.fraud_detection_goldlayer.fraud_predictions"
        )


In [0]:
query = (
    scoring_stream_df.writeStream
        .foreachBatch(score_batch)
        .option(
  "checkpointLocation",
  "/Volumes/angad_kumar91/fraud_detection_raw_data_files/checkpoints/fraud_scoring/"
)

        .trigger(availableNow=True)
        .start()
)

query.awaitTermination()




In [0]:
%sql
SHOW TABLES IN angad_kumar91.fraud_detection_goldlayer;


In [0]:
%sql
SELECT COUNT(*) 
FROM angad_kumar91.fraud_detection_goldlayer.fraud_predictions;


In [0]:
%sql
SELECT *
FROM angad_kumar91.fraud_detection_goldlayer.fraud_predictions
ORDER BY fraud_score DESC
LIMIT 50;
