Synthetic Financial Datasets For Fraud Detection (kaggle.com) - https://www.kaggle.com/datasets/ealaxi/paysim1

In [None]:
!pip install pyspark
!pip install findspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=081009cca3fecfbbd83e8f6b76b93ba2abff3b401cf6cde2cf780ef3e0dfb6c2
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1
Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [None]:
from google.colab import drive
drive.mount('/content/drive')

path = "/content/drive/MyDrive/6th semester Bilbao/Big Data/PS_20174392719_1491204439457_log.csv"

In [None]:
import findspark

findspark.init()

import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

spark

ModuleNotFoundError: No module named 'findspark'

In [None]:
data = spark.read.csv(f'{path}', header=True, quote='"', escape='"', multiLine=True)

In [None]:
display(data.show(5))

+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|step|    type|  amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|   1| PAYMENT| 9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|      0|             0|
|   1| PAYMENT| 1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|           0.0|      0|             0|
|   1|TRANSFER|   181.0|C1305486145|        181.0|           0.0| C553264065|           0.0|           0.0|      1|             0|
|   1|CASH_OUT|   181.0| C840083671|        181.0|           0.0|  C38997010|       21182.0|           0.0|      1|             0|
|   1| PAYMENT|11668.14|C2048537720|      41554.0|      29885.86|M1230701703|      

None

In [None]:
df = data

## 1 - TRANSACTION VOLUME OVER TIME - calculate the total number of transactions (grouped by type) over each time step (1 hour)

In [None]:
from pyspark.sql import functions as F

# Group by 'step' and 'type', then count the number of transactions in each group
transaction_volume_over_time = df.groupBy("step", "type").count()

# Show the results
transaction_volume_over_time.show(5)

+----+--------+-----+
|step|    type|count|
+----+--------+-----+
|  25| CASH_IN|  276|
|  37|CASH_OUT|13131|
| 105|CASH_OUT|    9|
| 107|CASH_OUT|    5|
| 157| CASH_IN| 6851|
+----+--------+-----+
only showing top 5 rows



## 2 - COUNT FRAUDULENT TRANSACTION  BY TYPE OF TRANSACTION - count the number of transitions, which are tagged as fraud and grouped by type of transaction

In [None]:
from pyspark.sql import functions as F

# Filter for fraudulent transactions
fraud_transactions = df.filter(df.isFraud == 1)

# Group by 'type' and count the number of fraudulent transactions for each type
fraud_count_by_type = fraud_transactions.groupBy("type").count()

# Show the results
fraud_count_by_type.show()

+--------+-----+
|    type|count|
+--------+-----+
|TRANSFER| 4097|
|CASH_OUT| 4116|
+--------+-----+



## 3 - MEAN TRANSACTION VALUE FOR ONLY FRAUDULENT TRANSACTIONS - calculate the average of the transaction, which are tagged as fraud

In [None]:
from pyspark.sql import functions as F

# Filter for fraudulent transactions
fraud_transactions = df.filter(df.isFraud == 1)

# Calculate the average amount of fraudulent transactions
average_fraud_transaction_value = fraud_transactions.agg(F.mean("amount").alias("avg_fraud_amount"))

# Show the results
average_fraud_transaction_value.show()

+-----------------+
| avg_fraud_amount|
+-----------------+
|1467967.299140387|
+-----------------+



## 4 - TOP CUSTOMERS BY TOTAL AMOUNT OF ALL THEIR TRANSACTIONS - find the top 10 customers by calculating the sum of every transaction grouped by nameOrig

In [None]:
from pyspark.sql import functions as F

# Group by 'nameOrig', sum the 'amount', and order by the sum in descending order
top_customers_by_total_amount = df.groupBy("nameOrig").agg(F.sum("amount").alias("total_amount")).orderBy(F.desc("total_amount"))

# Show the top 10 customers
top_customers_by_total_amount.show(10)

+-----------+-------------+
|   nameOrig| total_amount|
+-----------+-------------+
|C1715283297|9.244551664E7|
|C2127282686|7.382349036E7|
|C2044643633|7.117248042E7|
|C1425667947| 6.98867313E7|
|C1584456031|6.933731627E7|
| C811810230|6.750076129E7|
| C420748282|6.676127221E7|
|C1139847449|6.423444819E7|
| C300140823|6.384799258E7|
| C372535854|6.329483963E7|
+-----------+-------------+
only showing top 10 rows



## 5 - LARGE TRANSACTIONS, WHICH ARE NOT DESCRIBED AS FRAUD - find transactions with large amounts transferred, which are not flagged as fraud

In [None]:
from pyspark.sql import functions as F

# Define a threshold for what you consider a large transaction
large_amount_threshold = 200000

# Filter for transactions with large amounts that are not marked as fraud
large_non_fraud_transactions = df.filter((df.amount > large_amount_threshold) & (df.isFraud == 0))

# Select relevant columns and show some of the results
large_non_fraud_transactions.select("type", "amount").show()

+--------+----------+
|    type|    amount|
+--------+----------+
|CASH_OUT| 229133.94|
|TRANSFER|  215310.3|
|TRANSFER| 311685.89|
|TRANSFER| 224606.64|
|TRANSFER| 379856.23|
|TRANSFER|1505626.01|
|TRANSFER| 554026.99|
|TRANSFER| 761507.39|
|TRANSFER|1429051.47|
|TRANSFER| 358831.92|
|TRANSFER|  367768.4|
|TRANSFER| 209711.11|
|TRANSFER| 583848.46|
|TRANSFER|1724887.05|
|TRANSFER| 710544.77|
|TRANSFER| 581294.26|
|CASH_OUT| 212228.35|
|CASH_OUT|  419801.4|
|CASH_OUT| 335416.51|
|TRANSFER| 330757.04|
+--------+----------+
only showing top 20 rows



## MACHINE LEARNING ALG

https://chat.openai.com/share/cac93b3c-3d82-4e79-9b2a-d46be2f8423a

In [None]:
df.show(10)

+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|step|    type|  amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|   1| PAYMENT| 9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|      0|             0|
|   1| PAYMENT| 1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|           0.0|      0|             0|
|   1|TRANSFER|   181.0|C1305486145|        181.0|           0.0| C553264065|           0.0|           0.0|      1|             0|
|   1|CASH_OUT|   181.0| C840083671|        181.0|           0.0|  C38997010|       21182.0|           0.0|      1|             0|
|   1| PAYMENT|11668.14|C2048537720|      41554.0|      29885.86|M1230701703|      

In [None]:
df.count()

6362620

In [None]:
df.describe().show(vertical=True)

-RECORD 0------------------------------
 summary        | count                
 step           | 6362620              
 type           | 6362620              
 amount         | 6362620              
 nameOrig       | 6362620              
 oldbalanceOrg  | 6362620              
 newbalanceOrig | 6362620              
 nameDest       | 6362620              
 oldbalanceDest | 6362620              
 newbalanceDest | 6362620              
 isFraud        | 6362620              
 isFlaggedFraud | 6362620              
-RECORD 1------------------------------
 summary        | mean                 
 step           | 243.39724563151657   
 type           | NULL                 
 amount         | 179861.90354912292   
 nameOrig       | NULL                 
 oldbalanceOrg  | 833883.1040744851    
 newbalanceOrig | 855113.6685785672    
 nameDest       | NULL                 
 oldbalanceDest | 1100701.6665196999   
 newbalanceDest | 1224996.3982020712   
 isFraud        | 0.001290820448180152 


In [None]:
from pyspark.sql.functions import col

filtered_data = df.filter(col("isFlaggedFraud") == 1)
filtered_data = df.subtract(filtered_data)
filtered_data = filtered_data.drop("isFlaggedFraud", "nameOrig", "nameDest")

In [None]:
filtered_data.show(5)

In [None]:
# =========================================
# 1. Data preprocessing
# =========================================

from pyspark.sql.functions import col, when
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler

# Handling missing values
filtered_data.dropna()

DataFrame[step: string, type: string, amount: string, oldbalanceOrg: string, newbalanceOrig: string, oldbalanceDest: string, newbalanceDest: string, isFraud: string]

In [None]:
# Assuming `cut_df` is our small DataFrame

cut_df = filtered_data.sample(fraction=0.005, withReplacement=False, seed=42)

In [None]:
cut_df.show(10)

+----+--------+---------+-------------+--------------+--------------+--------------+-------+
|step|    type|   amount|oldbalanceOrg|newbalanceOrig|oldbalanceDest|newbalanceDest|isFraud|
+----+--------+---------+-------------+--------------+--------------+--------------+-------+
| 182|CASH_OUT|329225.74|     100093.0|           0.0|    3713483.09|    4042708.83|      0|
| 183| CASH_IN|244112.01|   5981354.08|    6225466.09|    1872925.31|     1628813.3|      0|
| 184|CASH_OUT| 271104.1|      10228.0|           0.0|           0.0|      271104.1|      0|
| 187| PAYMENT| 11882.55|      82728.0|      70845.45|           0.0|           0.0|      0|
| 191| CASH_IN| 45594.69|    9192018.6|    9237613.28|     275040.48|     229445.79|      0|
| 203| PAYMENT|  1368.59|    241506.58|     240137.99|           0.0|           0.0|      0|
| 205| CASH_IN|110934.41|    1082455.8|    1193390.21|    1353901.39|    1242966.98|      0|
| 210| PAYMENT|  8995.57|          0.0|           0.0|           0.0| 

In [None]:
from pyspark.sql.types import DoubleType, IntegerType

# Cast columns to numeric types
numeric_cols = ["amount", "oldbalanceOrg", "newbalanceOrig", "oldbalanceDest", "newbalanceDest"]
for col_name in numeric_cols:
    cut_df = cut_df.withColumn(col_name, col(col_name).cast(DoubleType()))

filtered_data = filtered_data.withColumn("isFraud", col("isFraud").cast(IntegerType()))

In [None]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline

# Assuming `data` is your DataFrame and `type` is the column we want to preprocess
stringIndexer = StringIndexer(inputCol="type", outputCol="type_index")
model = stringIndexer.fit(cut_df)
indexed_data = model.transform(cut_df)

# Apply OneHotEncoder
encoder = OneHotEncoder(inputCols=["type_index"], outputCols=["type_encoded"])
encoded_data = encoder.fit(indexed_data).transform(indexed_data)

# Drop the original categorical column if desired
encoded_data = encoded_data.drop("type", "type_index")

# Show the preprocessed DataFrame
#encoded_data.show()

In [None]:
# Assemble features
assembler = VectorAssembler(inputCols=["amount", "oldbalanceOrg", "newbalanceOrig", "oldbalanceDest", "newbalanceDest", "type_encoded"], outputCol="features")
scaled_data = assembler.transform(encoded_data)

# Scale features
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=False)
final_df = scaler.fit(scaled_data).transform(scaled_data)

In [None]:
final_df.show(5)

+----+---------+-------------+--------------+--------------+--------------+-------+-------------+--------------------+--------------------+
|step|   amount|oldbalanceOrg|newbalanceOrig|oldbalanceDest|newbalanceDest|isFraud| type_encoded|            features|      scaledFeatures|
+----+---------+-------------+--------------+--------------+--------------+-------+-------------+--------------------+--------------------+
| 182|329225.74|     100093.0|           0.0|    3713483.09|    4042708.83|    0.0|(4,[0],[1.0])|[329225.74,100093...|[0.49887896717863...|
| 183|244112.01|   5981354.08|    6225466.09|    1872925.31|     1628813.3|    0.0|(4,[2],[1.0])|[244112.01,598135...|[0.36990530395558...|
| 184| 271104.1|      10228.0|           0.0|           0.0|      271104.1|    0.0|(4,[0],[1.0])|(9,[0,1,4,5],[271...|(9,[0,1,4,5],[0.4...|
| 187| 11882.55|      82728.0|      70845.45|           0.0|           0.0|    0.0|(4,[1],[1.0])|(9,[0,1,2,6],[118...|(9,[0,1,2,6],[0.0...|
| 191| 45594.69|    

In [None]:
final_df = final_df.withColumn("isFraud", col("isFraud").cast("integer"))

In [None]:
# =========================================
# 2. Split Data
# =========================================

train_data, test_data = final_df.randomSplit([0.8, 0.2], seed=42)


In [None]:
# =========================================
# 3. Build and Train Models
# =========================================

# RANDOM FOREST
# -------------
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline

# Create a RandomForest model.
rf = RandomForestClassifier(featuresCol="scaledFeatures", labelCol="isFraud", seed=42)

# Train model.
model_rf = rf.fit(train_data)

# SVM
# ---
from pyspark.ml.classification import LinearSVC

# Create an SVM model.
svm = LinearSVC(labelCol="isFraud", featuresCol="scaledFeatures", maxIter=10)

# Train model.
model_svm = svm.fit(train_data)


In [None]:

# =========================================
# 4. Evaluate the Models
# =========================================

from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate models
evaluator = BinaryClassificationEvaluator(labelCol="isFraud")

print("Random Forest AUC: ", evaluator.evaluate(model_rf.transform(test_data)))
print("SVM AUC: ", evaluator.evaluate(model_svm.transform(test_data)))


Random Forest AUC:  0.9996316334753079
SVM AUC:  0.9561989179233338
