# Check for common deployment errors first

This should show three nodes with 500 - 1000 GB each (951 in this case). Ambari -> 'Start All' should be done first.

In [7]:
! hdfs dfsadmin -report | grep "DFS Remaining"

DFS Remaining: 2934991313408 (2.67 TB)
DFS Remaining: 999476915712 (930.84 GB)
DFS Remaining%: 93.06%
DFS Remaining: 1001890696704 (933.08 GB)
DFS Remaining%: 93.28%
DFS Remaining: 933623700992 (869.50 GB)
DFS Remaining%: 86.93%


# Create Spark Context

In [8]:
import spark_setup
spark_setup.setup_pyspark_env()
import spark_utils

In [9]:
%%time
sc = spark_utils.get_spark_context()

Ambari - http://10.0.1.21:8080
All Applications - http://10.0.1.23:8088/cluster
CPU times: user 16 ms, sys: 20 ms, total: 36 ms
Wall time: 19.5 s


In [10]:
import pandas as pd
from pyspark.sql import SparkSession

ss = SparkSession(sc)

In [11]:
from hdfs import InsecureClient
hdfs_client = InsecureClient("http://cluster1:50070", user='hdfs')

# Download task data

In [12]:
# place yours here
student_id = 53

In [7]:
import json
student_region = json.loads(open("../azure/regions.json").read())["student{}".format(student_id)]
print "your region is:", student_region

your region is: southcentralus


In [8]:
task_data_link = "https://lsml1{}.blob.core.windows.net/data/task1.zip".format(student_region)
print task_data_link

https://lsml1southcentralus.blob.core.windows.net/data/task1.zip


In [9]:
%%time
# you can see progress in tmux (tab with running jupyter notebook)
import os
os.system("wget {} -O /data/task1.zip".format(task_data_link))

CPU times: user 56 ms, sys: 20 ms, total: 76 ms
Wall time: 5min 59s


In [10]:
%%time
# unzip task1.zip (many zip files inside)
os.system("unzip /data/task1.zip -d /data/")

CPU times: user 0 ns, sys: 4 ms, total: 4 ms
Wall time: 1.06 s


256

In [13]:
# verify that you're all set
! du -sh /data/*.zip

136M	/data/clicks_test.csv.zip
390M	/data/clicks_train.csv.zip
33M	/data/documents_categories.csv.zip
126M	/data/documents_entities.csv.zip
16M	/data/documents_meta.csv.zip
121M	/data/documents_topics.csv.zip
478M	/data/events.csv.zip
30G	/data/page_views.csv.zip
149M	/data/page_views_sample.csv.zip
2.6M	/data/promoted_content.csv.zip
100M	/data/sample_submission.csv.zip
32G	/data/task1.zip


# Load data to HDFS

https://www.kaggle.com/c/outbrain-click-prediction/data

In [14]:
import time

def timeit(method):
    def timed(*args, **kw):
        ts = time.time()
        result = method(*args, **kw)
        te = time.time()
        print '%r (%r, %r) %2.2f sec' % \
              (method.__name__, args, kw, te-ts)
        return result
    return timed

In [15]:
hdfs_client.delete("/task1", recursive=True)

True

In [16]:
%%time
import subprocess

@timeit
def unzip_to_hdfs(fn):
    fn_out = fn.replace(".zip", "")
    print subprocess.check_output("unzip -p /data/{0} | hadoop fs -put - /task1/{1}".format(fn, fn_out), shell=True)
    
fns = [
    "clicks_test.csv.zip",
    "clicks_train.csv.zip",
    "documents_categories.csv.zip",
    "documents_entities.csv.zip",
    "documents_meta.csv.zip",
    "documents_topics.csv.zip",
    "events.csv.zip",
    "page_views.csv.zip",
    "page_views_sample.csv.zip",
    "promoted_content.csv.zip",
    "sample_submission.csv.zip"
]

for fn in fns:
    unzip_to_hdfs(fn)


'unzip_to_hdfs' (('clicks_test.csv.zip',), {}) 9.06 sec

'unzip_to_hdfs' (('clicks_train.csv.zip',), {}) 19.03 sec

'unzip_to_hdfs' (('documents_categories.csv.zip',), {}) 3.59 sec

'unzip_to_hdfs' (('documents_entities.csv.zip',), {}) 6.92 sec

'unzip_to_hdfs' (('documents_meta.csv.zip',), {}) 3.01 sec

'unzip_to_hdfs' (('documents_topics.csv.zip',), {}) 8.65 sec

'unzip_to_hdfs' (('events.csv.zip',), {}) 21.11 sec

'unzip_to_hdfs' (('page_views.csv.zip',), {}) 1321.19 sec

'unzip_to_hdfs' (('page_views_sample.csv.zip',), {}) 10.30 sec

'unzip_to_hdfs' (('promoted_content.csv.zip',), {}) 2.68 sec

'unzip_to_hdfs' (('sample_submission.csv.zip',), {}) 7.36 sec
CPU times: user 268 ms, sys: 96 ms, total: 364 ms
Wall time: 23min 32s


In [17]:
! hadoop fs -du -s -h /task1/*.csv

483.5 M  /task1/clicks_test.csv
1.4 G  /task1/clicks_train.csv
112.5 M  /task1/documents_categories.csv
309.1 M  /task1/documents_entities.csv
85.2 M  /task1/documents_meta.csv
323.7 M  /task1/documents_topics.csv
1.1 G  /task1/events.csv
88.4 G  /task1/page_views.csv
433.3 M  /task1/page_views_sample.csv
13.2 M  /task1/promoted_content.csv
260.5 M  /task1/sample_submission.csv


In [18]:
# files are written on cluster1 node only, need to balance HDFS on cluster

In [19]:
! hdfs dfsadmin -setBalancerBandwidth 1000000000

Balancer bandwidth is set to 1000000000


In [20]:
%%time
! hdfs balancer -threshold 5 > balancer.log 2>&1

CPU times: user 5.44 s, sys: 2.75 s, total: 8.19 s
Wall time: 3min 28s


# Read example

In [21]:
pvdf = ss.read.csv("/task1/page_views.csv", header=True)

In [22]:
pvdf.dtypes

[('uuid', 'string'),
 ('document_id', 'string'),
 ('timestamp', 'string'),
 ('platform', 'string'),
 ('geo_location', 'string'),
 ('traffic_source', 'string')]

In [None]:
pvdf.show(5)

+--------------+-----------+---------+--------+------------+--------------+
|          uuid|document_id|timestamp|platform|geo_location|traffic_source|
+--------------+-----------+---------+--------+------------+--------------+
|1fd5f051fba643|        120| 31905835|       1|          RS|             2|
|8557aa9004be3b|        120| 32053104|       1|       VN>44|             2|
|c351b277a358f0|        120| 54013023|       1|       KR>12|             1|
|8205775c5387f9|        120| 44196592|       1|       IN>16|             2|
|9cb0ccd8458371|        120| 65817371|       1|   US>CA>807|             2|
+--------------+-----------+---------+--------+------------+--------------+
only showing top 5 rows



In [22]:
%%time
pvdf.count()

CPU times: user 140 ms, sys: 40 ms, total: 180 ms
Wall time: 12min 47s


2034275448

# Parquet is faster than CSV

http://events.linuxfoundation.org/sites/events/files/slides/ApacheCon%20BigData%20Europe%202016%20-%20Parquet%20in%20Practice%20%26%20Detail_0.pdf

In [None]:
%%time
pvdf.write.parquet("/task1/page_views.parquet")

In [None]:
! hadoop fs -du -s -h /task1/page_views.parquet

In [None]:
pvdf2 = ss.read.parquet("/task1/page_views.parquet")

In [None]:
%%time
from IPython.display import display
boo = pvdf2.groupBy("geo_location").count().collect()
display(boo[:5])

In [None]:
%%time
boo = pvdf.groupBy("geo_location").count().collect()
display(boo[:5])

# Convert all to Parquet

In [31]:
%%time
def convert_all_to_parquet():
    task_dir = "/task1/"
    all_files = hdfs_client.list(task_dir)
    for fn in all_files:
        if fn.endswith(".csv"):
            fn_after = fn.replace(".csv", ".parquet")
            path_before = task_dir + fn
            path_after = task_dir + fn_after
            if fn_after not in all_files:
                # generate parquet
                df = ss.read.csv(path_before, header=True)
                df.write.parquet(path_after)
            # remove csv, we have parquet now
            hdfs_client.delete(path_before)
            print fn_after, "done"

convert_all_to_parquet()

clicks_test.parquet done
clicks_train.parquet done
documents_categories.parquet done
documents_entities.parquet done
documents_meta.parquet done
documents_topics.parquet done
events.parquet done
page_views.parquet done
page_views_sample.parquet done
promoted_content.parquet done
sample_submission.parquet done
CPU times: user 128 ms, sys: 40 ms, total: 168 ms
Wall time: 5min 17s


In [32]:
! hadoop fs -du -s -h /task1/*

133.2 M  /task1/clicks_test.parquet
367.5 M  /task1/clicks_train.parquet
36.5 M  /task1/documents_categories.parquet
184.0 M  /task1/documents_entities.parquet
21.2 M  /task1/documents_meta.parquet
183.3 M  /task1/documents_topics.parquet
669.3 M  /task1/events.parquet
47.3 G  /task1/page_views.parquet
236.9 M  /task1/page_views_sample.parquet
5.0 M  /task1/promoted_content.parquet
184.2 M  /task1/sample_submission.parquet


# Preview all files

In [33]:
%%time
def preview_all_files():
    task_dir = "/task1/"
    all_files = hdfs_client.list(task_dir)
    for fn in all_files:
        df = ss.read.parquet(task_dir + fn)
        print "#" * 15 + " {0} ".format(task_dir + fn) + "#" * 15
        df.show(1)
        
preview_all_files()

############### /task1/clicks_test.parquet ###############
+----------+------+
|display_id| ad_id|
+----------+------+
|  17805143|288388|
+----------+------+
only showing top 1 row

############### /task1/clicks_train.parquet ###############
+----------+-----+-------+
|display_id|ad_id|clicked|
+----------+-----+-------+
|         1|42337|      0|
+----------+-----+-------+
only showing top 1 row

############### /task1/documents_categories.parquet ###############
+-----------+-----------+----------------+
|document_id|category_id|confidence_level|
+-----------+-----------+----------------+
|    1544588|       1513|     0.263546236|
+-----------+-----------+----------------+
only showing top 1 row

############### /task1/documents_entities.parquet ###############
+-----------+--------------------+-----------------+
|document_id|           entity_id| confidence_level|
+-----------+--------------------+-----------------+
|    1539011|e01ed0c4a3e8f8f35...|0.327269624728567|
+-----------+

# Register all tables to be usable in SQL queries

In [34]:
%%time
def register_all_tables():
    task_dir = "/task1/"
    all_files = hdfs_client.list(task_dir)
    for fn in all_files:
        if fn.endswith(".parquet"):
            table_name = fn.replace(".parquet", "")
            df = ss.read.parquet(task_dir + fn)
            df.registerTempTable(table_name)
            print table_name, "done"
        
register_all_tables()

clicks_test done
clicks_train done
documents_categories done
documents_entities done
documents_meta done
documents_topics done
events done
page_views done
page_views_sample done
promoted_content done
sample_submission done
CPU times: user 12 ms, sys: 28 ms, total: 40 ms
Wall time: 2.45 s


# SQL query example

In [35]:
%%time
ss.sql("""
select count(distinct(uuid)) as users_countb
from events
""").collect()

CPU times: user 24 ms, sys: 4 ms, total: 28 ms
Wall time: 1min 20s


[Row(users_countb=19794967)]

In [54]:
# %%time
# ss.sql("""
# select distinct(geo_location)
# from events
# """).show(30)

+------------+
|geo_location|
+------------+
|   US>MT>756|
|       IE>16|
|   US>MS>673|
|       NL>10|
|       AE>05|
|       TH>46|
|       IL>01|
|          LT|
|       MA>57|
|       US>NY|
|       ES>07|
|          DZ|
|       CO>02|
|       CM>09|
|       ZM>03|
|       BG>50|
|   US>MT>764|
|   US>FL>548|
|       SE>26|
|       EC>18|
|   US>MT>881|
|       KZ>12|
|       AS>00|
|       BZ>01|
|       HU>09|
|       HU>18|
|       BR>08|
|       IN>20|
|       CN>01|
|       RU>01|
+------------+
only showing top 30 rows

CPU times: user 16 ms, sys: 4 ms, total: 20 ms
Wall time: 52.1 s


# 1. Baseline

Simple model using the following features:
- **clicked**
- geo_location features (country, state, dma)
- day_of_week (from timestamp, use *date.isoweekday()*)
- ad_id
- ad_document_id
- campaign_id
- advertiser_id
- display_document_id
- platform

## Calculate features for VW

- Use DataFrame API to join tables (functions in SQL queries: https://spark.apache.org/docs/2.1.0/api/java/org/apache/spark/sql/functions.html)
- Use Python API to calculate features and save them as text for VW (*saveAsTextFile()*)
- Hash features in Spark (24 bits, use *sklearn.utils.murmurhash.murmurhash3_32*)
- Split dataset in Spark into 90% train, 10% test **by display_id**, save the split for further use

In [33]:
from sklearn.utils.murmurhash import murmurhash3_32
def hasher(x, bits):
    return murmurhash3_32(x) % 2**bits

# My code

1. Parse geopoints (as in solution). See 'Events' on [solution](http://dsnotes.com/post/2017-01-27-lessons-learned-from-outbrain-click-prediction-kaggle-competition/) page

In [36]:
# we start with a DataFrame
events_df = ss.sql("select * from events")
# geopoints_parsing_df.show(3)

# # we can make RDD of Rows with *.rdd
from pyspark.sql import Row
# events_df.rdd.take(3)
type(events_df)
# events_df['platform']#.unique()

pyspark.sql.dataframe.DataFrame

In [37]:
ss.sql("""
select distinct(platform)
from events
""").toPandas()

Unnamed: 0,platform
0,3
1,\N
2,1
3,2


In [38]:
ss.sql("""
describe events
""").show()

+------------+---------+-------+
|    col_name|data_type|comment|
+------------+---------+-------+
|  display_id|   string|   null|
|        uuid|   string|   null|
| document_id|   string|   null|
|   timestamp|   string|   null|
|    platform|   string|   null|
|geo_location|   string|   null|
+------------+---------+-------+



In [48]:
temp = ss.sql("""
select distinct(geo_location)
from events
""")

In [49]:
temp.show(15)

+------------+
|geo_location|
+------------+
|   US>MT>756|
|       IE>16|
|   US>MS>673|
|       NL>10|
|       AE>05|
|       TH>46|
|       IL>01|
|          LT|
|       MA>57|
|       US>NY|
|       ES>07|
|          DZ|
|       CO>02|
|       ZM>03|
|       BG>50|
+------------+
only showing top 15 rows



In [61]:
from sklearn.utils.murmurhash import murmurhash3_32

def geo_to_arr(x):
    if x.geo_location:
        return (x.geo_location.split('>') + ['', '', ''])[:3]
    else:
        return ['', '', '']

def handle_geo_nans(x):
    if not x.geo_location:
        return ''
    return x.geo_location

def handle_platform_nans(x):
    if x['platform'] == '\N':
        return 1
    return x.platform

def hash_murmur(x):
    bits = 28
    return murmurhash3_32(x) % 2**bits

def do_as_selivanov_does(x):
    geo = geo_to_arr(x)
    return Row(uuid=hash_murmur(x['uuid']),
               platform=handle_platform_nans(x),
               country=hash_murmur(geo[0]),
               state=hash_murmur(geo[1]),
               dma=hash_murmur(geo[2]),
               geo_location=hash_murmur(handle_geo_nans(x)),
               display_id=x['display_id'],
               document_id=x['document_id'],
               timestamp=x['timestamp'])

# When it's RDD, we can use Python to create new RDD of Rows

new_events_df = events_df.rdd \
    .map(do_as_selivanov_does)
# new_events_df.take(3)

In [62]:
%%time
# much faster thanks to conversion back to DataFrame (works for simple python collections in columns)
ss.createDataFrame(new_events_df).write.mode("overwrite").parquet("/task1/new_events")

CPU times: user 84 ms, sys: 24 ms, total: 108 ms
Wall time: 6min


In [63]:
ss.read.parquet("/task1/new_events").registerTempTable("new_events")

In [64]:
new_events = ss.sql("select * from new_events")
new_events.show(3)

+---------+----------+---------+-----------+------------+--------+---------+---------+---------+
|  country|display_id|      dma|document_id|geo_location|platform|    state|timestamp|     uuid|
+---------+----------+---------+-----------+------------+--------+---------+---------+---------+
|175781132|         1| 70963568|     379743|   153819337|       3|193711855|       61|104110024|
|175781132|         2|183315336|    1794259|   251276266|       2|237626922|       81| 13662515|
|175781132|         3| 86670560|    1179111|    26577704|       2|139670629|      182|174690784|
+---------+----------+---------+-----------+------------+--------+---------+---------+---------+
only showing top 3 rows



In [88]:
ss.sql("""
describe promoted_content
""").show()

+-------------+---------+-------+
|     col_name|data_type|comment|
+-------------+---------+-------+
|        ad_id|   string|   null|
|  document_id|   string|   null|
|  campaign_id|   string|   null|
|advertiser_id|   string|   null|
+-------------+---------+-------+



In [82]:
events_clicks_joined = ss.sql(
# """
# create TEMPORARY table test
# USING PARQUET
# as"""
"""
select 
    ne.*,
    ct.ad_id, ct.clicked
from 
    new_events as ne
    join clicks_train as ct on ct.display_id = ne.display_id""")

In [83]:
events_clicks_joined.show(3)

+---------+----------+---------+-----------+------------+--------+---------+---------+--------+------+-------+
|  country|display_id|      dma|document_id|geo_location|platform|    state|timestamp|    uuid| ad_id|clicked|
+---------+----------+---------+-----------+------------+--------+---------+---------+--------+------+-------+
|175781132|  10000108|183315336|    1126487|   251276266|       1|237626922|668707895|69455135|132815|      1|
|175781132|  10000108|183315336|    1126487|   251276266|       1|237626922|668707895|69455135|133677|      0|
|175781132|  10000108|183315336|    1126487|   251276266|       1|237626922|668707895|69455135|406686|      0|
+---------+----------+---------+-----------+------------+--------+---------+---------+--------+------+-------+
only showing top 3 rows



In [85]:
%%time
# much faster thanks to conversion back to DataFrame (works for simple python collections in columns)
events_clicks_joined.write.mode("overwrite").parquet("/task1/events_clicks_joined")

CPU times: user 20 ms, sys: 4 ms, total: 24 ms
Wall time: 3min 33s


In [86]:
ss.read.parquet("/task1/events_clicks_joined").registerTempTable("events_clicks_joined")

In [89]:
ss.sql("""
describe events_clicks_joined
""").show()

+------------+---------+-------+
|    col_name|data_type|comment|
+------------+---------+-------+
|     country|   bigint|   null|
|  display_id|   string|   null|
|         dma|   bigint|   null|
| document_id|   string|   null|
|geo_location|   bigint|   null|
|    platform|   string|   null|
|       state|   bigint|   null|
|   timestamp|   string|   null|
|        uuid|   bigint|   null|
|       ad_id|   string|   null|
|     clicked|   string|   null|
+------------+---------+-------+



In [107]:
ss.sql("""
describe promoted_content
""").show()
# 

+-------------+---------+-------+
|     col_name|data_type|comment|
+-------------+---------+-------+
|        ad_id|   string|   null|
|  document_id|   string|   null|
|  campaign_id|   string|   null|
|advertiser_id|   string|   null|
+-------------+---------+-------+



In [112]:
events_clicks_prom_joined = ss.sql("""
select 
    ecj.*, 
    pc.document_id as promo_document_id, pc.campaign_id, pc.advertiser_id
from 
    events_clicks_joined as ecj
    join promoted_content as pc on pc.ad_id = ecj.ad_id""")

In [113]:
events_clicks_prom_joined.show(3)

+---------+----------+---------+-----------+------------+--------+---------+---------+---------+------+-------+-----------------+-----------+-------------+
|  country|display_id|      dma|document_id|geo_location|platform|    state|timestamp|     uuid| ad_id|clicked|promo_document_id|campaign_id|advertiser_id|
+---------+----------+---------+-----------+------------+--------+---------+---------+---------+------+-------+-----------------+-----------+-------------+
|175781132|  10000022|241243482|       4099|   156524926|       1|158446504|668703976|127285375| 47454|      0|           775716|       3135|          551|
|175781132|  10000022|241243482|       4099|   156524926|       1|158446504|668703976|127285375|120373|      1|          1041662|      10123|           58|
|175781132|  10000084| 54356298|    2394540|   229441745|       1|184152563|668706874|222045033|161336|      0|          1402797|      20307|         2260|
+---------+----------+---------+-----------+------------+-------

In [114]:
%%time
# much faster thanks to conversion back to DataFrame (works for simple python collections in columns)
events_clicks_prom_joined.write.mode("overwrite").parquet("/task1/events_clicks_prom_joined")

CPU times: user 8 ms, sys: 12 ms, total: 20 ms
Wall time: 2min 59s


In [115]:
ss.read.parquet("/task1/events_clicks_prom_joined").registerTempTable("events_clicks_prom_joined")

In [117]:
ss.sql("""describe events_clicks_prom_joined""").show()

+-----------------+---------+-------+
|         col_name|data_type|comment|
+-----------------+---------+-------+
|          country|   bigint|   null|
|       display_id|   string|   null|
|              dma|   bigint|   null|
|      document_id|   string|   null|
|     geo_location|   bigint|   null|
|         platform|   string|   null|
|            state|   bigint|   null|
|        timestamp|   string|   null|
|             uuid|   bigint|   null|
|            ad_id|   string|   null|
|          clicked|   string|   null|
|promo_document_id|   string|   null|
|      campaign_id|   string|   null|
|    advertiser_id|   string|   null|
+-----------------+---------+-------+



In [119]:
ss.sql("""
select
    display_id, document_id, platform, timestamp, ad_id, clicked,
    promo_document_id, campaign_id, advertiser_id
from events_clicks_prom_joined""").show(3)

+----------+-----------+--------+---------+------+-------+-----------------+-----------+-------------+
|display_id|document_id|platform|timestamp| ad_id|clicked|promo_document_id|campaign_id|advertiser_id|
+----------+-----------+--------+---------+------+-------+-----------------+-----------+-------------+
|  10000022|       4099|       1|668703976| 47454|      0|           775716|       3135|          551|
|  10000022|       4099|       1|668703976|120373|      1|          1041662|      10123|           58|
|  10000084|    2394540|       1|668706874|161336|      0|          1402797|      20307|         2260|
+----------+-----------+--------+---------+------+-------+-----------------+-----------+-------------+
only showing top 3 rows



In [216]:
from sklearn.utils.murmurhash import murmurhash3_32

def h(x):
    bits = 28
    return murmurhash3_32(x) % 2**bits

def h_with_colname(x, y):
    return h(x) + h(y)

def handle_clicked(x):
    if x == 0:
        return -1
    return x

def do_as_selivanov_does_2(x):
    return Row(country=x.country+h('country'),
               dma=x.dma+h('dma'),
               geo_location=x.geo_location+h('geo_location'),
               state=x.state+h('state'),
               uuid=x.uuid+h('uuid'),
               display_id=h_with_colname(int(x.display_id), 'display_id'),
               display_id_no_hash=int(x.display_id),
               document_id=h_with_colname(int(x.document_id), 'document_id'),
               ad_id=h_with_colname(int(x.ad_id), 'ad_id'),
               ad_id_no_hash=int(x.ad_id),
               promo_document_id=h_with_colname(int(x.promo_document_id), 'promo_document_id'),
               campaign_id=h_with_colname(int(x.campaign_id), 'campaign_id'),
               advertiser_id=h_with_colname(int(x.advertiser_id), 'advertiser_id'),
               platform=int(x.platform),
               timestamp=int(x.timestamp),
               clicked=handle_clicked(int(x.clicked)))
               

ecpj_hashed = events_clicks_prom_joined.rdd \
    .map(do_as_selivanov_does_2)

In [217]:
%%time
# much faster thanks to conversion back to DataFrame (works for simple python collections in columns)
ss.createDataFrame(ecpj_hashed).write.mode("overwrite").parquet("/task1/ecpj_hashed")

CPU times: user 84 ms, sys: 40 ms, total: 124 ms
Wall time: 15min 45s


In [218]:
ss.read.parquet("/task1/ecpj_hashed").registerTempTable("ecpj_hashed")

In [227]:
# events_clicks_prom_joined.describe()
ecpj_hashed.take(1)

[Row(ad_id=225494157, ad_id_no_hash=47454, advertiser_id=333740402, campaign_id=394285681, clicked=-1, country=308936875, display_id=255557651, display_id_no_hash=10000022, dma=435105654, document_id=196905228, geo_location=240564039, platform=1, promo_document_id=354778319, state=344404958, timestamp=668703976, uuid=175984321)]

In [None]:
ecpj_hashed.saveAsTextFile('/task1/ecpj_hashed.txt')

In [230]:
import os
import shutil

def copy_text_to_local(hdfs_path, local_path):
    if os.path.exists(local_path):
        shutil.rmtree(local_path)
    os.mkdir(local_path)
    os.system('hadoop fs -copyToLocal "{0}/*" {1}'.format(hdfs_path, local_path))
    os.system('cat {0}/part-* > {1}'.format(local_path, local_path + "/merged.txt"))
    print("done")

In [None]:
%%time
copy_text_to_local("/task1/ecpj_hashed.txt", "/data/ecpj_hashed.txt")

In [234]:
!tail -n 4 /data/ecpj_hashed.txt/merged.txt

Row(ad_id=248066118, ad_id_no_hash=368978, advertiser_id=329419276, campaign_id=232987658, clicked=1, country=308936875, display_id=264535752, display_id_no_hash=9999904, dma=280532732, document_id=114323303, geo_location=110616817, platform=1, promo_document_id=259186399, state=325629083, timestamp=668697477, uuid=213474776)
Row(ad_id=272092809, ad_id_no_hash=428582, advertiser_id=427254936, campaign_id=395604651, clicked=-1, country=308936875, display_id=264535752, display_id_no_hash=9999904, dma=280532732, document_id=114323303, geo_location=110616817, platform=1, promo_document_id=282840021, state=325629083, timestamp=668697477, uuid=213474776)
Row(ad_id=188901557, ad_id_no_hash=436596, advertiser_id=514696938, campaign_id=218286323, clicked=-1, country=308936875, display_id=264535752, display_id_no_hash=9999904, dma=280532732, document_id=114323303, geo_location=110616817, platform=1, promo_document_id=480084326, state=325629083, timestamp=668697477, uuid=213474776)
Row(ad_id=2

In [244]:
# # !sudo pip install telepyth
# import telepyth
# from __future__ import print_function  

# %telepyth -t 14890519403566776828
# %telepyth 'Very magic, wow!'

In [243]:
# from __future__ import print_function    

# def txt_to_vw(in_file, out_file):
#     with open(in_file, 'r') as in_f, open(out_file, 'w') as out_f:
#         for l in in_f:
#             pairs = (l.split('('))[1].split(')')[0].split(', ')
#             d = dict(p.split('=') for p in pairs)
#             y = d.pop('clicked')
#             s = '{} | '.format(y) + ' '.join('{}:1'.format(v) for k,v in d.items())
#             print(s, file=out_f)

# txt_to_vw('/data/ecpj_hashed.txt/merged.txt', '/data/vw_in.txt')

In [249]:
!head /data/ecpj_hashed.txt/merged.txt > /data/check.txt

In [None]:
from __future__ import print_function    

def txt_to_vw(in_file, out_file, no_hash_file):
    with open(in_file, 'r') as in_f, \
         open(out_file, 'w') as out_f, \
         open(no_hash_file, 'w') as no_hash_f:
        for l in in_f:
            pairs = (l.split('('))[1].split(')')[0].split(', ')
            d = dict(p.split('=') for p in pairs)

            ad_id = d.pop('ad_id_no_hash')
            display_id = d.pop('display_id_no_hash')
            print('{}, {}'.format(display_id, ad_id), file=no_hash_f)

            y = d.pop('clicked')
            s = '{} | '.format(y) + ' '.join('{}:1'.format(v) for k,v in d.items())
            print(s, file=out_f)

txt_to_vw('/data/ecpj_hashed.txt/merged.txt',
          '/data/vw_in.txt',
          '/data/no_hash_rows.txt')

In [258]:
!head -n 15 /data/no_hash_rows.txt

10000022, 47454
10000022, 120373
10000084, 161336
10000084, 180923
10000084, 183234
10000084, 211842
10000084, 224008
10000084, 255506
10000642, 80482
10000642, 117362
10000642, 154918
10000642, 370601
10000656, 116152
10000656, 173560
10000656, 289967


In [259]:
# def parse_to_vw(str_in):
!ls -sh /data/saved_ecpj_hashed.txt/merged.txt

23G /data/saved_ecpj_hashed.txt/merged.txt


In [260]:
# def parse_to_vw(str_in):
!ls -sh /data/vw_in.txt

13G /data/vw_in.txt


# Let's form test samples

In [274]:
ss.sql("""describe clicks_test""").show()

+----------+---------+-------+
|  col_name|data_type|comment|
+----------+---------+-------+
|display_id|   string|   null|
|     ad_id|   string|   null|
+----------+---------+-------+



In [278]:
events_clicks_joined_test = ss.sql(
"""
SELECT 
    ne.*,
    ct.ad_id
FROM 
    new_events AS ne
    join clicks_test AS ct ON ct.display_id = ne.display_id""")

In [279]:
events_clicks_joined_test.show(3)

+---------+----------+--------+-----------+------------+--------+--------+---------+-------+------+
|  country|display_id|     dma|document_id|geo_location|platform|   state|timestamp|   uuid| ad_id|
+---------+----------+--------+-----------+------------+--------+--------+---------+-------+------+
|175781132|  16874673|44518792|     505235|   168776120|       1|34087916|    34120|1227570| 91797|
|175781132|  16874673|44518792|     505235|   168776120|       1|34087916|    34120|1227570|107055|
|175781132|  16874673|44518792|     505235|   168776120|       1|34087916|    34120|1227570|158923|
+---------+----------+--------+-----------+------------+--------+--------+---------+-------+------+
only showing top 3 rows



In [280]:
%%time
events_clicks_joined_test.write.mode("overwrite").parquet("/task1/events_clicks_joined_test")

CPU times: user 12 ms, sys: 0 ns, total: 12 ms
Wall time: 1min 40s


In [281]:
ss.read.parquet("/task1/events_clicks_joined_test").registerTempTable("events_clicks_joined_test")

In [282]:
ss.sql("""
describe events_clicks_joined_test
""").show()

+------------+---------+-------+
|    col_name|data_type|comment|
+------------+---------+-------+
|     country|   bigint|   null|
|  display_id|   string|   null|
|         dma|   bigint|   null|
| document_id|   string|   null|
|geo_location|   bigint|   null|
|    platform|   string|   null|
|       state|   bigint|   null|
|   timestamp|   string|   null|
|        uuid|   bigint|   null|
|       ad_id|   string|   null|
+------------+---------+-------+



In [107]:
# ss.sql("""
# describe promoted_content
# """).show()

+-------------+---------+-------+
|     col_name|data_type|comment|
+-------------+---------+-------+
|        ad_id|   string|   null|
|  document_id|   string|   null|
|  campaign_id|   string|   null|
|advertiser_id|   string|   null|
+-------------+---------+-------+



In [283]:
events_clicks_prom_joined_test = ss.sql("""
select 
    ecj.*, 
    pc.document_id as promo_document_id, pc.campaign_id, pc.advertiser_id
from 
    events_clicks_joined_test as ecj
    join promoted_content as pc on pc.ad_id = ecj.ad_id""")

In [284]:
events_clicks_prom_joined_test.show(3)

+---------+----------+---------+-----------+------------+--------+---------+---------+---------+------+-----------------+-----------+-------------+
|  country|display_id|      dma|document_id|geo_location|platform|    state|timestamp|     uuid| ad_id|promo_document_id|campaign_id|advertiser_id|
+---------+----------+---------+-----------+------------+--------+---------+---------+---------+------+-----------------+-----------+-------------+
|175781132|  16874679|262486585|    1761355|   180291562|       2|225872597|    38382|210322864|  7343|           340745|       1444|          172|
|175781132|  16874679|262486585|    1761355|   180291562|       2|225872597|    38382|210322864|137124|          1225364|      17621|         2988|
|175781132|  16874679|262486585|    1761355|   180291562|       2|225872597|    38382|210322864|147706|           954430|      10363|         2603|
+---------+----------+---------+-----------+------------+--------+---------+---------+---------+------+---------

In [285]:
%%time
# much faster thanks to conversion back to DataFrame (works for simple python collections in columns)
events_clicks_prom_joined_test.write.mode("overwrite").parquet("/task1/events_clicks_prom_joined_test")

CPU times: user 4 ms, sys: 4 ms, total: 8 ms
Wall time: 1min 9s


In [286]:
ss.read.parquet("/task1/events_clicks_prom_joined_test").registerTempTable("events_clicks_prom_joined_test")

In [287]:
ss.sql("""describe events_clicks_prom_joined_test""").show()

+-----------------+---------+-------+
|         col_name|data_type|comment|
+-----------------+---------+-------+
|          country|   bigint|   null|
|       display_id|   string|   null|
|              dma|   bigint|   null|
|      document_id|   string|   null|
|     geo_location|   bigint|   null|
|         platform|   string|   null|
|            state|   bigint|   null|
|        timestamp|   string|   null|
|             uuid|   bigint|   null|
|            ad_id|   string|   null|
|promo_document_id|   string|   null|
|      campaign_id|   string|   null|
|    advertiser_id|   string|   null|
+-----------------+---------+-------+



In [289]:
# CLICKED!!
ss.sql("""
select
    display_id, document_id, platform, timestamp, ad_id,
    promo_document_id, campaign_id, advertiser_id
from events_clicks_prom_joined_test""").show(3)

+----------+-----------+--------+---------+------+-----------------+-----------+-------------+
|display_id|document_id|platform|timestamp| ad_id|promo_document_id|campaign_id|advertiser_id|
+----------+-----------+--------+---------+------+-----------------+-----------+-------------+
|  16874679|    1761355|       2|    38382|  7343|           340745|       1444|          172|
|  16874679|    1761355|       2|    38382|137124|          1225364|      17621|         2988|
|  16874679|    1761355|       2|    38382|147706|           954430|      10363|         2603|
+----------+-----------+--------+---------+------+-----------------+-----------+-------------+
only showing top 3 rows



In [290]:
from sklearn.utils.murmurhash import murmurhash3_32

def h(x):
    bits = 28
    return murmurhash3_32(x) % 2**bits

def h_with_colname(x, y):
    return h(x) + h(y)

def do_as_selivanov_does_2(x):
    return Row(country=x.country+h('country'),
               dma=x.dma+h('dma'),
               geo_location=x.geo_location+h('geo_location'),
               state=x.state+h('state'),
               uuid=x.uuid+h('uuid'),
               display_id=h_with_colname(int(x.display_id), 'display_id'),
               display_id_no_hash=int(x.display_id),
               document_id=h_with_colname(int(x.document_id), 'document_id'),
               ad_id=h_with_colname(int(x.ad_id), 'ad_id'),
               ad_id_no_hash=int(x.ad_id),
               promo_document_id=h_with_colname(int(x.promo_document_id), 'promo_document_id'),
               campaign_id=h_with_colname(int(x.campaign_id), 'campaign_id'),
               advertiser_id=h_with_colname(int(x.advertiser_id), 'advertiser_id'),
               platform=int(x.platform),
               timestamp=int(x.timestamp))
               

ecpj_hashed_test = events_clicks_prom_joined_test.rdd \
    .map(do_as_selivanov_does_2)

In [291]:
%%time
# much faster thanks to conversion back to DataFrame (works for simple python collections in columns)
ss.createDataFrame(ecpj_hashed_test).write.mode("overwrite").parquet("/task1/ecpj_hashed_test")

CPU times: user 64 ms, sys: 8 ms, total: 72 ms
Wall time: 5min 44s


In [292]:
ss.read.parquet("/task1/ecpj_hashed_test").registerTempTable("ecpj_hashed_test")

In [293]:
ecpj_hashed_test.take(1)

[Row(ad_id=307775314, ad_id_no_hash=7343, advertiser_id=506710073, campaign_id=301792321, country=308936875, display_id=496221867, display_id_no_hash=16874679, dma=456348757, document_id=252442268, geo_location=264330675, platform=2, promo_document_id=462589401, state=411831051, timestamp=38382, uuid=259021810)]

In [294]:
ecpj_hashed_test.saveAsTextFile('/task1/ecpj_hashed_test.txt')

In [295]:
import os
import shutil

def copy_text_to_local(hdfs_path, local_path):
    if os.path.exists(local_path):
        shutil.rmtree(local_path)
    os.mkdir(local_path)
    os.system('hadoop fs -copyToLocal "{0}/*" {1}'.format(hdfs_path, local_path))
    os.system('cat {0}/part-* > {1}'.format(local_path, local_path + "/merged.txt"))
    print("done")

In [296]:
%%time
copy_text_to_local("/task1/ecpj_hashed_test.txt", "/data/ecpj_hashed_test.txt")

done
CPU times: user 32 ms, sys: 8 ms, total: 40 ms
Wall time: 5min 19s


In [297]:
!tail -n 4 /data/ecpj_hashed_test.txt/merged.txt

Row(ad_id=253319144, ad_id_no_hash=174547, advertiser_id=440806606, campaign_id=199403314, country=308936875, display_id=377302092, display_id_no_hash=23119602, dma=379001842, document_id=139141068, geo_location=229219811, platform=2, promo_document_id=511012270, state=205914184, timestamp=1295966561, uuid=78228505)
Row(ad_id=294458570, ad_id_no_hash=192149, advertiser_id=257449846, campaign_id=308214969, country=308936875, display_id=377302092, display_id_no_hash=23119602, dma=379001842, document_id=139141068, geo_location=229219811, platform=2, promo_document_id=393154076, state=205914184, timestamp=1295966561, uuid=78228505)
Row(ad_id=316468025, ad_id_no_hash=539951, advertiser_id=427254936, campaign_id=170294627, country=308936875, display_id=377302092, display_id_no_hash=23119602, dma=379001842, document_id=139141068, geo_location=229219811, platform=2, promo_document_id=271542147, state=205914184, timestamp=1295966561, uuid=78228505)
Row(ad_id=367688901, ad_id_no_hash=566224, 

In [298]:
!head /data/ecpj_hashed_test.txt/merged.txt > /data/check.txt

In [301]:
from __future__ import print_function    

def txt_to_vw(in_file, out_file, no_hash_file):
    with open(in_file, 'r') as in_f, \
         open(out_file, 'w') as out_f, \
         open(no_hash_file, 'w') as no_hash_f:
        for l in in_f:
            pairs = (l.split('('))[1].split(')')[0].split(', ')
            d = dict(p.split('=') for p in pairs)

            ad_id = d.pop('ad_id_no_hash')
            display_id = d.pop('display_id_no_hash')
            print('{}, {}'.format(display_id, ad_id), file=no_hash_f)

            s = '| ' + ' '.join('{}:1'.format(v) for k,v in d.items())
            print(s, file=out_f)

txt_to_vw('/data/check.txt',
          '/data/vw_in_test.txt',
          '/data/no_hash_rows_test.txt')

In [306]:
!head -n 5 /data/vw_in_test.txt

| 496221867:1 307775314:1 259021810:1 38382:1 456348757:1 308936875:1 301792321:1 506710073:1 2:1 411831051:1 264330675:1 462589401:1 252442268:1
| 496221867:1 165121396:1 259021810:1 38382:1 456348757:1 308936875:1 287141130:1 326816900:1 2:1 411831051:1 264330675:1 398544910:1 252442268:1
| 496221867:1 165174384:1 259021810:1 38382:1 456348757:1 308936875:1 188603283:1 364587572:1 2:1 411831051:1 264330675:1 484004916:1 252442268:1
| 496221867:1 287148001:1 259021810:1 38382:1 456348757:1 308936875:1 349360155:1 499834666:1 2:1 411831051:1 264330675:1 517377613:1 252442268:1
| 336716434:1 126561095:1 105049090:1 170597:1 289378491:1 308936875:1 271510132:1 506710073:1 2:1 380123499:1 287315324:1 403937613:1 264569148:1


## Train VW

https://github.com/JohnLangford/vowpal_wabbit/wiki/Command-line-arguments

## Now let's use coeffs from original solution by Selivanov

In [447]:
!LD_LIBRARY_PATH=/usr/local/lib vw -d /data/vw_in.txt -b 24 -c -k --ftrl --ftrl_alpha 0.05 --ftrl_beta 0.5 --l1 1 --l2 1 --passes 1 -f /data/model --holdout_off --loss_function logistic --random_seed 42 --progress 8000000 
!LD_LIBRARY_PATH=/usr/local/lib vw -d /data/vw_in_test.txt -i /data/model -t -k -p /data/test_predictions.txt --progress 1000000 --link=logistic

using l1 regularization = 1
using l2 regularization = 1
final_regressor = /data/model
Enabling FTRL based optimization
Algorithm used: Proximal-FTRL
ftrl_alpha = 0.05
ftrl_beta = 0.5
Num weight bits = 24
learning rate = 0.5
initial_t = 0
power_t = 0.5
creating cache_file = /data/vw_in.txt.cache
Reading datafile = /data/vw_in.txt
num sources = 1
average  since         example        example  current  current  current
loss     last          counter         weight    label  predict features
0.443453 0.443453      8000000      8000000.0  -1.0000  -1.3975       14
0.441102 0.438750     16000000     16000000.0   1.0000  -2.4376       14
0.439983 0.437745     24000000     24000000.0  -1.0000  -1.8304       14
0.439204 0.436867     32000000     32000000.0  -1.0000  -1.6973       14
0.438689 0.436627     40000000     40000000.0  -1.0000  -2.1245       14
0.438224 0.435901     48000000     48000000.0  -1.0000  -2.8373       14
0.437883 0.435838     56000000     56000000.0  -1.0000  -4.0696      

In [312]:
# !head -n 5 /data/test_predictions.txt

0.190996
0.094548
0.176351
0.185417
0.193987


In [333]:
# !head /data/test_predictions.txt > /data/check.txt

In [448]:
# let's merge two tables columnwise using python script
from itertools import izip
from __future__ import print_function

def merge_pred_no_hash(preds_file, no_hash_file, ans_file):
    with open(preds_file, 'r') as pf, \
         open(no_hash_file, 'r') as nh_f, \
         open(ans_file, 'w') as af:
        for x, y in izip(nh_f, pf):
            print('{}, {}'.format(x.strip(), y.strip()), file=af)

merge_pred_no_hash('/data/test_predictions.txt',
                   '/data/no_hash_rows_test.txt',
                   '/data/no_hash_preds.txt')

In [344]:
!head /data/no_hash_preds.txt

16874679, 7343, 0.190996
16874679, 137124, 0.094548
16874679, 147706, 0.176351
16874679, 304154, 0.185417
16875035, 3784, 0.193987
16875035, 33142, 0.232907
16875035, 89504, 0.348130
16875035, 111980, 0.297726
16875035, 149047, 0.283822
16875035, 171530, 0.337937


In [449]:
!head /data/no_hash_preds.txt

16874679, 7343, 0.167151
16874679, 137124, 0.076552
16874679, 147706, 0.169222
16874679, 304154, 0.176729
16875035, 3784, 0.226063
16875035, 33142, 0.252485
16875035, 89504, 0.419533
16875035, 111980, 0.306840
16875035, 149047, 0.332894
16875035, 171530, 0.347986


In [455]:
# !hadoop fs -rm /task1/no_hash_preds.txt

18/05/23 14:11:12 INFO fs.TrashPolicyDefault: Moved: 'hdfs://cluster1:8020/task1/no_hash_preds.txt' to trash at: hdfs://cluster1:8020/user/ubuntu/.Trash/Current/task1/no_hash_preds.txt


In [456]:
# !hadoop fs -expunge

18/05/23 14:11:33 INFO fs.TrashPolicyDefault: TrashPolicyDefault#deleteCheckpoint for trashRoot: hdfs://cluster1:8020/user/ubuntu/.Trash
18/05/23 14:11:33 INFO fs.TrashPolicyDefault: TrashPolicyDefault#deleteCheckpoint for trashRoot: hdfs://cluster1:8020/user/ubuntu/.Trash
18/05/23 14:11:33 INFO fs.TrashPolicyDefault: TrashPolicyDefault#createCheckpoint for trashRoot: hdfs://cluster1:8020/user/ubuntu/.Trash
18/05/23 14:11:33 INFO fs.TrashPolicyDefault: Created trash checkpoint: /user/ubuntu/.Trash/180523141133


In [457]:
# !hadoop fs -put /data/test_predictions.txt /task1/test_predictions.txt
!hadoop fs -put /data/no_hash_preds.txt /task1/no_hash_preds.txt

In [458]:
predictions = ss.read.csv('/task1/no_hash_preds.txt')

In [459]:
predictions.show(3)

+--------+-------+---------+
|     _c0|    _c1|      _c2|
+--------+-------+---------+
|16874679|   7343| 0.167151|
|16874679| 137124| 0.076552|
|16874679| 147706| 0.169222|
+--------+-------+---------+
only showing top 3 rows



In [460]:
# c0 - display_id
# c1 - ad_id
# c2 - probability of being clicked
from pyspark.sql import functions as F

pred_sorted = predictions.sort(['_c0', '_c2'], ascending=[True, False])

In [461]:
pred_sorted.show(10)

+--------+-------+---------+
|     _c0|    _c1|      _c2|
+--------+-------+---------+
|16874594| 172888| 0.313364|
|16874594| 170392| 0.306634|
|16874594| 162754| 0.208949|
|16874594| 150083| 0.064795|
|16874594|  66758| 0.053696|
|16874594| 180797| 0.027205|
|16874595|   8846| 0.348094|
|16874595| 143982| 0.189634|
|16874595|  30609| 0.075779|
|16874596| 289915| 0.390936|
+--------+-------+---------+
only showing top 10 rows



In [367]:
# pred_sorted.show(50)

+--------+-------+---------+
|     _c0|    _c1|      _c2|
+--------+-------+---------+
|16874594| 170392| 0.325347|
|16874594| 172888| 0.316497|
|16874594| 162754| 0.223360|
|16874594|  66758| 0.077509|
|16874594| 150083| 0.064767|
|16874594| 180797| 0.035961|
|16874595|   8846| 0.290929|
|16874595| 143982| 0.155837|
|16874595|  30609| 0.066543|
|16874596| 289915| 0.369923|
|16874596| 289122| 0.368243|
|16874596|  11430| 0.363113|
|16874596| 132820| 0.341290|
|16874596|  57197| 0.150066|
|16874596| 153260| 0.116894|
|16874596| 173005| 0.072445|
|16874596| 288385| 0.060890|
|16874597| 285834| 0.229594|
|16874597| 305790| 0.166391|
|16874597| 308836| 0.161904|
|16874597| 143981| 0.145204|
|16874597| 182039| 0.116135|
|16874597| 155945| 0.108001|
|16874597| 180965| 0.098141|
|16874597| 137858| 0.067286|
|16874598| 145937| 0.165656|
|16874598| 335632| 0.074875|
|16874598|  67292| 0.057873|
|16874598| 250082| 0.037220|
|16874599| 173130| 0.323628|
|16874599|  91681| 0.308299|
|16874599| 210

In [462]:
pred_grouped = pred_sorted.groupby('_c0').agg(F.collect_list("_c1"))

In [463]:
pred_gr_sorted = pred_grouped.sort(['_c0'], ascending=[True])

In [464]:
pred_gr_sorted.show(3)

+--------+--------------------+
|     _c0|   collect_list(_c1)|
+--------+--------------------+
|16874594|[ 172888,  170392...|
|16874595|[ 8846,  143982, ...|
|16874596|[ 289915,  289122...|
+--------+--------------------+
only showing top 3 rows



In [387]:
pred_gr_sorted.show(3)

+--------+--------------------+
|     _c0|   collect_list(_c1)|
+--------+--------------------+
|16874594|[ 170392,  172888...|
|16874595|[ 8846,  143982, ...|
|16874596|[ 289915,  289122...|
+--------+--------------------+
only showing top 3 rows



In [465]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

In [466]:
from pyspark.sql import functions as func
from pyspark.sql.types import StringType

def stringify(arr):
    arr = map(str, arr)
    return ' '.join(map(str.strip, arr))

udf_stringify = func.udf(stringify, returnType=StringType())

pred_final = pred_gr_sorted \
    .withColumn('ad_id', udf_stringify('collect_list(_c1)')) \
    .withColumnRenamed('_c0', 'display_id') \
    .drop('collect_list(_c1)')

In [416]:
pred_final.show(3)

+----------+--------------------+
|display_id|               ad_id|
+----------+--------------------+
|  16874594|170392 172888 162...|
|  16874595|   8846 143982 30609|
|  16874596|289915 289122 114...|
+----------+--------------------+
only showing top 3 rows



In [467]:
pred_final.show(3)

+----------+--------------------+
|display_id|               ad_id|
+----------+--------------------+
|  16874594|172888 170392 162...|
|  16874595|   8846 143982 30609|
|  16874596|289915 289122 114...|
+----------+--------------------+
only showing top 3 rows



In [468]:
pred_final.write.mode("overwrite").option("header","true").csv('/task1/submission_1.csv')

In [469]:
import os
import shutil

def copy_text_to_local(hdfs_path, local_path):
    if os.path.exists(local_path):
        shutil.rmtree(local_path)
    os.mkdir(local_path)
    os.system('hadoop fs -copyToLocal "{0}/*" {1}'.format(hdfs_path, local_path))
    os.system('head -n 1 {0}/part-00001* > {1}'.format(local_path, local_path + "/merged.csv"))
    os.system('find {0} -name "part*.csv" | xargs -n 1 tail -n +2 >> {1}' \
              .format(local_path, local_path + "/merged.csv"))
    print("done")

In [470]:
%%time
copy_text_to_local("/task1/submission_1.csv",\
                   "/data/submission_2.csv")

done
CPU times: user 0 ns, sys: 12 ms, total: 12 ms
Wall time: 4.26 s


In [471]:
!sed -n '56394, 56396p; 56397q' /data/submission_2.csv/merged.csv

22235616,30682 155249 183640 542754 153413 153658
22235617,134692 279921 118470 123730
22235618,109018 117524 2820 147014 37981 161989


In [472]:
!head -n 4 /data/submission_2.csv/merged.csv

display_id,ad_id
18929906,463707 117305 158632
18929907,184220 166286 57791 285722 163843 156669 125865 129600 179761
18929908,396884 99515 139376 155251 159539 303433


In [444]:
!head -n 4 /data/submission_1.csv/merged.csv

display_id,ad_id
17960876,155607 319252 304273 141437
17960877,257366 198149 173411 123742
17960878,150609 148954 81742 246879 125218 280782


In [445]:
!head -n 4 /data/sample_submission.csv

display_id,ad_id
16874594,66758 150083 162754 170392 172888 180797
16874595,8846 30609 143982
16874596,11430 57197 132820 153260 173005 288385 289122 289915


In [473]:
# ss.sql('select * from sample_submission').show(5)

In [474]:
# !kaggle competitions submit -c outbrain-click-prediction -f /data/sample_submission.csv -m "Sample submission"
!kaggle competitions submit -c outbrain-click-prediction -f /data/submission_2.csv/merged.csv -m "Submission 2"

Successfully submitted to Outbrain Click Prediction

In [262]:
import numpy as np

def read_vw_predictions(p):
    y_pred = []
    with open(p, "r") as f:
        for line in f:
            y_pred.append(float(line.split()[0]))
    return np.array(y_pred)

y_pred = read_vw_predictions("/data/test_predictions.txt")

In [263]:
def get_vw_y_true(p):
    y_true = []
    with open(p, "r") as f:
        for line in f:
            y_true.append(float(line.partition(" ")[0]))
    return np.array(y_true)

y_true = get_vw_y_true("/data/test.txt/merged.txt")

In [264]:
from sklearn.metrics import log_loss, roc_auc_score

In [265]:
log_loss(y_true, y_pred)

ValueError: Found array with 0 sample(s) (shape=(0,)) while a minimum of 1 is required.

In [266]:
y_pred.shape

(0,)

In [None]:
roc_auc_score(y_true, y_pred)

# My code ends here

--------------------------------------------

## Copy data from HDFS to cluster1 machine

We will run vowpal wabbit **locally**, need to copy data from HDFS

In [34]:
import os
import shutil

def copy_text_to_local(hdfs_path, local_path):
    if os.path.exists(local_path):
        shutil.rmtree(local_path)
    os.mkdir(local_path)
    os.system('hadoop fs -copyToLocal "{0}/*" {1}'.format(hdfs_path, local_path))
    os.system('cat {0}/part-* > {1}'.format(local_path, local_path + "/merged.txt"))
    print "done"

In [35]:
%%time
copy_text_to_local("/task1/train.txt", "/data/train.txt")

done
CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 2.55 s


In [143]:
! ls -lh /data/train.txt/merged.txt

-rw-rw-r-- 1 ubuntu ubuntu 0 May 18 13:56 /data/train.txt/merged.txt


In [145]:
%%time
copy_text_to_local("/task1/test.txt", "/data/test.txt")

In [38]:
! ls -lh /data/test.txt/merged.txt

-rw-rw-r-- 1 ubuntu ubuntu 0 May 18 13:56 /data/test.txt/merged.txt


## Install VW

In [39]:
! sudo apt-get install libboost-program-options-dev zlib1g-dev libboost-python-dev libtool m4 automake -y

Reading package lists... Done
Building dependency tree       
Reading state information... Done
The following packages were automatically installed and are no longer required:
  python-chardet-whl python-colorama python-colorama-whl python-distlib
  python-distlib-whl python-html5lib python-html5lib-whl python-pip-whl
  python-requests-whl python-setuptools-whl python-six-whl python-urllib3-whl
  python-wheel python3-pkg-resources
Use 'apt-get autoremove' to remove them.
The following extra packages will be installed:
  autoconf autotools-dev libboost-program-options1.54-dev
  libboost-program-options1.54.0 libboost-python1.54-dev libboost-python1.54.0
  libboost1.54-dev libltdl-dev
Suggested packages:
  autoconf2.13 autoconf-archive gnu-standards autoconf-doc gettext
  libboost1.54-doc python-pyste libboost-atomic1.54-dev
  libboost-chrono1.54-dev libboost-context1.54-dev libboost-coroutine.54-dev
  libboost-date-time1.54-dev libboost-exception1.54-dev
  libboost-filesystem1.54-dev li

In [40]:
! wget https://github.com/JohnLangford/vowpal_wabbit/archive/8.2.0.tar.gz

--2018-05-18 13:57:16--  https://github.com/JohnLangford/vowpal_wabbit/archive/8.2.0.tar.gz
Resolving github.com (github.com)... 192.30.253.113, 192.30.253.112
Connecting to github.com (github.com)|192.30.253.113|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://codeload.github.com/JohnLangford/vowpal_wabbit/tar.gz/8.2.0 [following]
--2018-05-18 13:57:16--  https://codeload.github.com/JohnLangford/vowpal_wabbit/tar.gz/8.2.0
Resolving codeload.github.com (codeload.github.com)... 192.30.253.121, 192.30.253.120
Connecting to codeload.github.com (codeload.github.com)|192.30.253.121|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: unspecified [application/x-gzip]
Saving to: '8.2.0.tar.gz'

    [     <=>                               ] 13,391,191  12.2MB/s   in 1.0s   

2018-05-18 13:57:18 (12.2 MB/s) - '8.2.0.tar.gz' saved [13391191]



In [41]:
! tar -xzf 8.2.0.tar.gz

cd vowpal_wabbit-8.2.0

./autogen.sh

make -j4

make test -j4

sudo make install

## Train VW

https://github.com/JohnLangford/vowpal_wabbit/wiki/Command-line-arguments

In [42]:
! head -n2 /data/train.txt/merged.txt

In [43]:
%%time
! LD_LIBRARY_PATH=/usr/local/lib vw -d /data/train.txt/merged.txt -b 24 -c -k --ftrl --passes 1 -f /data/model --holdout_off --loss_function logistic --random_seed 42 --progress 8000000 

final_regressor = /data/model
Enabling FTRL based optimization
Algorithm used: Proximal-FTRL
ftrl_alpha = 0.005
ftrl_beta = 0.1
Num weight bits = 24
learning rate = 0.5
initial_t = 0
power_t = 0.5
creating cache_file = /data/train.txt/merged.txt.cache
Reading datafile = /data/train.txt/merged.txt
num sources = 1
average  since         example        example  current  current  current
loss     last          counter         weight    label  predict features

finished run
number of examples = 0
weighted example sum = 0.000000
weighted label sum = 0.000000
average loss = n.a.
total feature number = 0
CPU times: user 0 ns, sys: 12 ms, total: 12 ms
Wall time: 169 ms


## Check VW test performance

In [44]:
%%time
! LD_LIBRARY_PATH=/usr/local/lib vw -d /data/test.txt/merged.txt -i /data/model -t -k -p /data/test_predictions.txt --progress 1000000 --link=logistic

only testing
predictions = /data/test_predictions.txt
Num weight bits = 24
learning rate = 0.5
initial_t = 0
power_t = 0.5
using no cache
Reading datafile = /data/test.txt/merged.txt
num sources = 1
average  since         example        example  current  current  current
loss     last          counter         weight    label  predict features

finished run
number of examples per pass = 0
passes used = 1
weighted example sum = 0.000000
weighted label sum = 0.000000
average loss = n.a.
total feature number = 0
CPU times: user 12 ms, sys: 0 ns, total: 12 ms
Wall time: 115 ms


In [45]:
import numpy as np

def read_vw_predictions(p):
    y_pred = []
    with open(p, "r") as f:
        for line in f:
            y_pred.append(float(line.split()[0]))
    return np.array(y_pred)

y_pred = read_vw_predictions("/data/test_predictions.txt")

In [46]:
def get_vw_y_true(p):
    y_true = []
    with open(p, "r") as f:
        for line in f:
            y_true.append(float(line.partition(" ")[0]))
    return np.array(y_true)

y_true = get_vw_y_true("/data/test.txt/merged.txt")

In [47]:
from sklearn.metrics import log_loss, roc_auc_score

In [50]:
log_loss(y_true, y_pred)

ValueError: Found array with 0 sample(s) (shape=(0,)) while a minimum of 1 is required.

In [51]:
y_pred

array([], dtype=float64)

In [None]:
roc_auc_score(y_true, y_pred)

## Make submission to Kaggle

In [131]:
!sudo pip install kaggle
!kaggle -h

[33mThe directory '/home/ubuntu/.cache/pip/http' or its parent directory is not owned by the current user and the cache has been disabled. Please check the permissions and owner of that directory. If executing pip with sudo, you may want sudo's -H flag.[0m
[33mThe directory '/home/ubuntu/.cache/pip' or its parent directory is not owned by the current user and caching wheels has been disabled. check the permissions and owner of that directory. If executing pip with sudo, you may want sudo's -H flag.[0m
[33mYou are using pip version 9.0.1, however version 10.0.1 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.[0m
Unauthorized: you must download an API key from https://www.kaggle.com/<username>/account
Then put kaggle.json in the folder /home/ubuntu/.kaggle


In [None]:
# usage: kaggle competitions submit [-h] [-c COMPETITION] -f FILE -m MESSAGE
#                                   [-q]

# required arguments:
#   -f FILE, --file FILE  File for upload (full path)
#   -m MESSAGE, --message MESSAGE
#                         Message describing this submission

# optional arguments:
#   -h, --help            show this help message and exit
#   -c COMPETITION, --competition COMPETITION
#                         Competition URL suffix (use "kaggle competitions list" to show options)
#                         If empty, the default competition will be used (use "kaggle config set competition")"
#   -q, --quiet           Suppress printing information about download progress
!kaggle competitions submit -c outbrain-click-prediction -f ... -m "First"

# 2. Better model

This time let's make a personalized recommender using:
- page views information
- document properties

Ideas for features:
- uuid topic, entity, publisher, ... preferences
- document similarities
- ...

## More SQL examples

In [10]:
# we start with a DataFrame
events_df = ss.sql("select * from events")
events_df.show(3)

+----------+--------------+-----------+---------+--------+------------+
|display_id|          uuid|document_id|timestamp|platform|geo_location|
+----------+--------------+-----------+---------+--------+------------+
|         1|cb8c55702adb93|     379743|       61|       3|   US>SC>519|
|         2|79a85fa78311b9|    1794259|       81|       2|   US>CA>807|
|         3|822932ce3d8757|    1179111|      182|       2|   US>MI>505|
+----------+--------------+-----------+---------+--------+------------+
only showing top 3 rows



In [23]:
# we can make RDD of Rows with *.rdd
from pyspark.sql import Row
events_df.rdd.take(3)

[Row(display_id=u'1', uuid=u'cb8c55702adb93', document_id=u'379743', timestamp=u'61', platform=u'3', geo_location=u'US>SC>519'),
 Row(display_id=u'2', uuid=u'79a85fa78311b9', document_id=u'1794259', timestamp=u'81', platform=u'2', geo_location=u'US>CA>807'),
 Row(display_id=u'3', uuid=u'822932ce3d8757', document_id=u'1179111', timestamp=u'182', platform=u'2', geo_location=u'US>MI>505')]

In [30]:
# When it's RDD, we can use Python to create new RDD of Rows
(
    events_df.rdd
    .map(lambda x: Row(foo=x.geo_location.split(">"), bar=x.uuid))
).take(3)

[Row(bar=u'cb8c55702adb93', foo=[u'US', u'SC', u'519']),
 Row(bar=u'79a85fa78311b9', foo=[u'US', u'CA', u'807']),
 Row(bar=u'822932ce3d8757', foo=[u'US', u'MI', u'505'])]

In [31]:
# we can convert it back to DataFrame if it's still a table that can be converted to Java types
ss.createDataFrame(
    events_df.rdd
    .map(lambda x: Row(foo=x.geo_location.split(">"), bar=x.uuid))
).show(3)

+--------------+-------------+
|           bar|          foo|
+--------------+-------------+
|cb8c55702adb93|[US, SC, 519]|
|79a85fa78311b9|[US, CA, 807]|
|822932ce3d8757|[US, MI, 505]|
+--------------+-------------+
only showing top 3 rows



In [36]:
%%time
# we can save it to HDFS as parquet (if it's a DataFrame)
ss.createDataFrame(
    events_df.rdd
    .map(lambda x: Row(foo=x.geo_location.split(">") if x.geo_location else [], bar=x.uuid))
).write.mode("overwrite").parquet("/task1/example1")

CPU times: user 36 ms, sys: 8 ms, total: 44 ms
Wall time: 3min 4s


In [39]:
ss.read.parquet("/task1/example1").printSchema()
ss.read.parquet("/task1/example1").show(3)

root
 |-- bar: string (nullable = true)
 |-- foo: array (nullable = true)
 |    |-- element: string (containsNull = true)

+--------------+-------------+
|           bar|          foo|
+--------------+-------------+
|cb8c55702adb93|[US, SC, 519]|
|79a85fa78311b9|[US, CA, 807]|
|822932ce3d8757|[US, MI, 505]|
+--------------+-------------+
only showing top 3 rows



In [41]:
%%time
# or we can skip DataFrame API if we use Python functions (there will be no speed increase)
(
    events_df.rdd
    .map(lambda x: Row(foo=x.geo_location.split(">") if x.geo_location else [], bar=x.uuid))
).saveAsPickleFile("/task1/example2")

CPU times: user 36 ms, sys: 0 ns, total: 36 ms
Wall time: 2min 41s


In [43]:
sc.pickleFile("/task1/example2").take(3)

[Row(bar=u'cb8c55702adb93', foo=[u'US', u'SC', u'519']),
 Row(bar=u'79a85fa78311b9', foo=[u'US', u'CA', u'807']),
 Row(bar=u'822932ce3d8757', foo=[u'US', u'MI', u'505'])]

In [49]:
# sometimes we cannot make a DataFrame
import numpy as np
rdd = (
    events_df.rdd
    .map(lambda x: Row(x=np.array(x.geo_location.split(">") if x.geo_location else [])))
)
rdd.take(2)

[Row(x=array([u'US', u'SC', u'519'], 
      dtype='<U3')),
 Row(x=array([u'US', u'CA', u'807'], 
      dtype='<U3'))]

In [None]:
# throws TypeError: not supported type: <type 'numpy.ndarray'>
ss.createDataFrame(rdd)

In [52]:
%%time
# but we can save as RDD in pickle file just fine
rdd.saveAsPickleFile("/task1/example3")

CPU times: user 28 ms, sys: 8 ms, total: 36 ms
Wall time: 3min 31s


Takeaways:
- use DataFrames when you can (simple join's, select's, groupby's), it will be faster
- use RDD and Python when you can't use DataFrame API
- convert it back to DataFrame if needed
- or save to pickles (can save almost any Python object as pickle)

# Built-in SQL functions

You can find more at https://spark.apache.org/docs/2.1.0/api/java/org/apache/spark/sql/functions.html

In [53]:
# sql version
df = ss.sql("""
select
    document_id,
    collect_list(struct(category_id, confidence_level)) as categories
from
    documents_categories
group by document_id
""")
df.show(3)

+-----------+--------------------+
|document_id|          categories|
+-----------+--------------------+
|     100010|[[1513,0.79842798...|
|    1000240|[[1505,0.92], [15...|
|    1000280|[[1909,0.92], [19...|
+-----------+--------------------+
only showing top 3 rows



In [55]:
%%time
df.write.mode("overwrite").parquet("/task1/example4")

CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 12.5 s


In [57]:
# or we can use RDD and Python if we are not aware of those SQL functions
rdd = (
    ss.sql("select * from documents_categories")
    .rdd
    .map(lambda x: (x.document_id, (x.category_id, x.confidence_level)))
    .groupByKey()
    .map(lambda (k, vs): (k, list(vs)))
)
rdd.take(3)

[(u'2797448', [(u'1503', u'0.275620316'), (u'1210', u'0.020971111')]),
 (u'736183', [(u'1808', u'0.92'), (u'1513', u'0.07')]),
 (u'616955', [(u'1403', u'0.311842556'), (u'1210', u'0.023727151')])]

In [58]:
%%time
# it's much slower, but we can do almost everything in Python
rdd.saveAsPickleFile("/task1/example5")

CPU times: user 28 ms, sys: 12 ms, total: 40 ms
Wall time: 3min 31s


In [63]:
# but sometimes with Python we can do more
rdd = (
    ss.sql("select * from documents_categories")
    .rdd
    .map(lambda x: (x.document_id, (x.category_id, x.confidence_level)))
    .groupByKey()
    .map(lambda (k, vs): Row(document_id=k, categories={a: float(b) for a, b in vs}))
)

In [60]:
%%time
# much faster thanks to conversion back to DataFrame (works for simple python collections in columns)
ss.createDataFrame(rdd).write.parquet("/task1/example6")

CPU times: user 32 ms, sys: 4 ms, total: 36 ms
Wall time: 25.8 s


In [64]:
ss.read.parquet("/task1/example6").show(3)

+--------------------+-----------+
|          categories|document_id|
+--------------------+-----------+
|Map(1510 -> 0.887...|    1059269|
|Map(1408 -> 0.92,...|    1050604|
|Map(1903 -> 0.92,...|    1472688|
+--------------------+-----------+
only showing top 3 rows



In [65]:
# now we can join this table with events for instance
ss.read.parquet("/task1/example6").registerTempTable("doc_categories_ready")

In [68]:
ss.sql("""
select 
    e.*, 
    dc.categories
from 
    events as e
    join doc_categories_ready as dc on dc.document_id = e.document_id
""").show(3)

+----------+--------------+-----------+---------+--------+------------+--------------------+
|display_id|          uuid|document_id|timestamp|platform|geo_location|          categories|
+----------+--------------+-----------+---------+--------+------------+--------------------+
|  18242074|e703634e3dfa39|    1000240|536236046|       2|          NG|Map(1503 -> 0.07,...|
|  18694427|5b023d28c0a9f3|    1000240|687121504|       2|   US>MA>521|Map(1503 -> 0.07,...|
|   3436070|55e1db49ff4eef|    1000240|223783698|       1|   US>CA>803|Map(1503 -> 0.07,...|
+----------+--------------+-----------+---------+--------+------------+--------------------+
only showing top 3 rows

