In [1]:
# Configure MLflow Experiment
mlflow_experiment_id = 866112

# Including MLflow
import mlflow
import mlflow.spark
import os
print("MLflow Version: %s" % mlflow.__version__)

MLflow Version: 2.1.1


In [8]:
import pandas as pd
import numpy as np

### Source Data

In [3]:
#Data set is synthetic dataset is scaled down 1/4 of the original dataset and it is created just for Kaggle
data_urls = "https://media.githubusercontent.com/media/FelixQLe/Detect_Financial_Fraud_at_Scale_with_decision_Trees/main/Synthetic_Financial_datasets_log.csv"

In [4]:
fin_fraud_dataset = pd.read_csv(data_urls, delimiter=',', header = 0)

In [5]:
#large dataset takes longer to load, so i make a copy to reuse in case
fin_fraud_copy = fin_fraud_dataset.copy()

In [6]:
fin_fraud_copy.shape

(6362620, 11)

### Create SQL database using PySpark

In [14]:
#import pyspark
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, SQLContext
print(f"PySpark Version : {pyspark.__version__}")

PySpark Version : 3.3.0


In [15]:
SET_GLOBAL_VERBOSE = False #set False if working on a Large Data

In [7]:
#Create a spark Context class
sc = SparkContext.getOrCreate(SparkConf())
## Create spark session
spark = SparkSession(sc).builder.master('local[*]').\
                config('saprk.sql.shuffle.partitions', 200).\
                config('spark.sql.debug.maxToStringFields', '100').\
                config('spark.default.parallelism', 300).\
                config('spark.driver.maxResultsSize', '0').\
                config('spark.driver.memory', 15000).\
                appName("Python Spark Dataframes Financial Fruad").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/02/04 01:03:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/02/04 01:04:02 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [8]:
fin_fraud_copy.head(5)

Unnamed: 0,step,type,amount,nameOrig,oldbalanceOrg,newbalanceOrig,nameDest,oldbalanceDest,newbalanceDest,isFraud,isFlaggedFraud
0,1,PAYMENT,9839.64,C1231006815,170136.0,160296.36,M1979787155,0.0,0.0,0,0
1,1,PAYMENT,1864.28,C1666544295,21249.0,19384.72,M2044282225,0.0,0.0,0,0
2,1,TRANSFER,181.0,C1305486145,181.0,0.0,C553264065,0.0,0.0,1,0
3,1,CASH_OUT,181.0,C840083671,181.0,0.0,C38997010,21182.0,0.0,1,0
4,1,PAYMENT,11668.14,C2048537720,41554.0,29885.86,M1230701703,0.0,0.0,0,0


In [9]:
#Loading data into Spark, take long
spark_df = spark.createDataFrame(fin_fraud_copy)

In [10]:
spark_df.printSchema()

root
 |-- step: long (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- nameOrig: string (nullable = true)
 |-- oldbalanceOrg: double (nullable = true)
 |-- newbalanceOrig: double (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- oldbalanceDest: double (nullable = true)
 |-- newbalanceDest: double (nullable = true)
 |-- isFraud: long (nullable = true)
 |-- isFlaggedFraud: long (nullable = true)



In [11]:
#create table view fin_fraud_table, we can treat it as sql table
spark_df.createTempView("fin_fraud_table")

In [12]:
# Create df DataFrame which contains our simulated financial fraud detection dataset
fin_fraud_df = spark.sql("select step, type, amount, nameOrig, oldbalanceOrg, newbalanceOrig, nameDest, oldbalanceDest, newbalanceDest from fin_fraud_table")

In [13]:
fin_fraud_df.printSchema()

root
 |-- step: long (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- nameOrig: string (nullable = true)
 |-- oldbalanceOrg: double (nullable = true)
 |-- newbalanceOrig: double (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- oldbalanceDest: double (nullable = true)
 |-- newbalanceDest: double (nullable = true)



In [14]:
# Review the schema of your data 
fin_fraud_df.show()

[Stage 0:>                                                          (0 + 0) / 1]

23/02/04 01:12:30 WARN TaskSetManager: Stage 0 contains a task of very large size (64404 KiB). The maximum recommended task size is 1000 KiB.


[Stage 0:>                                                          (0 + 1) / 1]

23/02/04 01:12:36 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 0 (TID 0): Attempting to kill Python Worker
+----+--------+---------+-----------+-------------+--------------+-----------+--------------+--------------+
|step|    type|   amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|
+----+--------+---------+-----------+-------------+--------------+-----------+--------------+--------------+
|   1| PAYMENT|  9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|
|   1| PAYMENT|  1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|           0.0|
|   1|TRANSFER|    181.0|C1305486145|        181.0|           0.0| C553264065|           0.0|           0.0|
|   1|CASH_OUT|    181.0| C840083671|        181.0|           0.0|  C38997010|       21182.0|           0.0|
|   1| PAYMENT| 11668.14|C2048537720|      41554.0|      29885.86|M1230701703|           0.0|          

                                                                                

In [15]:
#add column orgDiff and destDiff based on the difference between Originating and Destination
fin_fraud_df = fin_fraud_df.withColumn("orgDiff",
                            fin_fraud_df.newbalanceOrig - 
                                       fin_fraud_df.oldbalanceOrg).withColumn("destDiff",
                            fin_fraud_df.newbalanceDest - fin_fraud_df.oldbalanceDest)
    
#create temporary view
fin_fraud_df.createOrReplaceTempView("financials")

In [28]:
#review the new table
fin_fraud_df.show()

23/02/04 01:17:23 ERROR Inbox: An error happened while processing message in the inbox for LocalSchedulerBackendEndpoint
java.lang.OutOfMemoryError: Java heap space


Exception in thread "dispatcher-event-loop-2" java.lang.OutOfMemoryError: Java heap space
ERROR:root:KeyboardInterrupt while sending command.                 (0 + 0) / 1]
Traceback (most recent call last):
  File "/opt/miniconda3/envs/lighthouse/lib/python3.9/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/opt/miniconda3/envs/lighthouse/lib/python3.9/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/opt/miniconda3/envs/lighthouse/lib/python3.9/socket.py", line 704, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

In [27]:
fin_fraud_df.printSchema()

root
 |-- step: long (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- nameOrig: string (nullable = true)
 |-- oldbalanceOrg: double (nullable = true)
 |-- newbalanceOrig: double (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- oldbalanceDest: double (nullable = true)
 |-- newbalanceDest: double (nullable = true)
 |-- orgDiff: double (nullable = true)
 |-- destDiff: double (nullable = true)



### Exploring Data Analysis

#### What are the type of transactions?

In [23]:
%load_ext sparksql_magic

In [25]:
%config SparkSql.limit=20

In [None]:
%%sparksql
select type, count(1) from financials group by type limit 20