In [1]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Fraud Detection with Logistic Regression") \
    .getOrCreate()

# Load the dataset
df = spark.read.csv('Synthetic_Financial_datasets_log.csv', header=True, inferSchema=True)

# Show the schema
df.printSchema()

from pyspark.sql.functions import col
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline

# Index the categorical column 'type'
indexer = StringIndexer(inputCol='type', outputCol='type_index')

# Select features for the model (you can adjust this based on your needs)
feature_cols = ['step', 'amount', 'oldbalanceOrg', 'newbalanceOrig', 'oldbalanceDest', 'newbalanceDest', 'type_index']
assembler = VectorAssembler(inputCols=feature_cols, outputCol='features')

# Create a pipeline
pipeline = Pipeline(stages=[indexer, assembler])
df_transformed = pipeline.fit(df).transform(df)

# Show transformed data
df_transformed.select('features', 'isFraud').show()

# Split the data into train and test sets
train_data, test_data = df_transformed.randomSplit([0.8, 0.2], seed=42)

from pyspark.sql import functions as F

# Handle the class imbalance by oversampling the minority class
count_1 = df_transformed.filter(col("isFraud") == 1).count()
count_0 = df_transformed.filter(col("isFraud") == 0).count()

# Oversample the minority class
fraud_data = df_transformed.filter(col("isFraud") == 1)
non_fraud_data = df_transformed.filter(col("isFraud") == 0)
non_fraud_data_oversampled = non_fraud_data.sample(False, count_1 / count_0, seed=42)

# Combine the oversampled data
balanced_data = fraud_data.union(non_fraud_data_oversampled)

# Logistic Regression Model
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(labelCol='isFraud', featuresCol='features')
model = lr.fit(balanced_data)

# Save the trained model
model.save("/data/fraud_detection_logistic_regression_model")

# Make predictions on test data
predictions = model.transform(test_data)
predictions.select('features', 'isFraud', 'prediction').show()

# Evaluate the model
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol='isFraud', predictionCol='prediction', metricName='accuracy')
accuracy = evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy}")


root
 |-- step: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- nameOrig: string (nullable = true)
 |-- oldbalanceOrg: double (nullable = true)
 |-- newbalanceOrig: double (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- oldbalanceDest: double (nullable = true)
 |-- newbalanceDest: double (nullable = true)
 |-- isFraud: integer (nullable = true)
 |-- isFlaggedFraud: integer (nullable = true)

+--------------------+-------+
|            features|isFraud|
+--------------------+-------+
|[1.0,9839.64,1701...|      0|
|[1.0,1864.28,2124...|      0|
|[1.0,181.0,181.0,...|      1|
|[1.0,181.0,181.0,...|      1|
|[1.0,11668.14,415...|      0|
|[1.0,7817.71,5386...|      0|
|[1.0,7107.77,1831...|      0|
|[1.0,7861.64,1760...|      0|
|[1.0,4024.36,2671...|      0|
|[1.0,5337.77,4172...|      0|
|[1.0,9644.94,4465...|      0|
|[1.0,3099.97,2077...|      0|
|[1.0,2560.74,5070...|      0|
|[1.0,11633.76,101...|      0|
|[1.0,