# Objective

This notebook is used to concatenate all the created features and prepare the modeling and test dataset for Xgboost.

# Libraries

In [2]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
% matplotlib inline

import os

import seaborn as sns
sns.set(rc={'figure.figsize':(12, 5)},
        font_scale=1.5,
        style="whitegrid")


from pyspark.sql.functions import monotonically_increasing_id, col
import pyspark.sql.functions as func

# Path

In [3]:
project_folder = "/user/a008495/ADT/"
datapath = project_folder + "data/"
plotpath = project_folder + "graphs/"
mungepath = project_folder + "munge/"
configpath = project_folder + "config/"
diagnostic = project_folder + "diagnostic/"
modelpath = project_folder + "model/"
output = project_folder + "output/"

# Reading Train Data

In [4]:
train = spark.read.csv(os.path.join(datapath,"train.csv"), header=True)
print('Found %d observations in training set.' %train.count())

Found 184903890 observations in training set.


In [5]:
train.show(3)

+-----+---+------+---+-------+-------------------+---------------+-------------+
|   ip|app|device| os|channel|         click_time|attributed_time|is_attributed|
+-----+---+------+---+-------+-------------------+---------------+-------------+
|83230|  3|     1| 13|    379|2017-11-06 14:32:21|           null|            0|
|17357|  3|     1| 19|    379|2017-11-06 14:33:34|           null|            0|
|35810|  3|     1| 13|    379|2017-11-06 14:34:12|           null|            0|
+-----+---+------+---+-------+-------------------+---------------+-------------+
only showing top 3 rows



In [6]:
train.printSchema()

root
 |-- ip: string (nullable = true)
 |-- app: string (nullable = true)
 |-- device: string (nullable = true)
 |-- os: string (nullable = true)
 |-- channel: string (nullable = true)
 |-- click_time: string (nullable = true)
 |-- attributed_time: string (nullable = true)
 |-- is_attributed: string (nullable = true)



In [7]:
test = spark.read.csv(os.path.join(datapath,"test.csv"), header=True)
print('Found %d observations in test set.' %test.count())

Found 18790469 observations in test set.


# Reading Features

## Datetime features

In [8]:
# day function
def get_day(date):
    """
    Returns the hour based on a string date
        
    Args:
        date (String): A String containing the click datetime

    Returns:
        day: A String containing the day of click ("06", "07", .."10")
    """
    
    return date[8:10]

# hour function
def get_hour(date):
    """
    Returns the hour based on a string date
        
    Args:
        date (String): A String containing the click datetime

    Returns:
        hour: A String containing the hour interval ("01", "02", .."23")
    """
    
    return date[11:13]

# minute function
def get_minute(date):
    """
    Returns the minute based on a string date
        
    Args:
        date (String): A String containing the click datetime

    Returns:
        hour: A String containing the minute interval ("01", "02", .."59")
    """
    
    return date[14:16]

# minute function
def get_second(date):
    """
    Returns the second based on a string date
        
    Args:
        date (String): A String containing the click datetime

    Returns:
        hour: A String containing the minute interval ("01", "02", .."59")
    """
    
    return date[17:19]

In [9]:
# Get datetime features
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

day_udf = udf(get_day, StringType())
hour_udf = udf(get_hour, StringType())
minute_udf = udf(get_minute, StringType())
second_udf = udf(get_second, StringType())

train = train.withColumn('day', day_udf(train.click_time))
train = train.withColumn('hour', hour_udf(train.click_time))
train = train.withColumn('minute', minute_udf(train.click_time))
train = train.withColumn('second', second_udf(train.click_time))

test = test.withColumn('day', day_udf(test.click_time))
test = test.withColumn('hour', hour_udf(test.click_time))
test = test.withColumn('minute', minute_udf(test.click_time))
test = test.withColumn('second', second_udf(test.click_time))

In [10]:
train.show(3)

+-----+---+------+---+-------+-------------------+---------------+-------------+---+----+------+------+
|   ip|app|device| os|channel|         click_time|attributed_time|is_attributed|day|hour|minute|second|
+-----+---+------+---+-------+-------------------+---------------+-------------+---+----+------+------+
|83230|  3|     1| 13|    379|2017-11-06 14:32:21|           null|            0| 06|  14|    32|    21|
|17357|  3|     1| 19|    379|2017-11-06 14:33:34|           null|            0| 06|  14|    33|    34|
|35810|  3|     1| 13|    379|2017-11-06 14:34:12|           null|            0| 06|  14|    34|    12|
+-----+---+------+---+-------+-------------------+---------------+-------------+---+----+------+------+
only showing top 3 rows



## Logistic regression score as a feature

In [11]:
f_lr_hash_modeling2 = spark.read.csv(os.path.join(mungepath, "logisitc_regression/lasso/f_lr_hash_inter2_2p18_noip_modeling2_split"), header=True)
f_lr_hash_test = spark.read.csv(os.path.join(mungepath, "logisitc_regression/lasso/f_lr_hash_inter2_2p18_noip_test"), header=True)
print('Found %d observations in f_lr_hash_modeling2.' %f_lr_hash_modeling2.count())
print('Found %d observations in f_lr_hash_test.' %f_lr_hash_test.count())

Found 92458650 observations in f_lr_hash_modeling2.
Found 18790469 observations in f_lr_hash_test.


In [12]:
# convert id to long instead of string
f_lr_hash_modeling2 = f_lr_hash_modeling2.withColumn('id', col('id').cast('long'))
f_lr_hash_test = f_lr_hash_test.withColumn('id', col('id').cast('long'))

In [13]:
f_lr_hash_modeling2.show(3)

+------------+--------------------------+
|          id|f_lr_hash_inter2_2p18_noip|
+------------+--------------------------+
|403727430145|                    7.0E-5|
|403727812258|                    9.1E-5|
|403727812631|                    9.1E-5|
+------------+--------------------------+
only showing top 3 rows



## Features built for direct concatenation

In [14]:
f_close_clicks_train = spark.read.csv(os.path.join(mungepath, "f_close_clicks_train"), header=True)
f_close_clicks_test = spark.read.csv(os.path.join(mungepath, "f_close_clicks_test"), header=True)
print('Found %d observations in train.' %f_close_clicks_train.count())
print('Found %d observations in test.' %f_close_clicks_test.count())

Found 184903890 observations in train.
Found 18790469 observations in test.


In [15]:
f_close_clicks_train.show(3)

+---+----------------+----------------+
| id|lapse_prev_click|lapse_next_click|
+---+----------------+----------------+
|  0|              -1|            3600|
|  1|              -1|            3600|
|  2|              -1|            3600|
+---+----------------+----------------+
only showing top 3 rows



In [16]:
f_close_clicks_app_train = spark.read.csv(os.path.join(mungepath, "f_close_clicks_app_train"), header=True)
f_close_clicks_app_test = spark.read.csv(os.path.join(mungepath, "f_close_clicks_app_test"), header=True)
print('Found %d observations in train.' %f_close_clicks_app_train.count())
print('Found %d observations in test.' %f_close_clicks_app_test.count())

Found 184903890 observations in train.
Found 18790469 observations in test.


In [17]:
f_close_clicks_app_train.show(3)

+---+--------------------+--------------------+
| id|lapse_prev_click_app|lapse_next_click_app|
+---+--------------------+--------------------+
|  0|                  -1|                3600|
|  1|                  -1|                3600|
|  2|                  -1|                3600|
+---+--------------------+--------------------+
only showing top 3 rows



In [18]:
f_close_clicks_device_train = spark.read.csv(os.path.join(mungepath, "f_close_clicks_device_train"), header=True)
f_close_clicks_device_test = spark.read.csv(os.path.join(mungepath, "f_close_clicks_device_test"), header=True)
print('Found %d observations in train.' %f_close_clicks_device_train.count())
print('Found %d observations in test.' %f_close_clicks_device_test.count())

Found 184903890 observations in train.
Found 18790469 observations in test.


In [19]:
f_close_clicks_device_train.show(3)

+---+-----------------------+-----------------------+
| id|lapse_prev_click_device|lapse_next_click_device|
+---+-----------------------+-----------------------+
|  0|                     -1|                   3600|
|  1|                     -1|                   3600|
|  2|                     -1|                   3600|
+---+-----------------------+-----------------------+
only showing top 3 rows



In [20]:
f_close_clicks_os_train = spark.read.csv(os.path.join(mungepath, "f_close_clicks_os_train"), header=True)
f_close_clicks_os_test = spark.read.csv(os.path.join(mungepath, "f_close_clicks_os_test"), header=True)
print('Found %d observations in train.' %f_close_clicks_os_train.count())
print('Found %d observations in test.' %f_close_clicks_os_test.count())

Found 184903890 observations in train.
Found 18790469 observations in test.


In [21]:
f_close_clicks_os_train.show(3)

+---+-------------------+-------------------+
| id|lapse_prev_click_os|lapse_next_click_os|
+---+-------------------+-------------------+
|  0|                 -1|               3600|
|  1|                 -1|               3600|
|  2|                 -1|               3600|
+---+-------------------+-------------------+
only showing top 3 rows



## Features built for join operation

In [22]:
f_counts_hour_ip = spark.read.csv(os.path.join(mungepath, "f_counts_hour_ip"), header=True)
print('Found %d observations.' %f_counts_hour_ip.count())

Found 4633405 observations.


In [23]:
f_counts_hour_ip.show(1)

+-----+---+----+-----------------+---------------+------------------+-------------+-------------------+------------------+------------------+
|   ip|day|hour|ip_hour_nb_clicks|ip_hour_nb_apps|ip_hour_nb_devices|ip_hour_nb_os|ip_hour_nb_channels|ip_hour_std_minute|ip_hour_avg_minute|
+-----+---+----+-----------------+---------------+------------------+-------------+-------------------+------------------+------------------+
|75595| 06|  22|              330|             22|                 3|           29|                 56|             17.05|              36.4|
+-----+---+----+-----------------+---------------+------------------+-------------+-------------------+------------------+------------------+
only showing top 1 row



In [24]:
f_counts_minute_ip = spark.read.csv(os.path.join(mungepath, "f_counts_minute_ip"), header=True)
print('Found %d observations.' %f_counts_minute_ip.count())

Found 57437935 observations.


In [25]:
f_counts_minute_ip.show(1)

+------+---+----+------+-------------------+-----------------+--------------------+---------------+---------------------+--------------------+--------------------+
|    ip|day|hour|minute|ip_minute_nb_clicks|ip_minute_nb_apps|ip_minute_nb_devices|ip_minute_nb_os|ip_minute_nb_channels|ip_minute_std_second|ip_minute_avg_second|
+------+---+----+------+-------------------+-----------------+--------------------+---------------+---------------------+--------------------+--------------------+
|100009| 07|  03|    31|                  6|                5|                   1|              1|                    6|               11.23|               23.83|
+------+---+----+------+-------------------+-----------------+--------------------+---------------+---------------------+--------------------+--------------------+
only showing top 1 row



In [26]:
f_counts_week_ip = spark.read.csv(os.path.join(mungepath, "f_counts_week_ip"), header=True)
print('Found %d observations.' %f_counts_week_ip.count())

Found 364779 observations.


In [27]:
f_counts_week_ip.show(1)

+------+------------+----------+-------------+--------+--------------+-----------------+-----------+-------------+-----------+-------------+
|    ip|ip_nb_clicks|ip_nb_apps|ip_nb_devices|ip_nb_os|ip_nb_channels|ip_std_click_time|ip_std_hour|ip_std_minute|ip_avg_hour|ip_avg_minute|
+------+------------+----------+-------------+--------+--------------+-----------------+-----------+-------------+-----------+-------------+
|121867|        1915|        41|            4|      38|           103|         118201.0|       5.55|        16.63|       9.22|        29.31|
+------+------------+----------+-------------+--------+--------------+-----------------+-----------+-------------+-----------+-------------+
only showing top 1 row



In [28]:
f_counts_week_app = spark.read.csv(os.path.join(mungepath, "f_counts_week_app"), header=True)
print('Found %d observations.' %f_counts_week_app.count())

Found 769 observations.


In [29]:
f_counts_week_app.show(1)

+---+-------------+----------+--------------+---------+---------------+------------------+------------+--------------+------------+--------------+
|app|app_nb_clicks|app_nb_ips|app_nb_devices|app_nb_os|app_nb_channels|app_std_click_time|app_std_hour|app_std_minute|app_avg_hour|app_avg_minute|
+---+-------------+----------+--------------+---------+---------------+------------------+------------+--------------+------------+--------------+
|296|          497|       392|             5|       10|              1|           71876.0|        4.79|          16.3|        8.64|         30.16|
+---+-------------+----------+--------------+---------+---------------+------------------+------------+--------------+------------+--------------+
only showing top 1 row



In [30]:
f_counts_week_device = spark.read.csv(os.path.join(mungepath, "f_counts_week_device"), header=True)
print('Found %d observations.' %f_counts_week_device.count())

Found 4228 observations.


In [31]:
f_counts_week_device.show(1)

+------+----------------+--------------+-------------+------------+------------------+
|device|device_nb_clicks|device_nb_apps|device_nb_ips|device_nb_os|device_nb_channels|
+------+----------------+--------------+-------------+------------+------------------+
|   296|             143|             2|           91|           1|                 7|
+------+----------------+--------------+-------------+------------+------------------+
only showing top 1 row



In [32]:
f_counts_week_os = spark.read.csv(os.path.join(mungepath, "f_counts_week_os"), header=True)
print('Found %d observations.' %f_counts_week_os.count())

Found 957 observations.


In [33]:
f_counts_week_os.show(1)

+---+------------+----------+-------------+---------+--------------+
| os|os_nb_clicks|os_nb_apps|os_nb_devices|os_nb_ips|os_nb_channels|
+---+------------+----------+-------------+---------+--------------+
|675|           7|         3|            1|        5|             3|
+---+------------+----------+-------------+---------+--------------+
only showing top 1 row



In [34]:
f_counts_week_channel = spark.read.csv(os.path.join(mungepath, "f_counts_week_channel"), header=True)
print('Found %d observations.' %f_counts_week_channel.count())

Found 204 observations.


In [35]:
f_counts_week_channel.show(1)

+-------+-----------------+---------------+------------------+-------------+--------------+
|channel|channel_nb_clicks|channel_nb_apps|channel_nb_devices|channel_nb_os|channel_nb_ips|
+-------+-----------------+---------------+------------------+-------------+--------------+
|    467|           373151|              2|                 6|          114|         54297|
+-------+-----------------+---------------+------------------+-------------+--------------+
only showing top 1 row



In [36]:
f_ip_app_clicks = spark.read.csv(os.path.join(mungepath, "f_ip_app_clicks"), header=True)
print('Found %d observations.' %f_ip_app_clicks.count())

Found 4533337 observations.


In [37]:
f_ip_app_clicks.show(1)

+------+---+-------------+---------------------+---------------+-----------------+---------------+-----------------+
|    ip|app|ip_app_clicks|ip_app_std_click_time|ip_app_std_hour|ip_app_std_minute|ip_app_avg_hour|ip_app_avg_minute|
+------+---+-------------+---------------------+---------------+-----------------+---------------+-----------------+
|105388| 64|           45|              60882.0|           6.34|            17.09|           9.29|            32.44|
+------+---+-------------+---------------------+---------------+-----------------+---------------+-----------------+
only showing top 1 row



In [38]:
f_ip_device_clicks = spark.read.csv(os.path.join(mungepath, "f_ip_device_clicks"), header=True)
print('Found %d observations.' %f_ip_device_clicks.count())

Found 911450 observations.


In [39]:
f_ip_device_clicks.show(1)

+-----+------+----------------+------------------------+------------------+--------------------+------------------+--------------------+
|   ip|device|ip_device_clicks|ip_device_std_click_time|ip_device_std_hour|ip_device_std_minute|ip_device_avg_hour|ip_device_avg_minute|
+-----+------+----------------+------------------------+------------------+--------------------+------------------+--------------------+
|84942|     1|           16261|                103078.0|              6.11|               17.22|             11.06|               28.86|
+-----+------+----------------+------------------------+------------------+--------------------+------------------+--------------------+
only showing top 1 row



In [40]:
f_ip_os_clicks = spark.read.csv(os.path.join(mungepath, "f_ip_os_clicks"), header=True)
print('Found %d observations.' %f_ip_os_clicks.count())

Found 3578758 observations.


In [41]:
f_ip_os_clicks.show(1)

+-----+---+------------+--------------------+--------------+----------------+--------------+----------------+
|   ip| os|ip_os_clicks|ip_os_std_click_time|ip_os_std_hour|ip_os_std_minute|ip_os_avg_hour|ip_os_avg_minute|
+-----+---+------------+--------------------+--------------+----------------+--------------+----------------+
|59395| 19|       23982|             99425.0|          6.12|           17.49|         10.77|           29.35|
+-----+---+------------+--------------------+--------------+----------------+--------------+----------------+
only showing top 1 row



In [42]:
f_ip_app_hour_clicks = spark.read.csv(os.path.join(mungepath, "f_ip_app_hour_clicks"), header=True)
print('Found %d observations.' %f_ip_app_hour_clicks.count())

Found 46317015 observations.


In [43]:
f_ip_app_hour_clicks.show(1)

+------+---+---+----+------------------+----------------------+----------------------+
|    ip|app|day|hour|ip_app_hour_clicks|ip_app_hour_std_minute|ip_app_hour_avg_minute|
+------+---+---+----+------------------+----------------------+----------------------+
|172522|  3| 06|  15|                 1|                   0.0|                  58.0|
+------+---+---+----+------------------+----------------------+----------------------+
only showing top 1 row



In [44]:
f_ip_device_hour_clicks = spark.read.csv(os.path.join(mungepath, "f_ip_device_hour_clicks"), header=True)
print('Found %d observations.' %f_ip_device_hour_clicks.count())

Found 6259633 observations.


In [45]:
f_ip_device_hour_clicks.show(1)

+------+------+---+----+---------------------+-------------------------+-------------------------+
|    ip|device|day|hour|ip_device_hour_clicks|ip_device_hour_std_minute|ip_device_hour_avg_minute|
+------+------+---+----+---------------------+-------------------------+-------------------------+
|111189|     1| 06|  17|                   56|                    13.64|                    28.63|
+------+------+---+----+---------------------+-------------------------+-------------------------+
only showing top 1 row



In [46]:
f_ip_os_hour_clicks = spark.read.csv(os.path.join(mungepath, "f_ip_os_hour_clicks"), header=True)
print('Found %d observations.' %f_ip_os_hour_clicks.count())

Found 26491410 observations.


In [47]:
f_ip_os_clicks.show(1)

+-----+---+------------+--------------------+--------------+----------------+--------------+----------------+
|   ip| os|ip_os_clicks|ip_os_std_click_time|ip_os_std_hour|ip_os_std_minute|ip_os_avg_hour|ip_os_avg_minute|
+-----+---+------------+--------------------+--------------+----------------+--------------+----------------+
|59395| 19|       23982|             99425.0|          6.12|           17.49|         10.77|           29.35|
+-----+---+------------+--------------------+--------------+----------------+--------------+----------------+
only showing top 1 row



In [48]:
f_ip_app_minute_clicks = spark.read.csv(os.path.join(mungepath, "f_ip_app_minute_clicks"), header=True)
print('Found %d observations.' %f_ip_app_minute_clicks.count())

Found 175594102 observations.


In [49]:
f_ip_app_minute_clicks.show(1)

+-----+---+---+----+------+--------------------+------------------------+------------------------+
|   ip|app|day|hour|minute|ip_app_minute_clicks|ip_app_minute_std_second|ip_app_minute_avg_second|
+-----+---+---+----+------+--------------------+------------------------+------------------------+
|29045| 64| 06|  14|    54|                   1|                     0.0|                    59.0|
+-----+---+---+----+------+--------------------+------------------------+------------------------+
only showing top 1 row



In [50]:
f_ip_device_minute_clicks = spark.read.csv(os.path.join(mungepath, "f_ip_device_minute_clicks"), header=True)
print('Found %d observations.' %f_ip_device_minute_clicks.count())

Found 60890812 observations.


In [51]:
f_ip_device_minute_clicks.show(1)

+-----+------+---+----+------+-----------------------+---------------------------+---------------------------+
|   ip|device|day|hour|minute|ip_device_minute_clicks|ip_device_minute_std_second|ip_device_minute_avg_second|
+-----+------+---+----+------+-----------------------+---------------------------+---------------------------+
|80447|     1| 06|  14|    40|                      1|                        0.0|                       51.0|
+-----+------+---+----+------+-----------------------+---------------------------+---------------------------+
only showing top 1 row



In [52]:
f_ip_os_minute_clicks = spark.read.csv(os.path.join(mungepath, "f_ip_os_minute_clicks"), header=True)
print('Found %d observations.' %f_ip_os_minute_clicks.count())

Found 85228220 observations.


In [53]:
f_ip_os_minute_clicks.show(1)

+----+---+---+----+------+-------------------+-----------------------+-----------------------+
|  ip| os|day|hour|minute|ip_os_minute_clicks|ip_os_minute_std_second|ip_os_minute_avg_second|
+----+---+---+----+------+-------------------+-----------------------+-----------------------+
|5348|  9| 06|  22|    46|                 11|                  14.08|                  33.18|
+----+---+---+----+------+-------------------+-----------------------+-----------------------+
only showing top 1 row



In [54]:
f_app_hour_clicks = spark.read.csv(os.path.join(mungepath, "f_app_hour_clicks"), header=True)
print('Found %d observations.' %f_app_hour_clicks.count())

Found 25199 observations.


In [55]:
f_app_hour_clicks.show(1)

+---+---+----+---------------+-------------------+-------------------+
|app|day|hour|app_hour_clicks|app_hour_std_minute|app_hour_avg_minute|
+---+---+----+---------------+-------------------+-------------------+
|183| 07|  03|            894|               17.5|              37.56|
+---+---+----+---------------+-------------------+-------------------+
only showing top 1 row



In [56]:
f_device_hour_clicks = spark.read.csv(os.path.join(mungepath, "f_device_hour_clicks"), header=True)
print('Found %d observations.' %f_device_hour_clicks.count())

Found 53535 observations.


In [57]:
f_device_hour_clicks.show(1)

+------+---+----+------------------+----------------------+----------------------+
|device|day|hour|device_hour_clicks|device_hour_std_minute|device_hour_avg_minute|
+------+---+----+------------------+----------------------+----------------------+
|   128| 07|  00|                 1|                   0.0|                  11.0|
+------+---+----+------------------+----------------------+----------------------+
only showing top 1 row



In [58]:
f_os_hour_clicks = spark.read.csv(os.path.join(mungepath, "f_os_hour_clicks"), header=True)
print('Found %d observations.' %f_os_hour_clicks.count())

Found 19600 observations.


In [59]:
f_os_hour_clicks.show(1)

+---+---+----+--------------+------------------+------------------+
| os|day|hour|os_hour_clicks|os_hour_std_minute|os_hour_avg_minute|
+---+---+----+--------------+------------------+------------------+
|174| 07|  10|            19|             18.98|             22.42|
+---+---+----+--------------+------------------+------------------+
only showing top 1 row



In [60]:
f_channel_hour_clicks = spark.read.csv(os.path.join(mungepath, "f_channel_hour_clicks"), header=True)
print('Found %d observations.' %f_channel_hour_clicks.count())

Found 15659 observations.


In [61]:
f_channel_hour_clicks.show(1)

+-------+---+----+-------------------+-----------------------+-----------------------+
|channel|day|hour|channel_hour_clicks|channel_hour_std_minute|channel_hour_avg_minute|
+-------+---+----+-------------------+-----------------------+-----------------------+
|    278| 06|  23|              10066|                  17.14|                  31.61|
+-------+---+----+-------------------+-----------------------+-----------------------+
only showing top 1 row



# Modeling data

In [62]:
train.show(3)

+-----+---+------+---+-------+-------------------+---------------+-------------+---+----+------+------+
|   ip|app|device| os|channel|         click_time|attributed_time|is_attributed|day|hour|minute|second|
+-----+---+------+---+-------+-------------------+---------------+-------------+---+----+------+------+
|83230|  3|     1| 13|    379|2017-11-06 14:32:21|           null|            0| 06|  14|    32|    21|
|17357|  3|     1| 19|    379|2017-11-06 14:33:34|           null|            0| 06|  14|    33|    34|
|35810|  3|     1| 13|    379|2017-11-06 14:34:12|           null|            0| 06|  14|    34|    12|
+-----+---+------+---+-------+-------------------+---------------+-------------+---+----+------+------+
only showing top 3 rows



## Row number as ID variable

In [63]:
from pyspark.sql.functions import monotonically_increasing_id

modeling1 = (train
             .withColumn('id', monotonically_increasing_id())
             .drop('click_time', 'attributed_time'))
print("modeling1 size: ", modeling1.count())

modeling1 size:  184903890


In [64]:
modeling1.show(5)

+------+---+------+---+-------+-------------+---+----+------+------+---+
|    ip|app|device| os|channel|is_attributed|day|hour|minute|second| id|
+------+---+------+---+-------+-------------+---+----+------+------+---+
| 83230|  3|     1| 13|    379|            0| 06|  14|    32|    21|  0|
| 17357|  3|     1| 19|    379|            0| 06|  14|    33|    34|  1|
| 35810|  3|     1| 13|    379|            0| 06|  14|    34|    12|  2|
| 45745| 14|     1| 13|    478|            0| 06|  14|    34|    52|  3|
|161007|  3|     1| 13|    379|            0| 06|  14|    35|    08|  4|
+------+---+------+---+-------+-------------+---+----+------+------+---+
only showing top 5 rows



In [65]:
from pyspark.sql.functions import monotonically_increasing_id

test = (test
        .withColumn('id', monotonically_increasing_id())
        .drop('click_time'))
print("test size: ", test.count())

test size:  18790469


In [66]:
test.show(5)

+--------+------+---+------+---+-------+---+----+------+------+---+
|click_id|    ip|app|device| os|channel|day|hour|minute|second| id|
+--------+------+---+------+---+-------+---+----+------+------+---+
|       0|  5744|  9|     1|  3|    107| 10|  04|    00|    00|  0|
|       1|119901|  9|     1|  3|    466| 10|  04|    00|    00|  1|
|       2| 72287| 21|     1| 19|    128| 10|  04|    00|    00|  2|
|       3| 78477| 15|     1| 13|    111| 10|  04|    00|    00|  3|
|       4|123080| 12|     1| 13|    328| 10|  04|    00|    00|  4|
+--------+------+---+------+---+-------+---+----+------+------+---+
only showing top 5 rows



## Stage 2 modeling data set
This is the data set that will serve to create the random forest model.  
It contains only observations that we scored out of bag with a previous logisitic regression model.  
This dataset will be split later into train and 

In [67]:
modeling2 = modeling1.join(f_lr_hash_modeling2.select('id'), on='id', how='inner')
print("modeling2 size: ", modeling2.count())

modeling2 size:  92458650


In [68]:
modeling2.show(3)

+----+------+---+------+---+-------+-------------+---+----+------+------+
|  id|    ip|app|device| os|channel|is_attributed|day|hour|minute|second|
+----+------+---+------+---+-------+-------------+---+----+------+------+
|  26| 47902|  3|     1| 17|    379|            0| 06|  14|    48|    07|
|1677|191053|  3|     1| 22|    409|            0| 06|  16|    00|    02|
|1806|  9587|  3|     1| 53|    480|            0| 06|  16|    00|    02|
+----+------+---+------+---+-------+-------------+---+----+------+------+
only showing top 3 rows



## Data Splitting

We are now ready to start working with the actual click data, and our first task involves splitting it into training, validation sets.  Use the [randomSplit method](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.randomSplit) with the specified weights and seed to create DFs storing each of these datasets, and then [cache](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.cache) each of these DFs, as we will be accessing them multiple times in the remainder of this lab. Finally, compute the size of each dataset.

In [69]:
# We'll hold out 10% of our data for validation and leave 90% for training
seed = 2210
(train_sample, validation) = modeling2.randomSplit([0.9,0.1], seed=seed)

In [70]:
# dataframe of positive examples
train_pos = train_sample.filter(col('is_attributed')==1)
n_pos = train_pos.count()
print("number of positive examples:", n_pos)

number of positive examples: 205784


In [71]:
# dataframe of negative examples
n_neg = n_pos * 10
train_neg = train_sample.filter(col('is_attributed')==0).orderBy(func.rand(seed=seed)).limit(n_neg).cache()
print("number of negative examples:", n_neg)

number of negative examples: 2057840


In [72]:
# concat train_pos and train_neg dataframes
train_reduced = train_pos.union(train_neg).orderBy(func.rand(seed=seed))

# Let's cache these datasets for performance
train_reduced.cache()
validation = validation.cache()

print('reduced training set size:', train_reduced.count())
print('validation set size:', validation.count())

reduced training set size: 2263624
validation set size: 9246570


In [73]:
train_reduced.show(3)

+------------+------+---+------+---+-------+-------------+---+----+------+------+
|          id|    ip|app|device| os|channel|is_attributed|day|hour|minute|second|
+------------+------+---+------+---+-------+-------------+---+----+------+------+
| 60132345023|155147|  2|     1| 18|    477|            0| 07|  04|    53|    51|
|360777256570|  4710|  3|     1| 17|    173|            0| 09|  01|    58|    37|
|283470834119| 24506| 18|     1| 17|    134|            0| 08|  13|    01|    32|
+------------+------+---+------+---+-------+-------------+---+----+------+------+
only showing top 3 rows



## Data binding

In [74]:
def bind_features(inputDF, train_test=None):
    
    if train_test == "train":
        
        # add f_lr_hash_modeling
        dataset = inputDF.join(f_lr_hash_modeling2, on='id', how='inner')
        
        # add f_close_clicks_train
        dataset = dataset.join(f_close_clicks_train, on='id', how='inner')
        
        # add f_close_clicks_app_train
        dataset = dataset.join(f_close_clicks_app_train, on='id', how='inner')
        
        # add f_close_clicks_device_train
        dataset = dataset.join(f_close_clicks_device_train, on='id', how='inner')
        
        # add f_close_clicks_os_train
        dataset = dataset.join(f_close_clicks_os_train, on='id', how='inner')
    
    if train_test == "test":
        
        # add f_lr_hash_test
        dataset = inputDF.join(f_lr_hash_test, on='id', how='inner')
        
        # add f_close_clicks_test
        dataset = dataset.join(f_close_clicks_test, on='id', how='inner')
        
        # add f_close_clicks_app_test
        dataset = dataset.join(f_close_clicks_app_test, on='id', how='inner')
        
        # add f_close_clicks_device_test
        dataset = dataset.join(f_close_clicks_device_test, on='id', how='inner')
        
        # add f_close_clicks_os_test
        dataset = dataset.join(f_close_clicks_os_test, on='id', how='inner')
        
    
    # add f_counts_hour_ip
    dataset = dataset.join(f_counts_hour_ip, on=['ip', 'day', 'hour'], how='left_outer')
    
    # add f_counts_minute_ip
    dataset = dataset.join(f_counts_minute_ip, on=['ip', 'day', 'hour', 'minute'], how='left_outer')
    
    # add f_counts_week_ip
    dataset = dataset.join(f_counts_week_ip, on=['ip'], how='left_outer')
    
    # add f_counts_week_app
    dataset = dataset.join(f_counts_week_app, on=['app'], how='left_outer')
    
    # add f_counts_week_device
    dataset = dataset.join(f_counts_week_device, on=['device'], how='left_outer')
    
    # add f_counts_week_os
    dataset = dataset.join(f_counts_week_os, on=['os'], how='left_outer')
    
    # add f_counts_week_channel
    dataset = dataset.join(f_counts_week_channel, on=['channel'], how='left_outer')
    
    # add f_ip_app_clicks
    dataset = dataset.join(f_ip_app_clicks, on=['ip', 'app'], how='left_outer')
    
    # add f_ip_device_clicks
    dataset = dataset.join(f_ip_device_clicks, on=['ip', 'device'], how='left_outer')        

    # add f_ip_os_clicks
    dataset = dataset.join(f_ip_os_clicks, on=['ip', 'os'], how='left_outer')
    
    # add f_ip_app_hour_clicks
    dataset = dataset.join(f_ip_app_hour_clicks, on=['ip', 'app', 'day', 'hour'], how='left_outer')
    
    # add f_ip_device_hour_clicks
    dataset = dataset.join(f_ip_device_hour_clicks, on=['ip', 'device', 'day', 'hour'], how='left_outer')        

    # add f_ip_os_hour_clicks
    dataset = dataset.join(f_ip_os_hour_clicks, on=['ip', 'os', 'day', 'hour'], how='left_outer')
    
    # add f_ip_app_minute_clicks
    dataset = dataset.join(f_ip_app_minute_clicks, on=['ip', 'app', 'day', 'hour', 'minute'], how='left_outer')
    
    # add f_ip_device_minute_clicks
    dataset = dataset.join(f_ip_device_minute_clicks, on=['ip', 'device', 'day', 'hour', 'minute'], how='left_outer')        

    # add f_ip_os_minute_clicks
    dataset = dataset.join(f_ip_os_minute_clicks, on=['ip', 'os', 'day', 'hour', 'minute'], how='left_outer')
      
    # add f_app_hour_clicks
    dataset = dataset.join(f_app_hour_clicks, on=['app', 'day', 'hour'], how='left_outer')
    
    # add f_device_hour_clicks
    dataset = dataset.join(f_device_hour_clicks, on=['device', 'day', 'hour'], how='left_outer')
    
    # add f_os_hour_clicks
    dataset = dataset.join(f_os_hour_clicks, on=['os', 'day', 'hour'], how='left_outer')
    
    # add f_channel_hour_clicks
    dataset = dataset.join(f_channel_hour_clicks, on=['channel', 'day', 'hour'], how='left_outer')
    
    # apply the original chronological order
    dataset = dataset.orderBy(col('id'))
    
    return dataset

In [75]:
# add features to train_reduced
train_features = bind_features(train_reduced, train_test="train").cache()

# check wether the join operations where right
train_features.count()

2263624

In [76]:
# add features to validation
validation_features = bind_features(validation, train_test="train").cache()

# check wether the join operations where right
validation_features.count()

9246570

In [77]:
# add features to test
test_features = bind_features(test, train_test="test").cache()

# check wether the join operations where right
test_features.count()

18790469

## Data Storage

In [78]:
# save training data to disk
(train_features
 .coalesce(1)
 .write
 .format("com.databricks.spark.csv")
 .option("header", "true")
 .mode("overwrite")
 .save(mungepath+"model_data/20180504/rf_lr_lasso_inter2_noip/train_features", compression="None"))

In [79]:
# save validation data to disk
(validation_features
 .coalesce(1)
 .write
 .format("com.databricks.spark.csv")
 .option("header", "true")
 .mode("overwrite")
 .save(mungepath+"model_data/20180504/rf_lr_lasso_inter2_noip/validation_features", compression="None"))

In [80]:
# save test data to disk
(test_features
 .coalesce(1)
 .write
 .format("com.databricks.spark.csv")
 .option("header", "true")
 .mode("overwrite")
 .save(mungepath+"model_data/20180504/rf_lr_lasso_inter2_noip/test_features", compression="None"))

# ML Data Preparation

## Load pre-computed modeling data

In [69]:
train_features = spark.read.csv(os.path.join(mungepath,"model_data/20180504/rf_lr_lasso_inter2_noip/train_features/*"), header=True)
print("Number of observations in train :", train_features.count())

Number of observations in train : 2263624


In [70]:
validation_features = spark.read.csv(os.path.join(mungepath,"model_data/20180504/rf_lr_lasso_inter2_noip/validation_features/*"), header=True)
print("Number of observations in validation :", validation_features.count())

Number of observations in validation : 9246570


In [71]:
test_features = spark.read.csv(os.path.join(mungepath,"model_data/20180504/rf_lr_lasso_inter2_noip/test_features/*"), header=True)
print("Number of observations in test :", test_features.count())

Number of observations in test : 18790469


## Defining features

In [72]:
feature_names = list(train_features.limit(1).toPandas().columns)
feature_names = [f for f in feature_names if f not in ['ip','os','device','app','channel',
                                                       'day',
                                                       'id','is_attributed']]

In [73]:
print("number of features:", len(feature_names))

number of features: 109


In [74]:
feature_names

['hour',
 'minute',
 'second',
 'f_lr_hash_inter2_2p18_noip',
 'lapse_prev_click',
 'lapse_next_click',
 'lapse_prev_click_app',
 'lapse_next_click_app',
 'lapse_prev_click_device',
 'lapse_next_click_device',
 'lapse_prev_click_os',
 'lapse_next_click_os',
 'ip_hour_nb_clicks',
 'ip_hour_nb_apps',
 'ip_hour_nb_devices',
 'ip_hour_nb_os',
 'ip_hour_nb_channels',
 'ip_hour_std_minute',
 'ip_hour_avg_minute',
 'ip_minute_nb_clicks',
 'ip_minute_nb_apps',
 'ip_minute_nb_devices',
 'ip_minute_nb_os',
 'ip_minute_nb_channels',
 'ip_minute_std_second',
 'ip_minute_avg_second',
 'ip_nb_clicks',
 'ip_nb_apps',
 'ip_nb_devices',
 'ip_nb_os',
 'ip_nb_channels',
 'ip_std_click_time',
 'ip_std_hour',
 'ip_std_minute',
 'ip_avg_hour',
 'ip_avg_minute',
 'app_nb_clicks',
 'app_nb_ips',
 'app_nb_devices',
 'app_nb_os',
 'app_nb_channels',
 'app_std_click_time',
 'app_std_hour',
 'app_std_minute',
 'app_avg_hour',
 'app_avg_minute',
 'device_nb_clicks',
 'device_nb_apps',
 'device_nb_ips',
 'device_nb

## Assembling features

In [75]:
# change column types
train_ml = train_features.withColumn('label', col('is_attributed').cast('integer'))
validation_ml = validation_features.withColumn('label', col('is_attributed').cast('integer'))

for f in feature_names:
    train_ml = train_ml.withColumn(f, col(f).cast('float'))
    validation_ml = validation_ml.withColumn(f, col(f).cast('float'))

# keep only target and feature variable
train_ml = train_ml.select(['id', 'label']+feature_names)
validation_ml = validation_ml.select(['id', 'label']+feature_names)

The first step in building our ML pipeline is to convert the predictor features from DataFrame columns to Feature Vectors using the pyspark.ml.feature.VectorAssembler() method.

The VectorAssembler is a transformer that combines a given list of columns into a single vector column. It is useful for combining raw features and features generated by different feature transformers into a single feature vector, in order to train ML models like logistic regression and decision trees. VectorAssembler takes a list of input column names (each is a string) and the name of the output column (as a string).

In [76]:
from pyspark.ml.feature import VectorAssembler

vectorizer = VectorAssembler()
vectorizer.setInputCols(feature_names)
vectorizer.setOutputCol("features")

VectorAssembler_44db84b530b572911319

In [77]:
train_ml = (vectorizer
            .transform(train_ml)
            .select('id', 'label', 'features')
            .cache())
validation_ml = (vectorizer
                 .transform(validation_ml)
                 .select('id', 'label', 'features')
                 .cache())

train_ml.show(5)

+---+-----+--------------------+
| id|label|            features|
+---+-----+--------------------+
|  1|    0|[14.0,33.0,34.0,3...|
| 53|    0|[15.0,5.0,5.0,3.3...|
|153|    0|[15.0,42.0,55.0,3...|
|186|    0|[15.0,45.0,5.0,9....|
|223|    0|[15.0,46.0,45.0,3...|
+---+-----+--------------------+
only showing top 5 rows



# Random Forest

In [78]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# tuning grid
grid_maxBins = [32]
grid_numTrees = [100, 300, 500]
grid_maxDepth = [9, 11, 13, 15]

# Create an AUC ROC evaluator using the label and predicted columns
bcEval = BinaryClassificationEvaluator(metricName='areaUnderROC')

# initialize storage for cross validation results and models
cross_validation = pd.DataFrame(columns=['maxBins', 'numTrees', 'maxDepth', 'auc_train', 'auc_val'])
model_dict = dict()

# loop over tuning parameters
for mb in grid_maxBins:
    for nt in grid_numTrees:
        for md in grid_maxDepth:
            rf = RandomForestClassifier(labelCol="label", featuresCol="features", seed=22,
                                        maxDepth=md, numTrees=nt, maxBins=mb)
            model = rf.fit(train_ml)
            model_dict[str((mb, nt, md))] = model                                     
            auc_train = round(bcEval.evaluate(model.transform(train_ml)),6)
            auc_val = round(bcEval.evaluate(model.transform(validation_ml)),6)
            cross_validation = cross_validation.append({'maxBins':mb,
                                                        'maxDepth':md,
                                                        'numTrees':nt,
                                                        'auc_train':auc_train,
                                                        'auc_val':auc_val},
                                                       ignore_index=True)
            print('maxBins:', mb, '\tnumTrees:', nt, '\tmaxDepth:', md, '\tauc_train:', auc_train, '\tauc_val:', auc_val)

maxBins: 32 	numTrees: 100 	maxDepth: 9 	auc_train: 0.971886 	auc_val: 0.972128
maxBins: 32 	numTrees: 100 	maxDepth: 11 	auc_train: 0.975454 	auc_val: 0.97482
maxBins: 32 	numTrees: 100 	maxDepth: 13 	auc_train: 0.979628 	auc_val: 0.976586
maxBins: 32 	numTrees: 100 	maxDepth: 15 	auc_train: 0.984863 	auc_val: 0.977687
maxBins: 32 	numTrees: 300 	maxDepth: 9 	auc_train: 0.971739 	auc_val: 0.971915
maxBins: 32 	numTrees: 300 	maxDepth: 11 	auc_train: 0.975458 	auc_val: 0.974685
maxBins: 32 	numTrees: 300 	maxDepth: 13 	auc_train: 0.979892 	auc_val: 0.976621
maxBins: 32 	numTrees: 300 	maxDepth: 15 	auc_train: 0.985273 	auc_val: 0.97779
maxBins: 32 	numTrees: 500 	maxDepth: 9 	auc_train: 0.971876 	auc_val: 0.97208
maxBins: 32 	numTrees: 500 	maxDepth: 11 	auc_train: 0.975549 	auc_val: 0.974782
maxBins: 32 	numTrees: 500 	maxDepth: 13 	auc_train: 0.979954 	auc_val: 0.976672
maxBins: 32 	numTrees: 500 	maxDepth: 15 	auc_train: 0.985402 	auc_val: 0.977909
