### Starting a Spark Session

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum

In [2]:
spark = SparkSession.builder.appName('FinancialFraudDetection').getOrCreate()
spark

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/01/30 00:05:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Creating a Dataframe

In [3]:
# Create a dataframe
df_pyspark = spark.read.csv(r'/Users/aarjavsanghvi/Documents/Dataset/Financial Data for Fraud Detection.csv',header=True,inferSchema=True)

                                                                                

### Exploratory Data Analysis

In [4]:
# Print the dataset
df_pyspark.show()

+----+--------+---------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|step|    type|   amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+--------+---------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|   1| PAYMENT|  9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|      0|             0|
|   1| PAYMENT|  1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|           0.0|      0|             0|
|   1|TRANSFER|    181.0|C1305486145|        181.0|           0.0| C553264065|           0.0|           0.0|      1|             0|
|   1|CASH_OUT|    181.0| C840083671|        181.0|           0.0|  C38997010|       21182.0|           0.0|      1|             0|
|   1| PAYMENT| 11668.14|C2048537720|      41554.0|      29885.86|M123070170

In [5]:
# Print specific columns in the dataset
df_pyspark.select(['type','amount']).show()

+--------+---------+
|    type|   amount|
+--------+---------+
| PAYMENT|  9839.64|
| PAYMENT|  1864.28|
|TRANSFER|    181.0|
|CASH_OUT|    181.0|
| PAYMENT| 11668.14|
| PAYMENT|  7817.71|
| PAYMENT|  7107.77|
| PAYMENT|  7861.64|
| PAYMENT|  4024.36|
|   DEBIT|  5337.77|
|   DEBIT|  9644.94|
| PAYMENT|  3099.97|
| PAYMENT|  2560.74|
| PAYMENT| 11633.76|
| PAYMENT|  4098.78|
|CASH_OUT|229133.94|
| PAYMENT|  1563.82|
| PAYMENT|  1157.86|
| PAYMENT|   671.64|
|TRANSFER| 215310.3|
+--------+---------+
only showing top 20 rows



In [6]:
# Schema
df_pyspark.printSchema()

root
 |-- step: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- nameOrig: string (nullable = true)
 |-- oldbalanceOrg: double (nullable = true)
 |-- newbalanceOrig: double (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- oldbalanceDest: double (nullable = true)
 |-- newbalanceDest: double (nullable = true)
 |-- isFraud: integer (nullable = true)
 |-- isFlaggedFraud: integer (nullable = true)



In [7]:
# Datatype
df_pyspark.dtypes

[('step', 'int'),
 ('type', 'string'),
 ('amount', 'double'),
 ('nameOrig', 'string'),
 ('oldbalanceOrg', 'double'),
 ('newbalanceOrig', 'double'),
 ('nameDest', 'string'),
 ('oldbalanceDest', 'double'),
 ('newbalanceDest', 'double'),
 ('isFraud', 'int'),
 ('isFlaggedFraud', 'int')]

In [8]:
# Describe the dataset
df_pyspark.describe().show()

24/01/30 00:05:25 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.

+-------+------------------+--------+-----------------+-----------+-----------------+-----------------+-----------+-----------------+------------------+--------------------+--------------------+
|summary|              step|    type|           amount|   nameOrig|    oldbalanceOrg|   newbalanceOrig|   nameDest|   oldbalanceDest|    newbalanceDest|             isFraud|      isFlaggedFraud|
+-------+------------------+--------+-----------------+-----------+-----------------+-----------------+-----------+-----------------+------------------+--------------------+--------------------+
|  count|           6362620| 6362620|          6362620|    6362620|          6362620|          6362620|    6362620|          6362620|           6362620|             6362620|             6362620|
|   mean|243.39724563151657|    NULL|179861.9035491309|       NULL|833883.1040744886|855113.6685785917|       NULL|1100701.666519643| 1224996.398201916|0.001290820448180152| 2.51468734577894E-6|
| stddev| 142.33197104912

                                                                                

In [9]:
# Printing the shape of the dataframe
row_count = df_pyspark.count()
column_count = len(df_pyspark.columns)

print("Number of Rows:", row_count)
print("Number of Columns:", column_count)

Number of Rows: 6362620
Number of Columns: 11


In [10]:
# Count nulls for each column
null_counts = df_pyspark.select([sum(col(c).isNull().cast('int')).alias(c) for c in df_pyspark.columns])

# Show the results
null_counts.show()

[Stage 10:>                                                       (0 + 10) / 10]

+----+----+------+--------+-------------+--------------+--------+--------------+--------------+-------+--------------+
|step|type|amount|nameOrig|oldbalanceOrg|newbalanceOrig|nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+----+------+--------+-------------+--------------+--------+--------------+--------------+-------+--------------+
|   0|   0|     0|       0|            0|             0|       0|             0|             0|      0|             0|
+----+----+------+--------+-------------+--------------+--------+--------------+--------------+-------+--------------+





In [11]:
# Dropping columns
df = df_pyspark.drop('nameOrig','nameDest','isFlaggedFraud')

In [12]:
# List distinct values for each categorical variable
categorical_columns = [col_name for col_name, data_type in df.dtypes if data_type == 'string']

distinct_values = {}
for col_name in categorical_columns:
    distinct_values[col_name] = [row[col_name] for row in df.select(col_name).distinct().collect()]

# Print the distinct values
for col_name, values in distinct_values.items():
    print(f"Distinct values for '{col_name}':")
    for value in values:
        print(value)
    print()

[Stage 13:>                                                       (0 + 10) / 10]

Distinct values for 'type':
TRANSFER
CASH_IN
CASH_OUT
PAYMENT
DEBIT





In [13]:
df.groupby('type').count().show()

[Stage 16:>                                                       (0 + 10) / 10]

+--------+-------+
|    type|  count|
+--------+-------+
|TRANSFER| 532909|
| CASH_IN|1399284|
|CASH_OUT|2237500|
| PAYMENT|2151495|
|   DEBIT|  41432|
+--------+-------+



                                                                                

In [14]:
df.groupby('isfraud').count().show()

[Stage 19:>                                                       (0 + 10) / 10]

+-------+-------+
|isfraud|  count|
+-------+-------+
|      1|   8213|
|      0|6354407|
+-------+-------+



                                                                                

In [15]:
#Preprocessing steps
from pyspark.ml.feature import StringIndexer, OneHotEncoder

# create object of StringIndexer class and specify input and output column
SI_type = StringIndexer(inputCol='type',outputCol='type_Index')

# transform the data
df = SI_type.fit(df).transform(df)

                                                                                

In [16]:
df.select('type','type_Index').show()

+--------+----------+
|    type|type_Index|
+--------+----------+
| PAYMENT|       1.0|
| PAYMENT|       1.0|
|TRANSFER|       3.0|
|CASH_OUT|       0.0|
| PAYMENT|       1.0|
| PAYMENT|       1.0|
| PAYMENT|       1.0|
| PAYMENT|       1.0|
| PAYMENT|       1.0|
|   DEBIT|       4.0|
|   DEBIT|       4.0|
| PAYMENT|       1.0|
| PAYMENT|       1.0|
| PAYMENT|       1.0|
| PAYMENT|       1.0|
|CASH_OUT|       0.0|
| PAYMENT|       1.0|
| PAYMENT|       1.0|
| PAYMENT|       1.0|
|TRANSFER|       3.0|
+--------+----------+
only showing top 20 rows



In [17]:
# create object and specify input and output column
OHE = OneHotEncoder(inputCols=['type_Index'],outputCols=['type_OHE'])

# transform the data
df = OHE.fit(df).transform(df)

df.select('type','type_Index','type_OHE').show()

CodeCache: size=131072Kb used=36178Kb max_used=36202Kb free=94893Kb
 bounds [0x00000001020e8000, 0x0000000104478000, 0x000000010a0e8000]
 total_blobs=13369 nmethods=12378 adapters=903
 compilation: disabled (not enough contiguous free space left)
+--------+----------+-------------+
|    type|type_Index|     type_OHE|
+--------+----------+-------------+
| PAYMENT|       1.0|(4,[1],[1.0])|
| PAYMENT|       1.0|(4,[1],[1.0])|
|TRANSFER|       3.0|(4,[3],[1.0])|
|CASH_OUT|       0.0|(4,[0],[1.0])|
| PAYMENT|       1.0|(4,[1],[1.0])|
| PAYMENT|       1.0|(4,[1],[1.0])|
| PAYMENT|       1.0|(4,[1],[1.0])|
| PAYMENT|       1.0|(4,[1],[1.0])|
| PAYMENT|       1.0|(4,[1],[1.0])|
|   DEBIT|       4.0|    (4,[],[])|
|   DEBIT|       4.0|    (4,[],[])|
| PAYMENT|       1.0|(4,[1],[1.0])|
| PAYMENT|       1.0|(4,[1],[1.0])|
| PAYMENT|       1.0|(4,[1],[1.0])|
| PAYMENT|       1.0|(4,[1],[1.0])|
|CASH_OUT|       0.0|(4,[0],[1.0])|
| PAYMENT|       1.0|(4,[1],[1.0])|
| PAYMENT|       1.0|(4,[1],[1.0]



In [18]:
df.dtypes

[('step', 'int'),
 ('type', 'string'),
 ('amount', 'double'),
 ('oldbalanceOrg', 'double'),
 ('newbalanceOrig', 'double'),
 ('oldbalanceDest', 'double'),
 ('newbalanceDest', 'double'),
 ('isFraud', 'int'),
 ('type_Index', 'double'),
 ('type_OHE', 'vector')]

### Creating a model

In [19]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql.functions import lit, when

In [20]:
# specify the input and output columns of the vector assembler
assembler = VectorAssembler(inputCols=['amount',
                                      'oldbalanceOrg',
                                      'newbalanceOrig',
                                      'oldbalanceDest',
                                      'newbalanceDest',
                                      'type_OHE'],
                           outputCol='features')

# fill the null values
df = df.fillna(0)

# transform the data
df = assembler.transform(df)

In [21]:
# view the transformed vector
df.select('features','isfraud').show()

+--------------------+-------+
|            features|isfraud|
+--------------------+-------+
|(9,[0,1,2,6],[983...|      0|
|(9,[0,1,2,6],[186...|      0|
|(9,[0,1,8],[181.0...|      1|
|(9,[0,1,3,5],[181...|      1|
|(9,[0,1,2,6],[116...|      0|
|(9,[0,1,2,6],[781...|      0|
|(9,[0,1,2,6],[710...|      0|
|(9,[0,1,2,6],[786...|      0|
|(9,[0,1,6],[4024....|      0|
|[5337.77,41720.0,...|      0|
|(9,[0,1,3,4],[964...|      0|
|(9,[0,1,2,6],[309...|      0|
|(9,[0,1,2,6],[256...|      0|
|(9,[0,1,6],[11633...|      0|
|(9,[0,1,2,6],[409...|      0|
|[229133.94,15325....|      0|
|(9,[0,1,6],[1563....|      0|
|(9,[0,1,2,6],[115...|      0|
|(9,[0,1,2,6],[671...|      0|
|(9,[0,1,3,8],[215...|      0|
+--------------------+-------+
only showing top 20 rows



In [22]:
#Model_Dataframe
model_df = df.select(['features','isfraud'])
model_df = model_df.withColumnRenamed("isfraud","label")

In [23]:
# Calculate class weights
class_counts = model_df.groupBy('label').count().collect()
total_count = model_df.count()
class_weights = {row['label']: total_count / row['count'] for row in class_counts}

                                                                                

In [24]:
#Split into training & testing Dataframe
training_df,test_df = model_df.randomSplit([0.75,0.25])

In [25]:
# Add a new column with class weights
df_weighted = training_df.withColumn('classWeight', lit(1.0))  # default weight for majority class
for label, weight in class_weights.items():
    df_weighted = df_weighted.withColumn('classWeight', when(df_weighted['label'] == label, weight).otherwise(df_weighted['classWeight']))

In [26]:
# Assuming 'df_weighted' is the DataFrame with added class weights
df_weighted.select('label', 'classWeight').distinct().show()

24/01/30 00:05:42 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/01/30 00:05:42 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/01/30 00:05:42 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/01/30 00:05:42 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/01/30 00:05:42 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/01/30 00:05:42 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/01/30 00:05:42 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/01/30 00:05:42 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/01/30 00:05:42 WARN RowBasedKeyValueBatch: Calling spill() on

+-----+-----------------+
|label|      classWeight|
+-----+-----------------+
|    0|1.001292488819177|
|    1|774.7010836478753|
+-----+-----------------+





In [27]:
# Model Training with Class Weights
lr = LogisticRegression(labelCol='label', featuresCol='features', weightCol='classWeight')

# Train the model
model = lr.fit(df_weighted)

24/01/30 00:05:58 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/01/30 00:06:06 WARN MemoryStore: Not enough space to cache rdd_119_2 in memory! (computed 17.0 MiB so far)
24/01/30 00:06:06 WARN BlockManager: Persisting block rdd_119_2 to disk instead.
24/01/30 00:06:06 WARN MemoryStore: Not enough space to cache rdd_119_3 in memory! (computed 17.0 MiB so far)
24/01/30 00:06:06 WARN BlockManager: Persisting block rdd_119_3 to disk instead.
24/01/30 00:06:06 WARN MemoryStore: Not enough space to cache rdd_119_6 in memory! (computed 17.0 MiB so far)
24/01/30 00:06:06 WARN BlockManager: Persisting block rdd_119_6 to disk instead.
24/01/30 00:06:06 WARN MemoryStore: Not enough space to cache rdd_119_0 in memory! (computed 17.0 MiB so far)
24/01/30 00:06:06 WARN BlockManager: Persisting block rdd_119_0 to disk instead.
                                                                                

### Model evaluation

In [28]:
lr_summary=model.summary

In [29]:
#Overall accuracy of the classification model
lr_summary.accuracy

                                                                                

0.942042962515077

In [30]:
#Area under ROC
lr_summary.areaUnderROC

                                                                                

0.98886127698232

In [31]:
#Precision of both classes
print(lr_summary.precisionByLabel)

[0.9396133371106759, 0.9445057300204436]


In [32]:
#Recall of both classes
print(lr_summary.recallByLabel)

[0.9449420483487211, 0.9391375101708698]


In [33]:
#Get Preditions
predictions = model.transform(test_df)

In [34]:
predictions.select('label','prediction').show(50)

[Stage 164:>                                                        (0 + 1) / 1]

+-----+----------+
|label|prediction|
+-----+----------+
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|     

                                                                                

In [35]:
predictions.groupby('label').count().show()

24/01/30 00:07:04 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/01/30 00:07:04 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/01/30 00:07:04 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/01/30 00:07:04 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/01/30 00:07:04 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/01/30 00:07:04 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/01/30 00:07:04 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/01/30 00:07:04 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/01/30 00:07:04 WARN RowBasedKeyValueBatch: Calling spill() on

+-----+-------+
|label|  count|
+-----+-------+
|    1|   2068|
|    0|1589573|
+-----+-------+



