In [1]:
## Import necessary Libraries about Project

In [1]:
import numpy as np 
import pandas as pd 
import matplotlib.pyplot as plt 
plt.style.use("seaborn-whitegrid")

import seaborn as sns 
from collections import Counter

import warnings 
warnings.filterwarnings("ignore")

In [2]:
import findspark
findspark.init()

In [3]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1 pyspark-shell'

In [4]:
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark import SparkContext
from pyspark.sql.functions import *

In [2]:
# Build and Read file

In [5]:
spark = SparkSession \
    .builder \
    .appName("Spark ,Fraud Detection") \
    .getOrCreate()

In [6]:
Erdem_data = "/Users/erdemsmac/Desktop/fraud.csv"

In [7]:
fraud_df = spark.read.csv(Erdem_data,header = 'True',inferSchema='True')

In [3]:
# Basic Analysis and Visualization

In [8]:
display(fraud_df)

DataFrame[step: int, type: string, amount: double, nameOrig: string, oldbalanceOrg: double, newbalanceOrig: double, nameDest: string, oldbalanceDest: double, newbalanceDest: double, isFraud: int, isFlaggedFraud: int]

In [9]:
fraud_df.printSchema()

root
 |-- step: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- nameOrig: string (nullable = true)
 |-- oldbalanceOrg: double (nullable = true)
 |-- newbalanceOrig: double (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- oldbalanceDest: double (nullable = true)
 |-- newbalanceDest: double (nullable = true)
 |-- isFraud: integer (nullable = true)
 |-- isFlaggedFraud: integer (nullable = true)



In [10]:
action_count = fraud_df.count()
print(action_count)

6362620


In [11]:
fraud_df.show(5)

+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|step|    type|  amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|   1| PAYMENT| 9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|      0|             0|
|   1| PAYMENT| 1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|           0.0|      0|             0|
|   1|TRANSFER|   181.0|C1305486145|        181.0|           0.0| C553264065|           0.0|           0.0|      1|             0|
|   1|CASH_OUT|   181.0| C840083671|        181.0|           0.0|  C38997010|       21182.0|           0.0|      1|             0|
|   1| PAYMENT|11668.14|C2048537720|      41554.0|      29885.86|M1230701703|      

In [12]:
fraud_df.groupBy("type").count().show()

+--------+-------+
|    type|  count|
+--------+-------+
|TRANSFER| 532909|
| CASH_IN|1399284|
|CASH_OUT|2237500|
| PAYMENT|2151495|
|   DEBIT|  41432|
+--------+-------+



In [13]:
fraud_df2=fraud_df.filter(fraud_df.amount > 5000)

In [10]:
fraud_df3=fraud_df.filter(fraud_df.nameDest == 'M1979787155')

In [11]:
fraud_df3.count()

1

In [14]:
fraud_df2.show()

+----+--------+---------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|step|    type|   amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+--------+---------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|   1| PAYMENT|  9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|      0|             0|
|   1| PAYMENT| 11668.14|C2048537720|      41554.0|      29885.86|M1230701703|           0.0|           0.0|      0|             0|
|   1| PAYMENT|  7817.71|  C90045638|      53860.0|      46042.29| M573487274|           0.0|           0.0|      0|             0|
|   1| PAYMENT|  7107.77| C154988899|     183195.0|     176087.23| M408069119|           0.0|           0.0|      0|             0|
|   1| PAYMENT|  7861.64|C1912850431|    176087.23|     168225.59| M63332633

In [15]:
fraud_df2.count()

5660119

In [4]:
# Below codes are wrote about Real time Analysis -Structured Streaming-

In [16]:
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers","localhost:9092") \
.option("subscribe","fraud") \
.load()

In [17]:
df2=df.select(col("value").cast("string")) .alias("csv").select("csv.*")

In [18]:
df3=df2 \
      .selectExpr("split(value,',')[0] as step" \
                 ,"split(value,',')[1] as type" \
                 ,"split(value,',')[2] as amount" \
                 ,"split(value,',')[3] as nameOrig" \
                 ,"split(value,',')[4] as oldbalanceOrg" \
                 ,"split(value,',')[5] as newbalanceOrig"\
                 ,"split(value,',')[4] as nameDest" \
                 ,"split(value,',')[5] as oldbalanceDest" \
                 ,"split(value,',')[5] as newbalanceDest"\
                 ,"split(value,',')[4] as isFraud" \
                 ,"split(value,',')[5] as isFlaggedFraud" 
                 
                 )

In [19]:
df4=df3.filter(df3.amount > 10000)

In [20]:
Stream=df4.groupBy("type").count()

In [21]:
query = Stream.writeStream \
.format("console") \
.outputMode("complete") \
.trigger(processingTime='1 seconds') \
.start()

In [None]:
query.awaitTermination()