In [None]:
import numpy
import pandas as pd

from itertools import chain
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import OneHotEncoder, \
                               RobustScaler, \
                               StringIndexer, \
                               VectorAssembler
from pyspark.ml.tuning import TrainValidationSplit, \
                              ParamGridBuilder
from pyspark.sql import SparkSession
from pyspark.sql.functions import isnan, \
                                  col, \
                                  count, \
                                  create_map, \
                                  lit, \
                                  when, \
                                  udf  
from pyspark.sql.types import FloatType

# Load Dataset

In [None]:
spark = SparkSession.builder.appName('ads-ml').getOrCreate()

In [None]:
df = spark.read.options(delimiter=';') \
               .options(header=True) \
               .options(inferSchema=True) \
               .csv('../data/training_data.csv')
df.printSchema()
pd.DataFrame(df.take(5), columns=df.columns).transpose()

# Preprocess Dataset

In [None]:
def time_diff_in_minutes(dt_0, dt_1):
    if dt_0 is None:
        return 0.0
    return round((dt_1 - dt_0).total_seconds() / 60.0, 1)    
    
time_diff_in_min_udf = udf(time_diff_in_minutes, FloatType())
preprocessed_df = df.withColumn('timeSinceLastStart', \
                                time_diff_in_min_udf(df.lastStart, df.timestamp)) \
                    .drop("id", "timestamp", "lastStart")

# Count the number of null values per columns
preprocessed_df.select([count(when(isnan(c), c)).alias(c) for c in preprocessed_df.columns]).show()

In [None]:
df = df.withColumnRenamed('install', 'label')

In [None]:
class_count_df = df.groupby('label').agg({'label': 'count'})

n_1 = class_count_df.filter(df.label == '1').select("count(label)").collect()[0][0]
n_0 = class_count_df.filter(df.label == '0').select("count(label)").collect()[0][0]

w_1 = (n_0 + n_1) / (2.0 * n_1)
w_0 = (n_0 + n_1) / (2.0 * n_0)

class_weights = {0: w_0, 1: w_1}

mapping_expr = create_map([lit(x) for x in chain(*class_weights.items())])
df = df.withColumn("weights", mapping_expr.getItem(col("label")))

In [None]:
pd.DataFrame(df.take(5), columns=df.columns).head().transpose()

# Vectorize Features

In [None]:
# It is prudent to store the split data in train/test folders for the sake of reproducibility
train_df, test_df = df.randomSplit([0.8, 0.2])
train_df.count(), test_df.count()

In [None]:
categorical_feats = ['campaignId', 'platform', 'softwareVersion', 'sourceGameId', 'country', 
                   'connectionType', 'deviceType']
numerical_feats = ['startCount', 'viewCount', 'clickCount', 'installCount', 'startCount1d', 
                   'startCount7d', 'timeSinceLastStart']

In [None]:
stages = [] 
for column in categorical_feats:
    str_indexer = StringIndexer(inputCol=column, 
                                outputCol=column + "Index",
                                handleInvalid='keep')
    encoder = OneHotEncoder(inputCols=[str_indexer.getOutputCol()], 
                            outputCols=[column + "Vec"],
                            handleInvalid='keep')
    stages += [str_indexer, encoder]

In [None]:
assembler1 = VectorAssembler(inputCols=numerical_feats, 
                             outputCol="num_features")
scaler = RobustScaler(inputCol='num_features',
                      outputCol='scaled')
stages += [assembler1, scaler]

In [None]:
assembler_inputs = [c + "Vec" for c in categorical_feats] + ["scaled"]
assembler2 = VectorAssembler(inputCols=assembler_inputs,
                             outputCol="features")
stages += [assembler2]

In [None]:
lr = LogisticRegression(weightCol='weights')
stages.append(lr)

In [None]:
pipeline = Pipeline(stages=stages)

In [None]:
param_grid = ParamGridBuilder() \
    .addGrid(lr.regParam, [1.0, 0.1, 0.01, 0.001]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

In [None]:
train_val = TrainValidationSplit(estimator=pipeline,
                                 estimatorParamMaps=param_grid,
                                 evaluator=BinaryClassificationEvaluator(metricName='areaUnderPR'),
                                 trainRatio=0.8)

In [None]:
model = train_val.fit(train_df)

In [None]:
# print(model.bestModel.stages[-1].extractParamMap())
print(model.bestModel.stages[-1].explainParam('regParam'))
print(model.bestModel.stages[-1].explainParam('elasticNetParam'))

In [None]:
predictions = model.transform(test_df)

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

# Let's use the run-of-the-mill evaluator
evaluator = BinaryClassificationEvaluator()

# We have only two choices: area under ROC and PR curves :-(
auroc = evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})
auprc = evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderPR"})
print("Area under ROC Curve: {:.4f}".format(auroc))
print("Area under PR Curve: {:.4f}".format(auprc))

In [None]:
evaluator = MulticlassClassificationEvaluator()
evaluator.evaluate(predictions, {evaluator.metricName: "recallByLabel",
                                 evaluator.metricLabel: 1})
evaluator.evaluate(predictions, {evaluator.metricName: "precisionByLabel",
                                 evaluator.metricLabel: 1})