In [1]:
import os

# Set JAVA_HOME to the correct Java 17 path
os.environ["JAVA_HOME"] = "/opt/homebrew/Cellar/openjdk@17/17.0.16/libexec/openjdk.jdk/Contents/Home"
os.environ["PATH"] = os.environ["JAVA_HOME"] + "/bin:" + os.environ["PATH"]

# Then import PySpark
from pyspark.sql import SparkSession

# Create Spark session
spark = SparkSession.builder \
    .appName("FraudDetection") \
    .getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/09/13 18:03:09 WARN Utils: Your hostname, Arshdeeps-MacBook-Pro.local, resolves to a loopback address: 127.0.0.1; using 192.168.1.161 instead (on interface en0)
25/09/13 18:03:09 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/09/13 18:03:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
data = spark.read.csv("credit_card.csv", header=True, inferSchema=True)
data.printSchema()
data.show(5)

                                                                                

root
 |-- step: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- nameOrig: string (nullable = true)
 |-- oldbalanceOrg: double (nullable = true)
 |-- newbalanceOrig: double (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- oldbalanceDest: double (nullable = true)
 |-- newbalanceDest: double (nullable = true)
 |-- isFraud: integer (nullable = true)
 |-- isFlaggedFraud: integer (nullable = true)

+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|step|    type|  amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|   1| PAYMENT| 9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|      0|             0|
|   1| PA

### Dataset Column Descriptions

- **step**: Time step of the transaction.
- **type**: Type of transaction (PAYMENT, TRANSFER, CASH_OUT, etc.).
- **amount**: Amount of money involved in the transaction.
- **nameOrig**: Account ID of the transaction originator.
- **oldbalanceOrg**: Balance of the originator before the transaction.
- **newbalanceOrig**: Balance of the originator after the transaction.
- **nameDest**: Account ID of the transaction receiver.
- **oldbalanceDest**: Balance of the destination account before the transaction.
- **newbalanceDest**: Balance of the destination account after the transaction.
- **isFraud**: 1 if the transaction is fraudulent, 0 otherwise (target variable).
- **isFlaggedFraud**: 1 if the system flagged the transaction as fraud, 0 otherwise.


In [3]:
data.groupBy("isFraud").count().show()

+-------+-------+
|isFraud|  count|
+-------+-------+
|      1|   8213|
|      0|6354407|
+-------+-------+



                                                                                

Data is Highly Imbalanced

In [4]:
from pyspark.sql.functions import col, sum

data.select([sum(col(c).isNull().cast("int")).alias(c) for c in data.columns]).show()

[Stage 6:>                                                        (0 + 10) / 10]

+----+----+------+--------+-------------+--------------+--------+--------------+--------------+-------+--------------+
|step|type|amount|nameOrig|oldbalanceOrg|newbalanceOrig|nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+----+------+--------+-------------+--------------+--------+--------------+--------------+-------+--------------+
|   0|   0|     0|       0|            0|             0|       0|             0|             0|      0|             0|
+----+----+------+--------+-------------+--------------+--------+--------------+--------------+-------+--------------+



                                                                                

In [5]:
from pyspark.sql.functions import col, when

# Total counts
total = data.count()
fraud_count = data.filter(col("isFraud") == 1).count()
normal_count = data.filter(col("isFraud") == 0).count()

# Assign weight
data = data.withColumn(
    "classWeightCol",
    when(col("isFraud") == 1, normal_count / total)
    .otherwise(fraud_count / total)
)
data.select("isFraud", "classWeightCol").show(5)

+-------+--------------------+
|isFraud|      classWeightCol|
+-------+--------------------+
|      0|0.001290820448180152|
|      0|0.001290820448180152|
|      1|  0.9987091795518198|
|      1|  0.9987091795518198|
|      0|0.001290820448180152|
+-------+--------------------+
only showing top 5 rows


In [6]:
from pyspark.ml.feature import StringIndexer, VectorAssembler

# Convert 'type' column to numeric
indexer = StringIndexer(inputCol="type", outputCol="typeIndex")
data = indexer.fit(data).transform(data)

# Select numeric columns only
feature_cols = [c for c in data.columns if c not in ("isFraud", "classWeightCol", "nameOrig", "nameDest", "type")]

# Include the encoded 'typeIndex'
feature_cols.append("typeIndex")

assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
final_data = assembler.transform(data).select("features", "isFraud", "classWeightCol")
final_data.show(5)


                                                                                

+--------------------+-------+--------------------+
|            features|isFraud|      classWeightCol|
+--------------------+-------+--------------------+
|[1.0,9839.64,1701...|      0|0.001290820448180152|
|[1.0,1864.28,2124...|      0|0.001290820448180152|
|[1.0,181.0,181.0,...|      1|  0.9987091795518198|
|(9,[0,1,2,4],[1.0...|      1|  0.9987091795518198|
|[1.0,11668.14,415...|      0|0.001290820448180152|
+--------------------+-------+--------------------+
only showing top 5 rows


In [7]:
train, test = final_data.randomSplit([0.8, 0.2], seed=42)


In [8]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(
    labelCol="isFraud",
    featuresCol="features",
    weightCol="classWeightCol"  # important for imbalance
)

model = lr.fit(train)

25/09/13 18:03:25 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
25/09/13 18:03:28 WARN MemoryStore: Not enough space to cache rdd_87_7 in memory! (computed 17.0 MiB so far)
25/09/13 18:03:28 WARN BlockManager: Persisting block rdd_87_7 to disk instead.
25/09/13 18:03:28 WARN MemoryStore: Not enough space to cache rdd_87_3 in memory! (computed 17.0 MiB so far)
25/09/13 18:03:28 WARN BlockManager: Persisting block rdd_87_3 to disk instead.
25/09/13 18:03:28 WARN MemoryStore: Not enough space to cache rdd_87_2 in memory! (computed 17.0 MiB so far)
25/09/13 18:03:28 WARN BlockManager: Persisting block rdd_87_2 to disk instead.
25/09/13 18:03:28 WARN MemoryStore: Not enough space to cache rdd_87_8 in memory! (computed 17.0 MiB so far)
25/09/13 18:03:28 WARN BlockManager: Persisting block rdd_87_8 to disk instead.
25/09/13 18:03:28 WARN MemoryStore: Not enough space to cache rdd_87_6 in memory! (computed 17.0 MiB so far)
25/09/13 18:03:28 WARN Block

In [9]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

predictions = model.transform(test)

evaluator = BinaryClassificationEvaluator(labelCol="isFraud")
auc = evaluator.evaluate(predictions)
print("AUC:", auc)

                                                                                

AUC: 0.9833267974553004


In [10]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(
    labelCol="isFraud", predictionCol="prediction", metricName="f1"
)
f1 = evaluator.evaluate(predictions)
print("F1 Score:", f1)




F1 Score: 0.9833641556525715


                                                                                

In [11]:
model.save("fraud_detection_model_spark")