In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import FloatType, IntegerType, ByteType
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


from data_preprocess import (
    cast_df_values,
    encode_string_column,
    get_data_shape,
    count_nans,
    count_col_values,
    assemble_cols
)

In [2]:
spark = SparkSession.builder.master("local").appName("model").getOrCreate()
spark

your 131072x1 screen size is bogus. expect trouble
24/06/24 15:48:12 WARN Utils: Your hostname, DESKTOP-NTRA1ID resolves to a loopback address: 127.0.1.1; using 172.22.18.100 instead (on interface eth0)
24/06/24 15:48:12 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/06/24 15:48:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
data = spark.read.csv("../dataset/PS_20174392719_1491204439457_log.csv", header=True)
data.show(5)

+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|step|    type|  amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|   1| PAYMENT| 9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|      0|             0|
|   1| PAYMENT| 1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|           0.0|      0|             0|
|   1|TRANSFER|   181.0|C1305486145|        181.0|           0.0| C553264065|           0.0|           0.0|      1|             0|
|   1|CASH_OUT|   181.0| C840083671|        181.0|           0.0|  C38997010|       21182.0|           0.0|      1|             0|
|   1| PAYMENT|11668.14|C2048537720|      41554.0|      29885.86|M1230701703|      

In [4]:
columns = data.columns
print("Columns:", "\n".join(columns), sep="\n\n")

Columns:

step
type
amount
nameOrig
oldbalanceOrg
newbalanceOrig
nameDest
oldbalanceDest
newbalanceDest
isFraud
isFlaggedFraud


In [5]:
rows, cols = get_data_shape(data)
print(f"Number of rows: {rows},\nNumber of features: {cols}")



Number of rows: 6362620,
Number of features: 11


                                                                                

In [6]:
from pyspark.sql.functions import isnan, when, count

data.select([count(when(isnan(c), c)).alias(c) for c in data.columns])

DataFrame[step: bigint, type: bigint, amount: bigint, nameOrig: bigint, oldbalanceOrg: bigint, newbalanceOrig: bigint, nameDest: bigint, oldbalanceDest: bigint, newbalanceDest: bigint, isFraud: bigint, isFlaggedFraud: bigint]

In [7]:
count_nans(data).show()



+----+----+------+--------+-------------+--------------+--------+--------------+--------------+-------+--------------+
|step|type|amount|nameOrig|oldbalanceOrg|newbalanceOrig|nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+----+------+--------+-------------+--------------+--------+--------------+--------------+-------+--------------+
|   0|   0|     0|       0|            0|             0|       0|             0|             0|      0|             0|
+----+----+------+--------+-------------+--------------+--------+--------------+--------------+-------+--------------+



                                                                                

In [8]:
float_columns = [
    "amount",
    "oldbalanceOrg",
    "newbalanceOrig",
    "oldbalanceDest",
    "newbalanceDest",
]
data = cast_df_values(data, float_columns, newType=FloatType)

int_columns = ["step", "isFraud", "isFlaggedFraud"]
data = cast_df_values(data, int_columns, newType=IntegerType)

byte_columns = [
    "nameOrig",
    "nameDest"
]
data = cast_df_values(data, byte_columns, newType=ByteType)

In [9]:
data

DataFrame[step: int, type: string, amount: float, nameOrig: tinyint, oldbalanceOrg: float, newbalanceOrig: float, nameDest: tinyint, oldbalanceDest: float, newbalanceDest: float, isFraud: int, isFlaggedFraud: int]

In [10]:
count_col_values(data, "type").show()



+--------+-------+
|    type|  count|
+--------+-------+
|TRANSFER| 532909|
| CASH_IN|1399284|
|CASH_OUT|2237500|
| PAYMENT|2151495|
|   DEBIT|  41432|
+--------+-------+



                                                                                

In [11]:
count_col_values(data, "isFraud").show()



+-------+-------+
|isFraud|  count|
+-------+-------+
|      1|   8213|
|      0|6354407|
+-------+-------+



                                                                                

In [12]:
data = encode_string_column(data, "type", "type_indx")

data.show(5)



+----+--------+--------+--------+-------------+--------------+--------+--------------+--------------+-------+--------------+---------+
|step|    type|  amount|nameOrig|oldbalanceOrg|newbalanceOrig|nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|type_indx|
+----+--------+--------+--------+-------------+--------------+--------+--------------+--------------+-------+--------------+---------+
|   1| PAYMENT| 9839.64|    NULL|     170136.0|     160296.36|    NULL|           0.0|           0.0|      0|             0|      1.0|
|   1| PAYMENT| 1864.28|    NULL|      21249.0|      19384.72|    NULL|           0.0|           0.0|      0|             0|      1.0|
|   1|TRANSFER|   181.0|    NULL|        181.0|           0.0|    NULL|           0.0|           0.0|      1|             0|      3.0|
|   1|CASH_OUT|   181.0|    NULL|        181.0|           0.0|    NULL|       21182.0|           0.0|      1|             0|      0.0|
|   1| PAYMENT|11668.14|    NULL|      41554.0|      29

                                                                                

In [13]:
features = [
    "type_indx",
    "amount",
    "oldbalanceOrg",
    "newbalanceOrig",
    "newbalanceDest",
    "oldbalanceDest",
]

target = "isFraud"

In [14]:
data = assemble_cols(data, features, "features")

In [15]:
train_df, test_df = data.randomSplit(weights=[0.8,0.2], seed=101)

In [16]:
count_col_values(train_df, "isFraud").show()

24/06/24 15:48:58 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/06/24 15:49:04 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/06/24 15:49:09 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.

+-------+-------+
|isFraud|  count|
+-------+-------+
|      1|   6614|
|      0|5082990|
+-------+-------+



                                                                                

In [17]:
count_col_values(test_df, "isFraud").show()

24/06/24 15:49:17 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/06/24 15:49:22 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/06/24 15:49:26 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.

+-------+-------+
|isFraud|  count|
+-------+-------+
|      1|   1599|
|      0|1271417|
+-------+-------+



                                                                                

In [18]:
from pyspark.ml.classification import RandomForestClassifier

clf = RandomForestClassifier(featuresCol="features", labelCol="isFraud")
model = clf.fit(train_df)

24/06/24 15:50:55 WARN MemoryStore: Not enough space to cache rdd_95_3 in memory! (computed 144.3 MiB so far)
24/06/24 15:50:55 WARN BlockManager: Persisting block rdd_95_3 to disk instead.
24/06/24 15:50:58 WARN MemoryStore: Not enough space to cache rdd_95_0 in memory! (computed 228.0 MiB so far)
24/06/24 15:50:59 WARN MemoryStore: Not enough space to cache rdd_95_1 in memory! (computed 228.0 MiB so far)
24/06/24 15:51:01 WARN MemoryStore: Not enough space to cache rdd_95_2 in memory! (computed 228.0 MiB so far)
24/06/24 15:51:03 WARN MemoryStore: Not enough space to cache rdd_95_0 in memory! (computed 228.0 MiB so far)
24/06/24 15:51:05 WARN MemoryStore: Not enough space to cache rdd_95_1 in memory! (computed 228.0 MiB so far)
24/06/24 15:51:07 WARN MemoryStore: Not enough space to cache rdd_95_2 in memory! (computed 228.0 MiB so far)
24/06/24 15:51:10 WARN MemoryStore: Not enough space to cache rdd_95_0 in memory! (computed 228.0 MiB so far)
24/06/24 15:51:13 WARN MemoryStore: Not 

In [19]:

predictions = model.transform(test_df)
evaluator = MulticlassClassificationEvaluator(labelCol="isFraud", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(accuracy)



0.9994831172585419


                                                                                

In [None]:
model_path = "../model/random_forest_classifier"
model.save(model_path)

In [22]:
spark.stop()