**BDA Project - Financial Fraud Detection**

**Roll No.: P23DS022 and P23DS023**

In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=93fce98a1b26d703480a800ef6639aaa3be857f2c12f04bc69da1c79a35d27ea
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [None]:
from pyspark.sql import SparkSession

import pyspark.sql.functions as F
import pyspark.sql.types as T

spark = SparkSession.builder.getOrCreate()

In [None]:
spark

In [None]:
from google.colab import drive
drive.mount("/content/gdrive")

Mounted at /content/gdrive


In [None]:
df = spark.read.csv('/content/gdrive/MyDrive/PS.csv', inferSchema=True, header=True)

In [None]:
df.printSchema()

root
 |-- step: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- nameOrig: string (nullable = true)
 |-- oldbalanceOrg: double (nullable = true)
 |-- newbalanceOrig: double (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- oldbalanceDest: double (nullable = true)
 |-- newbalanceDest: double (nullable = true)
 |-- isFraud: integer (nullable = true)
 |-- isFlaggedFraud: integer (nullable = true)



In [None]:
df.show(2)

+----+-------+-------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|step|   type| amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+-------+-------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|   1|PAYMENT|9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|      0|             0|
|   1|PAYMENT|1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|           0.0|      0|             0|
+----+-------+-------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
only showing top 2 rows



In [None]:
df = df.select("type", "amount", "oldbalanceOrg", "newbalanceOrig", "isFraud")

In [None]:
df.show(2)

+-------+-------+-------------+--------------+-------+
|   type| amount|oldbalanceOrg|newbalanceOrig|isFraud|
+-------+-------+-------------+--------------+-------+
|PAYMENT|9839.64|     170136.0|     160296.36|      0|
|PAYMENT|1864.28|      21249.0|      19384.72|      0|
+-------+-------+-------------+--------------+-------+
only showing top 2 rows



In [None]:
df.printSchema()

root
 |-- type: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- oldbalanceOrg: double (nullable = true)
 |-- newbalanceOrig: double (nullable = true)
 |-- isFraud: integer (nullable = true)



In [None]:
(df.count() , len(df.columns))

(6362620, 5)

In [None]:
df.select('amount','oldbalanceOrg','newbalanceOrig','isFraud').describe().show()

+-------+-----------------+-----------------+------------------+--------------------+
|summary|           amount|    oldbalanceOrg|    newbalanceOrig|             isFraud|
+-------+-----------------+-----------------+------------------+--------------------+
|  count|          6362620|          6362620|           6362620|             6362620|
|   mean|179861.9035491287|833883.1040744764| 855113.6685785812|0.001290820448180152|
| stddev|603858.2314629209|2888242.673037527|2924048.5029542595|0.035904796801604424|
|    min|              0.0|              0.0|               0.0|                   0|
|    max|    9.244551664E7|    5.958504037E7|     4.958504037E7|                   1|
+-------+-----------------+-----------------+------------------+--------------------+



In [None]:
data_agg = df.agg(*[F.count(F.when(F.isnull(c), c)).alias(c) for c in df.columns])
data_agg.show()

+----+------+-------------+--------------+-------+
|type|amount|oldbalanceOrg|newbalanceOrig|isFraud|
+----+------+-------------+--------------+-------+
|   0|     0|            0|             0|      0|
+----+------+-------------+--------------+-------+



In [None]:
# value counts of Type column
df.groupBy('type').count().show()

+--------+-------+
|    type|  count|
+--------+-------+
|TRANSFER| 532909|
| CASH_IN|1399284|
|CASH_OUT|2237500|
| PAYMENT|2151495|
|   DEBIT|  41432|
+--------+-------+



In [None]:
train, test = df.randomSplit([0.7, 0.3], seed=7)

In [None]:
print(f"Train set length: {train.count()} records")
print(f"Test set length: {test.count()} records")

Train set length: 4451490 records
Test set length: 1911130 records


In [None]:
train.show(2)

+-------+------+-------------+--------------+-------+
|   type|amount|oldbalanceOrg|newbalanceOrig|isFraud|
+-------+------+-------------+--------------+-------+
|CASH_IN|  1.42|   1270713.41|    1270714.83|      0|
|CASH_IN|  4.35|   4136277.22|    4136281.57|      0|
+-------+------+-------------+--------------+-------+
only showing top 2 rows



In [None]:
train.dtypes

[('type', 'string'),
 ('amount', 'double'),
 ('oldbalanceOrg', 'double'),
 ('newbalanceOrig', 'double'),
 ('isFraud', 'int')]

In [None]:
catCols = [x for (x, dataType) in train.dtypes if dataType == "string"]
numCols = [ x for (x, dataType) in train.dtypes if (dataType == "double") ]

In [None]:
print(numCols)
print(catCols)

['amount', 'oldbalanceOrg', 'newbalanceOrig']
['type']


In [None]:
train.agg(F.countDistinct("type")).show()

+--------------------+
|count(DISTINCT type)|
+--------------------+
|                   5|
+--------------------+



In [None]:
train.groupBy("type").count().show()

+--------+-------+
|    type|  count|
+--------+-------+
|TRANSFER| 373084|
| CASH_IN| 979536|
|CASH_OUT|1566112|
| PAYMENT|1503731|
|   DEBIT|  29027|
+--------+-------+



In [None]:
from pyspark.ml.feature import (
    OneHotEncoder,
    StringIndexer,
)

In [None]:
#catCols are the cols with string
string_indexer = [
    StringIndexer(inputCol=x, outputCol=x + "_StringIndexer", handleInvalid="skip")
    for x in catCols
]

In [None]:
string_indexe=string_indexer[0].fit(df).transform(df)
string_indexe.show()

+--------+---------+-------------+--------------+-------+------------------+
|    type|   amount|oldbalanceOrg|newbalanceOrig|isFraud|type_StringIndexer|
+--------+---------+-------------+--------------+-------+------------------+
| PAYMENT|  9839.64|     170136.0|     160296.36|      0|               1.0|
| PAYMENT|  1864.28|      21249.0|      19384.72|      0|               1.0|
|TRANSFER|    181.0|        181.0|           0.0|      1|               3.0|
|CASH_OUT|    181.0|        181.0|           0.0|      1|               0.0|
| PAYMENT| 11668.14|      41554.0|      29885.86|      0|               1.0|
| PAYMENT|  7817.71|      53860.0|      46042.29|      0|               1.0|
| PAYMENT|  7107.77|     183195.0|     176087.23|      0|               1.0|
| PAYMENT|  7861.64|    176087.23|     168225.59|      0|               1.0|
| PAYMENT|  4024.36|       2671.0|           0.0|      0|               1.0|
|   DEBIT|  5337.77|      41720.0|      36382.23|      0|               4.0|

In [None]:
one_hot_encoder = [
    OneHotEncoder(
        inputCols=[f"{x}_StringIndexer" for x in catCols],
        outputCols=[f"{x}_OneHotEncoder" for x in catCols],
    )
]

In [None]:
one_hot_encoder_df=one_hot_encoder[0].fit(string_indexe).transform(string_indexe)
one_hot_encoder_df.show()

+--------+---------+-------------+--------------+-------+------------------+------------------+
|    type|   amount|oldbalanceOrg|newbalanceOrig|isFraud|type_StringIndexer|type_OneHotEncoder|
+--------+---------+-------------+--------------+-------+------------------+------------------+
| PAYMENT|  9839.64|     170136.0|     160296.36|      0|               1.0|     (4,[1],[1.0])|
| PAYMENT|  1864.28|      21249.0|      19384.72|      0|               1.0|     (4,[1],[1.0])|
|TRANSFER|    181.0|        181.0|           0.0|      1|               3.0|     (4,[3],[1.0])|
|CASH_OUT|    181.0|        181.0|           0.0|      1|               0.0|     (4,[0],[1.0])|
| PAYMENT| 11668.14|      41554.0|      29885.86|      0|               1.0|     (4,[1],[1.0])|
| PAYMENT|  7817.71|      53860.0|      46042.29|      0|               1.0|     (4,[1],[1.0])|
| PAYMENT|  7107.77|     183195.0|     176087.23|      0|               1.0|     (4,[1],[1.0])|
| PAYMENT|  7861.64|    176087.23|     1

In [None]:
from pyspark.ml.feature import VectorAssembler

In [None]:
assemblerInput = [x for x in numCols]
assemblerInput += [f"{x}_OneHotEncoder" for x in catCols]

In [None]:
assemblerInput

['amount', 'oldbalanceOrg', 'newbalanceOrig', 'type_OneHotEncoder']

In [None]:
vector_assembler = VectorAssembler(
    inputCols=assemblerInput, outputCol="VectorAssembler_features"
)

In [None]:
stages = []
stages += string_indexer
stages += one_hot_encoder
stages += [vector_assembler]

In [None]:
stages

[StringIndexer_8baeb40af947,
 OneHotEncoder_ea8d294b134a,
 VectorAssembler_f44658556149]

In [None]:
from pyspark.ml import Pipeline

pipeline = Pipeline().setStages(stages)
model = pipeline.fit(train)

pp_df = model.transform(train)

In [None]:
pp_df.select(
    "type", "amount", "oldbalanceOrg", "newbalanceOrig", "VectorAssembler_features",
).show(truncate=False)

+-------+------+-------------+--------------+--------------------------------------------------+
|type   |amount|oldbalanceOrg|newbalanceOrig|VectorAssembler_features                          |
+-------+------+-------------+--------------+--------------------------------------------------+
|CASH_IN|1.42  |1270713.41   |1270714.83    |[1.42,1270713.41,1270714.83,0.0,0.0,1.0,0.0]      |
|CASH_IN|4.35  |4136277.22   |4136281.57    |[4.35,4136277.22,4136281.57,0.0,0.0,1.0,0.0]      |
|CASH_IN|4.71  |50198.0      |50202.71      |[4.71,50198.0,50202.71,0.0,0.0,1.0,0.0]           |
|CASH_IN|5.19  |18104.0      |18109.19      |[5.19,18104.0,18109.19,0.0,0.0,1.0,0.0]           |
|CASH_IN|5.66  |5061561.06   |5061566.72    |[5.66,5061561.06,5061566.72,0.0,0.0,1.0,0.0]      |
|CASH_IN|6.5   |1696433.45   |1696439.95    |[6.5,1696433.45,1696439.95,0.0,0.0,1.0,0.0]       |
|CASH_IN|8.29  |20392.0      |20400.29      |[8.29,20392.0,20400.29,0.0,0.0,1.0,0.0]           |
|CASH_IN|9.02  |2416260.59   |

In [None]:
pp_df.show()

+-------+------+-------------+--------------+-------+------------------+------------------+------------------------+
|   type|amount|oldbalanceOrg|newbalanceOrig|isFraud|type_StringIndexer|type_OneHotEncoder|VectorAssembler_features|
+-------+------+-------------+--------------+-------+------------------+------------------+------------------------+
|CASH_IN|  1.42|   1270713.41|    1270714.83|      0|               2.0|     (4,[2],[1.0])|    [1.42,1270713.41,...|
|CASH_IN|  4.35|   4136277.22|    4136281.57|      0|               2.0|     (4,[2],[1.0])|    [4.35,4136277.22,...|
|CASH_IN|  4.71|      50198.0|      50202.71|      0|               2.0|     (4,[2],[1.0])|    [4.71,50198.0,502...|
|CASH_IN|  5.19|      18104.0|      18109.19|      0|               2.0|     (4,[2],[1.0])|    [5.19,18104.0,181...|
|CASH_IN|  5.66|   5061561.06|    5061566.72|      0|               2.0|     (4,[2],[1.0])|    [5.66,5061561.06,...|
|CASH_IN|   6.5|   1696433.45|    1696439.95|      0|           

In [None]:
test.count()

1911130

In [None]:
df_test=test.where(test.isFraud == 1)

In [None]:
df_test.show()

+--------+--------+-------------+--------------+-------+
|    type|  amount|oldbalanceOrg|newbalanceOrig|isFraud|
+--------+--------+-------------+--------------+-------+
|CASH_OUT|   119.0|        119.0|           0.0|      1|
|CASH_OUT|   181.0|        181.0|           0.0|      1|
|CASH_OUT|  215.83|       215.83|           0.0|      1|
|CASH_OUT|   636.0|        636.0|           0.0|      1|
|CASH_OUT|  1055.0|       1055.0|           0.0|      1|
|CASH_OUT|  1842.0|       1842.0|           0.0|      1|
|CASH_OUT|  2007.0|       2007.0|           0.0|      1|
|CASH_OUT|  2806.0|       2806.0|           0.0|      1|
|CASH_OUT| 4120.14|      4120.14|           0.0|      1|
|CASH_OUT| 4289.18|      4289.18|           0.0|      1|
|CASH_OUT| 7927.06|      7927.06|           0.0|      1|
|CASH_OUT| 8055.06|      8055.06|           0.0|      1|
|CASH_OUT| 8383.17|      8383.17|           0.0|      1|
|CASH_OUT|  9131.0|       9131.0|           0.0|      1|
|CASH_OUT| 9217.19|      9217.1

Logistic Regression

In [None]:
from pyspark.ml.classification import LogisticRegression

In [None]:
data = pp_df.select(
    F.col("VectorAssembler_features").alias("features"),
    F.col("isFraud").alias("label"),
)

In [None]:
data.show(5, truncate=False)

+--------------------------------------------+-----+
|features                                    |label|
+--------------------------------------------+-----+
|[1.42,1270713.41,1270714.83,0.0,0.0,1.0,0.0]|0    |
|[4.35,4136277.22,4136281.57,0.0,0.0,1.0,0.0]|0    |
|[4.71,50198.0,50202.71,0.0,0.0,1.0,0.0]     |0    |
|[5.19,18104.0,18109.19,0.0,0.0,1.0,0.0]     |0    |
|[5.66,5061561.06,5061566.72,0.0,0.0,1.0,0.0]|0    |
+--------------------------------------------+-----+
only showing top 5 rows



In [None]:
model = LogisticRegression().fit(data)
data=model.transform(data)

In [None]:
data.show()

+--------------------+-----+--------------------+-----------+----------+
|            features|label|       rawPrediction|probability|prediction|
+--------------------+-----+--------------------+-----------+----------+
|[1.42,1270713.41,...|    0|[197.882123184313...|  [1.0,0.0]|       0.0|
|[4.35,4136277.22,...|    0|[203.123961049005...|  [1.0,0.0]|       0.0|
|[4.71,50198.0,502...|    0|[195.649719310453...|  [1.0,0.0]|       0.0|
|[5.19,18104.0,181...|    0|[195.591036861907...|  [1.0,0.0]|       0.0|
|[5.66,5061561.06,...|    0|[204.816556351575...|  [1.0,0.0]|       0.0|
|[6.5,1696433.45,1...|    0|[198.661103852696...|  [1.0,0.0]|       0.0|
|[8.29,20392.0,204...|    0|[195.595376584706...|  [1.0,0.0]|       0.0|
|[9.02,2416260.59,...|    0|[199.977937889199...|  [1.0,0.0]|       0.0|
|[9.22,7730148.9,7...|    0|[209.698117000385...|  [1.0,0.0]|       0.0|
|[12.18,299322.0,2...|    0|[196.105789471215...|  [1.0,0.0]|       0.0|
|[13.2,106204.0,10...|    0|[195.752588695936...|  

Model Testing

In [None]:
model = pipeline.fit(df_test)

pp_df_test = model.transform(df_test)

In [None]:
data_test = pp_df_test.select(
    F.col("VectorAssembler_features").alias("features"),
    F.col("isFraud").alias("label"),
)

In [None]:
data_test.show(5, truncate=False)

+-----------------------+-----+
|features               |label|
+-----------------------+-----+
|[119.0,119.0,0.0,1.0]  |1    |
|[181.0,181.0,0.0,1.0]  |1    |
|[215.83,215.83,0.0,1.0]|1    |
|[636.0,636.0,0.0,1.0]  |1    |
|[1055.0,1055.0,0.0,1.0]|1    |
+-----------------------+-----+
only showing top 5 rows



In [None]:
model = LogisticRegression().fit(data_test)
data=model.transform(data_test)
data.show()

+--------------------+-----+--------------------+-----------+----------+
|            features|label|       rawPrediction|probability|prediction|
+--------------------+-----+--------------------+-----------+----------+
|[119.0,119.0,0.0,...|    1|[-Infinity,Infinity]|  [0.0,1.0]|       1.0|
|[181.0,181.0,0.0,...|    1|[-Infinity,Infinity]|  [0.0,1.0]|       1.0|
|[215.83,215.83,0....|    1|[-Infinity,Infinity]|  [0.0,1.0]|       1.0|
|[636.0,636.0,0.0,...|    1|[-Infinity,Infinity]|  [0.0,1.0]|       1.0|
|[1055.0,1055.0,0....|    1|[-Infinity,Infinity]|  [0.0,1.0]|       1.0|
|[1842.0,1842.0,0....|    1|[-Infinity,Infinity]|  [0.0,1.0]|       1.0|
|[2007.0,2007.0,0....|    1|[-Infinity,Infinity]|  [0.0,1.0]|       1.0|
|[2806.0,2806.0,0....|    1|[-Infinity,Infinity]|  [0.0,1.0]|       1.0|
|[4120.14,4120.14,...|    1|[-Infinity,Infinity]|  [0.0,1.0]|       1.0|
|[4289.18,4289.18,...|    1|[-Infinity,Infinity]|  [0.0,1.0]|       1.0|
|[7927.06,7927.06,...|    1|[-Infinity,Infinity]|  

In [None]:
df.limit

In [None]:
model.summary.areaUnderROC

1.0

In [None]:
model.summary.pr.show()

+------+---------+
|recall|precision|
+------+---------+
|   0.0|      1.0|
|   1.0|      1.0|
+------+---------+

