# Part 02 - Pyspark

Gradient Boosted Trees applied to Fraud detection

#### Pyspark libraries

In [1]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import col, countDistinct
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, array, lit
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import GBTClassifier
from pyspark.sql.functions import pow, col
import datetime
from pyspark.sql.functions import year, month, dayofmonth
from pyspark.sql.functions import isnan, when, count, col
from pyspark.sql.functions import col, countDistinct

#### Python libraries

In [6]:
import pandas as pd
import matplotlib.pyplot as plt
import sys
%config InlineBackend.figure_format = 'retina'
import warnings
warnings.filterwarnings('ignore')
warnings.simplefilter(action='ignore', category=FutureWarning)

In [4]:
spark = SparkSession.builder.appName('FraudTreeMethods').getOrCreate()

## Read Data

In [18]:
# inserting the parent directory into current path
sys.path.insert(1, '../work/data_set')

In [20]:
data_name = 'train_sample.csv'
dataset_address = '../work/data_set/'
path = dataset_address + data_name
RDD = spark.read.csv(path, inferSchema=True, header=True)

In [21]:
RDD.show(5)

+------+---+------+---+-------+-------------------+---------------+-------------+
|    ip|app|device| os|channel|         click_time|attributed_time|is_attributed|
+------+---+------+---+-------+-------------------+---------------+-------------+
| 87540| 12|     1| 13|    497|2017-11-07 09:30:38|           null|            0|
|105560| 25|     1| 17|    259|2017-11-07 13:40:27|           null|            0|
|101424| 12|     1| 19|    212|2017-11-07 18:05:24|           null|            0|
| 94584| 13|     1| 13|    477|2017-11-07 04:58:08|           null|            0|
| 68413| 12|     1|  1|    178|2017-11-09 09:00:09|           null|            0|
+------+---+------+---+-------+-------------------+---------------+-------------+
only showing top 5 rows



In [6]:
print('RDD.printSchema is \n')
RDD.printSchema()

RDD.printSchema is 

root
 |-- ip: integer (nullable = true)
 |-- app: integer (nullable = true)
 |-- device: integer (nullable = true)
 |-- os: integer (nullable = true)
 |-- channel: integer (nullable = true)
 |-- click_time: string (nullable = true)
 |-- attributed_time: string (nullable = true)
 |-- is_attributed: integer (nullable = true)



In [7]:
def fix_time_function(RDD):
    
    from pyspark.sql.functions import hour, minute, dayofmonth
    RDD = RDD.withColumn('hour',hour(RDD.click_time)).\
                 withColumn('day',dayofmonth(RDD.click_time))

    return RDD

In [8]:
def grouping_function(RDD):

    gp = RDD.select("ip","day","hour", "channel")\
                   .groupBy("ip","day","hour")\
                   .agg({"channel":"count"})\
                   .withColumnRenamed("count(channel)", "*ip_day_hour_count_channel")\
                   .sort(col("ip"))
    RDD = RDD.join(gp, ["ip","day","hour"])\
             .sort(col("ip"))
    
    
    gp = RDD.select("ip","app", "channel")\
               .groupBy("ip","app")\
               .agg({"channel":"count"})\
               .withColumnRenamed("count(channel)", "*ip_app_count_channel")\
               .sort(col("ip"))
    RDD = RDD.join(gp, ["ip","app"])\
             .sort(col("ip"))
    
    
    gp = RDD.select('ip','app', 'os', 'channel')\
               .groupBy('ip', 'app', 'os')\
               .agg({"channel":"count"})\
               .withColumnRenamed("count(channel)", "*ip_app_os_count_channel")\
               .sort(col("ip"))
    RDD = RDD.join(gp, ['ip','app', 'os'])\
             .sort(col("ip"))
    
    
    gp = RDD.select('ip','app', 'channel','hour')\
               .groupBy('ip', 'app', 'channel')\
               .agg({"hour":"mean"})\
               .withColumnRenamed("avg(hour)", "*ip_app_channel_mean_hour")\
               .sort(col("ip"))
    RDD = RDD.join(gp, ['ip', 'app', 'channel'])\
             .sort(col("ip"))

    return RDD

In [9]:
def oversampling_functions(RDD):
 
    # over sampling
    major_df = RDD.filter(col("is_attributed") == 0)
    minor_df = RDD.filter(col("is_attributed") == 1)
    ratio = int(major_df.count()/minor_df.count())
#     print("ratio: {}".format(ratio))
    a = range(ratio)
    
    # duplicate the minority rows
    oversampled_df = minor_df.withColumn("dummy", explode(array([lit(x) for x in a]))).drop('dummy')
    
    # combine both oversampled minority rows and previous majority rows combined_df = major_df.unionAll(oversampled_df)
    RDD = major_df.unionAll(oversampled_df)

    return RDD

In [10]:
def transfering_functions(RDD):

    def transformer(x):
        x = pow(x, (0.05))
        return x
    
    RDD = RDD.withColumn("app", transformer('app'))
    RDD = RDD.withColumn("device", transformer('device'))
    RDD = RDD.withColumn("os", transformer('os'))
    RDD = RDD.withColumn("day", transformer('day'))

    RDD = RDD.withColumn("*ip_day_hour_count_channel", transformer('*ip_day_hour_count_channel'))

    RDD = RDD.withColumn("*ip_app_count_channel", transformer('*ip_app_count_channel'))

    RDD = RDD.withColumn("*ip_app_os_count_channel", transformer('*ip_app_os_count_channel'))


    return RDD

In [11]:
def prepare_data_train(RDD):
    
    RDD = RDD.drop('click_time','attributed_time')
    # Split the data into training and test sets (30% held out for testing)
    (trainingData, testData) = RDD.randomSplit([0.7, 0.3])

    cols = ['ip',
     'app',
     'channel',
     'os',
     'day',
     'hour',
     'device',
     'is_attributed',
     '*ip_day_hour_count_channel',
     '*ip_app_count_channel',
     '*ip_app_os_count_channel',
     '*ip_app_channel_mean_hour']

    assembler = VectorAssembler(inputCols = cols,outputCol="features")
    trainingData = assembler.transform(trainingData)
    testData = assembler.transform(testData)
    
    return assembler, trainingData, testData

In [12]:
def train_XGB_on_data(assembler, trainingData, testData):
    # Train a GBT model.
    gbt = GBTClassifier(labelCol="is_attributed", featuresCol="features", maxIter=20, maxDepth=4)

    # Train model.  This also runs the indexers.
    model = gbt.fit(trainingData)

    # Make predictions.
    predictions = model.transform(testData)

    # Select example rows to display.
    predictions.select("prediction", "is_attributed", "features")
    
    return predictions

In [13]:
def evaluation(predictions):
    # Select (prediction, true label) and compute test error
    evaluator = MulticlassClassificationEvaluator(labelCol="is_attributed", predictionCol="prediction", metricName="accuracy")
    accuracy = evaluator.evaluate(predictions)
    print("Test Error = %g" % (1.0 - accuracy))
    print("Test accuracy = %g" % (accuracy))
    predictions.groupBy('prediction').count().show()

In [14]:
def run(RDD):
    
    RDD = fix_time_function(RDD); print('fix_time_function Done!')
    RDD = grouping_function(RDD); print('grouping_function Done!')
    RDD = oversampling_functions(RDD); print('oversampling_functions Done!')
    RDD = transfering_functions(RDD); print('transfering_functions Done!')
    assembler, trainingData, testData = prepare_data_train(RDD); print('prepare_data_train Done!')
    predictions = train_XGB_on_data(assembler, trainingData, testData); print('train_XGB_on_data Done!')
    evaluation(predictions); print('evaluation Done!')

In [15]:
run(RDD)

fix_time_function Done!
grouping_function Done!
oversampling_functions Done!
transfering_functions Done!
prepare_data_train Done!
train_XGB_on_data Done!
Test Error = 0
Test accuracy = 1
+----------+-----+
|prediction|count|
+----------+-----+
|       0.0|29802|
|       1.0|29996|
+----------+-----+

evaluation Done!


In [22]:
data_name = 'train.csv'
dataset_address = '../work/data_set/'
path = dataset_address + data_name
RDD = spark.read.csv(path, inferSchema=True, header=True)

In [19]:
run(RDD)

fix_time_function Done!
grouping_function Done!


Py4JJavaError: An error occurred while calling o964.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 1426.0 failed 1 times, most recent failure: Lost task 7.0 in stage 1426.0 (TID 62500, c796cf86906c, executor driver): java.lang.OutOfMemoryError: Java heap space

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2023)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1972)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1971)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1971)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:950)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:950)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:950)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2203)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2152)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2141)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:752)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2093)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2133)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2158)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1004)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1003)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:385)
	at org.apache.spark.sql.Dataset.$anonfun$count$1(Dataset.scala:2979)
	at org.apache.spark.sql.Dataset.$anonfun$count$1$adapted(Dataset.scala:2978)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3616)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3614)
	at org.apache.spark.sql.Dataset.count(Dataset.scala:2978)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.OutOfMemoryError: Java heap space


In [16]:
# pipeline = Pipeline(fix_time_function, 
#                             grouping_function, 
#                             oversampling_functions, 
#                             transfering_functions)

In [17]:
# RDD = pipeline.fit(RDD)