In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, regexp_replace, to_date, when
from pyspark.sql.types import FloatType

# Initialize Spark session - This starts our PySpark environment
spark = SparkSession.builder.appName("DataPreprocessing").getOrCreate()

# Define file locations in cloud storage
file_location_cards = "/FileStore/tables/cards_data.csv"
file_location_users = "/FileStore/tables/users_data.csv"
file_location_amex = "/FileStore/tables/amex_filtered_transactions.csv"

# Step 1: Load the datasets from cloud storage
# Cards data contains card details like credit limit and PIN change info
df_cards = spark.read.csv(file_location_cards, header=True, inferSchema=True)
# Users data has user info like income and age
df_users = spark.read.csv(file_location_users, header=True, inferSchema=True)
# Amex filtered transactions from our previous work
df_amex_transactions = spark.read.csv(file_location_amex, header=True, inferSchema=True)

# Step 2: Preprocess cards data (kept as DataFrame)
# Drop the 'card_on_dark_web' column since we don’t need it
df_cards = df_cards.drop("card_on_dark_web")
# Clean 'credit_limit' by removing '$' and converting to float
df_cards = df_cards.withColumn(
    "credit_limit",
    regexp_replace(col("credit_limit"), "[\$,]", "").cast(FloatType())
)
# Convert 'acct_open_date' to a proper date format (MM/YYYY)
df_cards = df_cards.withColumn(
    "acct_open_date",
    to_date(col("acct_open_date"), "MM/yyyy")
)
# Flag if a PIN change is due - if last changed over 5(not update) + 2 years years ago
current_year = 2025  # Using 2025 based on your context
df_cards = df_cards.withColumn(
    "PIN_Change_Due",
    when(col("year_pin_last_changed") < current_year - 7, "Yes").otherwise("No")
)

# Step 3: Preprocess users data (kept as DataFrame)
# Clean financial columns by removing '$' and converting to float
df_users = df_users.withColumn(
    "per_capita_income",
    regexp_replace(col("per_capita_income"), "[\$,]", "").cast(FloatType())
).withColumn(
    "yearly_income",
    regexp_replace(col("yearly_income"), "[\$,]", "").cast(FloatType())
).withColumn(
    "total_debt",
    regexp_replace(col("total_debt"), "[\$,]", "").cast(FloatType())
)
# Add 'retirement_status' - Retired if current age meets or exceeds retirement age
df_users = df_users.withColumn(
    "retirement_status",
    when(col("current_age") >= col("retirement_age"), "Retired").otherwise("Not Retired")
)
# Categorize 'age_group' based on current age
df_users = df_users.withColumn(
    "age_group",
    when(col("current_age") <= 30, "18-30")
    .when(col("current_age") <= 45, "31-45")
    .when(col("current_age") <= 60, "46-60")
    .otherwise("60+")
)
# Calculate 'Debt_to_Income_Ratio' as total debt divided by yearly income
df_users = df_users.withColumn(
    "Debt_to_Income_Ratio",
    col("total_debt") / col("yearly_income")
)

# Step 4: Preprocess amex_filtered_transactions (kept as DataFrame)
# Clean 'amount' by removing '$' and converting to float
df_amex_transactions = df_amex_transactions.withColumn(
    "amount",
    regexp_replace(col("amount"), "[\$,]", "").cast(FloatType())
)

# Step 5: Show a preview of the preprocessed DataFrames
# Let’s peek at the first few rows to confirm our changes
print("Preview of preprocessed cards data:")
df_cards.show(5, truncate=False)
print("Preview of preprocessed users data:")
df_users.show(5, truncate=False)
print("Preview of preprocessed Amex transactions (with cleaned amount):")
df_amex_transactions.show(5, truncate=False)

# Step 6: Count rows and confirm we’re done
# We’ll count the rows in each DataFrame to see what we’re working with
cards_count = df_cards.count()
users_count = df_users.count()
amex_count = df_amex_transactions.count()
print(f"Cards DataFrame has {cards_count} rows")
print(f"Users DataFrame has {users_count} rows")
print(f"Amex Transactions DataFrame has {amex_count} rows")

# Note: Spark session remains active since we’re keeping DataFrames in memory
# Use spark.stop() later when you’re done with these DataFrames

# **LOGISTIC REGRESSION**

In [None]:
import mlflow
import dagshub
dagshub.init(repo_owner='mayankpareek740', repo_name='aihi', mlflow=True)
mlflow.set_tracking_uri("https://dagshub.com/mayankpareek740/aihi.mlflow")
from pyspark.sql.functions import col, udf
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.sql.types import FloatType
import matplotlib.pyplot as plt

# UDF to extract Class 1 probability
def get_class1_prob(prob):
    return float(prob[1])
get_class1_prob_udf = udf(get_class1_prob, FloatType())

# Data joining (same as original)
amex = df_amex_transactions.alias("amex")
cards = df_cards.alias("cards")
users = df_users.alias("users")

df_temp = amex.join(
    cards,
    amex["card_id"] == cards["id"],
    "left"
).select(
    amex["id"].alias("transaction_id"),
    amex["client_id"],
    amex["amount"],
    amex["fraud_label"],
    amex["mcc"],
    cards["credit_limit"]
)

df_combined = df_temp.join(
    users,
    df_temp["client_id"] == users["id"],
    "left"
).select(
    df_temp["transaction_id"],
    df_temp["client_id"],
    df_temp["amount"],
    df_temp["fraud_label"],
    df_temp["mcc"],
    df_temp["credit_limit"],
    users["gender"],
    users["retirement_status"],
    users["current_age"],
    users["per_capita_income"],
    users["yearly_income"],
    usersn["total_debt"],
    users["Debt_to_Income_Ratio"]
)

# Set experiment name
mlflow.set_experiment('logisticmodel')

# Define feature columns
categorical_cols = ["mcc", "gender", "retirement_status"]
numerical_cols = ["amount", "current_age", "per_capita_income", "yearly_income", "total_debt", "Debt_to_Income_Ratio", "credit_limit"]

# Feature preprocessing
indexers = [StringIndexer(inputCol=col, outputCol=col + "_index", handleInvalid="keep") for col in categorical_cols]
fraud_indexer = StringIndexer(inputCol="fraud_label", outputCol="label", handleInvalid="keep")
assembler_inputs = [col + "_index" for col in categorical_cols] + numerical_cols
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="raw_features", handleInvalid="keep")
scaler = StandardScaler(inputCol="raw_features", outputCol="features", withStd=True, withMean=True)

# Calculate class weights
label_counts = df_combined.groupBy("fraud_label").count()
total = df_combined.count()
label_weights = label_counts.withColumn("weight", (total / col("count")).cast("float")).select(col("fraud_label"), col("weight"))
df_combined = df_combined.join(label_weights, "fraud_label", "left")

# Split data
train_data, test_data = df_combined.randomSplit([0.8, 0.2], seed=42)

# Define parameter grid for multiple runs
param_grid = [
    {"maxIter": 50, "regParam": 0.1, "elasticNetParam": 0.0},
    {"maxIter": 100, "regParam": 0.01, "elasticNetParam": 0.5},
    {"maxIter": 150, "regParam": 0.001, "elasticNetParam": 1.0}
]

# Multiple runs with different parameters
for i, params in enumerate(param_grid):
    with mlflow.start_run(run_name=f"run_{i+1}"):
        # Create Logistic Regression model
        lr = LogisticRegression(
            featuresCol="features",
            labelCol="label",
            weightCol="weight",
            maxIter=params["maxIter"],
            regParam=params["regParam"],
            elasticNetParam=params["elasticNetParam"]
        )

        # Build and fit pipeline
        pipeline = Pipeline(stages=indexers + [fraud_indexer, assembler, scaler, lr])
        model = pipeline.fit(train_data)

        # Make predictions
        predictions = model.transform(test_data)

        # Evaluate model
        evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="prediction", metricName="areaUnderROC")
        auc = evaluator.evaluate(predictions)
        print(f"Run {i+1} - Area Under ROC: {auc}")

        # Confusion Matrix
        confusion_matrix = predictions.groupBy("label", "prediction").count().collect()
        tn = next((row["count"] for row in confusion_matrix if row["label"] == 0.0 and row["prediction"] == 0.0), 0)
        fp = next((row["count"] for row in confusion_matrix if row["label"] == 0.0 and row["prediction"] == 1.0), 0)
        fn = next((row["count"] for row in confusion_matrix if row["label"] == 1.0 and row["prediction"] == 0.0), 0)
        tp = next((row["count"] for row in confusion_matrix if row["label"] == 1.0 and row["prediction"] == 1.0), 0)
        print(f"Run {i+1} - Confusion Matrix:")
        print(f"TN: {tn} | FP: {fp}")
        print(f"FN: {fn} | TP: {tp}")

        # Calculate additional metrics
        accuracy = (tp + tn) / (tp + tn + fp + fn) if (tp + tn + fp + fn) > 0 else 0
        precision = tp / (tp + fp) if (tp + fp) > 0 else 0
        recall = tp / (tp + fn) if (tp + fn) > 0 else 0

        # Log parameters and metrics to MLflow
        mlflow.log_params(params)
        mlflow.log_param("seed", 42)
        mlflow.log_metrics({
            "auc": auc,
            "accuracy": accuracy,
            "precision": precision,
            "recall": recall,
            "true_positives": tp,
            "true_negatives": tn,
            "false_positives": fp,
            "false_negatives": fn
        })

        # ROC Curve
        roc_data = predictions.select(
            col("label"),
            get_class1_prob_udf(col("probability")).alias("prob_class1")
        ).orderBy(col("prob_class1").desc())
        
        total_positives = tp + fn
        total_negatives = tn + fp
        roc_points = [(0.0, 0.0)]
        cum_tp, cum_fp = 0, 0
        for row in roc_data.collect():
            if row["label"] == 1.0:
                cum_tp += 1
            else:
                cum_fp += 1
            tpr = cum_tp / total_positives if total_positives > 0 else 0
            fpr = cum_fp / total_negatives if total_negatives > 0 else 0
            roc_points.append((fpr, tpr))

        # Plot and save ROC curve
        fpr, tpr = zip(*roc_points)
        plt.figure()
        plt.plot(fpr, tpr, label=f"AUC = {auc:.4f}", color="blue")
        plt.plot([0, 1], [0, 1], "k--", label="Random Guess")
        plt.xlabel("False Positive Rate")
        plt.ylabel("True Positive Rate")
        plt.title(f"ROC Curve - Run {i+1}")
        plt.legend()
        plt.grid(True)
        roc_file = f"roc_curve_run_{i+1}.png"
        plt.savefig(roc_file)
        mlflow.log_artifact(roc_file)
        plt.close()

## **OUTPUT**
- 2025/03/29 06:00:39 INFO mlflow.tracking.fluent: Experiment with name 'logisticmodel' does not exist. Creating a new experiment.

## RUN 1
- Run 1 - Area Under ROC: 0.6787976710557039
- Run 1 - Confusion Matrix:
- TN: 94955 | FP: 19311
- FN: 89 | TP: 99

## RUN 2
- Run 2 - Area Under ROC: 0.6733341222105493
- Run 2 - Confusion Matrix:
- TN: 94922 | FP: 19344
- FN: 91 | TP: 97

## RUN 3
- Run 3 - Area Under ROC: 0.7067603736112564
- Run 3 - Confusion Matrix:
- TN: 93444 | FP: 20822
- FN: 76 | TP: 112

# **RAMDOM FOREST**

In [None]:
import mlflow
import dagshub
dagshub.init(repo_owner='mayankpareek740', repo_name='aihi', mlflow=True)
mlflow.set_tracking_uri("https://dagshub.com/mayankpareek740/aihi.mlflow")
from pyspark.sql.functions import col, udf
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.sql.types import FloatType
import matplotlib.pyplot as plt

# UDF to extract Class 1 probability
def get_class1_prob(prob):
    return float(prob[1])
get_class1_prob_udf = udf(get_class1_prob, FloatType())

# Data joining (same as original)
amex = df_amex_transactions.alias("amex")
cards = df_cards.alias("cards")
users = df_users.alias("users")

df_temp = amex.join(
    cards,
    amex["card_id"] == cards["id"],
    "left"
).select(
    amex["id"].alias("transaction_id"),
    amex["client_id"],
    amex["amount"],
    amex["fraud_label"],
    amex["mcc"],
    cards["credit_limit"]
)

df_combined = df_temp.join(
    users,
    df_temp["client_id"] == users["id"],
    "left"
).select(
    df_temp["transaction_id"],
    df_temp["client_id"],
    df_temp["amount"],
    df_temp["fraud_label"],
    df_temp["mcc"],
    df_temp["credit_limit"],
    users["gender"],
    users["retirement_status"],
    users["current_age"],
    users["per_capita_income"],
    users["yearly_income"],
    users["total_debt"],
    users["Debt_to_Income_Ratio"]
)

# Set experiment name
mlflow.set_experiment('randomforest')

# Define feature columns
categorical_cols = ["mcc", "gender", "retirement_status"]
numerical_cols = ["amount", "current_age", "per_capita_income", "yearly_income", "total_debt", "Debt_to_Income_Ratio", "credit_limit"]

# Feature preprocessing
indexers = [StringIndexer(inputCol=col, outputCol=col + "_index", handleInvalid="keep") for col in categorical_cols]
fraud_indexer = StringIndexer(inputCol="fraud_label", outputCol="label", handleInvalid="keep")
assembler_inputs = [col + "_index" for col in categorical_cols] + numerical_cols
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="raw_features", handleInvalid="keep")
scaler = StandardScaler(inputCol="raw_features", outputCol="features", withStd=True, withMean=True)

# Calculate class weights
label_counts = df_combined.groupBy("fraud_label").count()
total = df_combined.count()
label_weights = label_counts.withColumn("weight", (total / col("count")).cast("float")).select(col("fraud_label"), col("weight"))
df_combined = df_combined.join(label_weights, "fraud_label", "left")

# Split data
train_data, test_data = df_combined.randomSplit([0.8, 0.2], seed=24)

# Multiple runs with different parameters
param_grid = [
    {"numTrees": 20, "maxDepth": 5, "subsamplingRate": 0.8},
    {"numTrees": 50, "maxDepth": 10, "subsamplingRate": 0.9},
    {"numTrees": 100, "maxDepth": 15, "subsamplingRate": 1.0}
]

for i, params in enumerate(param_grid):
    with mlflow.start_run(run_name=f"run_{i+1}"):
        # Create Random Forest model
        rf = RandomForestClassifier(
            featuresCol="features",
            labelCol="label",
            weightCol="weight",
            numTrees=params["numTrees"],
            maxDepth=params["maxDepth"],
            subsamplingRate=params["subsamplingRate"],
            seed=24
        )

        # Build and fit pipeline
        pipeline = Pipeline(stages=indexers + [fraud_indexer, assembler, scaler, rf])
        model = pipeline.fit(train_data)

        # Make predictions
        predictions = model.transform(test_data)

        # Evaluate model
        evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="prediction", metricName="areaUnderROC")
        auc = evaluator.evaluate(predictions)
        print(f"Run {i+1} - Area Under ROC: {auc}")

        # Confusion Matrix
        confusion_matrix = predictions.groupBy("label", "prediction").count().collect()
        tn = next((row["count"] for row in confusion_matrix if row["label"] == 0.0 and row["prediction"] == 0.0), 0)
        fp = next((row["count"] for row in confusion_matrix if row["label"] == 0.0 and row["prediction"] == 1.0), 0)
        fn = next((row["count"] for row in confusion_matrix if row["label"] == 1.0 and row["prediction"] == 0.0), 0)
        tp = next((row["count"] for row in confusion_matrix if row["label"] == 1.0 and row["prediction"] == 1.0), 0)
        print(f"Run {i+1} - Confusion Matrix:")
        print(f"TN: {tn} | FP: {fp}")
        print(f"FN: {fn} | TP: {tp}")

        # Calculate additional metrics
        accuracy = (tp + tn) / (tp + tn + fp + fn) if (tp + tn + fp + fn) > 0 else 0
        precision = tp / (tp + fp) if (tp + fp) > 0 else 0
        recall = tp / (tp + fn) if (tp + fn) > 0 else 0

        # Log parameters and metrics to MLflow
        mlflow.log_params(params)
        mlflow.log_param("seed", 24)
        mlflow.log_metrics({
            "auc": auc,
            "accuracy": accuracy,
            "precision": precision,
            "recall": recall,
            "true_positives": tp,
            "true_negatives": tn,
            "false_positives": fp,
            "false_negatives": fn
        })

        # ROC Curve
        roc_data = predictions.select(
            col("label"),
            get_class1_prob_udf(col("probability")).alias("prob_class1")
        ).orderBy(col("prob_class1").desc())
        
        total_positives = tp + fn
        total_negatives = tn + fp
        roc_points = [(0.0, 0.0)]
        cum_tp, cum_fp = 0, 0
        for row in roc_data.collect():
            if row["label"] == 1.0:
                cum_tp += 1
            else:
                cum_fp += 1
            tpr = cum_tp / total_positives if total_positives > 0 else 0
            fpr = cum_fp / total_negatives if total_negatives > 0 else 0
            roc_points.append((fpr, tpr))

        # Plot and save ROC curve
        fpr, tpr = zip(*roc_points)
        plt.figure()
        plt.plot(fpr, tpr, label=f"AUC = {auc:.4f}", color="blue")
        plt.plot([0, 1], [0, 1], "k--", label="Random Guess")
        plt.xlabel("False Positive Rate")
        plt.ylabel("True Positive Rate")
        plt.title(f"ROC Curve - Run {i+1}")
        plt.legend()
        plt.grid(True)
        roc_file = f"roc_curve_run_{i+1}.png"
        plt.savefig(roc_file)
        mlflow.log_artifact(roc_file)
        plt.close()

## OUTPUT