In [None]:
import subprocess
subprocess.run(["py", "-m", "uv", "pip", "install", "-r", "requirements.txt"])

In [None]:
import pyspark.sql.functions as spark_func
from pyspark.sql import SparkSession, Row, DataFrame
from pyspark.ml.feature import VectorAssembler, StandardScaler, StringIndexer, OneHotEncoder, MinMaxScaler
from pyspark.ml import Transformer, Pipeline
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark.ml.classification import GBTClassifier, RandomForestClassifier, LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

import mlflow
import mlflow.spark

import shutil
import os
import json
import pprint
import numpy as np
from IPython.core.display import HTML
from IPython.display import display
from uuid import uuid4

import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots

In [None]:
display(HTML("<style>pre { white-space: pre !important; }</style>"))
display(HTML("<style>.container { width:100% !important; }</style>"))

# Spark

In [None]:
def get_spark():
    print("Creating spark session with custom temp dir")
    dirname = str(uuid4())
    os.mkdir(f"spark_dir/{dirname}")
    
    spark = SparkSession.builder.appName("Preprocessing").getOrCreate()
    spark.conf.set("spark.local.dir", f"/spark_dir/{dirname}")
    return spark, dirname

# Dataframes

In [None]:
def get_dataframes(
    fraud_ratio: float,
    test_limit: int
):
    print("Loading dataframes")
    train_fraud = (
        spark
        .read.option("header", True)
        .csv("resources/fraudTrain.csv")
        .filter("is_fraud = 1")
    )
    non_fraud_records = int(train_fraud.count() / fraud_ratio) - train_fraud.count()
    train_df = train_fraud.union(
        (
            spark
            .read.option("header", True)
            .csv("resources/fraudTrain.csv")
            .filter("is_fraud = 0")
            .orderBy(spark_func.rand())
            .limit(non_fraud_records)
        )
    )
    fraud = train_df.where(spark_func.col("is_fraud") == "1").count()
    non_fraud = train_df.where(spark_func.col("is_fraud") == "0").count()
    
    print(f"Train fraud records = {fraud}\nNon fraud records {non_fraud}\nFraud ratio {round(fraud / (fraud + non_fraud), 4)}")
    
    test_df = spark.read.option("header", True).csv("resources/fraudTest.csv").orderBy(spark_func.rand()).limit(test_limit)
    return train_df, test_df

# Transformers

In [None]:
class LabelTransformer(Transformer, DefaultParamsReadable, DefaultParamsWritable):

    def _transform(self, dataframe):
        return (
            dataframe
            .withColumn("is_fraud", spark_func.col("is_fraud").cast("int"))
        )

In [None]:
class DatetimeTransformer(Transformer, DefaultParamsReadable, DefaultParamsWritable):

    def _transform(self, dataframe):
        dataframe = (
            dataframe
            .withColumn(
                "date",
                spark_func.to_date(dataframe["trans_date_trans_time"], "yyyy-MM-dd HH:mm:ss")
            ) 
            .withColumn(
                "date_time",
                spark_func.to_timestamp(dataframe["trans_date_trans_time"], "yyyy-MM-dd HH:mm:ss")
            )
        )
        return (
            dataframe
            .withColumn("year", spark_func.year(dataframe["date"]))
            .withColumn("month", spark_func.month(dataframe["date"]))
            .withColumn("day", spark_func.day(dataframe["date"]))
            .withColumn("hour", spark_func.hour(dataframe["date_time"]))
            .withColumn("minute", spark_func.minute(dataframe["date_time"]))
        )

In [None]:
class TransactionsTransformer(Transformer, DefaultParamsReadable, DefaultParamsWritable):

    _power = 0.25
    _cat_diff_columns = ["merchant", "category", "gender", "state", "city"]
    
    def _transform(self, dataframe):
        dataframe = dataframe.withColumn("amt", spark_func.col("amt").cast("float"))
        dataframe = dataframe.withColumn("amt", spark_func.power(spark_func.col("amt"), self._power))

        for col in self._cat_diff_columns:
            dataframe = dataframe.join(
                dataframe.groupBy("merchant").agg(spark_func.mean("amt").alias(f"mean_{col}_amt")), 
                on="merchant", 
                how="right"
            )
            dataframe = dataframe.withColumn(f"amt_{col}_diff", spark_func.col("amt") - spark_func.col(f"mean_{col}_amt"))

        return dataframe

In [None]:
class MerchantTransformer(Transformer, DefaultParamsReadable, DefaultParamsWritable):

    _lvl0_values = json.load(open("resources/lvl0_merchants.json", "r"))
    _lvl1_values = json.load(open("resources/lvl1_merchants.json", "r"))
    _lvl2_values = json.load(open("resources/lvl2_merchants.json", "r"))
    _sig_categories = json.load(open("resources/sig_categories.json", "r"))
    
    def _transform(self, dataframe):
        dataframe = self._merchant_levels(dataframe)
        dataframe = self._merchant_history(dataframe)
        dataframe = self._category_levels(dataframe)
        dataframe = self._categories_history(dataframe)
        return dataframe

    def _merchant_levels(self, dataframe):
        return (
            dataframe
            .withColumn("merchant_lvl_0", spark_func.when(spark_func.col("merchant").isin(self._lvl0_values), 1).otherwise(0))
            .withColumn("merchant_lvl_1", spark_func.when(spark_func.col("merchant").isin(self._lvl1_values), 1).otherwise(0))
            .withColumn("merchant_lvl_2", spark_func.when(spark_func.col("merchant").isin(self._lvl2_values), 1).otherwise(0))
        )

    def _merchant_history(self, dataframe):
        return dataframe.join(
            spark.read.option("header", True).csv("resources/merchant_fraud_history.csv"), 
            on="merchant", 
            how="right"
        )

    def _category_levels(self, dataframe):
        return (
            dataframe
            .withColumn(
                "sig_categories",
                spark_func.when(spark_func.col("category").isin(self._sig_categories), 1).otherwise(0))
        )

    def _categories_history(self, dataframe):    
        return dataframe.join(
            spark.read.option("header", True).csv("resources/categories_fraud_history.csv"), 
            on="category", 
            how="right"
        )

In [None]:
class CityPopulationTransformer(Transformer, DefaultParamsReadable, DefaultParamsWritable):

    _power = 0.25
    
    def _transform(self, dataframe):
        dataframe = dataframe.withColumn("city_pop", spark_func.col("city_pop").cast("float"))
        dataframe = dataframe.withColumn("city_pop", spark_func.power(spark_func.col("city_pop"), self._power))
        return dataframe

# Pipeline

In [None]:
origin_cols = [
    "amt",
    "trans_date_trans_time",
    "merchant",
    "category",
    "gender", 
    "state", 
    "city",
    "city_pop",
    "is_fraud"
]

final_cols = [
    "amt",
    "amt_merchant_diff",
    "amt_category_diff",
    "amt_gender_diff",
    "amt_state_diff",
    "amt_city_diff",
    
    "month",
    "day",
    "hour", 
    "minute",

    "category_dummy",
    "gender_dummy"
]

label = "is_fraud"

evaluation_cols = [
    "is_fraud", 
    "rawPrediction", 
    "probability", 
    "prediction"
]

In [None]:
def get_pipeline(model_stage):
    return Pipeline(
        stages=[       
            # custom
            LabelTransformer(),
            DatetimeTransformer(),
            TransactionsTransformer(),
            MerchantTransformer(),
            CityPopulationTransformer(),
    
            # category one hot
            StringIndexer(inputCol="category", outputCol="category_index"),
            OneHotEncoder(inputCol="category_index", outputCol="category_dummy"),
    
            # gender one hot
            StringIndexer(inputCol="gender", outputCol="gender_index"),
            OneHotEncoder(inputCol="gender_index", outputCol="gender_dummy"),
    
            # min max scaler
            VectorAssembler(inputCols=final_cols, outputCol="features"),
            MinMaxScaler(inputCol="features", outputCol="scaled_features"),

            # ml model
            model_stage
        ]
    )

# ML experiment

In [None]:
cross_val_reps = 2
fraud_ratio = 0.5
test_sample = 10**4

In [None]:
mlflow.set_tracking_uri("file:///tmp/mlflow")

In [None]:
model_kwargs = dict(featuresCol="features", labelCol=label)
models = [
    [
        LogisticRegression, 
        "LogisticRegression"
    ],
    [
        RandomForestClassifier, 
        "RandomForest"
    ],
    [
        GBTClassifier, 
        "GBT"
    ]
]

In [None]:
for model in models:
    for i in range(cross_val_reps):
        spark, dirname = get_spark()
        try:
            train_df, test_df = get_dataframes(fraud_ratio, test_sample)
            evaluator = BinaryClassificationEvaluator(labelCol=label, metricName="areaUnderPR")
            with mlflow.start_run():
                print(f"Working on: {model[1]} ({i})")
                pipe = get_pipeline(model[0](**model_kwargs))
                pipe_model = pipe.fit(train_df.select(*origin_cols))
                predictions_train = pipe_model.transform(train_df.select(*origin_cols)).select(*evaluation_cols)
                predictions_test = pipe_model.transform(test_df.select(*origin_cols)).select(*evaluation_cols)
            
                auc_train = evaluator.evaluate(predictions_train)
                auc_test = evaluator.evaluate(predictions_test)
            
                mlflow.log_param("model", f"{model[1]}_{i}")
                mlflow.log_metric("train_auc_pr", auc_train)
                mlflow.log_metric("test_auc_pr", auc_test)
    
                spark.stop()
        except Exception as e:
            spark.stop()
            raise e
        finally:
            shutil.rmtree(f"spark_dir/{dirname}")

# Visualization

In [None]:
runs_df = mlflow.search_runs(experiment_ids=["0"])
runs_df["params.model_family"] = runs_df["params.model"].apply(lambda x: x.split("_")[0] if x else x)
runs_df = (
    runs_df
    [runs_df.status == "FINISHED"]
    [:cross_val_reps*len(models)]
    .groupby("params.model_family")
    .agg({
        "metrics.test_auc_pr": "mean",
        "metrics.train_auc_pr": "mean",
    })
    .reset_index()
)
runs_df

In [None]:
fig = go.Figure()

x_data = runs_df["params.model_family"].to_list()
y_data = runs_df["metrics.train_auc_pr"].to_list()
fig.add_trace(
    go.Bar(
        name="Train",
        x=x_data,
        y=y_data,
        text=[round(y, 3) for y in y_data],
        marker_color="teal"
    )
)

x_data = runs_df["params.model_family"].to_list()
y_data = runs_df["metrics.test_auc_pr"].to_list()
fig.add_trace(
    go.Bar(
        name="Test",
        x=x_data,
        y=y_data,
        text=[round(y, 3) for y in y_data],
        marker_color="orange"
    )
)

fig.update_layout(
    title=f"<b>AUC PR values for tested models</b><br>Cross val reps = {cross_val_reps}",
    width=1000,
    height=600,
    yaxis=dict(range=(0, 1.1))
)

fig.show()