In [1]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [2]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T

spark=SparkSession.builder.getOrCreate()

In [3]:
spark

In [4]:
!ls #check directory

PS_20174392719_1491204439457_log.csv  sample_data


In [5]:

# File location and type
path = '/content/PS_20174392719_1491204439457_log.csv'
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# Import csv
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(path)

In [6]:
df.show(10) #show 10 rows in table format

+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|step|    type|  amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|   1| PAYMENT| 9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|      0|             0|
|   1| PAYMENT| 1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|           0.0|      0|             0|
|   1|TRANSFER|   181.0|C1305486145|        181.0|           0.0| C553264065|           0.0|           0.0|      1|             0|
|   1|CASH_OUT|   181.0| C840083671|        181.0|           0.0|  C38997010|       21182.0|           0.0|      1|             0|
|   1| PAYMENT|11668.14|C2048537720|      41554.0|      29885.86|M1230701703|      

In [7]:
df.printSchema()

root
 |-- step: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- nameOrig: string (nullable = true)
 |-- oldbalanceOrg: double (nullable = true)
 |-- newbalanceOrig: double (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- oldbalanceDest: double (nullable = true)
 |-- newbalanceDest: double (nullable = true)
 |-- isFraud: integer (nullable = true)
 |-- isFlaggedFraud: integer (nullable = true)



In [8]:
# Droping columns that are not that are not required for solving the problem.
df = df.select("type", "amount", "oldbalanceOrg", "newbalanceOrig", "isFraud")

In [9]:
df.show(5)

+--------+--------+-------------+--------------+-------+
|    type|  amount|oldbalanceOrg|newbalanceOrig|isFraud|
+--------+--------+-------------+--------------+-------+
| PAYMENT| 9839.64|     170136.0|     160296.36|      0|
| PAYMENT| 1864.28|      21249.0|      19384.72|      0|
|TRANSFER|   181.0|        181.0|           0.0|      1|
|CASH_OUT|   181.0|        181.0|           0.0|      1|
| PAYMENT|11668.14|      41554.0|      29885.86|      0|
+--------+--------+-------------+--------------+-------+
only showing top 5 rows



**Data Exploration**

In [10]:
(df.count() , len(df.columns))

(6362620, 5)

In [11]:
#summary statistics
df.select('amount','oldbalanceOrg','newbalanceOrig','isFraud').describe().show()

+-------+-----------------+-----------------+------------------+--------------------+
|summary|           amount|    oldbalanceOrg|    newbalanceOrig|             isFraud|
+-------+-----------------+-----------------+------------------+--------------------+
|  count|          6362620|          6362620|           6362620|             6362620|
|   mean|179861.9035491287|833883.1040744764| 855113.6685785812|0.001290820448180152|
| stddev|603858.2314629209|2888242.673037527|2924048.5029542595|0.035904796801604424|
|    min|              0.0|              0.0|               0.0|                   0|
|    max|    9.244551664E7|    5.958504037E7|     4.958504037E7|                   1|
+-------+-----------------+-----------------+------------------+--------------------+



In [12]:

from pyspark.sql.functions import isnan, when, count, col
df.select([count(when(isnan(c), c)).alias(c) for c in df.columns]).toPandas().head()

Unnamed: 0,type,amount,oldbalanceOrg,newbalanceOrig,isFraud
0,0,0,0,0,0


In [13]:
# value counts of type column
df.groupBy('type').count().show()

+--------+-------+
|    type|  count|
+--------+-------+
|TRANSFER| 532909|
| CASH_IN|1399284|
|CASH_OUT|2237500|
| PAYMENT|2151495|
|   DEBIT|  41432|
+--------+-------+



**Train Test split**

In [14]:
train, test = df.randomSplit([0.75, 0.25], seed=42)

In [15]:
print(f"Train set count: {train.count()}")
print(f"Test set count: {test.count()} ")

Train set count: 4771652
Test set count: 1590968 


In [16]:
train.show()

+-------+------+-------------+--------------+-------+
|   type|amount|oldbalanceOrg|newbalanceOrig|isFraud|
+-------+------+-------------+--------------+-------+
|CASH_IN|  1.42|   1270713.41|    1270714.83|      0|
|CASH_IN|  4.35|   4136277.22|    4136281.57|      0|
|CASH_IN|  4.71|      50198.0|      50202.71|      0|
|CASH_IN|  5.19|      18104.0|      18109.19|      0|
|CASH_IN|  5.44|          0.0|          5.44|      0|
|CASH_IN|  6.07|     400680.0|     400686.07|      0|
|CASH_IN|  8.27|   8428410.94|    8428419.21|      0|
|CASH_IN|  8.29|      20392.0|      20400.29|      0|
|CASH_IN|  8.44|      39384.0|      39392.44|      0|
|CASH_IN|  9.04|      99971.0|      99980.04|      0|
|CASH_IN|  9.22|    7730148.9|    7730158.12|      0|
|CASH_IN| 11.13|   4116439.74|    4116450.87|      0|
|CASH_IN| 12.18|     299322.0|     299334.18|      0|
|CASH_IN| 12.79|     601743.0|     601755.79|      0|
|CASH_IN| 13.86|   6868100.18|    6868114.04|      0|
|CASH_IN| 14.36|    613030.4

**dtype**

In [17]:
train.dtypes

[('type', 'string'),
 ('amount', 'double'),
 ('oldbalanceOrg', 'double'),
 ('newbalanceOrig', 'double'),
 ('isFraud', 'int')]

In [18]:
#seperate the numerical columns and string columns
catCols = [x for (x, dataType) in train.dtypes if dataType == "string"]
numCols = [ x for (x, dataType) in train.dtypes if (dataType == "double") ]

In [19]:
print(numCols)
print(catCols)

['amount', 'oldbalanceOrg', 'newbalanceOrig']
['type']


**One hot encoding**

In [20]:
train.agg(F.countDistinct("type")).show()

+-----------+
|count(type)|
+-----------+
|          5|
+-----------+



In [21]:
train.groupBy("type").count().show()

+--------+-------+
|    type|  count|
+--------+-------+
|TRANSFER| 399382|
| CASH_IN|1049380|
|CASH_OUT|1678718|
| PAYMENT|1613038|
|   DEBIT|  31134|
+--------+-------+



In [22]:
 #String Indexer to convert the variable into numerical form 
 #OneHotEncoderEstimator to encode multiple columns of the dataset

from pyspark.ml.feature import (
    OneHotEncoder,
    StringIndexer,
)

In [23]:
string_indexer = [
    StringIndexer(inputCol=x, outputCol=x + "_StringIndexer", handleInvalid="skip")
    for x in catCols
]

In [24]:
string_index=string_indexer[0].fit(df).transform(df)
string_index.show()

+--------+---------+-------------+--------------+-------+------------------+
|    type|   amount|oldbalanceOrg|newbalanceOrig|isFraud|type_StringIndexer|
+--------+---------+-------------+--------------+-------+------------------+
| PAYMENT|  9839.64|     170136.0|     160296.36|      0|               1.0|
| PAYMENT|  1864.28|      21249.0|      19384.72|      0|               1.0|
|TRANSFER|    181.0|        181.0|           0.0|      1|               3.0|
|CASH_OUT|    181.0|        181.0|           0.0|      1|               0.0|
| PAYMENT| 11668.14|      41554.0|      29885.86|      0|               1.0|
| PAYMENT|  7817.71|      53860.0|      46042.29|      0|               1.0|
| PAYMENT|  7107.77|     183195.0|     176087.23|      0|               1.0|
| PAYMENT|  7861.64|    176087.23|     168225.59|      0|               1.0|
| PAYMENT|  4024.36|       2671.0|           0.0|      0|               1.0|
|   DEBIT|  5337.77|      41720.0|      36382.23|      0|               4.0|

In [25]:
one_hot_encoder = [
    OneHotEncoder(
        inputCols=[f"{x}_StringIndexer" for x in catCols],
        outputCols=[f"{x}_OneHotEncoder" for x in catCols],
    )
]

In [26]:
one_hot_encode=one_hot_encoder[0].fit(string_index).transform(string_index)
one_hot_encode.show()

+--------+---------+-------------+--------------+-------+------------------+------------------+
|    type|   amount|oldbalanceOrg|newbalanceOrig|isFraud|type_StringIndexer|type_OneHotEncoder|
+--------+---------+-------------+--------------+-------+------------------+------------------+
| PAYMENT|  9839.64|     170136.0|     160296.36|      0|               1.0|     (4,[1],[1.0])|
| PAYMENT|  1864.28|      21249.0|      19384.72|      0|               1.0|     (4,[1],[1.0])|
|TRANSFER|    181.0|        181.0|           0.0|      1|               3.0|     (4,[3],[1.0])|
|CASH_OUT|    181.0|        181.0|           0.0|      1|               0.0|     (4,[0],[1.0])|
| PAYMENT| 11668.14|      41554.0|      29885.86|      0|               1.0|     (4,[1],[1.0])|
| PAYMENT|  7817.71|      53860.0|      46042.29|      0|               1.0|     (4,[1],[1.0])|
| PAYMENT|  7107.77|     183195.0|     176087.23|      0|               1.0|     (4,[1],[1.0])|
| PAYMENT|  7861.64|    176087.23|     1

**Vector assembling**

In [27]:
# VectorAssembler combines the values of input columns into a single vector.
from pyspark.ml.feature import VectorAssembler

In [28]:
assembler = [x for x in numCols]
assembler += [f"{x}_OneHotEncoder" for x in catCols]


In [29]:
assembler


['amount', 'oldbalanceOrg', 'newbalanceOrig', 'type_OneHotEncoder']

In [30]:
vector_assembler = VectorAssembler(
    inputCols=assembler, outputCol="VectorAssembler_features"
)


In [31]:
#stages and pipeline to apply the indexer and encoder
stages = []
stages += string_indexer
stages += one_hot_encoder
stages += [vector_assembler]

In [32]:
stages

[StringIndexer_930a77c42108,
 OneHotEncoder_1f15537e6e4f,
 VectorAssembler_67e6e9803714]

In [33]:
# A pipeline allows to maintain the data flow of all the relevant transformations that are required
#%%time
from pyspark.ml import Pipeline

pipeline = Pipeline().setStages(stages)
model = pipeline.fit(train)

pipe_df = model.transform(train)

In [34]:
pipe_df.select(
    "type", "amount", "oldbalanceOrg", "newbalanceOrig", "VectorAssembler_features",
).show(truncate=False)

+-------+------+-------------+--------------+--------------------------------------------------+
|type   |amount|oldbalanceOrg|newbalanceOrig|VectorAssembler_features                          |
+-------+------+-------------+--------------+--------------------------------------------------+
|CASH_IN|1.42  |1270713.41   |1270714.83    |[1.42,1270713.41,1270714.83,0.0,0.0,1.0,0.0]      |
|CASH_IN|4.35  |4136277.22   |4136281.57    |[4.35,4136277.22,4136281.57,0.0,0.0,1.0,0.0]      |
|CASH_IN|4.71  |50198.0      |50202.71      |[4.71,50198.0,50202.71,0.0,0.0,1.0,0.0]           |
|CASH_IN|5.19  |18104.0      |18109.19      |[5.19,18104.0,18109.19,0.0,0.0,1.0,0.0]           |
|CASH_IN|5.44  |0.0          |5.44          |(7,[0,2,5],[5.44,5.44,1.0])                       |
|CASH_IN|6.07  |400680.0     |400686.07     |[6.07,400680.0,400686.07,0.0,0.0,1.0,0.0]         |
|CASH_IN|8.27  |8428410.94   |8428419.21    |[8.27,8428410.94,8428419.21,0.0,0.0,1.0,0.0]      |
|CASH_IN|8.29  |20392.0      |

In [35]:
pipe_df.show()

+-------+------+-------------+--------------+-------+------------------+------------------+------------------------+
|   type|amount|oldbalanceOrg|newbalanceOrig|isFraud|type_StringIndexer|type_OneHotEncoder|VectorAssembler_features|
+-------+------+-------------+--------------+-------+------------------+------------------+------------------------+
|CASH_IN|  1.42|   1270713.41|    1270714.83|      0|               2.0|     (4,[2],[1.0])|    [1.42,1270713.41,...|
|CASH_IN|  4.35|   4136277.22|    4136281.57|      0|               2.0|     (4,[2],[1.0])|    [4.35,4136277.22,...|
|CASH_IN|  4.71|      50198.0|      50202.71|      0|               2.0|     (4,[2],[1.0])|    [4.71,50198.0,502...|
|CASH_IN|  5.19|      18104.0|      18109.19|      0|               2.0|     (4,[2],[1.0])|    [5.19,18104.0,181...|
|CASH_IN|  5.44|          0.0|          5.44|      0|               2.0|     (4,[2],[1.0])|    (7,[0,2,5],[5.44,...|
|CASH_IN|  6.07|     400680.0|     400686.07|      0|           

In [36]:
test.count()

1590968

In [37]:
df_test=test.where(test.isFraud == 1)

In [38]:
df_test.show()

+--------+--------+-------------+--------------+-------+
|    type|  amount|oldbalanceOrg|newbalanceOrig|isFraud|
+--------+--------+-------------+--------------+-------+
|CASH_OUT|   170.0|        170.0|           0.0|      1|
|CASH_OUT| 6215.44|      6215.44|           0.0|      1|
|CASH_OUT|  9131.0|       9131.0|           0.0|      1|
|CASH_OUT|10119.47|     10119.47|           0.0|      1|
|CASH_OUT| 10565.0|      10565.0|           0.0|      1|
|CASH_OUT| 11308.0|      11308.0|           0.0|      1|
|CASH_OUT| 11385.0|      11385.0|           0.0|      1|
|CASH_OUT|16274.51|     16274.51|           0.0|      1|
|CASH_OUT|16671.29|     16671.29|           0.0|      1|
|CASH_OUT|19742.71|     19742.71|           0.0|      1|
|CASH_OUT|21516.38|     21516.38|           0.0|      1|
|CASH_OUT|21574.55|     21574.55|           0.0|      1|
|CASH_OUT|21829.31|     21829.31|           0.0|      1|
|CASH_OUT| 21922.0|      21922.0|           0.0|      1|
|CASH_OUT| 21987.0|      21987.

**Logistic Regression**

In [39]:
from pyspark.ml.classification import LogisticRegression

In [40]:
data = pipe_df.select(
    F.col("VectorAssembler_features").alias("features"),
    F.col("isFraud").alias("label"),
)

In [41]:
data.show(5, truncate=False)

+--------------------------------------------+-----+
|features                                    |label|
+--------------------------------------------+-----+
|[1.42,1270713.41,1270714.83,0.0,0.0,1.0,0.0]|0    |
|[4.35,4136277.22,4136281.57,0.0,0.0,1.0,0.0]|0    |
|[4.71,50198.0,50202.71,0.0,0.0,1.0,0.0]     |0    |
|[5.19,18104.0,18109.19,0.0,0.0,1.0,0.0]     |0    |
|(7,[0,2,5],[5.44,5.44,1.0])                 |0    |
+--------------------------------------------+-----+
only showing top 5 rows



In [42]:
%%time
model = LogisticRegression().fit(data)
data=model.transform(data)

CPU times: user 842 ms, sys: 86.2 ms, total: 929 ms
Wall time: 2min 10s


In [43]:
data.show()

+--------------------+-----+--------------------+-----------+----------+
|            features|label|       rawPrediction|probability|prediction|
+--------------------+-----+--------------------+-----------+----------+
|[1.42,1270713.41,...|    0|[238.517757628337...|  [1.0,0.0]|       0.0|
|[4.35,4136277.22,...|    0|[244.429064104133...|  [1.0,0.0]|       0.0|
|[4.71,50198.0,502...|    0|[236.000249874816...|  [1.0,0.0]|       0.0|
|[5.19,18104.0,181...|    0|[235.934073923229...|  [1.0,0.0]|       0.0|
|(7,[0,2,5],[5.44,...|    0|[235.896743317671...|  [1.0,0.0]|       0.0|
|[6.07,400680.0,40...|    0|[236.723309887840...|  [1.0,0.0]|       0.0|
|[8.27,8428410.94,...|    0|[253.283181789225...|  [1.0,0.0]|       0.0|
|[8.29,20392.0,204...|    0|[235.938975193461...|  [1.0,0.0]|       0.0|
|[8.44,39384.0,393...|    0|[235.978161005906...|  [1.0,0.0]|       0.0|
|[9.04,99971.0,999...|    0|[236.103176061258...|  [1.0,0.0]|       0.0|
|[9.22,7730148.9,7...|    0|[251.842850337973...|  

**Model Testing**

In [44]:
#passing test data into the pipeline
model = pipeline.fit(df_test)

pipe_df_test = model.transform(df_test)

In [45]:
data_test = pipe_df_test.select(
    F.col("VectorAssembler_features").alias("features"),
    F.col("isFraud").alias("label"),
)

In [46]:
data_test.show(5, truncate=False)

+---------------------------+-----+
|features                   |label|
+---------------------------+-----+
|[170.0,170.0,0.0,1.0]      |1    |
|[6215.44,6215.44,0.0,1.0]  |1    |
|[9131.0,9131.0,0.0,1.0]    |1    |
|[10119.47,10119.47,0.0,1.0]|1    |
|[10565.0,10565.0,0.0,1.0]  |1    |
+---------------------------+-----+
only showing top 5 rows



In [47]:
model = LogisticRegression().fit(data_test)
data=model.transform(data_test)
data.show()

+--------------------+-----+--------------------+-----------+----------+
|            features|label|       rawPrediction|probability|prediction|
+--------------------+-----+--------------------+-----------+----------+
|[170.0,170.0,0.0,...|    1|[-Infinity,Infinity]|  [0.0,1.0]|       1.0|
|[6215.44,6215.44,...|    1|[-Infinity,Infinity]|  [0.0,1.0]|       1.0|
|[9131.0,9131.0,0....|    1|[-Infinity,Infinity]|  [0.0,1.0]|       1.0|
|[10119.47,10119.4...|    1|[-Infinity,Infinity]|  [0.0,1.0]|       1.0|
|[10565.0,10565.0,...|    1|[-Infinity,Infinity]|  [0.0,1.0]|       1.0|
|[11308.0,11308.0,...|    1|[-Infinity,Infinity]|  [0.0,1.0]|       1.0|
|[11385.0,11385.0,...|    1|[-Infinity,Infinity]|  [0.0,1.0]|       1.0|
|[16274.51,16274.5...|    1|[-Infinity,Infinity]|  [0.0,1.0]|       1.0|
|[16671.29,16671.2...|    1|[-Infinity,Infinity]|  [0.0,1.0]|       1.0|
|[19742.71,19742.7...|    1|[-Infinity,Infinity]|  [0.0,1.0]|       1.0|
|[21516.38,21516.3...|    1|[-Infinity,Infinity]|  

In [48]:
df.limit

<bound method DataFrame.limit of DataFrame[type: string, amount: double, oldbalanceOrg: double, newbalanceOrig: double, isFraud: int]>

In [49]:
model.summary.areaUnderROC

1.0

In [50]:
model.summary.pr.show()

+------+---------+
|recall|precision|
+------+---------+
|   0.0|      1.0|
|   1.0|      1.0|
+------+---------+

