In [1]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.config("spark.driver.memory", "64g").appName('Fraud Detection').getOrCreate()



In [2]:
data = spark.read.option("inferSchema","true").option("header","true").csv("archive/bank_data.csv")
data.show(3)

data.printSchema()
print(data.count())
data.cache()

data.show(5)



+----+--------+-------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|step|    type| amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+--------+-------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|   1| PAYMENT|9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|      0|             0|
|   1| PAYMENT|1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|           0.0|      0|             0|
|   1|TRANSFER|  181.0|C1305486145|        181.0|           0.0| C553264065|           0.0|           0.0|      1|             0|
+----+--------+-------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
only showing top 3 rows

root
 |-- step: integer (nullable = true)
 |-- type: string (null

In [3]:
from pyspark.sql.functions import col
data.groupBy("isFraud").count().orderBy(col("count").desc()).show()
     
# parsed.createOrReplaceTempView("linkage")

+-------+-------+
|isFraud|  count|
+-------+-------+
|      0|6354407|
|      1|   8213|
+-------+-------+



In [4]:
from pyspark.sql.functions import col
data.groupBy("isFlaggedFraud").count().orderBy(col("count").desc()).show()
     
# parsed.createOrReplaceTempView("linkage")

+--------------+-------+
|isFlaggedFraud|  count|
+--------------+-------+
|             0|6362604|
|             1|     16|
+--------------+-------+



In [5]:
from pyspark.mllib.evaluation import MulticlassMetrics

In [6]:
#important: need to cast to float type, and order by prediction, else it won't work

#select only prediction and label columns
preds_and_labels = data.select(data['isFlaggedFraud'].cast('Double'), data['isFraud'].cast('Double'))
preds_and_labels.show()
metrics = MulticlassMetrics(preds_and_labels.rdd.map(tuple))
# dataColl=metrics.collect()
# for row in dataColl:
#     print(row[0] + "," +str(row[1]))
print(metrics.confusionMatrix().toArray())

+--------------+-------+
|isFlaggedFraud|isFraud|
+--------------+-------+
|           0.0|    0.0|
|           0.0|    0.0|
|           0.0|    1.0|
|           0.0|    1.0|
|           0.0|    0.0|
|           0.0|    0.0|
|           0.0|    0.0|
|           0.0|    0.0|
|           0.0|    0.0|
|           0.0|    0.0|
|           0.0|    0.0|
|           0.0|    0.0|
|           0.0|    0.0|
|           0.0|    0.0|
|           0.0|    0.0|
|           0.0|    0.0|
|           0.0|    0.0|
|           0.0|    0.0|
|           0.0|    0.0|
|           0.0|    0.0|
+--------------+-------+
only showing top 20 rows





[[6.354407e+06 0.000000e+00]
 [8.197000e+03 1.600000e+01]]


In [7]:
import numpy as np

# Given numbers in scientific notation
numbers = [
    [6.354407e+06, 0.000000e+00],
    [8.197000e+03, 1.600000e+01]
]

# Convert to normal numbers
normal_numbers = [[f'{x:.2f}' for x in row] for row in numbers]

# Print normal numbers
for row in normal_numbers:
    print(row)


['6354407.00', '0.00']
['8197.00', '16.00']


In [8]:
data.select('amount','oldbalanceOrg','newbalanceOrig','isFraud').describe().show()

+-------+------------------+------------------+-----------------+--------------------+
|summary|            amount|     oldbalanceOrg|   newbalanceOrig|             isFraud|
+-------+------------------+------------------+-----------------+--------------------+
|  count|           6362620|           6362620|          6362620|             6362620|
|   mean|179861.90354913118| 833883.1040744853|855113.6685785885|0.001290820448180152|
| stddev| 603858.2314629366|2888242.6730375616|2924048.502954269| 0.03590479680160424|
|    min|               0.0|               0.0|              0.0|                   0|
|    max|     9.244551664E7|     5.958504037E7|    4.958504037E7|                   1|
+-------+------------------+------------------+-----------------+--------------------+



In [9]:
import pyspark.sql.functions as F

data_agg = data.agg(*[F.count(F.when(F.isnull(c), c)).alias(c) for c in data.columns])
data_agg.show()
data.groupBy('type').count().show()

+----+----+------+--------+-------------+--------------+--------+--------------+--------------+-------+--------------+
|step|type|amount|nameOrig|oldbalanceOrg|newbalanceOrig|nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+----+------+--------+-------------+--------------+--------+--------------+--------------+-------+--------------+
|   0|   0|     0|       0|            0|             0|       0|             0|             0|      0|             0|
+----+----+------+--------+-------------+--------------+--------+--------------+--------------+-------+--------------+

+--------+-------+
|    type|  count|
+--------+-------+
|TRANSFER| 532909|
| CASH_IN|1399284|
|CASH_OUT|2237500|
| PAYMENT|2151495|
|   DEBIT|  41432|
+--------+-------+



In [10]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import matplotlib.pyplot as plt

# Assuming you have a SparkSession named spark
fraud_counts = data.groupBy('isFraud').count().toPandas()

import seaborn as sns
sns.barplot(x='count', y='isFraud', data=fraud_counts)
plt.show()

flagged_fraud_counts = data.groupBy('isFlaggedFraud').count().toPandas()
sns.barplot(x='count', y='isFlaggedFraud', data=flagged_fraud_counts)
plt.show()


<Figure size 640x480 with 1 Axes>

<Figure size 640x480 with 1 Axes>

In [11]:
sns.lineplot(x=list(range(1,744)),y=data.groupby("step")["isFraud"].sum())
plt.xlabel("Hour of the month")
plt.ylabel("Number of transactions per hour")
plt.show()

TypeError: 'GroupedData' object is not subscriptable

In [17]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

import plotly.plotly as py
import plotly.graph_objs as go
import pandas as pd
import requests
requests.packages.urllib3.disable_warnings()

NameError: name 'sc' is not defined