In [2]:
pip install pyspark

Note: you may need to restart the kernel to use updated packages.


In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline

In [4]:
spark = SparkSession.builder \
    .appName("FinancialFraudDetection") \
    .getOrCreate()

In [17]:
df = spark.read.csv("Synthetic_Financial_datasets_log.csv", header=True, inferSchema=True)

In [19]:
df.head()

Row(step=1, type='PAYMENT', amount=9839.64, nameOrig='C1231006815', oldbalanceOrg=170136.0, newbalanceOrig=160296.36, nameDest='M1979787155', oldbalanceDest=0.0, newbalanceDest=0.0, isFraud=0, isFlaggedFraud=0)

In [20]:
df = df.fillna(0)

In [23]:
# Convert categorical columns to numeric using StringIndexer
indexer = StringIndexer(inputCol="type", outputCol="type_indexed")

In [25]:
# Create new features based on balance differences
df = df.withColumn("errorBalanceOrig", col("newbalanceOrig") + col("amount") - col("oldbalanceOrg"))
df = df.withColumn("errorBalanceDest", col("oldbalanceDest") + col("amount") - col("newbalanceDest"))

In [27]:
# Assemble features into a vector
feature_columns = ["type_indexed", "amount", "oldbalanceOrg", "newbalanceOrig",
                   "oldbalanceDest", "newbalanceDest", "errorBalanceOrig", "errorBalanceDest"]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

In [29]:
train_data, test_data = df.randomSplit([0.7, 0.3], seed=42)

In [31]:
rf = RandomForestClassifier(labelCol="isFraud", featuresCol="features", numTrees=100)


In [33]:
pipeline = Pipeline(stages=[indexer, assembler, rf])


In [35]:
model = pipeline.fit(train_data)

In [40]:
predictions = model.transform(test_data)
predictions.select("isFraud", "prediction", "probability").show(10, False)


+-------+----------+------------------------------------------+
|isFraud|prediction|probability                               |
+-------+----------+------------------------------------------+
|0      |0.0       |[0.9998902261216203,1.0977387837974097E-4]|
|0      |0.0       |[0.9998885608815268,1.114391184733045E-4] |
|0      |0.0       |[0.999878547586007,1.2145241399292756E-4] |
|0      |0.0       |[0.9998881900831456,1.1180991685445569E-4]|
|0      |0.0       |[0.9998881900831456,1.1180991685445569E-4]|
|0      |0.0       |[0.9998902261216203,1.0977387837974097E-4]|
|0      |0.0       |[0.999878547586007,1.2145241399292756E-4] |
|0      |0.0       |[0.9998881900831456,1.1180991685445569E-4]|
|0      |0.0       |[0.9998881900831456,1.1180991685445569E-4]|
|0      |0.0       |[0.999878547586007,1.2145241399292756E-4] |
+-------+----------+------------------------------------------+
only showing top 10 rows



In [41]:
evaluator = MulticlassClassificationEvaluator(labelCol="isFraud", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Test Accuracy: {accuracy}")

Test Accuracy: 0.9999587592018531


In [49]:
spark.stop()

