In [None]:
# Basic imports
import pickle
from io import BytesIO
import pandas as pd

# Pyspark imports
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
from pyspark.sql import functions as F
from pyspark.dbutils import DBUtils



In [None]:
# Create a SparkSession
spark = SparkSession.builder.appName("FraudDetection").getOrCreate()

In [None]:
# Import trained model
model_path = "dbfs:/FileStore/tables/models/IFMODEL_1_0_0.pkl"

# Read the binary file into a DataFrame
binary_df = spark.read.format("binaryFile").load(model_path)

# Extract the binary data from the DataFrame
binary_data = binary_df.select("content").collect()[0][0]

# Deserialize the model using pickle
isolation_forest = pickle.load(BytesIO(binary_data))

# The model is now loaded and ready to use
print("Model loaded successfully.")

Model loaded successfully.


In [None]:
# Define paths for Delta table and checkpoints
delta_table_path = "/transactions/table"
checkpoint_path = "/transactions/checkpoint/dir"
predictions_output_path = "/transactions/table_with_predictions"

In [None]:
# Define the feature columns used for prediction
feature_columns = [
    "transaction_amount", "merchant_country_code", "settlement_amount", 
    "transaction_code", "transaction_currency", "settlement_currency", 
    "transaction_type", "payment_provider", "action_code", "cycle", 
    "mean_by_cardid", "day", "month", "year"
]

In [None]:
# Data schema for the transactions
schema = StructType([
    StructField("transaction_amount", DoubleType(), True),
    StructField("merchant_country_code", StringType(), True),
    StructField("transaction_date", StringType(), True),
    StructField("transaction_code", StringType(), True),
    StructField("transaction_currency", IntegerType(), True),
    StructField("transaction_type", StringType(), True),
    StructField("settlement_amount", DoubleType(), True),
    StructField("settlement_currency", IntegerType(), True),
    StructField("payment_provider", StringType(), True),
    StructField("processing_code", StringType(), True),
    StructField("card_id", StringType(), True),
    StructField("action_code", DoubleType(), True),
    StructField("merchant_name", StringType(), True),
    StructField("cycle", DoubleType(), True)
])

In [None]:
# Stream new data from the Delta table
streaming_df = spark.readStream.format("delta").load(delta_table_path)

# Parse the JSON data
parsed_df = streaming_df.withColumn("parsed_value", F.from_json(F.col("value").cast("string"), schema))
parsed_df = parsed_df.select("parsed_value.*")

In [None]:
dbutils = DBUtils(spark)
files = dbutils.fs.ls(delta_table_path)
for file_info in files:
    if file_info.path != "_data_log": # should add a condition to make sure we only move the file we have in this execution
        dst_file_path = delta_table_path + file_info.path.split('/')[-1]
        dbutils.fs.mv(delta_table_path, dst_file_path)

In [None]:
parsed_df = parsed_df.withColumn("merchant_country_code", F.substring(F.col("merchant_country_code").cast("string"), 1, 3))

In [None]:
parsed_df = parsed_df.withColumn("mean_by_cardid", F.col("settlement_amount"))

In [None]:
mappings = {
    "transaction_type": {
        "Purchase Domestic": 0,
        "Withdrawal Domestic": 5,
        "Refund Domestic": 2,
        "Purchase International": 1,
        "Unique Transfer Domestic": 4,
        "Refund International": 3,
        "Withdrawal International": 6
    },
    "settlement_currency": {
        826: 0
    },
    "merchant_country_code": {
        "826": 15,
        "250": 2,
        "372": 5,
        "831": 16,
        "840": 17,
        "528": 7,
        "276": 3,
        "196": 0,
        "344": 4,
        "724": 11,
        "208": 1,
        "554": 8,
        "620": 10,
        "756": 12,
        "764": 13,
        "440": 6,
        "566": 9,
        "792": 14
    },
    "transaction_code": {
        "1": 3,
        "01": 0,
        "4": 5,
        "20": 4,
        "04": 1,
        "05": 2,
        "5": 6
    },
    "transaction_currency": {
        826: 4,
        784: 3,
        124: 0,
        978: 6,
        764: 2,
        840: 5,
        566: 1
    },
    "payment_provider": {
        "": 0
    },
    "action_code": {
        0: 0,
        "null": 1
    },
    "cycle": {
        4: 3,
        2: 1,
        3: 2,
        1: 0,
        5: 4,
        6: 5,
        "null": 6
    }
}

In [None]:
# Apply mappings that we generated while label encoding the data
def create_mapping_udf(mapping_dict):
    def map_value(value):
        return mapping_dict.get(value, None)
    return F.udf(map_value, IntegerType())

In [None]:
for column, mapping in mappings.items():
    mapping_udf = create_mapping_udf(mapping)
    parsed_df = parsed_df.withColumn(column, mapping_udf(parsed_df[column]))

In [None]:
parsed_df = parsed_df.withColumn("transaction_date", F.to_timestamp(F.col("transaction_date")))

In [None]:
parsed_df = parsed_df.withColumn("day", F.dayofmonth(F.col("transaction_date")))
parsed_df = parsed_df.withColumn("month", F.month(F.col("transaction_date")))
parsed_df = parsed_df.withColumn("year", F.year(F.col("transaction_date")))

In [None]:
pred_cols = [
        "transaction_amount", "merchant_country_code", "settlement_amount", 
        "transaction_code", "transaction_currency", "settlement_currency", 
        "transaction_type", "payment_provider", "action_code", "cycle", 
        "mean_by_cardid", "day", "month", "year"
    ]
parsed_df = parsed_df[pred_cols]

In [None]:
def apply_isolation_forest(batch_df, batch_id):
    # Convert to Pandas DataFrame
    pandas_df = batch_df.toPandas()
    
    # Apply the Isolation Forest model
    predictions = isolation_forest.predict(pandas_df[feature_columns])
    
    # Add predictions back to the Pandas DataFrame
    pandas_df['predictions'] = predictions
    
    # Convert the Pandas DataFrame back to Spark DataFrame
    result_df = spark.createDataFrame(pandas_df)
    
    # Write results to Parquet (replace with your desired path)
    result_df.write.mode("append").parquet("/transactions/table/predictions")

# Apply the function on each batch
query = parsed_df.writeStream.foreachBatch(apply_isolation_forest).start()
query.awaitTermination()