# Predicting clicks on log streams using Elastic MapReduce (EMR)

This data is a sample of Adform's ad traffic.

Each record corresponds to an ad impression served by Adform, and consists of a single binary label (clicked/not-clicked) and a selected subset of features (c0-c9). The positives and negatives are downsampled at different rates. The data is chronologically ordered.

The file is gzipped and each line corresponds to a single record, serialized as JSON. The JSON has the following fields:

"l": The binary label indicating whether the ad was clicked (1) or not (0).
"c0" - "c9": Categorical features which were hashed into a 32-bit integer.
The semantics of the features are not disclosed. The values are stored in an array, because some of the features have multiple values per record. When a key is missing, the field is empty.

    The dataset is large enough (5GB). We need multiple machines to performing the train. Therefore, will use 
    AWS EMR (Elastic MapReduce) in the train of a classifier capable of predicting whether a user will click on an advertisement given certain conditions. 

#### Starting Spark application and loading the dataset

In [1]:
#run in aws-emr
#s3_train_path = 's3://mastering-ml-aws/chapter4'

#run local
from pyspark.context import SparkContext
from pyspark.sql import SQLContext
sc = SparkContext('local', 'Pred_clicks')
sql = SQLContext(sc)

In [2]:
#run in aws-emr
#ctr_df = spark.read.json('adform.click.2017.01.json')

#run local
ctr_df = sql.read.json('adform.click.2017.01.json')
ctr_df.show(5)

+-------------+------------+-------------+-------------+------------+-------------+--------------------+-------------+------------+--------------------+---+
|           c0|          c1|           c2|           c3|          c4|           c5|                  c6|           c7|          c8|                  c9|  l|
+-------------+------------+-------------+-------------+------------+-------------+--------------------+-------------+------------+--------------------+---+
|[-1664374510]|[1292560685]| [1963151207]| [-113426919]|[1024827180]|         null|[-1841755489, -20...|  [781804810]| [677061876]|[-2054476127, 128...|  0|
| [1566608579]|[-248982458]|  [336746857]|[-1629610286]| [244157766]|         null|[-574085389, 1869...| [1065163157]| [332083152]|        [-614983515]|  0|
| [1935105702]|[1292560685]|[-1389162932]| [-113426919]|  [-8361123]|  [839761088]|                null|[-1708330775]|[1856995055]|[-1954958362, 157...|  0|
| [1718276659]| [630920017]| [1171414431]| [-113426919]| [

our dataset has 10 features and 1 column indicating the label that we're trying to predict if the user: 
           
           clicked (1) or didin't clicked (0) on the advertisement.
           
           


To simplify the code, we will pick the first five features by constructing  a dataset and rename these features as f0, f1,f2 and f3. we will also replace null features with the value 0 and only take the first value in the case of multivalued features.

In [3]:
df = ctr_df.selectExpr("coalesce(c0[0],0) as f0",
                       "coalesce(c1[0],0) as f1",
                       "coalesce(c2[0],0) as f2",
                       "coalesce(c3[0],0) as f3",
                       "coalesce(c4[0],0) as f4",
                       "l as click")

df.show()

+-----------+-----------+-----------+-----------+-----------+-----+
|         f0|         f1|         f2|         f3|         f4|click|
+-----------+-----------+-----------+-----------+-----------+-----+
|-1664374510| 1292560685| 1963151207| -113426919| 1024827180|    0|
| 1566608579| -248982458|  336746857|-1629610286|  244157766|    0|
| 1935105702| 1292560685|-1389162932| -113426919|   -8361123|    0|
| 1718276659|  630920017| 1171414431| -113426919|  640993460|    1|
| 1562430026|  630920017| 1639152385| 1781226914| 1493440023|    0|
|-1322326169| 1338259132|-1389162932|          0|-1658448827|    0|
| 1059882129| 2000378252|-1389162932|          0|-1530435578|    0|
|-1322326169| -642801039|-1389162932|          0| -457281581|    0|
|-1410788805| -409082697|-1389162932|          0| 2002120008|    0|
|  539933552|  630920017|-1389162932| -113426919|  326407952|    0|
|  539933552|  630920017|-1389162932| -113426919|  326407952|    0|
|-1296787258| 2060859103|-2081862133| -113426919

reshuffle the different portions of the csv into different machines and cache them in memory.

In [4]:
#run in aws-arm
#df = df.repartition(100).cache()

In [5]:
df.describe().show()

+-------+--------------------+--------------------+--------------------+-------------------+--------------------+------------------+
|summary|                  f0|                  f1|                  f2|                 f3|                  f4|             click|
+-------+--------------------+--------------------+--------------------+-------------------+--------------------+------------------+
|  count|            12000000|            12000000|            12000000|           12000000|            12000000|          12000000|
|   mean|-6.610412663970825E7|2.5049429668800482E8|-2.915904354482062E8|5.459869260236725E7|-6.716129061083934E7|        0.18310175|
| stddev|1.2294656059145827E9| 1.287445524252861E9|1.2580392622053537E9|8.234483651283174E8| 1.242913446913509E9|0.3867499342101615|
|    min|         -2145952914|         -2125813709|         -2145112401|        -2134594413|         -2147400218|                 0|
|    max|          2146734164|          2136145316|          21455299

       The mean value for the Click column informs us that there is certain degree of label imbalance(as about 18% of the instances are Clicks). Additionally, the count row tell us that there is a total of 12,000,000 rows in our dataset. 

#### Unique values for each features

In [6]:
df.select('f0').distinct().count()

2497

In [7]:
df.select('f1').distinct().count()

178

In [8]:
df.select('f2').distinct().count()

377

In [9]:
df.select('f3').distinct().count()

68

In [10]:
df.select('f4').distinct().count()

17572

#### Categorical encoder

In [11]:
from pyspark.ml.feature import StringIndexer


indexer = StringIndexer(inputCol = 'f0', outputCol = 'f0_index')
ctr_df_indexed = indexer.fit(df)
df_indexed = ctr_df_indexed.transform(df).select('f0','f0_index')
df_indexed.show(4)

+-----------+--------+
|         f0|f0_index|
+-----------+--------+
|-1664374510|   140.0|
| 1566608579|   455.0|
| 1935105702|    44.0|
| 1718276659|  1540.0|
+-----------+--------+
only showing top 4 rows



#### One-hot encoding

In [12]:
from pyspark.ml.feature import OneHotEncoder

encoder = OneHotEncoder(inputCol="f0_index", outputCol="f0_encoded")
encoder.transform(df_indexed).distinct().show(4)

+-----------+--------+------------------+
|         f0|f0_index|        f0_encoded|
+-----------+--------+------------------+
|-1156005499|   337.0|(2496,[337],[1.0])|
|-1713169383|   242.0|(2496,[242],[1.0])|
|-1577432220|   408.0|(2496,[408],[1.0])|
| -293118980|   401.0|(2496,[401],[1.0])|
+-----------+--------+------------------+
only showing top 4 rows



## Processing and Training a Model 

In [13]:
def categorical_one_hot_encoding(columns):
    indexers = [StringIndexer(inputCol=column, outputCol=column + "_index", handleInvalid='keep') for column in columns]
    encoders = [OneHotEncoder(inputCol=column + "_index", outputCol=column + "_encoded") for column in columns]

    return indexers + encoders

In [14]:
categorical_columns = ['f0','f1','f2','f3','f4']
encoded_columns = [column + '_encoded' for column in categorical_columns] 

Given the high number of features due the One-Hot encoder, the feature vector can be huge and make the trainer very slow or require massive amounts of memory. One way to mitigate that is to use a **Chi-squared** feature selector. We will select the best 100 features

[Chi-Squared Test for Feature Selection with implementation in Python](https://towardsdatascience.com/chi-squared-test-for-feature-selection-with-implementation-in-python-65b4ae7696db)


In [22]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import ChiSqSelector



categorical_stages = categorical_one_hot_encoding(categorical_columns)
vector_assembler = VectorAssembler(inputCols=encoded_columns, outputCol="features")

selector = ChiSqSelector(numTopFeatures=100, featuresCol="features",
                         outputCol="selected_features", labelCol="click")

decision_tree = DecisionTreeClassifier(labelCol="click", featuresCol="selected_features")

encoding_pipeline = Pipeline(stages=categorical_stages + [vector_assembler, selector, decision_tree])

#
#encoding_pipeline = Pipeline(stages=categorical_stages)


In [23]:
train_df, test_df = df.randomSplit([0.8, 0.2], seed=17)

In [None]:
pipeline_model = encoding_pipeline.fit(train_df)

In [None]:
pipeline_model.transform(train_df).limit(5).toPandas()

#### AUC - area under the ROC curve


[The 5 Classification Evaluation metrics every Data Scientist must know](https://towardsdatascience.com/the-5-classification-evaluation-metrics-you-must-know-aa97784ff226)



In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="click")
evaluator.evaluate(pipeline_model.transform(test_df), {evaluator.metricName: "areaUnderROC"})

### Training Tree Ensembles on EMR

In [None]:
from pyspark.ml.classification import RandomForestClassifier

random_forest = RandomForestClassifier(labelCol="click", featuresCol="features")
pipeline_rf = Pipeline(stages=categorical_stages + [vector_assembler, random_forest])


rf_pipeline_model = pipeline_rf.fit(train_df)
evaluator.evaluate(rf_pipeline_model.transform(test_df), {evaluator.metricName: "areaUnderROC"})