In [1]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m2.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285387 sha256=50afd5d885fc010c086eee2c87392f6cedc10f281396991f152bdc626fb00d2b
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


In [2]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T

import os

In [4]:
spark = SparkSession.builder.appName('Project').getOrCreate()
spark

In [9]:
df = spark.read.csv('moneysim.csv', inferSchema=True, header=True)
df.show(5)

+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|step|    type|  amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|   1| PAYMENT| 9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|      0|             0|
|   1| PAYMENT| 1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|           0.0|      0|             0|
|   1|TRANSFER|   181.0|C1305486145|        181.0|           0.0| C553264065|           0.0|           0.0|      1|             0|
|   1|CASH_OUT|   181.0| C840083671|        181.0|           0.0|  C38997010|       21182.0|           0.0|      1|             0|
|   1| PAYMENT|11668.14|C2048537720|      41554.0|      29885.86|M1230701703|      

In [10]:
df.printSchema()

root
 |-- step: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- nameOrig: string (nullable = true)
 |-- oldbalanceOrg: double (nullable = true)
 |-- newbalanceOrig: double (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- oldbalanceDest: double (nullable = true)
 |-- newbalanceDest: double (nullable = true)
 |-- isFraud: integer (nullable = true)
 |-- isFlaggedFraud: integer (nullable = true)



In [13]:
df = df.select('type', 'amount', 'oldbalanceOrg', 'newbalanceOrig','isFraud')
df.show(5)

+--------+--------+-------------+--------------+-------+
|    type|  amount|oldbalanceOrg|newbalanceOrig|isFraud|
+--------+--------+-------------+--------------+-------+
| PAYMENT| 9839.64|     170136.0|     160296.36|      0|
| PAYMENT| 1864.28|      21249.0|      19384.72|      0|
|TRANSFER|   181.0|        181.0|           0.0|      1|
|CASH_OUT|   181.0|        181.0|           0.0|      1|
| PAYMENT|11668.14|      41554.0|      29885.86|      0|
+--------+--------+-------------+--------------+-------+
only showing top 5 rows



In [16]:
train, test = df.randomSplit([0.7,0.3], seed=137)
print(f"Training set size: {train.count()} records")
print(f"Testing set size: {test.count()} records")

Training set size: 4452314 records
Testing set size: 1910306 records


In [15]:
train.dtypes

[('step', 'int'),
 ('type', 'string'),
 ('amount', 'double'),
 ('nameOrig', 'string'),
 ('oldbalanceOrg', 'double'),
 ('newbalanceOrig', 'double'),
 ('nameDest', 'string'),
 ('oldbalanceDest', 'double'),
 ('newbalanceDest', 'double'),
 ('isFraud', 'int'),
 ('isFlaggedFraud', 'int')]

In [17]:
categorical = [x for (x, dataType) in train.dtypes if dataType == 'string']
numerical = [x for (x, dataType) in train.dtypes if ((dataType == 'double') & (x != 'isFraud'))]

print(numerical)
print(categorical)

['amount', 'oldbalanceOrg', 'newbalanceOrig']
['type']


In [18]:
### Using OneHotEncoder to encode categorical features into indexed ones

train.agg(F.countDistinct('type')).show()

+-----------+
|count(type)|
+-----------+
|          5|
+-----------+



In [19]:
train.groupBy ('type').count().show()

+--------+-------+
|    type|  count|
+--------+-------+
|TRANSFER| 372692|
| CASH_IN| 978862|
|CASH_OUT|1566782|
| PAYMENT|1505046|
|   DEBIT|  28932|
+--------+-------+



In [22]:
from pyspark.ml.feature import (OneHotEncoder, StringIndexer)

In [24]:
string_indexer = [StringIndexer(inputCol=x, outputCol=x + "_StringIndexer", handleInvalid="skip") for x in categorical]
onehot = [OneHotEncoder(inputCols=[f"{x}_StringIndexer" for x in categorical], outputCols=[f"{x}_OneHotEncoder" for x in categorical])]

In [26]:
from pyspark.ml.feature import VectorAssembler

In [29]:
assemblerInput = [x for x in numerical]
assemblerInput += [f"{x}_OneHotEncoder" for x in categorical]

In [30]:
assemblerInput

['amount', 'oldbalanceOrg', 'newbalanceOrig', 'type_OneHotEncoder']

In [31]:
vector_assembler = VectorAssembler(inputCols=assemblerInput, outputCol="VectorAssembler_features")

In [32]:
stages = []
stages += string_indexer
stages += onehot
stages += [vector_assembler]
stages

[StringIndexer_2c4557167c13,
 OneHotEncoder_ec80d48bf722,
 VectorAssembler_dd0fd75a47d1]

In [33]:
%%time
from pyspark.ml import Pipeline

pipeline = Pipeline().setStages(stages)
model = pipeline.fit(train)
pp_df = model.transform(test)

CPU times: user 372 ms, sys: 57.8 ms, total: 430 ms
Wall time: 58.8 s


In [34]:
from pyspark.ml.classification import LogisticRegression

data = pp_df.select(
    F.col("VectorAssembler_features").alias("features"),
    F.col("isFraud").alias("label"),
    )

data.show(5, truncate=False)


+--------------------------------------------------+-----+
|features                                          |label|
+--------------------------------------------------+-----+
|[1.42,1270713.41,1270714.83,0.0,0.0,1.0,0.0]      |0    |
|[4.58,94241.0,94245.58,0.0,0.0,1.0,0.0]           |0    |
|[9.04,99971.0,99980.04,0.0,0.0,1.0,0.0]           |0    |
|[13.2,106204.0,106217.2,0.0,0.0,1.0,0.0]          |0    |
|[14.4,1.143460813E7,1.143462253E7,0.0,0.0,1.0,0.0]|0    |
+--------------------------------------------------+-----+
only showing top 5 rows



In [35]:
%%time
model = LogisticRegression().fit(data)

CPU times: user 815 ms, sys: 89.1 ms, total: 904 ms
Wall time: 2min 13s


In [36]:
model.summary.areaUnderROC

0.9936772936803748

In [37]:
model.summary.pr.show()

+-------------------+-------------------+
|             recall|          precision|
+-------------------+-------------------+
|                0.0| 0.9304979253112033|
|0.36552567237163813| 0.9304979253112033|
| 0.4930725346373268| 0.7006369426751592|
| 0.5570497147514263|  0.548995983935743|
| 0.6075794621026895|0.45820528580208975|
| 0.6438467807660961|0.39332835449340303|
| 0.6719641401792991|0.34497907949790796|
| 0.6951915240423798| 0.3077755727945156|
| 0.7155664221678891|  0.278464954012052|
| 0.7359413202933985| 0.2554816805771679|
| 0.7571312143439283|0.23723186925434117|
| 0.7762836185819071|0.22164048865619546|
| 0.7897310513447433|0.20709553323359692|
| 0.8019559902200489| 0.1944471890129434|
| 0.8064384678076609| 0.1818265343623668|
| 0.8105134474327629| 0.1707735897656049|
| 0.8149959250203749|0.16116035455278002|
| 0.8198859005704972|0.15273665831625294|
|  0.823960880195599| 0.1450918484500574|
| 0.8259983700081499|0.13789115646258504|
+-------------------+-------------