# Check for common deployment errors first

This should show three nodes with 500 - 1000 GB each (951 in this case). Ambari -> 'Start All' should be done first.

In [2]:
! hdfs dfsadmin -report | grep "DFS Remaining"

DFS Remaining: 3037451872768 (2.76 TB)
DFS Remaining: 1012518645248 (942.98 GB)
DFS Remaining%: 94.00%
DFS Remaining: 1011719310848 (942.24 GB)
DFS Remaining%: 93.92%
DFS Remaining: 1013213916672 (943.63 GB)
DFS Remaining%: 94.06%


# Create Spark Context

In [3]:
import spark_setup
spark_setup.setup_pyspark_env()
import spark_utils

In [4]:
%%time
sc = spark_utils.get_spark_context()

Ambari - http://10.0.1.21:8080
All Applications - http://10.0.1.23:8088/cluster
CPU times: user 20 ms, sys: 8 ms, total: 28 ms
Wall time: 17.2 s


In [68]:
import pandas as pd
from pyspark.sql import SparkSession

ss = SparkSession(sc)

In [6]:
from hdfs import InsecureClient
hdfs_client = InsecureClient("http://cluster1:50070", user='hdfs')

# Download task data

In [25]:
# place yours here
student_id = 74

In [26]:
import json
student_region = json.loads(open("../azure/regions.json").read())["student{}".format(student_id)]
print "your region is:", student_region

your region is: westeurope


In [27]:
task_data_link = "https://lsml1{}.blob.core.windows.net/data/task1.zip".format(student_region)
print task_data_link

https://lsml1westeurope.blob.core.windows.net/data/task1.zip


In [28]:
%%time
# you can see progress in tmux (tab with running jupyter notebook)
import os
os.system("wget {} -O /data/task1.zip".format(task_data_link))

CPU times: user 92 ms, sys: 12 ms, total: 104 ms
Wall time: 11min 4s


In [29]:
%%time
# unzip task1.zip (many zip files inside)
os.system("unzip /data/task1.zip -d /data/")

CPU times: user 112 ms, sys: 64 ms, total: 176 ms
Wall time: 18min 20s


0

In [30]:
# verify that you're all set
! du -sh /data/*.zip

136M	/data/clicks_test.csv.zip
390M	/data/clicks_train.csv.zip
33M	/data/documents_categories.csv.zip
126M	/data/documents_entities.csv.zip
16M	/data/documents_meta.csv.zip
121M	/data/documents_topics.csv.zip
478M	/data/events.csv.zip
30G	/data/page_views.csv.zip
149M	/data/page_views_sample.csv.zip
2.6M	/data/promoted_content.csv.zip
100M	/data/sample_submission.csv.zip
32G	/data/task1.zip


# Load data to HDFS

https://www.kaggle.com/c/outbrain-click-prediction/data

In [31]:
import time

def timeit(method):
    def timed(*args, **kw):
        ts = time.time()
        result = method(*args, **kw)
        te = time.time()
        print '%r (%r, %r) %2.2f sec' % \
              (method.__name__, args, kw, te-ts)
        return result
    return timed

In [32]:
hdfs_client.delete("/task1", recursive=True)

True

In [33]:
%%time
import subprocess

@timeit
def unzip_to_hdfs(fn):
    fn_out = fn.replace(".zip", "")
    print subprocess.check_output("unzip -p /data/{0} | hadoop fs -put - /task1/{1}".format(fn, fn_out), shell=True)
    
fns = [
    "clicks_test.csv.zip",
    "clicks_train.csv.zip",
    "documents_categories.csv.zip",
    "documents_entities.csv.zip",
    "documents_meta.csv.zip",
    "documents_topics.csv.zip",
    "events.csv.zip",
    "page_views.csv.zip",
    "page_views_sample.csv.zip",
    "promoted_content.csv.zip",
    "sample_submission.csv.zip"
]

for fn in fns:
    unzip_to_hdfs(fn)


'unzip_to_hdfs' (('clicks_test.csv.zip',), {}) 9.13 sec

'unzip_to_hdfs' (('clicks_train.csv.zip',), {}) 19.54 sec

'unzip_to_hdfs' (('documents_categories.csv.zip',), {}) 3.58 sec

'unzip_to_hdfs' (('documents_entities.csv.zip',), {}) 14.93 sec

'unzip_to_hdfs' (('documents_meta.csv.zip',), {}) 3.01 sec

'unzip_to_hdfs' (('documents_topics.csv.zip',), {}) 7.02 sec

'unzip_to_hdfs' (('events.csv.zip',), {}) 21.23 sec

'unzip_to_hdfs' (('page_views.csv.zip',), {}) 1604.36 sec

'unzip_to_hdfs' (('page_views_sample.csv.zip',), {}) 8.96 sec

'unzip_to_hdfs' (('promoted_content.csv.zip',), {}) 2.36 sec

'unzip_to_hdfs' (('sample_submission.csv.zip',), {}) 6.11 sec
CPU times: user 204 ms, sys: 112 ms, total: 316 ms
Wall time: 28min 20s


In [34]:
! hadoop fs -du -s -h /task1/*.csv

483.5 M  /task1/clicks_test.csv
1.4 G  /task1/clicks_train.csv
112.5 M  /task1/documents_categories.csv
309.1 M  /task1/documents_entities.csv
85.2 M  /task1/documents_meta.csv
323.7 M  /task1/documents_topics.csv
1.1 G  /task1/events.csv
88.4 G  /task1/page_views.csv
433.3 M  /task1/page_views_sample.csv
13.2 M  /task1/promoted_content.csv
260.5 M  /task1/sample_submission.csv


In [35]:
# files are written on cluster1 node only, need to balance HDFS on cluster

In [36]:
! hdfs dfsadmin -setBalancerBandwidth 1000000000

Balancer bandwidth is set to 1000000000


In [37]:
%%time
! hdfs balancer -threshold 5 > balancer.log 2>&1

CPU times: user 2.09 s, sys: 732 ms, total: 2.82 s
Wall time: 1min 55s


# Read example

In [38]:
pvdf = ss.read.csv("/task1/page_views.csv", header=True)

In [39]:
pvdf.dtypes

[('uuid', 'string'),
 ('document_id', 'string'),
 ('timestamp', 'string'),
 ('platform', 'string'),
 ('geo_location', 'string'),
 ('traffic_source', 'string')]

In [40]:
pvdf.show(5)

+--------------+-----------+---------+--------+------------+--------------+
|          uuid|document_id|timestamp|platform|geo_location|traffic_source|
+--------------+-----------+---------+--------+------------+--------------+
|1fd5f051fba643|        120| 31905835|       1|          RS|             2|
|8557aa9004be3b|        120| 32053104|       1|       VN>44|             2|
|c351b277a358f0|        120| 54013023|       1|       KR>12|             1|
|8205775c5387f9|        120| 44196592|       1|       IN>16|             2|
|9cb0ccd8458371|        120| 65817371|       1|   US>CA>807|             2|
+--------------+-----------+---------+--------+------------+--------------+
only showing top 5 rows



In [41]:
%%time
pvdf.count()

CPU times: user 56 ms, sys: 32 ms, total: 88 ms
Wall time: 8min 3s


2034275448

# Parquet is faster than CSV

http://events.linuxfoundation.org/sites/events/files/slides/ApacheCon%20BigData%20Europe%202016%20-%20Parquet%20in%20Practice%20%26%20Detail_0.pdf

In [42]:
%%time
pvdf.write.parquet("/task1/page_views.parquet")

CPU times: user 136 ms, sys: 28 ms, total: 164 ms
Wall time: 14min 36s


In [43]:
! hadoop fs -du -s -h /task1/page_views.parquet

47.3 G  /task1/page_views.parquet


In [44]:
pvdf2 = ss.read.parquet("/task1/page_views.parquet")

In [45]:
%%time
from IPython.display import display
boo = pvdf2.groupBy("geo_location").count().collect()
display(boo[:5])

[Row(geo_location=u'ES>07', count=139257),
 Row(geo_location=u'US>MT>756', count=676540),
 Row(geo_location=u'LT', count=145441),
 Row(geo_location=u'IL>01', count=21174),
 Row(geo_location=u'DZ', count=141209)]

CPU times: user 20 ms, sys: 4 ms, total: 24 ms
Wall time: 38.4 s


In [46]:
%%time
boo = pvdf.groupBy("geo_location").count().collect()
display(boo[:5])

[Row(geo_location=u'US>MS>673', count=849299),
 Row(geo_location=u'DZ', count=141209),
 Row(geo_location=u'US>MT>756', count=676540),
 Row(geo_location=u'US>NY', count=420207),
 Row(geo_location=u'CO>02', count=274301)]

CPU times: user 84 ms, sys: 16 ms, total: 100 ms
Wall time: 8min 19s


# Convert all to Parquet

In [48]:
%%time
def convert_all_to_parquet():
    task_dir = "/task1/"
    all_files = hdfs_client.list(task_dir)
    for fn in all_files:
        if fn.endswith(".csv"):
            fn_after = fn.replace(".csv", ".parquet")
            path_before = task_dir + fn
            path_after = task_dir + fn_after
            if fn_after not in all_files:
                # generate parquet
                df = ss.read.csv(path_before, header=True)
                df.write.parquet(path_after)
            # remove csv, we have parquet now
            hdfs_client.delete(path_before)
            print fn_after, "done"

convert_all_to_parquet()

clicks_test.parquet done
clicks_train.parquet done
documents_categories.parquet done
documents_entities.parquet done
documents_meta.parquet done
documents_topics.parquet done
events.parquet done
page_views.parquet done
page_views_sample.parquet done
promoted_content.parquet done
sample_submission.parquet done
CPU times: user 84 ms, sys: 40 ms, total: 124 ms
Wall time: 6min 40s


In [49]:
! hadoop fs -du -s -h /task1/*

133.2 M  /task1/clicks_test.parquet
367.5 M  /task1/clicks_train.parquet
36.5 M  /task1/documents_categories.parquet
184.0 M  /task1/documents_entities.parquet
21.2 M  /task1/documents_meta.parquet
183.3 M  /task1/documents_topics.parquet
669.3 M  /task1/events.parquet
47.3 G  /task1/page_views.parquet
236.9 M  /task1/page_views_sample.parquet
5.0 M  /task1/promoted_content.parquet
184.2 M  /task1/sample_submission.parquet


# Preview all files

In [50]:
%%time
def preview_all_files():
    task_dir = "/task1/"
    all_files = hdfs_client.list(task_dir)
    for fn in all_files:
        df = ss.read.parquet(task_dir + fn)
        print "#" * 15 + " {0} ".format(task_dir + fn) + "#" * 15
        df.show(1)
        
preview_all_files()

############### /task1/clicks_test.parquet ###############
+----------+------+
|display_id| ad_id|
+----------+------+
|  17805143|288388|
+----------+------+
only showing top 1 row

############### /task1/clicks_train.parquet ###############
+----------+-----+-------+
|display_id|ad_id|clicked|
+----------+-----+-------+
|         1|42337|      0|
+----------+-----+-------+
only showing top 1 row

############### /task1/documents_categories.parquet ###############
+-----------+-----------+----------------+
|document_id|category_id|confidence_level|
+-----------+-----------+----------------+
|    1544588|       1513|     0.263546236|
+-----------+-----------+----------------+
only showing top 1 row

############### /task1/documents_entities.parquet ###############
+-----------+--------------------+-----------------+
|document_id|           entity_id| confidence_level|
+-----------+--------------------+-----------------+
|    1539011|e01ed0c4a3e8f8f35...|0.327269624728567|
+-----------+

# Register all tables to be usable in SQL queries

In [70]:
%%time
def register_all_tables():
    task_dir = "/task1/"
    all_files = hdfs_client.list(task_dir)
    for fn in all_files:
        if fn.endswith(".parquet"):
            table_name = fn.replace(".parquet", "")
            df = ss.read.parquet(task_dir + fn)
            df.registerTempTable(table_name)
            print table_name, "done"
        
register_all_tables()

clicks_test done
clicks_train done
documents_categories done
documents_entities done
documents_meta done
documents_topics done
events done
page_views done
page_views_sample done
promoted_content done
sample_submission done
CPU times: user 28 ms, sys: 4 ms, total: 32 ms
Wall time: 1.9 s


# SQL query example

In [66]:
%%time
ss.sql("""
select count(distinct(uuid)) as users_count
from events
""").collect()

CPU times: user 0 ns, sys: 4 ms, total: 4 ms
Wall time: 17.5 s


[Row(users_count=19794967)]

# 1. Baseline

Simple model using the following features:
- **clicked**
- geo_location features (country, state, dma)
- day_of_week (from timestamp, use *date.isoweekday()*)
- ad_id
- ad_document_id
- campaign_id
- advertiser_id
- display_document_id
- platform

In [235]:
ss.sql("""
show tables
""").collect()

[Row(database=u'', tableName=u'clicks_test', isTemporary=True),
 Row(database=u'', tableName=u'clicks_train', isTemporary=True),
 Row(database=u'', tableName=u'documents_categories', isTemporary=True),
 Row(database=u'', tableName=u'documents_entities', isTemporary=True),
 Row(database=u'', tableName=u'documents_meta', isTemporary=True),
 Row(database=u'', tableName=u'documents_topics', isTemporary=True),
 Row(database=u'', tableName=u'events', isTemporary=True),
 Row(database=u'', tableName=u'geo', isTemporary=True),
 Row(database=u'', tableName=u'merged', isTemporary=True),
 Row(database=u'', tableName=u'page_views', isTemporary=True),
 Row(database=u'', tableName=u'page_views_sample', isTemporary=True),
 Row(database=u'', tableName=u'promoted_content', isTemporary=True),
 Row(database=u'', tableName=u'sample_submission', isTemporary=True)]

In [236]:
field_to_table = {}

task_dir = "/task1/"
all_files = hdfs_client.list(task_dir)
for table_name in all_files:
    if ".parquet" in table_name:
        table_name = table_name.replace(".parquet", "")
        query = "DESCRIBE %s" % table_name
        #print(query)
        res = ss.sql(query)
        for i in res.collect():
            if i['col_name'] in field_to_table:
                field_to_table[i['col_name']] += [table_name]
            else:
                field_to_table[i['col_name']] = [table_name]
        fields = [i['col_name'] for i in res.collect()]
        print('%s: %s' % (table_name, str(fields)))
        #print(merged.collect())

clicks_test: [u'display_id', u'ad_id']
clicks_train: [u'display_id', u'ad_id', u'clicked']
documents_categories: [u'document_id', u'category_id', u'confidence_level']
documents_entities: [u'document_id', u'entity_id', u'confidence_level']
documents_meta: [u'document_id', u'source_id', u'publisher_id', u'publish_time']
documents_topics: [u'document_id', u'topic_id', u'confidence_level']
events: [u'display_id', u'uuid', u'document_id', u'timestamp', u'platform', u'geo_location']
page_views: [u'uuid', u'document_id', u'timestamp', u'platform', u'geo_location', u'traffic_source']
page_views_sample: [u'uuid', u'document_id', u'timestamp', u'platform', u'geo_location', u'traffic_source']
promoted_content: [u'ad_id', u'document_id', u'campaign_id', u'advertiser_id']
sample_submission: [u'display_id', u'ad_id']


In [237]:
['display_document_id', 'ad_document_id']

fields = [
    'clicked',
    'geo_location',
    'timestamp',
    'ad_id',
    'document_id',
    'campaign_id',
    'advertiser_id',
    'platform'
]
for i in fields:
    print(i + " : " + str(field_to_table[i]))


clicked : [u'clicks_train']
geo_location : [u'events', u'page_views', u'page_views_sample']
timestamp : [u'events', u'page_views', u'page_views_sample']
ad_id : [u'clicks_test', u'clicks_train', u'promoted_content', u'sample_submission']
document_id : [u'documents_categories', u'documents_entities', u'documents_meta', u'documents_topics', u'events', u'page_views', u'page_views_sample', u'promoted_content']
campaign_id : [u'promoted_content']
advertiser_id : [u'promoted_content']
platform : [u'events', u'page_views', u'page_views_sample']


In [238]:
field_to_table

{u'ad_id': [u'clicks_test',
  u'clicks_train',
  u'promoted_content',
  u'sample_submission'],
 u'advertiser_id': [u'promoted_content'],
 u'campaign_id': [u'promoted_content'],
 u'category_id': [u'documents_categories'],
 u'clicked': [u'clicks_train'],
 u'confidence_level': [u'documents_categories',
  u'documents_entities',
  u'documents_topics'],
 u'display_id': [u'clicks_test',
  u'clicks_train',
  u'events',
  u'sample_submission'],
 u'document_id': [u'documents_categories',
  u'documents_entities',
  u'documents_meta',
  u'documents_topics',
  u'events',
  u'page_views',
  u'page_views_sample',
  u'promoted_content'],
 u'entity_id': [u'documents_entities'],
 u'geo_location': [u'events', u'page_views', u'page_views_sample'],
 u'platform': [u'events', u'page_views', u'page_views_sample'],
 u'publish_time': [u'documents_meta'],
 u'publisher_id': [u'documents_meta'],
 u'source_id': [u'documents_meta'],
 u'timestamp': [u'events', u'page_views', u'page_views_sample'],
 u'topic_id': [u'do

In [239]:
merged = ss.sql('''
    select 
        clicked,
        clicks_train.ad_id,
        document_id, 
        campaign_id,
        advertiser_id
    from 
        clicks_train
    join 
        promoted_content 
    on 
        clicks_train.ad_id = promoted_content.ad_id
''')

In [240]:
merged.registerTempTable("merged")

In [None]:
merged.show(5)

In [220]:
['display_document_id', 'ad_document_id']

fields = [
    'geo_location',
    'timestamp',
    'document_id',
    'platform',
    'display_id'
]
for i in fields:
    print(i + " : " + str(field_to_table[i]))


geo_location : [u'events', u'page_views', u'page_views_sample']
timestamp : [u'events', u'page_views', u'page_views_sample']
document_id : [u'documents_categories', u'documents_entities', u'documents_meta', u'documents_topics', u'events', u'page_views', u'page_views_sample', u'promoted_content']
platform : [u'events', u'page_views', u'page_views_sample']
display_id : [u'clicks_test', u'clicks_train', u'events', u'sample_submission']


In [221]:
merged = ss.sql('''
    select 
        merged.clicked,
        merged.ad_id,
        merged.document_id, 
        merged.campaign_id,
        merged.advertiser_id,
        events.platform,
        events.display_id,
        events.uuid
    from 
        merged
    join 
        events 
    on 
        merged.document_id = events.document_id
''')
merged.registerTempTable("merged")

In [222]:
merged.show(5)

+-------+-----+-----------+-----------+-------------+--------+----------+--------------+
|clicked|ad_id|document_id|campaign_id|advertiser_id|platform|display_id|          uuid|
+-------+-----+-----------+-----------+-------------+--------+----------+--------------+
|      0|85515|    1084599|       1633|          617|       1|    682390|7592dfa44f29b6|
|      0|85515|    1084599|       1633|          617|       1|  16985153|c6389efa0eedc1|
|      0|85515|    1084599|       1633|          617|       1|  18648077|2c4e6c8e09ff70|
|      0|85515|    1084599|       1633|          617|       1|   8823719|6df7e3724b72c5|
|      0|85515|    1084599|       1633|          617|       3|  14515033|89b6e9b40a406d|
+-------+-----+-----------+-----------+-------------+--------+----------+--------------+
only showing top 5 rows



In [223]:
geo_df = ss.sql("select uuid, geo_location from events")
geo_df.show(5)

+--------------+------------+
|          uuid|geo_location|
+--------------+------------+
|cb8c55702adb93|   US>SC>519|
|79a85fa78311b9|   US>CA>807|
|822932ce3d8757|   US>MI>505|
|85281d0a49f7ac|   US>WV>564|
|8d0daef4bf5b56|       SG>00|
+--------------+------------+
only showing top 5 rows



In [224]:
from pyspark.sql import Row

geo_df = ss.createDataFrame(geo_df.rdd.map(lambda x: Row(
    geo_location=x.geo_location.split(">") + ['', ''] if x.geo_location else ['', '', ''],
    uuid=x.uuid)
))

In [225]:
geo_df.show(10)

+-----------------+--------------+
|     geo_location|          uuid|
+-----------------+--------------+
|[US, SC, 519, , ]|cb8c55702adb93|
|[US, CA, 807, , ]|79a85fa78311b9|
|[US, MI, 505, , ]|822932ce3d8757|
|[US, WV, 564, , ]|85281d0a49f7ac|
|     [SG, 00, , ]|8d0daef4bf5b56|
|[US, OH, 510, , ]|7765b4faae4ad4|
|[US, MT, 762, , ]|2cc3f6457d16da|
|[US, PA, 566, , ]|166fc654d73c98|
|[US, FL, 528, , ]|9dddccf70f6067|
|         [US, , ]|b09a0e92aa4d17|
+-----------------+--------------+
only showing top 10 rows



In [226]:
geo_df = geo_df.rdd.map(lambda x: Row(
    country=x.geo_location[0],
    state=x.geo_location[1],
    dma=x.geo_location[2],
    uuid=x.uuid)
)

In [227]:
df = ss.createDataFrame(geo_df)
df.registerTempTable("geo")

In [228]:
df.show(10)

+-------+---+-----+--------------+
|country|dma|state|          uuid|
+-------+---+-----+--------------+
|     US|519|   SC|cb8c55702adb93|
|     US|807|   CA|79a85fa78311b9|
|     US|505|   MI|822932ce3d8757|
|     US|564|   WV|85281d0a49f7ac|
|     SG|   |   00|8d0daef4bf5b56|
|     US|510|   OH|7765b4faae4ad4|
|     US|762|   MT|2cc3f6457d16da|
|     US|566|   PA|166fc654d73c98|
|     US|528|   FL|9dddccf70f6067|
|     US|   |     |b09a0e92aa4d17|
+-------+---+-----+--------------+
only showing top 10 rows



In [229]:
merged.show(5)

+-------+-----+-----------+-----------+-------------+--------+----------+--------------+
|clicked|ad_id|document_id|campaign_id|advertiser_id|platform|display_id|          uuid|
+-------+-----+-----------+-----------+-------------+--------+----------+--------------+
|      0|85515|    1084599|       1633|          617|       1|    682390|7592dfa44f29b6|
|      0|85515|    1084599|       1633|          617|       1|  16985153|c6389efa0eedc1|
|      0|85515|    1084599|       1633|          617|       1|   3438587|bba6c1624943ed|
|      0|85515|    1084599|       1633|          617|       1|  12669272|7b3fa7dbbd2869|
|      0|85515|    1084599|       1633|          617|       1|   8823719|6df7e3724b72c5|
+-------+-----+-----------+-----------+-------------+--------+----------+--------------+
only showing top 5 rows



In [230]:
ss.sql('''
DESCRIBE merged
''').collect()

[Row(col_name=u'clicked', data_type=u'string', comment=None),
 Row(col_name=u'ad_id', data_type=u'string', comment=None),
 Row(col_name=u'document_id', data_type=u'string', comment=None),
 Row(col_name=u'campaign_id', data_type=u'string', comment=None),
 Row(col_name=u'advertiser_id', data_type=u'string', comment=None),
 Row(col_name=u'platform', data_type=u'string', comment=None),
 Row(col_name=u'display_id', data_type=u'string', comment=None),
 Row(col_name=u'uuid', data_type=u'string', comment=None)]

In [231]:
ss.sql('''
DESCRIBE geo
''').collect()

[Row(col_name=u'country', data_type=u'string', comment=None),
 Row(col_name=u'dma', data_type=u'string', comment=None),
 Row(col_name=u'state', data_type=u'string', comment=None),
 Row(col_name=u'uuid', data_type=u'string', comment=None)]

In [234]:
merged2 = ss.sql('''
    select 
        merged.clicked,
        merged.ad_id,
        merged.document_id, 
        merged.campaign_id,
        merged.advertiser_id,
        merged.platform,
        merged.display_id,
        geo.country,
        geo.dma,
        geo.state
    from 
        merged
    join 
        geo 
    on 
        merged.uuid = geo.uuid
''')
merged2.registerTempTable("merged")

In [233]:
merged2.show(5)

KeyboardInterrupt: 

## Calculate features for VW

- Use DataFrame API to join tables (functions in SQL queries: https://spark.apache.org/docs/2.1.0/api/java/org/apache/spark/sql/functions.html)
- Use Python API to calculate features and save them as text for VW (*saveAsTextFile()*)
- Hash features in Spark (24 bits, use *sklearn.utils.murmurhash.murmurhash3_32*)
- Split dataset in Spark into 90% train, 10% test **by display_id**, save the split for further use

In [53]:
from sklearn.utils.murmurhash import murmurhash3_32
def hasher(x, bits):
    return murmurhash3_32(x) % 2**bits

## Copy data from HDFS to cluster1 machine

We will run vowpal wabbit **locally**, need to copy data from HDFS

In [54]:
import os
import shutil

def copy_text_to_local(hdfs_path, local_path):
    if os.path.exists(local_path):
        shutil.rmtree(local_path)
    os.mkdir(local_path)
    os.system('hadoop fs -copyToLocal "{0}/*" {1}'.format(hdfs_path, local_path))
    os.system('cat {0}/part-* > {1}'.format(local_path, local_path + "/merged.txt"))
    print "done"

In [55]:
%%time
copy_text_to_local("/task1/train.txt", "/data/train.txt")

done
CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 2.19 s


In [56]:
! ls -lh /data/train.txt/merged.txt

-rw-rw-r-- 1 ubuntu ubuntu 0 May 13 12:19 /data/train.txt/merged.txt


In [57]:
%%time
copy_text_to_local("/task1/test.txt", "/data/test.txt")

done
CPU times: user 0 ns, sys: 8 ms, total: 8 ms
Wall time: 2.06 s


In [58]:
! ls -lh /data/test.txt/merged.txt

-rw-rw-r-- 1 ubuntu ubuntu 0 May 13 12:19 /data/test.txt/merged.txt


## Install VW

In [59]:
! sudo apt-get install libboost-program-options-dev zlib1g-dev libboost-python-dev libtool m4 automake -y

Reading package lists... Done
Building dependency tree       
Reading state information... Done
The following packages were automatically installed and are no longer required:
  python-chardet-whl python-colorama python-colorama-whl python-distlib
  python-distlib-whl python-html5lib python-html5lib-whl python-pip-whl
  python-requests-whl python-setuptools-whl python-six-whl python-urllib3-whl
  python-wheel python3-pkg-resources
Use 'apt-get autoremove' to remove them.
The following extra packages will be installed:
  autoconf autotools-dev libboost-program-options1.54-dev
  libboost-program-options1.54.0 libboost-python1.54-dev libboost-python1.54.0
  libboost1.54-dev libltdl-dev libltdl7
Suggested packages:
  autoconf2.13 autoconf-archive gnu-standards autoconf-doc gettext
  libboost1.54-doc python-pyste libboost-atomic1.54-dev
  libboost-chrono1.54-dev libboost-context1.54-dev libboost-coroutine.54-dev
  libboost-date-time1.54-dev libboost-exception1.54-dev
  libboost-filesystem1.

In [60]:
! wget https://github.com/JohnLangford/vowpal_wabbit/archive/8.2.0.tar.gz

--2017-05-13 12:19:50--  https://github.com/JohnLangford/vowpal_wabbit/archive/8.2.0.tar.gz
Resolving github.com (github.com)... 192.30.253.112, 192.30.253.113
Connecting to github.com (github.com)|192.30.253.112|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://codeload.github.com/JohnLangford/vowpal_wabbit/tar.gz/8.2.0 [following]
--2017-05-13 12:19:51--  https://codeload.github.com/JohnLangford/vowpal_wabbit/tar.gz/8.2.0
Resolving codeload.github.com (codeload.github.com)... 192.30.253.120, 192.30.253.121
Connecting to codeload.github.com (codeload.github.com)|192.30.253.120|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: unspecified [application/x-gzip]
Saving to: '8.2.0.tar.gz'

    [     <=>                               ] 13,391,191  11.9MB/s   in 1.1s   

2017-05-13 12:19:52 (11.9 MB/s) - '8.2.0.tar.gz' saved [13391191]



In [61]:
! tar -xzf 8.2.0.tar.gz

cd vowpal_wabbit-8.2.0

./autogen.sh

make -j4

make test -j4

sudo make install

## Train VW

https://github.com/JohnLangford/vowpal_wabbit/wiki/Command-line-arguments

In [62]:
! head -n2 /data/train.txt/merged.txt

In [63]:
%%time
! LD_LIBRARY_PATH=/usr/local/lib vw -d /data/train.txt/merged.txt -b 24 -c -k --ftrl --passes 1 -f /data/model --holdout_off --loss_function logistic --random_seed 42 --progress 8000000 

/bin/sh: 1: vw: not found
CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 109 ms


## Check VW test performance

In [64]:
%%time
! LD_LIBRARY_PATH=/usr/local/lib vw -d /data/test.txt/merged.txt -i /data/model -t -k -p /data/test_predictions.txt --progress 1000000 --link=logistic

/bin/sh: 1: vw: not found
CPU times: user 0 ns, sys: 4 ms, total: 4 ms
Wall time: 107 ms


In [65]:
import numpy as np

def read_vw_predictions(p):
    y_pred = []
    with open(p, "r") as f:
        for line in f:
            y_pred.append(float(line.split()[0]))
    return np.array(y_pred)

y_pred = read_vw_predictions("/data/test_predictions.txt")

IOError: [Errno 2] No such file or directory: '/data/test_predictions.txt'

In [None]:
def get_vw_y_true(p):
    y_true = []
    with open(p, "r") as f:
        for line in f:
            y_true.append(float(line.partition(" ")[0]))
    return np.array(y_true)

y_true = get_vw_y_true("/data/test.txt/merged.txt")

In [None]:
from sklearn.metrics import log_loss, roc_auc_score

In [None]:
log_loss(y_true, y_pred)

In [None]:
roc_auc_score(y_true, y_pred)

## Make submission to Kaggle

# 2. Better model

This time let's make a personalized recommender using:
- page views information
- document properties

Ideas for features:
- uuid topic, entity, publisher, ... preferences
- document similarities
- ...

## More SQL examples

In [None]:
# we start with a DataFrame
events_df = ss.sql("select * from events")
events_df.show(3)

In [None]:
# we can make RDD of Rows with *.rdd
from pyspark.sql import Row
events_df.rdd.take(3)

In [None]:
# When it's RDD, we can use Python to create new RDD of Rows
(
    events_df.rdd
    .map(lambda x: Row(foo=x.geo_location.split(">"), bar=x.uuid))
).take(3)

In [None]:
# we can convert it back to DataFrame if it's still a table that can be converted to Java types
ss.createDataFrame(
    events_df.rdd
    .map(lambda x: Row(foo=x.geo_location.split(">"), bar=x.uuid))
).show(3)

In [None]:
%%time
# we can save it to HDFS as parquet (if it's a DataFrame)
ss.createDataFrame(
    events_df.rdd
    .map(lambda x: Row(foo=x.geo_location.split(">") if x.geo_location else [], bar=x.uuid))
).write.mode("overwrite").parquet("/task1/example1")

In [None]:
ss.read.parquet("/task1/example1").printSchema()
ss.read.parquet("/task1/example1").show(3)

In [None]:
%%time
# or we can skip DataFrame API if we use Python functions (there will be no speed increase)
(
    events_df.rdd
    .map(lambda x: Row(foo=x.geo_location.split(">") if x.geo_location else [], bar=x.uuid))
).saveAsPickleFile("/task1/example2")

In [None]:
sc.pickleFile("/task1/example2").take(3)

In [None]:
# sometimes we cannot make a DataFrame
import numpy as np
rdd = (
    events_df.rdd
    .map(lambda x: Row(x=np.array(x.geo_location.split(">") if x.geo_location else [])))
)
rdd.take(2)

In [None]:
# throws TypeError: not supported type: <type 'numpy.ndarray'>
ss.createDataFrame(rdd)

In [None]:
%%time
# but we can save as RDD in pickle file just fine
rdd.saveAsPickleFile("/task1/example3")

Takeaways:
- use DataFrames when you can (simple join's, select's, groupby's), it will be faster
- use RDD and Python when you can't use DataFrame API
- convert it back to DataFrame if needed
- or save to pickles (can save almost any Python object as pickle)

# Built-in SQL functions

You can find more at https://spark.apache.org/docs/2.1.0/api/java/org/apache/spark/sql/functions.html

In [None]:
# sql version
df = ss.sql("""
select
    document_id,
    collect_list(struct(category_id, confidence_level)) as categories
from
    documents_categories
group by document_id
""")
df.show(3)

In [None]:
%%time
df.write.mode("overwrite").parquet("/task1/example4")

In [None]:
# or we can use RDD and Python if we are not aware of those SQL functions
rdd = (
    ss.sql("select * from documents_categories")
    .rdd
    .map(lambda x: (x.document_id, (x.category_id, x.confidence_level)))
    .groupByKey()
    .map(lambda (k, vs): (k, list(vs)))
)
rdd.take(3)

In [None]:
%%time
# it's much slower, but we can do almost everything in Python
rdd.saveAsPickleFile("/task1/example5")

In [None]:
# but sometimes with Python we can do more
rdd = (
    ss.sql("select * from documents_categories")
    .rdd
    .map(lambda x: (x.document_id, (x.category_id, x.confidence_level)))
    .groupByKey()
    .map(lambda (k, vs): Row(document_id=k, categories={a: float(b) for a, b in vs}))
)

In [None]:
%%time
# much faster thanks to conversion back to DataFrame (works for simple python collections in columns)
ss.createDataFrame(rdd).write.parquet("/task1/example6")

In [None]:
ss.read.parquet("/task1/example6").show(3)

In [None]:
# now we can join this table with events for instance
ss.read.parquet("/task1/example6").registerTempTable("doc_categories_ready")

In [None]:
ss.sql("""
select 
    e.*, 
    dc.categories
from 
    events as e
    join doc_categories_ready as dc on dc.document_id = e.document_id
""").show(3)