In [1]:
# Checklist:
# AWS emr-5.29.0
# MASTER r5d.8xlarge 1x, no EBS
# CORE r5d.8xlarge 4x, no EBS
# Custom bootstrap action: s3://ydatazian/bootstrap.sh
# Allow ssh in master node security group

In [2]:
import tqdm.notebook as tqdm
import numpy as np
import scipy
import sklearn
import matplotlib.pyplot as plt

# SparkSession

https://spark.apache.org/docs/2.4.4/api/python/pyspark.html

https://spark.apache.org/docs/2.4.4/api/python/pyspark.sql.html

In [3]:
import findspark
findspark.init()

import spark_utils
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
sc = SparkContext("yarn", "My App", conf=spark_utils.get_spark_conf())
se = SparkSession(sc)
spark_utils.print_ui_links()

NameNode: http://ec2-52-73-202-8.compute-1.amazonaws.com:50070
YARN: http://ec2-52-73-202-8.compute-1.amazonaws.com:8088
Spark UI: http://ec2-52-73-202-8.compute-1.amazonaws.com:20888/proxy/application_1589891955781_0001


# Register all tables for sql queries

In [4]:
from IPython.display import display
tables = ["clicks_test", "clicks_train", 
          "documents_categories", "documents_entities", "documents_meta", "documents_topics", 
          "events", "page_views", "page_views_sample", "promoted_content"]
for name in tqdm.tqdm(tables):
    df = se.read.parquet("s3://ydatazian/{}.parquet".format(name))
    df.registerTempTable(name)
    print(name)
    display(df.limit(3).toPandas())

HBox(children=(FloatProgress(value=0.0, max=10.0), HTML(value='')))

clicks_test


Unnamed: 0,display_id,ad_id
0,16874594,66758
1,16874594,150083
2,16874594,162754


clicks_train


Unnamed: 0,display_id,ad_id,clicked
0,1,42337,0
1,1,139684,0
2,1,144739,1


documents_categories


Unnamed: 0,document_id,category_id,confidence_level
0,1595802,1611,0.92
1,1595802,1610,0.07
2,1524246,1807,0.92


documents_entities


Unnamed: 0,document_id,entity_id,confidence_level
0,1524246,f9eec25663db4cd83183f5c805186f16,0.672865314504701
1,1524246,55ebcfbdaff1d6f60b3907151f38527a,0.399113728441297
2,1524246,839907a972930b17b125eb0247898412,0.392095749652966


documents_meta


Unnamed: 0,document_id,source_id,publisher_id,publish_time
0,1595802,1,603,2016-06-05 00:00:00
1,1524246,1,603,2016-05-26 11:00:00
2,1617787,1,603,2016-05-27 00:00:00


documents_topics


Unnamed: 0,document_id,topic_id,confidence_level
0,1595802,140,0.0731131601068925
1,1595802,16,0.0594164867373976
2,1595802,143,0.0454207537554526


events


Unnamed: 0,display_id,uuid,document_id,timestamp,platform,geo_location
0,1,cb8c55702adb93,379743,61,3,US>SC>519
1,2,79a85fa78311b9,1794259,81,2,US>CA>807
2,3,822932ce3d8757,1179111,182,2,US>MI>505


page_views


Unnamed: 0,uuid,document_id,timestamp,platform,geo_location,traffic_source
0,1fd5f051fba643,120,31905835,1,RS,2
1,8557aa9004be3b,120,32053104,1,VN>44,2
2,c351b277a358f0,120,54013023,1,KR>12,1


page_views_sample


Unnamed: 0,uuid,document_id,timestamp,platform,geo_location,traffic_source
0,1fd5f051fba643,120,31905835,1,RS,2
1,8557aa9004be3b,120,32053104,1,VN>44,2
2,c351b277a358f0,120,54013023,1,KR>12,1


promoted_content


Unnamed: 0,ad_id,document_id,campaign_id,advertiser_id
0,1,6614,1,7
1,2,471467,2,7
2,3,7692,3,7





# Prepare dataset for VW

We will predict a *click* based on:
- ad_id
- document_id
- campaign_id
- advertiser_id

In [21]:
%%time
se.sql("""
select 
    clicks_train.clicked,
    clicks_train.display_id,
    clicks_train.ad_id,
    promoted_content.document_id,
    promoted_content.campaign_id,
    promoted_content.advertiser_id
from clicks_train join promoted_content on clicks_train.ad_id = promoted_content.ad_id
""").write.parquet("/train_features.parquet", mode='overwrite')

CPU times: user 2.97 ms, sys: 1.33 ms, total: 4.29 ms
Wall time: 54.2 s


In [22]:
se.read.parquet("/train_features.parquet").show(5)

+-------+----------+------+-----------+-----------+-------------+
|clicked|display_id| ad_id|document_id|campaign_id|advertiser_id|
+-------+----------+------+-----------+-----------+-------------+
|      0|         1| 42337|     938164|       5969|         1499|
|      0|         1|139684|    1085937|      17527|         2563|
|      1|         1|144739|    1337362|      18488|         2909|
|      0|         1|156824|     992370|       7283|         1919|
|      0|         1|279295|    1670176|      27524|         1820|
+-------+----------+------+-----------+-----------+-------------+
only showing top 5 rows



In [23]:
# Format: [Label] [Importance] [Base] [Tag]|Namespace Features |Namespace Features ... |Namespace Features
# https://github.com/VowpalWabbit/vowpal_wabbit/wiki/Input-format
def vw_row_mapper(row):
    clicked = None
    features = []
    for k, v in row.asDict().items():
        if k == 'clicked':
            clicked = '1' if v == '1' else '-1'
        else:
            features.append(k + "_" + v)
    tag = row.display_id + "_" + row.ad_id
    return "{} {}| {}".format(clicked, tag, " ".join(features))

r = se.read.parquet("/train_features.parquet").take(1)[0]
print(r)
print(vw_row_mapper(r))

Row(clicked='0', display_id='1', ad_id='42337', document_id='938164', campaign_id='5969', advertiser_id='1499')
-1 1_42337| display_id_1 ad_id_42337 document_id_938164 campaign_id_5969 advertiser_id_1499


In [24]:
%%time
! hdfs dfs -rm -r /train_features.txt
(
    se.read.parquet("/train_features.parquet")
    .rdd
    .map(vw_row_mapper)
    .saveAsTextFile("/train_features.txt")
)

Deleted /train_features.txt
CPU times: user 27.4 ms, sys: 19 ms, total: 46.5 ms
Wall time: 2min 43s


In [9]:
# copy file to local master node
! rm /mnt/train.txt
! hdfs dfs -getmerge /train_features.txt /mnt/train.txt
# preview local file
! head -n 5 /mnt/train.txt

rm: cannot remove ‘/mnt/train.txt’: No such file or directory
-1 | ad_id_42337 document_id_938164 campaign_id_5969 advertiser_id_1499
-1 | ad_id_139684 document_id_1085937 campaign_id_17527 advertiser_id_2563
1 | ad_id_144739 document_id_1337362 campaign_id_18488 advertiser_id_2909
-1 | ad_id_156824 document_id_992370 campaign_id_7283 advertiser_id_1919
-1 | ad_id_279295 document_id_1670176 campaign_id_27524 advertiser_id_1820


# Train VW
https://vowpalwabbit.org/tutorials/getting_started.html

https://github.com/JohnLangford/vowpal_wabbit/wiki/Command-line-arguments

In [10]:
! ./vw -d /mnt/train.txt -b 24 -c -k --ftrl --passes 1 -f model --holdout_off --loss_function logistic --random_seed 42 --progress 8000000

final_regressor = model
Enabling FTRL based optimization
Algorithm used: Proximal-FTRL
ftrl_alpha = 0.005
ftrl_beta = 0.1
Num weight bits = 24
learning rate = 0.5
initial_t = 0
power_t = 0.5
creating cache_file = /mnt/train.txt.cache
Reading datafile = /mnt/train.txt
num sources = 1
average  since         example        example  current  current  current
loss     last          counter         weight    label  predict features
0.459683 0.459683      8000000      8000000.0  -1.0000  -1.3025        5
0.455707 0.451731     16000000     16000000.0  -1.0000  -1.4668        5
0.452884 0.447239     24000000     24000000.0  -1.0000  -2.3240        5
0.452193 0.450119     32000000     32000000.0   1.0000  -0.5717        5
0.452457 0.453513     40000000     40000000.0  -1.0000  -1.2560        5
0.451587 0.447235     48000000     48000000.0  -1.0000  -2.6222        5
0.450909 0.446843     56000000     56000000.0  -1.0000  -3.4498        5
0.450235 0.445520     64000000     64000000.0  -1.0000  -1.

In [25]:
# make prediction with VW
! echo "? tag1| ad_id_144739 document_id_1337362 campaign_id_18488 advertiser_id_2909" > /mnt/test.txt
! echo "? tag2| ad_id_156824 document_id_992370 campaign_id_7283 advertiser_id_1919" >> /mnt/test.txt
! ./vw -d /mnt/test.txt -i model -t -k -p /mnt/predictions.txt --progress 1000000 --link=logistic
# predicted probabilities of "1" class
! cat /mnt/predictions.txt

only testing
predictions = /mnt/predictions.txt
Enabling FTRL based optimization
Algorithm used: Proximal-FTRL
ftrl_alpha = 0.005
ftrl_beta = 0.1
Num weight bits = 24
learning rate = 0.5
initial_t = 0
power_t = 0.5
using no cache
Reading datafile = /mnt/test.txt
num sources = 1
average  since         example        example  current  current  current
loss     last          counter         weight    label  predict features

finished run
number of examples = 2
weighted example sum = 2.000000
weighted label sum = 0.000000
average loss = 5.685139
total feature number = 10
0.318580 tag1
0.036084 tag2


# Homework 2: Baseline VW model

Train a baseline model using the following features:
- **clicked**
- geo_location features (country, state, dma)
- day_of_week (from timestamp, use *date.isoweekday()*)
- ad_id
- campaign_id
- advertiser_id
- ad_document_id
- display_document_id
- platform

Make submission to Kaggle to know your leaderboard score

If you want to create a dev set, make a 90%/10% split of training data by display_id

In [12]:
# YOUR CODE HERE

# Submitting to Kaggle

Obtain Kaggle API token: https://github.com/Kaggle/kaggle-api#api-credentials

Making a submission: https://github.com/Kaggle/kaggle-api#submit-to-a-competition

In [None]:
! mkdir ~/.kaggle
! touch ~/.kaggle/kaggle.json
! echo '{"username":"?","key":"?"}' > ~/.kaggle/kaggle.json
! cat ~/.kaggle/kaggle.json
! chmod 600 /home/hadoop/.kaggle/kaggle.json

In [18]:
! aws s3 cp s3://ydatazian/sample_submission.csv .

download: s3://ydatazian/sample_submission.csv to ./sample_submission.csv


In [20]:
# https://www.kaggle.com/c/outbrain-click-prediction/overview/evaluation
# For each display_id in the test set, you must predict a space-delimited list of ad_ids, 
# ordered by decreasing likelihood of being clicked.
! head -n 5 ./sample_submission.csv

display_id,ad_id
16874594,66758 150083 162754 170392 172888 180797
16874595,8846 30609 143982
16874596,11430 57197 132820 153260 173005 288385 289122 289915
16874597,137858 143981 155945 180965 182039 285834 305790 308836


In [26]:
%%time
se.sql("""
select 
    "0" as clicked,
    clicks_test.display_id,
    clicks_test.ad_id,
    promoted_content.document_id,
    promoted_content.campaign_id,
    promoted_content.advertiser_id
from clicks_test join promoted_content on clicks_test.ad_id = promoted_content.ad_id
""").write.parquet("/test_features.parquet", mode='overwrite')

CPU times: user 4.35 ms, sys: 0 ns, total: 4.35 ms
Wall time: 45.8 s


In [27]:
%%time
! hdfs dfs -rm -r /test_features.txt
(
    se.read.parquet("/test_features.parquet")
    .rdd
    .map(vw_row_mapper)
    .saveAsTextFile("/test_features.txt")
)

rm: `/test_features.txt': No such file or directory
CPU times: user 31 ms, sys: 8.89 ms, total: 39.9 ms
Wall time: 2min 21s


In [28]:
# copy file to local master node
! rm /mnt/test.txt
! hdfs dfs -getmerge /test_features.txt /mnt/test.txt
# preview local file
! head -n 5 /mnt/test.txt

-1 16874594_66758| display_id_16874594 ad_id_66758 document_id_1051283 campaign_id_8949 advertiser_id_555
-1 16874594_150083| display_id_16874594 ad_id_150083 document_id_1358132 campaign_id_19045 advertiser_id_1913
-1 16874594_162754| display_id_16874594 ad_id_162754 document_id_1292723 campaign_id_17770 advertiser_id_2391
-1 16874594_170392| display_id_16874594 ad_id_170392 document_id_1083829 campaign_id_20943 advertiser_id_1731
-1 16874594_172888| display_id_16874594 ad_id_172888 document_id_1433954 campaign_id_1384 advertiser_id_16


In [30]:
! ./vw -d /mnt/test.txt -i model -t -k -p /mnt/predictions.txt --progress 1000000 --link=logistic
# predicted probabilities of "1" class
! head -n 5 /mnt/predictions.txt

only testing
predictions = /mnt/predictions.txt
Enabling FTRL based optimization
Algorithm used: Proximal-FTRL
ftrl_alpha = 0.005
ftrl_beta = 0.1
Num weight bits = 24
learning rate = 0.5
initial_t = 0
power_t = 0.5
using no cache
Reading datafile = /mnt/test.txt
num sources = 1
average  since         example        example  current  current  current
loss     last          counter         weight    label  predict features
0.883141 0.883141      1000000      1000000.0  -1.0000   0.0993        6
0.893463 0.903786      2000000      2000000.0  -1.0000   0.2037        6
0.910530 0.944664      3000000      3000000.0  -1.0000   0.1901        6
0.931353 0.993823      4000000      4000000.0  -1.0000   0.1295        6
0.941262 0.980896      5000000      5000000.0  -1.0000   0.2322        6
0.936796 0.914465      6000000      6000000.0  -1.0000   0.2522        6
0.932178 0.904468      7000000      7000000.0  -1.0000   0.1488        6
0.935481 0.958609      8000000      8000000.0  -1.0000   0.1201 

In [34]:
! wc -l /mnt/predictions.txt

32225162 /mnt/predictions.txt


In [40]:
from collections import defaultdict
scores_by_display_id = defaultdict(dict)
for line in tqdm.tqdm(open('/mnt/predictions.txt')):
    score, tag = line.strip().split(" ")
    score = float(score)
    display_id, ad_id = tag.split("_")
    scores_by_display_id[display_id][ad_id] = score

HBox(children=(FloatProgress(value=1.0, bar_style='info', max=1.0), HTML(value='')))




In [46]:
with open("submission.txt", "w") as f:
    f.write("display_id,ad_id\n")
    for k, vs in tqdm.tqdm_notebook(scores_by_display_id.items()):
        f.write("{},{}\n".format(
            k, 
            " ".join([v[0] for v in sorted(vs.items(), key=lambda x: -x[1])])
        ))

HBox(children=(FloatProgress(value=0.0, max=6245533.0), HTML(value='')))




In [57]:
! kaggle competitions submit -f submission.txt outbrain-click-prediction -m "baseline"

100%|████████████████████████████████████████| 260M/260M [00:03<00:00, 81.9MB/s]
Successfully submitted to Outbrain Click Prediction