In [None]:
# Checklist:
# AWS emr-5.29.0
# MASTER r5d.8xlarge 1x, no EBS
# CORE r5d.8xlarge 4x, no EBS
# Custom bootstrap action: s3://ydatazian/bootstrap.sh
# Allow ssh in master node security group

In [1]:
import tqdm.notebook as tqdm
import numpy as np
import scipy
import sklearn
import matplotlib.pyplot as plt

# SparkSession

https://spark.apache.org/docs/2.4.4/api/python/pyspark.html

https://spark.apache.org/docs/2.4.4/api/python/pyspark.sql.html

In [2]:
import findspark
findspark.init()

import spark_utils
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
sc = SparkContext("yarn", "My App", conf=spark_utils.get_spark_conf())
se = SparkSession(sc)
spark_utils.print_ui_links()

NameNode: http://ec2-54-234-223-228.compute-1.amazonaws.com:50070
YARN: http://ec2-54-234-223-228.compute-1.amazonaws.com:8088
Spark UI: http://ec2-54-234-223-228.compute-1.amazonaws.com:20888/proxy/application_1590856126964_0002


# Register all tables for sql queries

# Prepare dataset for VW

We will predict a *click* based on:
- ad_id
- document_id
- campaign_id
- advertiser_id

In [None]:
%%time
se.sql("""
select 
    clicks_train.clicked,
    clicks_train.display_id,
    clicks_train.ad_id,
    promoted_content.document_id,
    promoted_content.campaign_id,
    promoted_content.advertiser_id
from clicks_train join promoted_content on clicks_train.ad_id = promoted_content.ad_id
""").write.parquet("/train_features.parquet", mode='overwrite')

In [None]:
se.read.parquet("/train_features.parquet").show(20)

In [170]:
# Format: [Label] [Importance] [Base] [Tag]|Namespace Features |Namespace Features ... |Namespace Features
# https://github.com/VowpalWabbit/vowpal_wabbit/wiki/Input-format
def vw_row_mapper(row):
    clicked = None
    user_features = []
    user_features.append("|user")
    doc_features = []
    doc_features.append("|doc")
    for k, v in row.asDict().items():          
        if k == 'clicked':
            if v is not None:
                clicked = '1' if v == '1' else '-1'
            else:
                clicked = '0'
        elif "NSUU" in k:
            if v is None:
                v = 0
            if "CAT" in k:
                if (("geo" in k) & (v!=0)):
                    location = v.split(">")
                    for loc in location:
                        user_features.append(k+"_"+str(loc))
                else:
                    user_features.append(k+"_"+str(v))
            else:
                user_features.append(k+":"+str(v))
        else:
            if v is None:
                v = 0
            if "CAT" in k:
                doc_features.append(k+"_"+str(v))
            elif "CON" in k:
                doc_features.append(k+":"+str(v))
            else:
                if v != 0:
                    for val in v:
                        a,b = val
                        doc_features.append(k+"_"+str(a)+":"+str(b))
    tag = row.NSD_CAT_dis_id + "_" + row.NSD_CAT_ad_id
    return "{} {}{} {}".format(clicked, tag, " ".join(user_features), " ".join(doc_features))

r = se.read.parquet("/vw_feat_train.parquet").take(1)[0]
#print(r)
vw_row_mapper(r);

In [171]:
%%time
! hdfs dfs -rm -r /vw_feat_train.txt
(
    se.read.parquet("/vw_feat_train.parquet")
    .rdd
    .map(vw_row_mapper)
    .saveAsTextFile("/vw_feat_train.txt")
)

Deleted /vw_feat_train.txt
CPU times: user 49.8 ms, sys: 18.8 ms, total: 68.6 ms
Wall time: 3min 43s


In [172]:
# copy file to local master node
! rm /mnt/train.txt
! hdfs dfs -getmerge /vw_feat_train.txt /mnt/train.txt
# preview local file
! head -n 5 /mnt/train.txt

-1 12873428_466073|user NSUU_CAT_uuid_53b567c20a8eb2 NSUU_CAT_flag_ad_clicked_1 NSUU_CAT_flag_ad_seen_1 NSUU_CON_freq:0 NSUU_CAT_geo_GB NSUU_CAT_platform_3 |doc NDS_CAT_doc_id_1000240 NSD_CAT_dis_id_12873428 NSD_CAT_ad_id_466073 NSD_CAT_ad_doc_id_2288665 NSD_CAT_pub_id_0 NSD_CON_jac_score:0.2 NSD_CAT_campaign_id_31899 NSD_CAT_advertiser_id_1108 NSD_LIST_topics_138:0.0894683491081469 NSD_LIST_topics_143:0.0881824742656303 NSD_LIST_topics_89:0.0675975072009047 NSD_LIST_topics_20:0.060568950330729 NSD_LIST_topics_194:0.045461627205345 NSD_LIST_topics_285:0.0356309213536497 NSD_LIST_topics_238:0.024729739952217 NSD_LIST_categories_1505:0.92 NSD_LIST_categories_1503:0.07
1 12873428_134263|user NSUU_CAT_uuid_53b567c20a8eb2 NSUU_CAT_flag_ad_clicked_1 NSUU_CAT_flag_ad_seen_1 NSUU_CON_freq:0 NSUU_CAT_geo_GB NSUU_CAT_platform_3 |doc NDS_CAT_doc_id_1000240 NSD_CAT_dis_id_12873428 NSD_CAT_ad_id_134263 NSD_CAT_ad_doc_id_1300757 NSD_CAT_pub_id_145 NSD_CON_jac_score:0.0 NSD_CAT_campaign_id_16162 NSD

In [173]:
! ./vw -d /mnt/train.txt -b 24 -c -k --ftrl --passes 1 -f model --holdout_off --loss_function logistic --random_seed 42 --progress 8000000

final_regressor = model
Enabling FTRL based optimization
Algorithm used: Proximal-FTRL
ftrl_alpha = 0.005
ftrl_beta = 0.1
Num weight bits = 24
learning rate = 0.5
initial_t = 0
power_t = 0.5
creating cache_file = /mnt/train.txt.cache
Reading datafile = /mnt/train.txt
num sources = 1
average  since         example        example  current  current  current
loss     last          counter         weight    label  predict features
0.454929 0.454929      8000000      8000000.0  -1.0000  -1.8055       18
0.449997 0.445065     16000000     16000000.0  -1.0000  -1.1351       19
0.449668 0.449010     24000000     24000000.0  -1.0000  -2.6820       19
0.446191 0.435759     32000000     32000000.0  -1.0000  -0.9945       20
0.446549 0.447979     40000000     40000000.0  -1.0000  -2.8013       21
0.446154 0.444183     48000000     48000000.0  -1.0000  -2.6226       23
0.445516 0.441687     56000000     56000000.0   1.0000  -0.5397       29
0.444271 0.435552     64000000     64000000.0  -1.0000  -1.

In [85]:
# make prediction with VW
! echo "? tag1| ad_id_144739 document_id_1337362 campaign_id_18488 advertiser_id_2909" > /mnt/test.txt
! echo "? tag2| ad_id_156824 document_id_992370 campaign_id_7283 advertiser_id_1919" >> /mnt/test.txt
! ./vw -d /mnt/test.txt -i model -t -k -p /mnt/predictions.txt --progress 1000000 --link=logistic
# predicted probabilities of "1" class
! cat /mnt/predictions.txt

only testing
predictions = /mnt/predictions.txt
Enabling FTRL based optimization
Algorithm used: Proximal-FTRL
ftrl_alpha = 0.005
ftrl_beta = 0.1
Num weight bits = 24
learning rate = 0.5
initial_t = 0
power_t = 0.5
using no cache
Reading datafile = /mnt/test.txt
num sources = 1
average  since         example        example  current  current  current
loss     last          counter         weight    label  predict features

finished run
number of examples = 2
weighted example sum = 2.000000
weighted label sum = 0.000000
average loss = 0.007602
total feature number = 10
0.521783 tag1
0.521783 tag2


# Homework 2: Baseline VW model

Train a baseline model using the following features:
- **clicked**
- geo_location features (country, state, dma)
- day_of_week (from timestamp, use *date.isoweekday()*)
- ad_id
- campaign_id
- advertiser_id
- ad_document_id
- display_document_id
- platform

Make submission to Kaggle to know your leaderboard score

If you want to create a dev set, make a 90%/10% split of training data by display_id

In [None]:
# YOUR CODE HERE

# Submitting to Kaggle

Obtain Kaggle API token: https://github.com/Kaggle/kaggle-api#api-credentials

Making a submission: https://github.com/Kaggle/kaggle-api#submit-to-a-competition

In [62]:
! mkdir ~/.kaggle
! touch ~/.kaggle/kaggle.json
! echo '{"username":"shaulsolomon","key":"4910adeb28b48a95f93dcfd9abd4a69f"}' > ~/.kaggle/kaggle.json
! cat ~/.kaggle/kaggle.json
! chmod 600 /home/hadoop/.kaggle/kaggle.json

mkdir: cannot create directory '/home/hadoop/.kaggle': File exists
{"username":"shaulsolomon","key":"4910adeb28b48a95f93dcfd9abd4a69f"}


In [174]:
! aws s3 cp s3://ydatazian/sample_submission.csv .

download: s3://ydatazian/sample_submission.csv to ./sample_submission.csv


In [175]:
# https://www.kaggle.com/c/outbrain-click-prediction/overview/evaluation
# For each display_id in the test set, you must predict a space-delimited list of ad_ids, 
# ordered by decreasing likelihood of being clicked.
! head -n 5 ./sample_submission.csv

display_id,ad_id
16874594,66758 150083 162754 170392 172888 180797
16874595,8846 30609 143982
16874596,11430 57197 132820 153260 173005 288385 289122 289915
16874597,137858 143981 155945 180965 182039 285834 305790 308836


In [176]:
%%time
! hdfs dfs -rm -r /test_features.txt
(
    se.read.parquet("/vw_feat_test.parquet")
    .rdd
    .map(vw_row_mapper)
    .saveAsTextFile("/test_features.txt")
)

Deleted /test_features.txt
CPU times: user 34.5 ms, sys: 21 ms, total: 55.5 ms
Wall time: 1min 34s


In [177]:
# copy file to local master node
! rm /mnt/test.txt
! hdfs dfs -getmerge /test_features.txt /mnt/test.txt
# preview local file
! head -n 5 /mnt/test.txt

0 20285030_68743|user NSUU_CAT_uuid_f3e881f6f85843 NSUU_CAT_flag_ad_clicked_0 NSUU_CAT_flag_ad_seen_0 NSUU_CON_freq:0 NSUU_CAT_geo_NL NSUU_CAT_geo_11 NSUU_CAT_platform_1 |doc NDS_CAT_doc_id_1000495 NSD_CAT_dis_id_20285030 NSD_CAT_ad_id_68743 NSD_CAT_ad_doc_id_874654 NSD_CAT_pub_id_0 NSD_CON_jac_score:0 NSD_CAT_campaign_id_5043 NSD_CAT_advertiser_id_1726 NSD_LIST_entities_cf466fba71c45fffb360f9654866b8e2:0.389884279844755 NSD_LIST_entities_69a142b83f000df5a6521c509ebef37a:0.305385452499218 NSD_LIST_entities_d802e8cbdc8c491484f544a72250ddcc:0.257393343586711 NSD_LIST_entities_cd128dbe890472bf279f077bae47fe25:0.249998898433788 NSD_LIST_entities_265a8f9448cde40296e5612761423a5e:0.247187787620092 NSD_LIST_entities_d68bd9ef6753326a2a4dfbe54cca7693:0.246142299113605 NSD_LIST_topics_174:0.068081067406863 NSD_LIST_topics_26:0.0661115173841483 NSD_LIST_topics_250:0.0573127014565049 NSD_LIST_topics_196:0.0504604325430469 NSD_LIST_topics_129:0.0422208359689696 NSD_LIST_topics_168:0.040747922702203

In [178]:
! ./vw -d /mnt/test.txt -i model -t -k -p /mnt/predictions.txt --progress 1000000 --link=logistic
# predicted probabilities of "1" class
! head -n 5 /mnt/predictions.txt

only testing
predictions = /mnt/predictions.txt
Enabling FTRL based optimization
Algorithm used: Proximal-FTRL
ftrl_alpha = 0.005
ftrl_beta = 0.1
Num weight bits = 24
learning rate = 0.5
initial_t = 0
power_t = 0.5
using no cache
Reading datafile = /mnt/test.txt
num sources = 1
average  since         example        example  current  current  current
loss     last          counter         weight    label  predict features
1.435940 1.435940      1000000      1000000.0   0.0000   0.0712       15
1.501673 1.567405      2000000      2000000.0   0.0000   0.2344       23
1.551366 1.650753      3000000      3000000.0   0.0000   0.1570       25
1.522551 1.436103      4000000      4000000.0   0.0000   0.6691       22
1.545302 1.636306      5000000      5000000.0   0.0000   0.2747       20
1.706138 2.510317      6000000      6000000.0   0.0000   0.1565       18
1.714504 1.764702      7000000      7000000.0   0.0000   0.4935       25
1.689249 1.512463      8000000      8000000.0   0.0000   0.4682 

In [179]:
! wc -l /mnt/predictions.txt

33965229 /mnt/predictions.txt


In [180]:
from collections import defaultdict
scores_by_display_id = defaultdict(dict)
for line in tqdm.tqdm(open('/mnt/predictions.txt')):
    score, tag = line.strip().split(" ")
    score = float(score)
    display_id, ad_id = tag.split("_")
    scores_by_display_id[display_id][ad_id] = score

HBox(children=(FloatProgress(value=1.0, bar_style='info', max=1.0), HTML(value='')))




In [181]:
with open("submission.txt", "w") as f:
    f.write("display_id,ad_id\n")
    for k, vs in tqdm.tqdm_notebook(scores_by_display_id.items()):
        f.write("{},{}\n".format(
            k, 
            " ".join([v[0] for v in sorted(vs.items(), key=lambda x: -x[1])])
        ))

HBox(children=(FloatProgress(value=0.0, max=6245533.0), HTML(value='')))




In [182]:
! kaggle competitions submit -f submission.txt outbrain-click-prediction -m "baseline"

100%|########################################| 260M/260M [00:03<00:00, 80.9MB/s]
Successfully submitted to Outbrain Click Prediction

# Feature engeneering

In [133]:
from pyspark.sql import functions as f
from pyspark.sql import types as t

In [4]:
from IPython.display import display
tables = ["clicks_test", "clicks_train", 
          "documents_categories", "documents_entities", "documents_meta", "documents_topics", 
          "events", "page_views", "page_views_sample", "promoted_content"]
for name in tqdm.tqdm(tables):
    df = se.read.parquet("s3://ydatazian/{}.parquet".format(name))
    df.registerTempTable(name)
    print(name)
    display(df.limit(3).toPandas())

HBox(children=(FloatProgress(value=0.0, max=10.0), HTML(value='')))

clicks_test


Unnamed: 0,display_id,ad_id
0,16874594,66758
1,16874594,150083
2,16874594,162754


clicks_train


Unnamed: 0,display_id,ad_id,clicked
0,1,42337,0
1,1,139684,0
2,1,144739,1


documents_categories


Unnamed: 0,document_id,category_id,confidence_level
0,1595802,1611,0.92
1,1595802,1610,0.07
2,1524246,1807,0.92


documents_entities


Unnamed: 0,document_id,entity_id,confidence_level
0,1524246,f9eec25663db4cd83183f5c805186f16,0.672865314504701
1,1524246,55ebcfbdaff1d6f60b3907151f38527a,0.399113728441297
2,1524246,839907a972930b17b125eb0247898412,0.392095749652966


documents_meta


Unnamed: 0,document_id,source_id,publisher_id,publish_time
0,1595802,1,603,2016-06-05 00:00:00
1,1524246,1,603,2016-05-26 11:00:00
2,1617787,1,603,2016-05-27 00:00:00


documents_topics


Unnamed: 0,document_id,topic_id,confidence_level
0,1595802,140,0.0731131601068925
1,1595802,16,0.0594164867373976
2,1595802,143,0.0454207537554526


events


Unnamed: 0,display_id,uuid,document_id,timestamp,platform,geo_location
0,1,cb8c55702adb93,379743,61,3,US>SC>519
1,2,79a85fa78311b9,1794259,81,2,US>CA>807
2,3,822932ce3d8757,1179111,182,2,US>MI>505


page_views


Unnamed: 0,uuid,document_id,timestamp,platform,geo_location,traffic_source
0,1fd5f051fba643,120,31905835,1,RS,2
1,8557aa9004be3b,120,32053104,1,VN>44,2
2,c351b277a358f0,120,54013023,1,KR>12,1


page_views_sample


Unnamed: 0,uuid,document_id,timestamp,platform,geo_location,traffic_source
0,1fd5f051fba643,120,31905835,1,RS,2
1,8557aa9004be3b,120,32053104,1,VN>44,2
2,c351b277a358f0,120,54013023,1,KR>12,1


promoted_content


Unnamed: 0,ad_id,document_id,campaign_id,advertiser_id
0,1,6614,1,7
1,2,471467,2,7
2,3,7692,3,7





## ESTABLISH BASE TABLES

In [104]:
# create a table with uuid, doc_id, display_id, ad_id (from EVENTS and CLICKS_TEST)

se.sql(
"""
SELECT
    BASE.uuid,
    BASE.document_id,
    BASE.display_id,
    BASE.ad_id,
    BASE.timestamp,
    PC.document_id as ad_doc_id
    
    
FROM
(SELECT
    events.uuid,
    events.document_id,
    events.display_id,
    events.timestamp,
    CT.ad_id

FROM clicks_test CT
        JOIN events
        ON CT.display_id = events.display_id) BASE
        JOIN promoted_content PC
        ON BASE.ad_id = PC.ad_id

""").write.parquet("/baseline_test.parquet",mode="overwrite")


se.sql(
"""
SELECT
    BASE.uuid,
    BASE.document_id,
    BASE.display_id,
    BASE.ad_id,
    BASE.timestamp,
    PC.document_id as ad_doc_id,
    BASE.clicked
    
    
FROM
(SELECT
    events.uuid,
    events.document_id,
    events.display_id,
    events.timestamp,
    CT.ad_id,
    CT.clicked

FROM clicks_train CT
        JOIN events
        ON CT.display_id = events.display_id) BASE
        JOIN promoted_content PC
        ON BASE.ad_id = PC.ad_id
        
""").write.parquet("/baseline_train.parquet",mode="overwrite")

In [105]:
se.read.parquet("/baseline_train.parquet").show(5)

+--------------+-----------+----------+------+---------+---------+-------+
|          uuid|document_id|display_id| ad_id|timestamp|ad_doc_id|clicked|
+--------------+-----------+----------+------+---------+---------+-------+
|66d17a9ccd0e15|    1126487|  10000108|132815|668707895|  1227645|      1|
|66d17a9ccd0e15|    1126487|  10000108|133677|668707895|  1297868|      0|
|66d17a9ccd0e15|    1126487|  10000108|406686|668707895|  1671134|      0|
|66d17a9ccd0e15|    1126487|  10000108|406704|668707895|  1678714|      0|
|66d17a9ccd0e15|    1126487|  10000108|449087|668707895|  2135836|      0|
+--------------+-----------+----------+------+---------+---------+-------+
only showing top 5 rows



In [106]:
df = se.read.parquet("/baseline_train.parquet")
df.registerTempTable("baseline_train")
df = se.read.parquet("/baseline_test.parquet")
df.registerTempTable("baseline_test")

In [107]:
df = se.read.parquet("/baseline_test.parquet")
df.count()

32225162

#### FEAT 1 - FLAG IF AD WAS CLICKED ON BEFORE BY USER

In [108]:
# create a flag feature to determine if a user has seen this ad in the past

se.sql(
"""
SELECT
T.uuid,
T.ad_id,
IF (ad_clicked > 0, 1, 0) as flag_ad_clicked
FROM  (SELECT
        events.uuid,
        CT.ad_id,
        COUNT(CT.clicked) as ad_clicked
    FROM clicks_train CT JOIN events
        ON CT.display_id =  events.display_id
    GROUP BY events.uuid, CT.ad_id) T
""").write.parquet("/feat_ad_clicked.parquet",mode="overwrite")

In [109]:
se.read.parquet("/feat_ad_clicked.parquet").show(5)
df = se.read.parquet("/feat_ad_clicked.parquet")
df.registerTempTable("feat_ad_clicked")

+--------------+------+---------------+
|          uuid| ad_id|flag_ad_clicked|
+--------------+------+---------------+
|95974bc792347d| 67281|              1|
|18270f4a2b53ce|311759|              1|
|2e29c469dba6ed| 26723|              1|
|9f9405eaa19d5f|141268|              1|
|20e401e14e9d92|365175|              1|
+--------------+------+---------------+
only showing top 5 rows



In [110]:
se.sql("""
SELECT
    btr.uuid,
    btr.document_id,
    btr.display_id,
    btr.ad_id,
    btr.timestamp,
    btr.ad_doc_id,
    btr.clicked,
    fac.flag_ad_clicked
FROM baseline_train btr LEFT JOIN feat_ad_clicked fac
ON (btr.uuid = fac.uuid) AND (btr.ad_id = fac.ad_id) 
""").write.parquet("/baseline_train_2.parquet",mode="overwrite");

se.sql("""
SELECT
    bte.uuid,
    bte.document_id,
    bte.display_id,
    bte.ad_id,
    bte.timestamp,
    bte.ad_doc_id,
    fac.flag_ad_clicked
FROM baseline_test bte LEFT JOIN feat_ad_clicked fac
ON (bte.uuid = fac.uuid) AND (bte.ad_id = fac.ad_id) 
""").write.parquet("/baseline_test_2.parquet",mode="overwrite")

In [111]:
df = se.read.parquet("/baseline_train_2.parquet")
df.registerTempTable("baseline_train_2")   
df = se.read.parquet("/baseline_test_2.parquet")
df.registerTempTable("baseline_test_2")    

In [112]:
#32225162
df = se.read.parquet("/baseline_test.parquet")
df.count()

32225162

#### FEAT 2 - FLAG IF AD WAS SEEN BEFORE BY USER

In [113]:
se.sql(
"""
SELECT 
uuid, 
ad_id,
1 as ad_seen
FROM (SELECT
    events.uuid,
    events.display_id,
    CT.ad_id
    FROM clicks_train CT
        JOIN events
            ON CT.display_id =  events.display_id)
GROUP BY uuid, ad_id

""").write.parquet("/feat_flag_ad_seen.parquet",mode="overwrite")

In [114]:
df = se.read.parquet("/feat_flag_ad_seen.parquet")
df.registerTempTable("feat_ad_seen")

In [115]:
se.sql("""
SELECT
    btr.uuid,
    btr.document_id,
    btr.display_id,
    btr.ad_id,
    btr.timestamp,
    btr.ad_doc_id,
    btr.clicked,
    btr.flag_ad_clicked,
    fas.ad_seen as flag_ad_seen
FROM baseline_train_2 btr LEFT JOIN  feat_ad_seen fas
ON (btr.uuid = fas.uuid) AND (btr.ad_id = fas.ad_id) 
""").write.parquet("/baseline_train.parquet",mode="overwrite")

se.sql("""
SELECT
    bte.uuid,
    bte.document_id,
    bte.display_id,
    bte.ad_id,
    bte.timestamp,
    bte.ad_doc_id,
    bte.flag_ad_clicked,
    fas.ad_seen as flag_ad_seen
FROM baseline_test_2 bte LEFT JOIN feat_ad_seen fas
ON (bte.uuid = fas.uuid) AND (bte.ad_id = fas.ad_id) 
""").write.parquet("/baseline_test.parquet",mode="overwrite")

In [116]:
se.read.parquet("/baseline_test.parquet").show(3)

+--------------+-----------+----------+------+----------+---------+---------------+------------+
|          uuid|document_id|display_id| ad_id| timestamp|ad_doc_id|flag_ad_clicked|flag_ad_seen|
+--------------+-----------+----------+------+----------+---------+---------------+------------+
|100073c2e19605|    2880276|  21908895|468817|1239445590|  2310110|           null|        null|
|10013d303c2ba1|    2321683|  18439464| 74845| 596040874|   331537|           null|        null|
|10020e2d1802e9|    1788470|  16933750|180923|  30463831|  1151028|           null|        null|
+--------------+-----------+----------+------+----------+---------+---------------+------------+
only showing top 3 rows



In [117]:
df = se.read.parquet("/baseline_train.parquet")
df.registerTempTable("baseline_train")
df = se.read.parquet("/baseline_test.parquet")
df.registerTempTable("baseline_test")

In [118]:
#32225162
df = se.read.parquet("/baseline_test.parquet")
df.count()

32225162

#### FEAT 3 - FREQ OF PAPERS USER READS FROM PUBLISHER

In [119]:
se.sql(
"""
SELECT TC.uuid,
        PC.publisher_id,
        PC.pub_counts / TC.total_counts as freq
       
FROM
(SELECT COUNT (*) as total_counts,
        uuid
FROM page_views
GROUP BY uuid) TC

JOIN  (SELECT   
        PV.uuid,
        DM.publisher_id,
        COUNT (*) as pub_counts

    FROM
        page_views PV
            JOIN documents_meta DM
            ON PV.document_id = DM.document_id
    GROUP BY PV.uuid, DM.publisher_id) PC
ON TC.uuid = PC.uuid

""").write.parquet("/feat_publisher_freq.parquet",mode="overwrite") 

In [120]:
df = se.read.parquet("/feat_publisher_freq.parquet")
df.registerTempTable("pub_freq")

In [121]:
se.read.parquet("/feat_publisher_freq.parquet").show(5)

+--------------+------------+-------------------+
|          uuid|publisher_id|               freq|
+--------------+------------+-------------------+
|1000022084d5a9|          89|                1.0|
|100008e35c47a0|         867|0.09090909090909091|
|100008e35c47a0|         255| 0.6363636363636364|
|100008e35c47a0|         636| 0.2727272727272727|
|1000105ca6b2b1|         435|                1.0|
+--------------+------------+-------------------+
only showing top 5 rows



In [122]:
se.sql("""
SELECT
T.*,
pf.freq
FROM   (SELECT
        bt.*,
        dm.publisher_id
        FROM baseline_train bt LEFT JOIN documents_meta dm
        ON bt.ad_doc_id = dm.document_id) T LEFT JOIN pub_freq pf
            ON (T.publisher_id = pf.publisher_id) AND (T.uuid = pf.uuid)
""").write.parquet("/baseline_train_2.parquet",mode="overwrite")
                   
se.sql("""
SELECT
T.*,
pf.freq
FROM   (SELECT
        bt.*,
        dm.publisher_id
        FROM baseline_test bt LEFT JOIN documents_meta dm
        ON bt.ad_doc_id = dm.document_id) T LEFT JOIN pub_freq pf
            ON (T.publisher_id = pf.publisher_id) AND (T.uuid = pf.uuid)
""").write.parquet("/baseline_test_2.parquet",mode="overwrite")                   

In [123]:
df = se.read.parquet("/baseline_train_2.parquet")
df.registerTempTable("baseline_train")
df = se.read.parquet("/baseline_test_2.parquet")
df.registerTempTable("baseline_test")

In [124]:
df.show()

+--------------+-----------+----------+------+----------+---------+---------------+------------+------------+----+
|          uuid|document_id|display_id| ad_id| timestamp|ad_doc_id|flag_ad_clicked|flag_ad_seen|publisher_id|freq|
+--------------+-----------+----------+------+----------+---------+---------------+------------+------------+----+
|1000615e760786|    2959725|  22991705|153060|1288970662|  1376367|           null|        null|        null|null|
|1000615e760786|    2959725|  22991705|118016|1288970662|  1193086|           null|        null|        null|null|
|1000615e760786|    2959725|  22991705|287742|1288970662|  1429646|           null|        null|        null|null|
|1000615e760786|    2959725|  22991705| 85274|1288970662|   961909|           null|        null|        null|null|
|1000615e760786|    2959725|  22991705|124564|1288970662|  1201265|           null|        null|        null|null|
|10042103b7ff2b|    1314190|  20171775|277690|1149159940|  1666912|           nu

In [125]:
#32225162
df = se.read.parquet("/baseline_test_2.parquet")
df.count()

32225162

#### feat 4 - jaccard score of doc and ad_doc topics

In [134]:
from pyspark.sql import Row

In [135]:
doc_topic = se.read.parquet("s3://ydatazian/documents_topics.parquet")
page_view = se.read.parquet("s3://ydatazian/page_views_sample.parquet")

In [136]:
join_df = doc_topic.filter(doc_topic.confidence_level > 0.048) \
                   .select(doc_topic.document_id,doc_topic.topic_id)

In [137]:
# set_df = join_df.groupBy(join_df.document_id).agg(f.concat_ws(",", f.collect_set(join_df.topic_id)).alias("set_list"))

In [138]:
set_df = join_df.groupBy(join_df.document_id).agg(f.collect_set(join_df.topic_id).alias("set_list"))
set_df.show(3)

+-----------+------------------+
|document_id|          set_list|
+-----------+------------------+
|     100010|              [16]|
|    1000240|[20, 89, 143, 138]|
|    1000280|        [199, 183]|
+-----------+------------------+
only showing top 3 rows



In [139]:
set_df.write.parquet("/set_topics.parquet",mode="overwrite")
set_df.registerTempTable("set_topics")

In [140]:
def jaccard(x):
    if((x.doc_set_list is None) | (x.ad_doc_set_list is None) | (x is None)):
        return {'display_id':x.display_id,'ad_id':x.ad_id,'jac_score':0.0}
    top_1 = set(x.doc_set_list)
    top_2 = set(x.ad_doc_set_list)
    score = (len(top_1.intersection(top_2)) / len(top_1.union(top_2)))
    mydict = {'display_id':x.display_id,'ad_id':x.ad_id,'jac_score':score}
    return(mydict)

In [143]:
se.sql("""
SELECT
BASE.*,
st2.set_list as ad_doc_set_list
FROM (SELECT
    bt.*,
    st1.set_list as doc_set_list
    FROM
        baseline_train bt LEFT JOIN set_topics st1
        on bt.document_id = st1.document_id) BASE
        LEFT JOIN set_topics st2
        on BASE.ad_doc_id = st2.document_id
""").write.parquet("/baseline_train_with_topics.parquet",mode="overwrite")
baseline_train = se.read.parquet("/baseline_train_with_topics.parquet")
baseline_train.registerTempTable("baseline_train_topics")

In [144]:
baseline_train.rdd.map(jaccard).saveAsPickleFile("/pickle_jaccard_train10.pickle")
rdd = sc.pickleFile("/pickle_jaccard_train10.pickle")
rdd_of_rows = rdd.map(lambda x: Row(**x))
df = rdd_of_rows.toDF()
df.write.parquet("\train_jac_score.parquet",mode="overwrite")
df.registerTempTable("train_jac_score")

In [145]:
se.sql("""
SELECT
BASE.*,
st2.set_list as ad_doc_set_list
FROM (SELECT
    bt.*,
    st1.set_list as doc_set_list
    FROM
        baseline_test bt LEFT JOIN set_topics st1
        on bt.document_id = st1.document_id) BASE
        LEFT JOIN set_topics st2
        on BASE.ad_doc_id = st2.document_id
""").write.parquet("/baseline_test_with_topics.parquet",mode="overwrite")
baseline_test = se.read.parquet("/baseline_test_with_topics.parquet")
baseline_test.registerTempTable("baseline_test_topics")

In [146]:
baseline_test.rdd.map(jaccard).saveAsPickleFile("/pickle_jaccard_test6.pickle")
rdd = sc.pickleFile("/pickle_jaccard_test6.pickle")
rdd_of_rows = rdd.map(lambda x: Row(**x))
df = rdd_of_rows.toDF()
df.write.parquet("\test_jac_score.parquet",mode="overwrite")
df.registerTempTable("test_jac_score")

In [147]:
se.sql("""
select *
from train_jac_score""").show(3)

+-----+----------+---------+
|ad_id|display_id|jac_score|
+-----+----------+---------+
|59937|    593656|      0.0|
|59937|    753683|      0.0|
|59937|    661032|      0.0|
+-----+----------+---------+
only showing top 3 rows



In [148]:
se.sql("""
SELECT 
bt.*,
tjc.jac_score as jac_score
FROM baseline_train bt LEFT JOIN train_jac_score tjc
ON (bt.ad_id = tjc.ad_id) AND (bt.display_id = tjc.display_id)
""").write.parquet("/baseline_train.parquet",mode="overwrite")

se.sql("""
SELECT 
bt.*,
tjc.jac_score as jac_score
FROM baseline_test bt LEFT JOIN train_jac_score tjc
ON (bt.ad_id = tjc.ad_id) AND (bt.display_id = tjc.display_id)
""").write.parquet("/baseline_test.parquet",mode="overwrite")

In [149]:
df = se.read.parquet("/baseline_train.parquet")
df.registerTempTable("baseline_train")
df = se.read.parquet("/baseline_test.parquet")
df.registerTempTable("baseline_test")

In [150]:
df.show(3)

+--------------+-----------+----------+------+----------+---------+---------------+------------+------------+----+---------+
|          uuid|document_id|display_id| ad_id| timestamp|ad_doc_id|flag_ad_clicked|flag_ad_seen|publisher_id|freq|jac_score|
+--------------+-----------+----------+------+----------+---------+---------------+------------+------------+----+---------+
|ba1a1c1ca4eda7|     279789|  22287839|100023|1255800522|  1134055|           null|        null|        null|null|     null|
|dadec92e81475e|    1874495|  17113957|100074|  80862164|  1149622|           null|        null|        1142|null|     null|
|2c27508d68df25|    1363126|  22157897|100114|1250429171|  1029185|           null|        null|         866|null|     null|
+--------------+-----------+----------+------+----------+---------+---------------+------------+------------+----+---------+
only showing top 3 rows



In [151]:
#32225162
df = se.read.parquet("/baseline_test.parquet")
df.count()

32225162

### ADD ALL OTHER DETAILS OF KNOWN CELLS

#### GEO LOCATION AND PLATFORM

In [152]:
df = se.sql("""
SELECT
    bt.*,
    e.geo_location,
    e.platform
from baseline_train bt LEFT JOIN events e
ON (e.uuid = bt.uuid) AND (e.document_id = bt.document_id)
""")
df.withColumn("geo_country", f.split("geo_location",">")[0])\
  .withColumn("geo_state", f.split("geo_location",">")[1])\
  .withColumn("geo_dist", f.split("geo_location",">")[2]).write.parquet("\baseline_train.parquet",mode="overwrite")
df.registerTempTable("baseline_train")

In [153]:
df = se.sql("""
SELECT
    bt.*,
    e.geo_location,
    e.platform
from baseline_test bt LEFT JOIN events e
ON (e.uuid = bt.uuid) AND (e.document_id = bt.document_id)
""")
df.withColumn("geo_country", f.split("geo_location",">")[0])\
  .withColumn("geo_state", f.split("geo_location",">")[1])\
  .withColumn("geo_dist", f.split("geo_location",">")[2]).write.parquet("\baseline_test.parquet",mode="overwrite")
df.registerTempTable("baseline_test")


#### AD CAMPAIGN AND ADVERTISER ID

In [154]:
df = se.sql("""
SELECT 
bt.*,
pc.campaign_id,
pc.advertiser_id
FROM baseline_train bt JOIN promoted_content pc
    ON bt.ad_id = pc.ad_id
""")
df.write.parquet("\baseline_train.parquet",mode="overwrite")
df.registerTempTable("baseline_train")

df = se.sql("""
SELECT 
bt.*,
pc.campaign_id,
pc.advertiser_id
FROM baseline_test bt JOIN promoted_content pc
    ON bt.ad_id = pc.ad_id
""")
df.write.parquet("\baseline_test.parquet",mode="overwrite")
df.registerTempTable("baseline_test")


In [155]:
df.show(1)

+--------------+-----------+----------+------+----------+---------+---------------+------------+------------+----+---------+------------+--------+-----------+-------------+
|          uuid|document_id|display_id| ad_id| timestamp|ad_doc_id|flag_ad_clicked|flag_ad_seen|publisher_id|freq|jac_score|geo_location|platform|campaign_id|advertiser_id|
+--------------+-----------+----------+------+----------+---------+---------------+------------+------------+----+---------+------------+--------+-----------+-------------+
|1002c599dbe8da|    2851078|  20493995|156936|1163149548|  1389098|           null|        null|        null|null|     null|       US>TX|       2|      19873|         1048|
+--------------+-----------+----------+------+----------+---------+---------------+------------+------------+----+---------+------------+--------+-----------+-------------+
only showing top 1 row



#### all entities

In [156]:
df_topics = se.sql("""select * from documents_topics""")
df_topics = df_topics.groupby("document_id").agg(f.collect_list(f.struct("topic_id","confidence_level")).alias("list_topics"))

In [157]:
df_entities = se.sql("""select * from documents_entities""")
df_entities = df_entities.groupby("document_id").agg(f.collect_list(f.struct("entity_id","confidence_level")).alias("list_entities"))

In [158]:
df_categories = se.sql("""select * from documents_categories""")
df_categories = df_categories.groupby("document_id").agg(f.collect_list(f.struct("category_id","confidence_level")).alias("list_categories"))

In [159]:
df = df_topics.join(df_entities,df_topics.document_id == df_entities.document_id,'full_outer').select("documents_topics.document_id","list_topics","list_entities")
df = df.join(df_categories, df.document_id == df_categories.document_id,'full_outer').select("documents_topics.document_id","list_topics","list_entities","list_categories")

In [160]:
df.write.parquet("\feat_lists_tuple.parquet",mode="overwrite")
df.registerTempTable("feat_lists")

In [161]:
df.show(1)

+-----------+--------------------+--------------------+--------------------+
|document_id|         list_topics|       list_entities|     list_categories|
+-----------+--------------------+--------------------+--------------------+
|     100010|[[16, 0.079073048...|[[e02abf90d2468f5...|[[1513, 0.7984279...|
+-----------+--------------------+--------------------+--------------------+
only showing top 1 row



In [162]:
df = se.sql("""
SELECT
    bt.*,
    fl.list_entities,
    fl.list_topics,
    fl.list_categories
from baseline_train bt LEFT JOIN feat_lists fl
ON bt.document_id = fl.document_id
""")
df.write.parquet("\baseline_train.parquet",mode="overwrite")
df.registerTempTable("baseline_train")

df = se.sql("""
SELECT
    bt.*,
    fl.list_entities,
    fl.list_topics,
    fl.list_categories
from baseline_test bt LEFT JOIN feat_lists fl
ON bt.document_id = fl.document_id
""")
df.write.parquet("\baseline_test.parquet",mode="overwrite")
df.registerTempTable("baseline_test")


In [163]:
from pyspark.sql.types import StringType

In [164]:
df = se.sql("""select * from baseline_test""")
df2 = df.withColumn('clicked', f.lit(None).cast(StringType()))
df2.write.parquet("\baseline_test.parquet",mode="overwrite")

In [165]:
df2.registerTempTable("baseline_test")

## VW MAPPER

In [166]:
# NSUU for namespace User // NSD for namespace Document
# CAT for categorial // CON for continous // LIST for list
se.sql("""
SELECT 

uuid as NSUU_CAT_uuid,
document_id as NDS_CAT_doc_id,
display_id as NSD_CAT_dis_id,
ad_id as NSD_CAT_ad_id,
ad_doc_id as NSD_CAT_ad_doc_id,
flag_ad_clicked as NSUU_CAT_flag_ad_clicked,
flag_ad_seen as NSUU_CAT_flag_ad_seen,
publisher_id as NSD_CAT_pub_id,
freq as NSUU_CON_freq,
jac_score as NSD_CON_jac_score,
campaign_id as NSD_CAT_campaign_id,
advertiser_id as NSD_CAT_advertiser_id,
list_entities as NSD_LIST_entities,
list_topics as NSD_LIST_topics,
list_categories as NSD_LIST_categories,
geo_location as NSUU_CAT_geo,
platform as NSUU_CAT_platform,
clicked

FROM baseline_test
""").write.parquet("/vw_feat_test.parquet",mode="overwrite")

se.sql("""
SELECT 

uuid as NSUU_CAT_uuid,
document_id as NDS_CAT_doc_id,
display_id as NSD_CAT_dis_id,
ad_id as NSD_CAT_ad_id,
ad_doc_id as NSD_CAT_ad_doc_id,
flag_ad_clicked as NSUU_CAT_flag_ad_clicked,
flag_ad_seen as NSUU_CAT_flag_ad_seen,
publisher_id as NSD_CAT_pub_id,
freq as NSUU_CON_freq,
jac_score as NSD_CON_jac_score,
campaign_id as NSD_CAT_campaign_id,
advertiser_id as NSD_CAT_advertiser_id,
list_entities as NSD_LIST_entities,
list_topics as NSD_LIST_topics,
list_categories as NSD_LIST_categories,
geo_location as NSUU_CAT_geo,
platform as NSUU_CAT_platform,
clicked

FROM baseline_train
""").write.parquet("/vw_feat_train.parquet",mode="overwrite")

In [167]:
se.read.parquet("/vw_feat_train.parquet").show(1)

+--------------+--------------+--------------+-------------+-----------------+------------------------+---------------------+--------------+-------------+-----------------+-------------------+---------------------+-----------------+--------------------+--------------------+------------+-----------------+-------+
| NSUU_CAT_uuid|NDS_CAT_doc_id|NSD_CAT_dis_id|NSD_CAT_ad_id|NSD_CAT_ad_doc_id|NSUU_CAT_flag_ad_clicked|NSUU_CAT_flag_ad_seen|NSD_CAT_pub_id|NSUU_CON_freq|NSD_CON_jac_score|NSD_CAT_campaign_id|NSD_CAT_advertiser_id|NSD_LIST_entities|     NSD_LIST_topics| NSD_LIST_categories|NSUU_CAT_geo|NSUU_CAT_platform|clicked|
+--------------+--------------+--------------+-------------+-----------------+------------------------+---------------------+--------------+-------------+-----------------+-------------------+---------------------+-----------------+--------------------+--------------------+------------+-----------------+-------+
|53b567c20a8eb2|       1000240|      12873428|       46607

In [169]:
#32225162
df = se.read.parquet("/vw_feat_train.parquet")
df.count()

92274377

In [None]:
651346 HAVE
32225162 TOTAL
6245533 NEED