### Machine Learning Library (MLlib)

In [2]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T


In [3]:
spark = SparkSession.builder.getOrCreate()

22/11/20 12:50:52 WARN Utils: Your hostname, dell-5570 resolves to a loopback address: 127.0.1.1; using 10.189.123.35 instead (on interface wlp0s20f3)
22/11/20 12:50:52 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/11/20 12:50:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
spark

In [36]:
import os
from os.path import isfile, join

df = spark.read.csv("data/paysim.csv", inferSchema = True, header = True)

                                                                                

In [37]:
df.printSchema()

root
 |-- step: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- nameOrig: string (nullable = true)
 |-- oldbalanceOrg: double (nullable = true)
 |-- newbalanceOrig: double (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- oldbalanceDest: double (nullable = true)
 |-- newbalanceDest: double (nullable = true)
 |-- isFraud: integer (nullable = true)
 |-- isFlaggedFraud: integer (nullable = true)



In [38]:
df.show(2)

+----+-------+-------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|step|   type| amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+-------+-------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|   1|PAYMENT|9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|      0|             0|
|   1|PAYMENT|1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|           0.0|      0|             0|
+----+-------+-------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
only showing top 2 rows



In [39]:
df = df.select("type", "amount", "oldbalanceOrg", "newbalanceOrig", "isFraud")

In [40]:
train, test = df.randomSplit([0.7,0.3], seed=7)

In [41]:
print(f"train set lenght: {train.count()} records")
print(f"test set lenght: {test.count()} records")

                                                                                

train set lenght: 4453616 records




test set lenght: 1909004 records


                                                                                

In [42]:
train.show(2)

[Stage 28:>                                                         (0 + 1) / 1]

+-------+------+-------------+--------------+-------+
|   type|amount|oldbalanceOrg|newbalanceOrig|isFraud|
+-------+------+-------------+--------------+-------+
|CASH_IN|  5.66|   5061561.06|    5061566.72|      0|
|CASH_IN|  14.4|1.143460813E7| 1.143462253E7|      0|
+-------+------+-------------+--------------+-------+
only showing top 2 rows



                                                                                

## Dtypes

* in this dataset, any columns of type string as a categotical feature, but sometimes we might have numeric features we want treated as categotical or vice versa. We'll need to carefully identify which columns are numeric and which are categotical

In [70]:
train.dtypes

[('type', 'string'),
 ('amount', 'double'),
 ('oldbalanceOrg', 'double'),
 ('newbalanceOrig', 'double'),
 ('isFraud', 'int')]

In [71]:
catCols = [x for (x, dataType) in train.dtypes if dataType =="string"]
numCols = [x for (x, dataType) in train.dtypes if ((dataType =="double") & (x != "isFraud"))]

In [72]:
print(numCols)
print(catCols)

['amount', 'oldbalanceOrg', 'newbalanceOrig']
['type']


In [73]:
train.agg(F.countDistinct("type")).show()


[Stage 44:=====>                                                  (2 + 18) / 20]

+-----------+
|count(type)|
+-----------+
|          5|
+-----------+



                                                                                

In [74]:
train.groupBy("type").count().show()

[Stage 50:>                                                       (0 + 20) / 20]

22/11/20 15:49:03 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/11/20 15:49:03 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/11/20 15:49:03 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/11/20 15:49:03 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/11/20 15:49:03 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/11/20 15:49:03 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/11/20 15:49:03 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/11/20 15:49:03 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/11/20 15:49:03 WARN RowBasedKeyValueBatch: Calling spill() on

[Stage 50:==>                                                     (1 + 19) / 20]

+--------+-------+
|    type|  count|
+--------+-------+
|TRANSFER| 372954|
| CASH_IN| 979563|
|CASH_OUT|1566092|
| PAYMENT|1506065|
|   DEBIT|  28942|
+--------+-------+





In [76]:
from pyspark.ml.feature import (OneHotEncoder, StringIndexer,)

In [77]:
string_indexer = [StringIndexer(inputCol = x, outputCol = x + "_StringIndexer", handleInvalid = "skip") for x in catCols]

In [78]:
string_indexer

[StringIndexer_09cfc459d60f]

In [79]:
one_hot_encoder = [OneHotEncoder(
inputCols = [f"{x}_StringIndexer" for x in catCols],
outputCols = [f"{x}_OneHotEncoder" for x in catCols],
)
                  ]

In [80]:
one_hot_encoder

[OneHotEncoder_fbae5d1b0f57]

In [81]:
from pyspark.ml.feature import VectorAssembler

In [82]:
assemblerInput  = [x for x in numCols]

In [83]:
assemblerInput += [f"{x}_oneHotEncoder" for x in catCols]

In [84]:
assemblerInput

['amount', 'oldbalanceOrg', 'newbalanceOrig', 'type_oneHotEncoder']

In [87]:
vector_assembler = VectorAssembler(
inputCols = assemblerInput, outputCol = "VectorAssembmler_features")

In [91]:
stages = []
stages +=string_indexer
stages +=one_hot_encoder
stages +=[vector_assembler]

In [92]:
stages

[StringIndexer_09cfc459d60f,
 OneHotEncoder_fbae5d1b0f57,
 VectorAssembler_b440cd865c97]

In [93]:
%%time
from pyspark.ml import Pipeline
pipeline = Pipeline().setStages(stages)
model = pipeline.fit(train)

#pp_df =  model.transform(test)



CPU times: user 20.5 ms, sys: 0 ns, total: 20.5 ms
Wall time: 2.67 s


                                                                                

22/11/20 18:04:40 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 2934359 ms exceeds timeout 120000 ms
22/11/20 18:04:40 WARN SparkContext: Killing executors is not supported by current scheduler.
