# Fix all jupyter notebook problems

<sub><sub>0. Pray </sub></sub>
1. Connect to head machine via SSH
2. Open `/usr/bin/anaconda/lib/python2.7/site-packages/nbformat/_version.py` and change 5 to 4.
3. Fix anaconda installation via official fix script. 
```
curl https://gregorysfixes.blob.core.windows.net/public/fix-conda.sh | sudo sh
```
4. Install all necessary python packages. At least hdfs and kaggle - 
```
sudo /usr/bin/anaconda/bin/conda install -c conda-forge python-hdfs kaggle --yes
```
5. Open Ambari and restart jupyter service.
6. Open azure jupyter notebook and upload this notebook
7. Check, that cells below can be executed correctly

In [1]:
%%local
import hdfs

# Create Spark Context

In [3]:
sc = spark.sparkContext

In [4]:
import pandas as pd
from pyspark.sql import SparkSession

ss = SparkSession(sc)

In [16]:
hadoop = sc._jvm.org.apache.hadoop
fs = hadoop.fs.FileSystem
conf = hadoop.conf.Configuration() 
path = hadoop.fs.Path('/')
    
def hdfs_ls(path):
    result = []
    for f in fs.get(conf).listStatus(hadoop.fs.Path(path)):
        result.append(str(f.getPath()))
    return result

In [12]:
%%local
from hdfs import InsecureClient
hdfs_client = InsecureClient("http://cluster1:50070", user='hdfs')

In [17]:
hdfs_ls('/')

['wasb://spark2-fix-hw@hadoop2hdistorage2.blob.core.windows.net/custom-scriptaction-logs', 'wasb://spark2-fix-hw@hadoop2hdistorage2.blob.core.windows.net/app-logs', 'wasb://spark2-fix-hw@hadoop2hdistorage2.blob.core.windows.net/tmp', 'wasb://spark2-fix-hw@hadoop2hdistorage2.blob.core.windows.net/warehouse', 'wasb://spark2-fix-hw@hadoop2hdistorage2.blob.core.windows.net/mapred', 'wasb://spark2-fix-hw@hadoop2hdistorage2.blob.core.windows.net/HDInsight_TestAccessiblityBlobName', 'wasb://spark2-fix-hw@hadoop2hdistorage2.blob.core.windows.net/hive', 'wasb://spark2-fix-hw@hadoop2hdistorage2.blob.core.windows.net/example', 'wasb://spark2-fix-hw@hadoop2hdistorage2.blob.core.windows.net/apps', 'wasb://spark2-fix-hw@hadoop2hdistorage2.blob.core.windows.net/HdiSamples', 'wasb://spark2-fix-hw@hadoop2hdistorage2.blob.core.windows.net/yarn', 'wasb://spark2-fix-hw@hadoop2hdistorage2.blob.core.windows.net/HdiNotebooks', 'wasb://spark2-fix-hw@hadoop2hdistorage2.blob.core.windows.net/amshbase', 'wasb://

# Download task data

Download data directly from kaggle. Read this to understand how: https://github.com/Kaggle/kaggle-api

In [49]:
%%local
! cat ~/.kaggle/kaggle.json

{"username": "alexeykosmos", "key": "750a605c53f61211eeea14909ed129b6"}

In [65]:
%%local
! kaggle competitions files outbrain-click-prediction

name                           size  creationDate         
----------------------------  -----  -------------------  
clicks_test.csv.zip           135MB  2018-06-22 05:33:10  
documents_entities.csv.zip    126MB  2018-06-22 05:33:10  
documents_topics.csv.zip      121MB  2018-06-22 05:33:10  
documents_categories.csv.zip   32MB  2018-06-22 05:33:10  
page_views_sample.csv.zip     149MB  2018-06-22 05:33:10  
clicks_train.csv.zip          390MB  2018-06-22 05:33:10  
page_views.csv.zip             35GB  2018-06-22 05:33:10  
events.csv.zip                478MB  2018-06-22 05:33:10  
sample_submission.csv.zip     100MB  2018-06-22 05:33:10  
promoted_content.csv.zip        3MB  2018-06-22 05:33:10  
documents_meta.csv.zip         16MB  2018-06-22 05:33:10  


In [None]:
%%local
! kaggle competitions download outbrain-click-prediction

# Load data to HDFS

https://www.kaggle.com/c/outbrain-click-prediction/data

In [None]:
%%local
! hdfs dfs -rm -r /task1

In [None]:
%%local
! for i in `ls *.zip`; do unzip -p $i | tqdm | hdfs dfs -put - /task1/$i; done

In [39]:
%%local
! hdfs dfs -du -s -h /task1/*.csv

483.5 M  /task1/clicks_test.csv
1.4 G  /task1/clicks_train.csv
112.5 M  /task1/documents_categories.csv
309.1 M  /task1/documents_entities.csv
85.2 M  /task1/documents_meta.csv
323.7 M  /task1/documents_topics.csv
1.1 G  /task1/events.csv
88.4 G  /task1/page_views.csv
433.3 M  /task1/page_views_sample.csv
13.2 M  /task1/promoted_content.csv
260.5 M  /task1/sample_submission.csv


# Read example

In [None]:
pvdf = ss.read.csv("/task1/page_views.csv", header=True)

In [None]:
pvdf.dtypes

In [None]:
pvdf.show(5)

+--------------+-----------+---------+--------+------------+--------------+
|          uuid|document_id|timestamp|platform|geo_location|traffic_source|
+--------------+-----------+---------+--------+------------+--------------+
|1fd5f051fba643|        120| 31905835|       1|          RS|             2|
|8557aa9004be3b|        120| 32053104|       1|       VN>44|             2|
|c351b277a358f0|        120| 54013023|       1|       KR>12|             1|
|8205775c5387f9|        120| 44196592|       1|       IN>16|             2|
|9cb0ccd8458371|        120| 65817371|       1|   US>CA>807|             2|
+--------------+-----------+---------+--------+------------+--------------+
only showing top 5 rows



In [None]:
%%time
pvdf.count()

CPU times: user 76 ms, sys: 20 ms, total: 96 ms
Wall time: 9min 16s


# Parquet is faster than CSV

http://events.linuxfoundation.org/sites/events/files/slides/ApacheCon%20BigData%20Europe%202016%20-%20Parquet%20in%20Practice%20%26%20Detail_0.pdf

In [None]:
%%time
pvdf.write.parquet("/task1/page_views.parquet")

In [31]:
%%local
! hdfs dfs -du -s -h /task1/page_views.parquet

47.3 G  /task1/page_views.parquet


In [33]:
pvdf2 = ss.read.parquet("/task1/page_views.parquet")

In [35]:
%%time
from IPython.display import display
boo = pvdf2.groupBy("geo_location").count().collect()
display(boo[:5])

CPU times: user 16 ms, sys: 0 ns, total: 16 ms
Wall time: 22.2 s


In [36]:
%%time
boo = pvdf.groupBy("geo_location").count().collect()
display(boo[:5])

CPU times: user 96 ms, sys: 20 ms, total: 116 ms
Wall time: 10min 34s


# Convert all to Parquet

In [58]:
%%time
def convert_all_to_parquet():
    task_dir = "/task1/"
    all_files = hdfs_ls(task_dir)
    for fn in all_files:
        if fn.endswith(".csv"):
            fn_after = fn.replace(".csv", ".parquet")
            path_before = task_dir + fn
            path_after = task_dir + fn_after
            if fn_after not in all_files:
                # generate parquet
                df = ss.read.csv(path_before, header=True)
                df.write.parquet(path_after)
            # remove csv, we have parquet now
            print(fn_after, "done")

convert_all_to_parquet()

clicks_test.parquet done
clicks_train.parquet done
documents_categories.parquet done
documents_entities.parquet done
documents_meta.parquet done
documents_topics.parquet done
events.parquet done
page_views.parquet done
page_views_sample.parquet done
promoted_content.parquet done
sample_submission.parquet done
CPU times: user 96 ms, sys: 0 ns, total: 96 ms
Wall time: 4min 37s


In [None]:
%%local
! hdfs dfs -rm /task1/*.csv

In [60]:
%%local
! hdfs dfs -du -s -h /task1/*

133.2 M  /task1/clicks_test.parquet
367.5 M  /task1/clicks_train.parquet
36.5 M  /task1/documents_categories.parquet
184.0 M  /task1/documents_entities.parquet
21.2 M  /task1/documents_meta.parquet
183.3 M  /task1/documents_topics.parquet
669.3 M  /task1/events.parquet
47.3 G  /task1/page_views.parquet
236.9 M  /task1/page_views_sample.parquet
5.0 M  /task1/promoted_content.parquet
184.2 M  /task1/sample_submission.parquet


# Preview all files

In [65]:
%%time
def preview_all_files():
    task_dir = "/task1/"
    all_files = hdfs_ls(task_dir)
    for fn in all_files:
        df = ss.read.parquet(task_dir + fn)
        print()"#" * 15 + " {0} ".format(task_dir + fn) + "#" * 15)
        df.show(1)
        
preview_all_files()

############### /task1/clicks_test.parquet ###############
+----------+------+
|display_id| ad_id|
+----------+------+
|  17805143|288388|
+----------+------+
only showing top 1 row

############### /task1/clicks_train.parquet ###############
+----------+-----+-------+
|display_id|ad_id|clicked|
+----------+-----+-------+
|         1|42337|      0|
+----------+-----+-------+
only showing top 1 row

############### /task1/documents_categories.parquet ###############
+-----------+-----------+----------------+
|document_id|category_id|confidence_level|
+-----------+-----------+----------------+
|    1544588|       1513|     0.263546236|
+-----------+-----------+----------------+
only showing top 1 row

############### /task1/documents_entities.parquet ###############
+-----------+--------------------+-----------------+
|document_id|           entity_id| confidence_level|
+-----------+--------------------+-----------------+
|    1539011|e01ed0c4a3e8f8f35...|0.327269624728567|
+-----------+

# Register all tables to be usable in SQL queries

In [5]:
%%time
def register_all_tables():
    task_dir = "/task1/"
    all_files = hdfs_ls(task_dir)
    for fn in all_files:
        if fn.endswith(".parquet"):
            table_name = fn.replace(".parquet", "")
            df = ss.read.parquet(task_dir + fn)
            df.registerTempTable(table_name)
            print table_name, "done"
        
register_all_tables()

clicks_test done
clicks_train done
documents_categories done
documents_categories2 done
documents_entities done
documents_meta done
documents_topics done
events done
events_test done
events_train done
joined_events done
page_views done
page_views_sample done
promoted_content done
sample_submission done
CPU times: user 36 ms, sys: 8 ms, total: 44 ms
Wall time: 18.2 s


# SQL query example

In [72]:
%%time
ss.sql("""
select count(distinct(uuid)) as users_count
from events
""").collect()

CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 16.1 s


# 1. Baseline

Simple model using the following features:
- **clicked**
- geo_location features (country, state, dma)
- day_of_week (from timestamp, use *date.isoweekday()*)
- ad_id
- ad_document_id
- campaign_id
- advertiser_id
- display_document_id
- platform

## Calculate features for VW

- Use DataFrame API to join tables (functions in SQL queries: https://spark.apache.org/docs/2.1.0/api/java/org/apache/spark/sql/functions.html)
- Use Python API to calculate features and save them as text for VW (*saveAsTextFile()*)
- Hash features in Spark (24 bits, use *sklearn.utils.murmurhash.murmurhash3_32*)
- Split dataset in Spark into 90% train, 10% test **by display_id**, save the split for further use

In [5]:
from sklearn.utils.murmurhash import murmurhash3_32
def hasher(x, bits):
    return murmurhash3_32(x) % 2**bits

## Copy data from HDFS to cluster1 machine

We will run vowpal wabbit **locally**, need to copy data from HDFS

In [2]:
import os
import shutil

def copy_text_to_local(hdfs_path, local_path):
    if os.path.exists(local_path):
        shutil.rmtree(local_path)
    os.mkdir(local_path)
    os.system('hdfs dfs -getmerge "{0}/*" {1}'.format(hdfs_path, local_path))
    os.system('cat {0}/part-* > {1}'.format(local_path, local_path + "/merged.txt"))
    print "done"

In [None]:
%%time
copy_text_to_local("/task1/train.txt", "/data/train.txt")

In [3]:
! ls -lh /data/train.txt/merged.txt

-rw-rw-r-- 1 ubuntu ubuntu 9.2G Apr 23 11:31 /data/train.txt/merged.txt


In [None]:
%%time
copy_text_to_local("/task1/test.txt", "/data/test.txt")

In [4]:
! ls -lh /data/test.txt/merged.txt

-rw-rw-r-- 1 ubuntu ubuntu 1.1G Apr 23 11:32 /data/test.txt/merged.txt


## Install VW

In [None]:
! sudo apt-get install libboost-program-options-dev zlib1g-dev libboost-python-dev libtool m4 automake -y

In [None]:
! wget https://github.com/JohnLangford/vowpal_wabbit/archive/8.2.0.tar.gz

In [None]:
! tar -xzf 8.2.0.tar.gz

cd vowpal_wabbit-8.2.0

./autogen.sh

make -j4

make test -j4

sudo make install

## Train VW

https://github.com/JohnLangford/vowpal_wabbit/wiki/Command-line-arguments

In [10]:
! head -n2 /data/train.txt/merged.txt

-1 |f 5611969:1.0 11418299:1.0 11602085:1.0 15617356:1.0 15750989:1.0 786735:1.0 3083696:1.0 4204498:1.0 5728439:1.0 1376507:1.0
1 |f 1748258:1.0 1376507:1.0 11602085:1.0 14125547:1.0 15617356:1.0 16621393:1.0 4204498:1.0 9609462:1.0 5728439:1.0 11418299:1.0


In [12]:
%%time
! LD_LIBRARY_PATH=/usr/local/lib vw -d /data/train.txt/merged.txt -b 24 -c -k --ftrl --passes 1 -f /data/model --holdout_off --loss_function logistic --random_seed 42 --progress 8000000 

final_regressor = /data/model
Enabling FTRL based optimization
Algorithm used: Proximal-FTRL
ftrl_alpha = 0.005
ftrl_beta = 0.1
Num weight bits = 24
learning rate = 0.5
initial_t = 0
power_t = 0.5
creating cache_file = /data/train.txt/merged.txt.cache
Reading datafile = /data/train.txt/merged.txt
num sources = 1
average  since         example        example  current  current  current
loss     last          counter         weight    label  predict features
0.459597 0.459597      8000000      8000000.0  -1.0000  -1.4509       11
0.454955 0.450313     16000000     16000000.0  -1.0000   0.2849       11
0.452520 0.447649     24000000     24000000.0  -1.0000  -1.0289       11
0.450893 0.446015     32000000     32000000.0  -1.0000  -1.5837       11
0.449772 0.445288     40000000     40000000.0  -1.0000  -1.2465       11
0.448888 0.444469     48000000     48000000.0  -1.0000  -1.6476       11
0.448170 0.443861     56000000     56000000.0  -1.0000  -1.0652        9
0.447529 0.443043     6400000

## Check VW test performance

In [13]:
%%time
! LD_LIBRARY_PATH=/usr/local/lib vw -d /data/test.txt/merged.txt -i /data/model -t -k -p /data/test_predictions.txt --progress 1000000 --link=logistic

only testing
predictions = /data/test_predictions.txt
Num weight bits = 24
learning rate = 0.5
initial_t = 0
power_t = 0.5
using no cache
Reading datafile = /data/test.txt/merged.txt
num sources = 1
average  since         example        example  current  current  current
loss     last          counter         weight    label  predict features
1.833906 1.833906      1000000      1000000.0  -1.0000   0.1692       11
1.834238 1.834571      2000000      2000000.0  -1.0000   0.1054       11
1.834516 1.835071      3000000      3000000.0  -1.0000   0.1948       11
1.834247 1.833440      4000000      4000000.0  -1.0000   0.0511       11
1.833725 1.831640      5000000      5000000.0  -1.0000   0.4039       11
1.834064 1.835758      6000000      6000000.0  -1.0000   0.3104       10
1.833605 1.830850      7000000      7000000.0   1.0000   0.2100       11
1.833540 1.833085      8000000      8000000.0   1.0000   0.2825       11

finished run
number of examples per pass = 8706436
passes used = 1
wei

In [15]:
import numpy as np

def read_vw_predictions(p):
    y_pred = []
    with open(p, "r") as f:
        for line in f:
            y_pred.append(float(line.split()[0]))
    return np.array(y_pred)

y_pred = read_vw_predictions("/data/test_predictions.txt")

In [16]:
def get_vw_y_true(p):
    y_true = []
    with open(p, "r") as f:
        for line in f:
            y_true.append(float(line.partition(" ")[0]))
    return np.array(y_true)

y_true = get_vw_y_true("/data/test.txt/merged.txt")

In [17]:
from sklearn.metrics import log_loss, roc_auc_score

In [18]:
log_loss(y_true, y_pred)

In [19]:
roc_auc_score(y_true, y_pred)

## Make submission to Kaggle

# 2. Better model

This time let's make a personalized recommender using:
- page views information
- document properties

Ideas for features:
- uuid topic, entity, publisher, ... preferences
- document similarities
- ...

## More SQL examples

In [10]:
# we start with a DataFrame
events_df = ss.sql("select * from events")
events_df.show(3)

+----------+--------------+-----------+---------+--------+------------+
|display_id|          uuid|document_id|timestamp|platform|geo_location|
+----------+--------------+-----------+---------+--------+------------+
|         1|cb8c55702adb93|     379743|       61|       3|   US>SC>519|
|         2|79a85fa78311b9|    1794259|       81|       2|   US>CA>807|
|         3|822932ce3d8757|    1179111|      182|       2|   US>MI>505|
+----------+--------------+-----------+---------+--------+------------+
only showing top 3 rows



In [23]:
# we can make RDD of Rows with *.rdd
from pyspark.sql import Row
events_df.rdd.take(3)

In [30]:
# When it's RDD, we can use Python to create new RDD of Rows
(
    events_df.rdd
    .map(lambda x: Row(foo=x.geo_location.split(">"), bar=x.uuid))
).take(3)

In [31]:
# we can convert it back to DataFrame if it's still a table that can be converted to Java types
ss.createDataFrame(
    events_df.rdd
    .map(lambda x: Row(foo=x.geo_location.split(">"), bar=x.uuid))
).show(3)

+--------------+-------------+
|           bar|          foo|
+--------------+-------------+
|cb8c55702adb93|[US, SC, 519]|
|79a85fa78311b9|[US, CA, 807]|
|822932ce3d8757|[US, MI, 505]|
+--------------+-------------+
only showing top 3 rows



In [36]:
%%time
# we can save it to HDFS as parquet (if it's a DataFrame)
ss.createDataFrame(
    events_df.rdd
    .map(lambda x: Row(foo=x.geo_location.split(">") if x.geo_location else [], bar=x.uuid))
).write.mode("overwrite").parquet("/task1/example1")

CPU times: user 36 ms, sys: 8 ms, total: 44 ms
Wall time: 3min 4s


In [39]:
ss.read.parquet("/task1/example1").printSchema()
ss.read.parquet("/task1/example1").show(3)

root
 |-- bar: string (nullable = true)
 |-- foo: array (nullable = true)
 |    |-- element: string (containsNull = true)

+--------------+-------------+
|           bar|          foo|
+--------------+-------------+
|cb8c55702adb93|[US, SC, 519]|
|79a85fa78311b9|[US, CA, 807]|
|822932ce3d8757|[US, MI, 505]|
+--------------+-------------+
only showing top 3 rows



In [41]:
%%time
# or we can skip DataFrame API if we use Python functions (there will be no speed increase)
(
    events_df.rdd
    .map(lambda x: Row(foo=x.geo_location.split(">") if x.geo_location else [], bar=x.uuid))
).saveAsPickleFile("/task1/example2")

CPU times: user 36 ms, sys: 0 ns, total: 36 ms
Wall time: 2min 41s


In [43]:
sc.pickleFile("/task1/example2").take(3)

In [49]:
# sometimes we cannot make a DataFrame
import numpy as np
rdd = (
    events_df.rdd
    .map(lambda x: Row(x=np.array(x.geo_location.split(">") if x.geo_location else [])))
)
rdd.take(2)

In [None]:
# throws TypeError: not supported type: <type 'numpy.ndarray'>
ss.createDataFrame(rdd)

In [52]:
%%time
# but we can save as RDD in pickle file just fine
rdd.saveAsPickleFile("/task1/example3")

CPU times: user 28 ms, sys: 8 ms, total: 36 ms
Wall time: 3min 31s


Takeaways:
- use DataFrames when you can (simple join's, select's, groupby's), it will be faster
- use RDD and Python when you can't use DataFrame API
- convert it back to DataFrame if needed
- or save to pickles (can save almost any Python object as pickle)

# Built-in SQL functions

You can find more at https://spark.apache.org/docs/2.1.0/api/java/org/apache/spark/sql/functions.html

In [53]:
# sql version
df = ss.sql("""
select
    document_id,
    collect_list(struct(category_id, confidence_level)) as categories
from
    documents_categories
group by document_id
""")
df.show(3)

+-----------+--------------------+
|document_id|          categories|
+-----------+--------------------+
|     100010|[[1513,0.79842798...|
|    1000240|[[1505,0.92], [15...|
|    1000280|[[1909,0.92], [19...|
+-----------+--------------------+
only showing top 3 rows



In [55]:
%%time
df.write.mode("overwrite").parquet("/task1/example4")

CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 12.5 s


In [57]:
# or we can use RDD and Python if we are not aware of those SQL functions
rdd = (
    ss.sql("select * from documents_categories")
    .rdd
    .map(lambda x: (x.document_id, (x.category_id, x.confidence_level)))
    .groupByKey()
    .map(lambda (k, vs): (k, list(vs)))
)
rdd.take(3)

In [58]:
%%time
# it's much slower, but we can do almost everything in Python
rdd.saveAsPickleFile("/task1/example5")

CPU times: user 28 ms, sys: 12 ms, total: 40 ms
Wall time: 3min 31s


In [63]:
# but sometimes with Python we can do more
rdd = (
    ss.sql("select * from documents_categories")
    .rdd
    .map(lambda x: (x.document_id, (x.category_id, x.confidence_level)))
    .groupByKey()
    .map(lambda (k, vs): Row(document_id=k, categories={a: float(b) for a, b in vs}))
)

In [60]:
%%time
# much faster thanks to conversion back to DataFrame (works for simple python collections in columns)
ss.createDataFrame(rdd).write.parquet("/task1/example6")

CPU times: user 32 ms, sys: 4 ms, total: 36 ms
Wall time: 25.8 s


In [64]:
ss.read.parquet("/task1/example6").show(3)

+--------------------+-----------+
|          categories|document_id|
+--------------------+-----------+
|Map(1510 -> 0.887...|    1059269|
|Map(1408 -> 0.92,...|    1050604|
|Map(1903 -> 0.92,...|    1472688|
+--------------------+-----------+
only showing top 3 rows



In [65]:
# now we can join this table with events for instance
ss.read.parquet("/task1/example6").registerTempTable("doc_categories_ready")

In [68]:
ss.sql("""
select 
    e.*, 
    dc.categories
from 
    events as e
    join doc_categories_ready as dc on dc.document_id = e.document_id
""").show(3)

+----------+--------------+-----------+---------+--------+------------+--------------------+
|display_id|          uuid|document_id|timestamp|platform|geo_location|          categories|
+----------+--------------+-----------+---------+--------+------------+--------------------+
|  18242074|e703634e3dfa39|    1000240|536236046|       2|          NG|Map(1503 -> 0.07,...|
|  18694427|5b023d28c0a9f3|    1000240|687121504|       2|   US>MA>521|Map(1503 -> 0.07,...|
|   3436070|55e1db49ff4eef|    1000240|223783698|       1|   US>CA>803|Map(1503 -> 0.07,...|
+----------+--------------+-----------+---------+--------+------------+--------------------+
only showing top 3 rows

