In [1]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import RobustScaler
from pyspark.ml.feature import StringIndexer
from pyspark.sql.types import IntegerType, DoubleType
from pyspark.sql.functions import udf, col, round
from pyspark.sql.functions import *
from time import time

from pyspark.ml import Pipeline
from pyspark.sql.functions import rand
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [2]:
class FraudDetection():
    def __init__(self):
        self.spark = None
        self.data = None
        self.rep_data = None # repeated data
        
    def create_spark_context(self, ram, rpt=False, ret=False):
        self.spark = SparkSession.\
            builder.\
            appName("Fraud Detector").\
            master("spark://spark-master:7077").\
            config("spark.executor.memory", "{}g".format(ram)).\
            getOrCreate()
        if rpt: print(self.spark.sparkContext.getConf().getAll())
        if ret: return self.spark
    
    def read_file(self, path, rpt=False, ret=False):
        self.data = self.spark.read.csv(path, header=True, inferSchema=True)
        if rpt: print('number of partitions: {}'.format(self.data.rdd.getNumPartitions()))
        if ret: return self.data
    
    def data_duplicator(self, number, rpt=False, ret=False):
        self.rep_data = self.data
        for i in range(number-1):
            self.rep_data = self.data.union(self.rep_data)
        if rpt: print("Created df with: {}, {}".format(self.rep_data .count(), len(self.rep_data .columns)))
        if ret: return self.rep_data

        
class Preprocess():
    def __init__(self, data):
        self.spark = None
        self.sub_sample = None
        self.data = data
        
    def scale_column(self, feature):
        self.data = self.data.withColumn(feature, self.data[feature].cast(IntegerType()))
        assembler = VectorAssembler().setInputCols([feature]).setOutputCol('f'+feature)
        self.data = assembler.transform(self.data)
        self.data = self.data.drop(feature)
        scaler = RobustScaler(inputCol="f"+feature, outputCol=feature,
                          withScaling=True, withCentering=False,
                          lower=0.25, upper=0.75)
        scalerModel = scaler.fit(self.data)
        self.data = scalerModel.transform(self.data)
        self.data = self.data.drop('f'+feature)
        unlist = udf(lambda x: float(list(x)[0]), DoubleType())
        self.data = self.data.withColumn(feature, unlist(feature))
        return self.data
    
    def robust_scale(self, scale_columns):
        for column in scale_columns:
            self.data = self.scale_column(column)
        return self.data
    
    def calculate_iqr_bound(self, feature, q1, q3, k, rpt=False):
        bound = self.sub_sample.filter(self.data.Class==1).approxQuantile(feature, [q1, q3], 0)
        if rpt: print(f'Feature: {feature}, Lower bound: {bound[0]}, Upper bound: {bound[1]}')
        iqr = bound[1] - bound[0]
        if rpt: print(f'Feature: {feature}, IQR: {iqr}')
        bound[0] = bound[0] - (iqr * k)
        bound[1] = bound[1] + (iqr * k)
        if rpt: print(f'Feature: {feature}, Cut-off Lower bound: {bound[0]}, Cut-off Upper bound: {bound[1]}')
        return bound
    
    def outlier_removal(self, features, q1=0.25, q3=0.75, k=1.5, rpt=False):
        frauds = self.data.filter(self.data.Class==1)
        self.sub_sample = frauds.union(self.data.filter(self.data.Class==0).limit(492))
        for feature in features:
            before_removal_count = self.sub_sample.count()
            bound = self.calculate_iqr_bound(feature, q1, q3, k, rpt=rpt)
            self.sub_sample = self.sub_sample.filter((col(feature) >= bound[0]) & (col(feature) <= bound[1]))
            after_removal_count = self.sub_sample.count()
            if rpt: print(f'before removal count: {before_removal_count}, after removal count: {after_removal_count}')
    
    def assemble_features(self):
        assembler = VectorAssembler(inputCols=['V{}'.format(i) for i in range(1,29)], outputCol='features')
        self.data = assembler.transform(self.data)
        return self.data

    
class Evaluator():
    def __init__(self):
        pass
    
    def accuracy(self, data):
        accuracy_evaluator = MulticlassClassificationEvaluator(labelCol="prediction", predictionCol="class", metricName="accuracy")
        print('accuracy: {}'.format(accuracy_evaluator.evaluate(data)))
        
    def recall(self, data):
        recall_evaluator = MulticlassClassificationEvaluator(labelCol="prediction", predictionCol="class", metricName="recallByLabel")
        print('recall: {}'.format(recall_evaluator.evaluate(data)))
    
    def recall(self, data):
        recall_evaluator = MulticlassClassificationEvaluator(labelCol="prediction", predictionCol="class", metricName="recallByLabel")
        print('recall: {}'.format(recall_evaluator.evaluate(data))) 

In [3]:
ram=16
duplicate=1
splitation=[0.7, 0.1, 0.2]
detector = FraudDetection()
detector.create_spark_context(ram=ram)
detector.read_file("/opt/workspace/bank_sim.csv", True)
detector.data_duplicator(duplicate, True)

number of partitions: 12


Py4JJavaError: An error occurred while calling o38.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 6 in stage 2.0 failed 4 times, most recent failure: Lost task 6.3 in stage 2.0 (TID 48, 10.0.1.16, executor 0): java.io.FileNotFoundException: File file:/opt/workspace/bank_sim.csv does not exist
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:124)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:169)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2023)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1972)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1971)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1971)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:950)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:950)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:950)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2203)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2152)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2141)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:752)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2093)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2133)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2158)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1004)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1003)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:385)
	at org.apache.spark.sql.Dataset.$anonfun$count$1(Dataset.scala:2979)
	at org.apache.spark.sql.Dataset.$anonfun$count$1$adapted(Dataset.scala:2978)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3616)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3614)
	at org.apache.spark.sql.Dataset.count(Dataset.scala:2978)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.FileNotFoundException: File file:/opt/workspace/bank_sim.csv does not exist
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:124)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:169)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [None]:
df = detector.data

In [None]:
df.limit(5).show()

In [None]:
def index_column(df, column):
    indexer = StringIndexer(inputCol=column, outputCol=column+"Index")
    df = indexer.fit(df).transform(df)
    df = df.withColumn(column, df[column+"Index"].cast(IntegerType()))
    df = df.drop(column+"Index")
    return df

In [None]:
df = index_column(df, 'merchant')
df = index_column(df, 'category')
df = index_column(df, 'customer')
df = index_column(df, 'age')
df = index_column(df, 'gender')

In [None]:
def merchant_fraud_probablity(merchant):
    merchant_df = df.filter(df.merchant==merchant)
    return merchant_df.filter(merchant_df.fraud==1).count()/merchant_df.count()

In [None]:
merchants = df.toPandas()['merchant'].unique()
merchants_fraud_probablity = {}
for merchant in merchants:
    merchants_fraud_probablity[merchant] = merchant_fraud_probablity(int(merchant))

In [None]:
def merchant_probablity(merchant):
    return merchants_fraud_probablity[merchant]

In [None]:
df = df.rdd.map(lambda x: x + (merchant_probablity(x["merchant"]),)).toDF(df.columns + ["merchanttProbablity"])

In [None]:
df.show()

In [None]:
inputCols = [
 'customer',
 'age',
 'gender',
 'merchant',
 'category',
 'amount',
 'merchanttProbablity'
]

In [None]:
assembler = VectorAssembler(inputCols=inputCols, outputCol='features')

In [None]:
df = assembler.transform(df)

In [None]:
df.show()

In [None]:
train, validation, test = df.randomSplit([0.7, 0.1, 0.2])

In [None]:
# With 1 Worker

start = time()
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(labelCol="fraud", featuresCol="features", numTrees=10)
pipeline = Pipeline(stages=[rf])

# Train model.  This also runs the indexers.
model = pipeline.fit(train)

# Make predictions.
predictions = model.transform(test)

# Select example rows to display.
predictions = predictions.withColumn("fraud", predictions["fraud"].cast(DoubleType()))

roc_evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction", labelCol="fraud", metricName="areaUnderROC")
print('ROC: {}'.format(roc_evaluator.evaluate(predictions)))

accuracy_evaluator = MulticlassClassificationEvaluator(labelCol="fraud", predictionCol="prediction", metricName="accuracy")
print('accuracy: {}'.format(accuracy_evaluator.evaluate(predictions)))
print('Elapsed time is: {}'.format(time()-start))