# Fix all jupyter notebook problems

<sub> 0. Pray </sub> 
1. Connect to head machine via SSH
2. Open `/usr/bin/anaconda/lib/python2.7/site-packages/nbformat/_version.py` and change 5 to 4.
3. Fix anaconda installation via official fix script. 
```
curl https://gregorysfixes.blob.core.windows.net/public/fix-conda.sh | sudo sh
```
4. Install all necessary python packages. At least kaggle - 
```
sudo /usr/bin/anaconda/bin/conda install -c conda-forge kaggle --yes
```
5. Open Ambari and restart jupyter service.
6. Open azure jupyter notebook and upload this notebook
7. Check, that cells below can be executed correctly

# Create Spark Context

In [1]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/opt/spark/"

In [2]:
import findspark
findspark.init('/opt/spark')
from pyspark import SparkContext
from pyspark.conf import SparkConf
conf = SparkConf().set("spark.ui.port", 5050).set("spark.driver.memory", "32g").set("spark.executor.memory", "32g").set("spark.local.dir", "datadrive/spark-temp")
# sc.stop()
sc = SparkContext("local[8]", "my app", conf=conf)  # spark with 4 cores

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [4]:
sc = spark.sparkContext

In [5]:
sc

In [6]:
import pandas as pd
from pyspark.sql import SparkSession

ss = SparkSession(sc)

In [7]:
hadoop = sc._jvm.org.apache.hadoop
fs = hadoop.fs.FileSystem
conf = hadoop.conf.Configuration() 
path = hadoop.fs.Path('/')
    
def hdfs_ls(path):
    result = []
    for f in fs.get(conf).listStatus(hadoop.fs.Path(path)):
        result.append(str(f.getPath()))
    return result

In [8]:
from pprint import pprint

pprint(hdfs_ls('/'))

['file:/etc',
 'file:/tmp',
 'file:/lib64',
 'file:/bin',
 'file:/boot',
 'file:/sys',
 'file:/vmlinuz.old',
 'file:/opt',
 'file:/mnt',
 'file:/.rnd',
 'file:/home',
 'file:/lib',
 'file:/srv',
 'file:/usr',
 'file:/datadrive',
 'file:/run',
 'file:/lost+found',
 'file:/snap',
 'file:/var',
 'file:/proc',
 'file:/root',
 'file:/initrd.img',
 'file:/vmlinuz',
 'file:/media',
 'file:/sbin',
 'file:/dev',
 'file:/initrd.img.old']


# Download task data

Download data directly from kaggle. Read this to understand how: https://github.com/Kaggle/kaggle-api

In [9]:
#%%local
! cat ~/.kaggle/kaggle.json

{"username":"hukutoc46","key":"225711d8186c94c45fa2e360a70419c4"}


In [10]:
# %%local
! kaggle competitions files outbrain-click-prediction

name                           size  creationDate         
----------------------------  -----  -------------------  
documents_meta.csv.zip         16MB  2018-06-22 05:33:10  
documents_entities.csv.zip    126MB  2018-06-22 05:33:10  
clicks_train.csv.zip          390MB  2018-06-22 05:33:10  
documents_topics.csv.zip      121MB  2018-06-22 05:33:10  
promoted_content.csv.zip        3MB  2018-06-22 05:33:10  
clicks_test.csv.zip           135MB  2018-06-22 05:33:10  
events.csv.zip                478MB  2018-06-22 05:33:10  
page_views_sample.csv.zip     149MB  2018-06-22 05:33:10  
documents_categories.csv.zip   32MB  2018-06-22 05:33:10  
page_views.csv.zip             35GB  2018-06-22 05:33:10  
sample_submission.csv.zip     100MB  2018-06-22 05:33:10  


In [11]:
#%%local
! kaggle competitions download -c outbrain-click-prediction

404 - Not Found


In [12]:
! for f in $(kaggle competitions files outbrain-click-prediction | cut -d ' ' -f 1 | tail -n+3); do kaggle competitions download outbrain-click-prediction -f $f -p ./data/; done

documents_meta.csv.zip: Skipping, found more recently modified local copy (use --force to force download)
events.csv.zip: Skipping, found more recently modified local copy (use --force to force download)
documents_entities.csv.zip: Skipping, found more recently modified local copy (use --force to force download)
documents_topics.csv.zip: Skipping, found more recently modified local copy (use --force to force download)
promoted_content.csv.zip: Skipping, found more recently modified local copy (use --force to force download)
documents_categories.csv.zip: Skipping, found more recently modified local copy (use --force to force download)
page_views_sample.csv.zip: Skipping, found more recently modified local copy (use --force to force download)
sample_submission.csv.zip: Skipping, found more recently modified local copy (use --force to force download)
page_views.csv.zip: Skipping, found more recently modified local copy (use --force to force download)
clicks_test.csv.zip: Skipping, found m

# Load data to HDFS

https://www.kaggle.com/c/outbrain-click-prediction/data

%%local
! hdfs dfs -rm -r /task1
! hdfs dfs -mkdir /task1

%%local
! for i in `ls *.zip`; do unzip -p $i | tqdm | hadoop fs -put - /task1/${i//\.zip/}; done

%%local
! hadoop fs -du -s -h /task1/*.csv

In [13]:
! for i in `ls data/*.zip`; do unzip -n -d ./data/ $i; done

Archive:  data/clicks_test.csv.zip
Archive:  data/clicks_train.csv.zip
Archive:  data/documents_categories.csv.zip
Archive:  data/documents_entities.csv.zip
Archive:  data/documents_meta.csv.zip
Archive:  data/documents_topics.csv.zip
Archive:  data/events.csv.zip
Archive:  data/page_views.csv.zip
Archive:  data/page_views_sample.csv.zip
Archive:  data/promoted_content.csv.zip
Archive:  data/sample_submission.csv.zip


# Read example

In [14]:
data_path = '/datadrive/LSML2020/data/'

In [15]:
pvdf = ss.read.csv(data_path + "page_views.csv", header=True)

In [16]:
pvdf.dtypes

[('uuid', 'string'),
 ('document_id', 'string'),
 ('timestamp', 'string'),
 ('platform', 'string'),
 ('geo_location', 'string'),
 ('traffic_source', 'string')]

In [17]:
pvdf.show(5)

+--------------+-----------+---------+--------+------------+--------------+
|          uuid|document_id|timestamp|platform|geo_location|traffic_source|
+--------------+-----------+---------+--------+------------+--------------+
|1fd5f051fba643|        120| 31905835|       1|          RS|             2|
|8557aa9004be3b|        120| 32053104|       1|       VN>44|             2|
|c351b277a358f0|        120| 54013023|       1|       KR>12|             1|
|8205775c5387f9|        120| 44196592|       1|       IN>16|             2|
|9cb0ccd8458371|        120| 65817371|       1|   US>CA>807|             2|
+--------------+-----------+---------+--------+------------+--------------+
only showing top 5 rows



# Convert all to Parquet

%%time

def convert_all_to_parquet():
    task_dir = data_path
    all_files = hdfs_ls(task_dir)
    for fn in all_files:
        if fn.endswith(".csv"):
            fn_after = fn.replace(".csv", ".parquet")
            path_before = fn
            path_after = fn_after
            if fn_after not in all_files:
                # generate parquet
                df = ss.read.csv(path_before, header=True)
                df.write.parquet(path_after)
            print(fn_after, "done")

convert_all_to_parquet()

Remove csv, we have parquet now

%%local
! hdfs dfs -rm /data/*.csv

%%local
! hadoop fs -du -s -h /data/*

# Preview all files

In [18]:
%%time
def preview_all_files():
    task_dir = data_path
    all_files = hdfs_ls(task_dir)
    for fn in all_files:
        if '.zip' in fn:
            continue
        if '.csv' not in fn:
            continue
        df = ss.read.csv(fn, header=True)
        print("#" * 15 + " {0} ".format(fn) + "#" * 15)
        df.show(3)
        
preview_all_files()

############### file:/datadrive/LSML2020/data/documents_topics.csv ###############
+-----------+--------+------------------+
|document_id|topic_id|  confidence_level|
+-----------+--------+------------------+
|    1595802|     140|0.0731131601068925|
|    1595802|      16|0.0594164867373976|
|    1595802|     143|0.0454207537554526|
+-----------+--------+------------------+
only showing top 3 rows

############### file:/datadrive/LSML2020/data/promoted_content.csv ###############
+-----+-----------+-----------+-------------+
|ad_id|document_id|campaign_id|advertiser_id|
+-----+-----------+-----------+-------------+
|    1|       6614|          1|            7|
|    2|     471467|          2|            7|
|    3|       7692|          3|            7|
+-----+-----------+-----------+-------------+
only showing top 3 rows

############### file:/datadrive/LSML2020/data/documents_meta.csv ###############
+-----------+---------+------------+-------------------+
|document_id|source_id|publish

# Register all tables to be usable in SQL queries

In [19]:
%%time
def register_all_tables():
    task_dir = data_path
    all_files = hdfs_ls(task_dir)
    for fn in all_files:
        if fn.endswith(".csv"):
            table_name = os.path.basename(fn).replace(".csv", "")
            
            df = ss.read.csv(fn, header=True)
            df.registerTempTable(table_name)
            print(table_name, "done")
        
register_all_tables()

documents_topics done
promoted_content done
documents_meta done
clicks_test done
page_views_sample done
page_views done
pred_data done
documents_categories done
events done
documents_entities done
sample_submission done
clicks_train done
CPU times: user 61 ms, sys: 6.63 ms, total: 67.6 ms
Wall time: 6.09 s


# 1. Baseline

Simple model using the following features:
- **clicked**
- geo_location features (country, state, dma)
- day_of_week (from timestamp, use *date.isoweekday()*)
- ad_id
- ad_document_id
- campaign_id
- advertiser_id
- display_document_id
- platform

In [20]:
from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)

In [21]:
def get_country(row):
    if row is None:
        return ''
    geo_locations = row.split('>')
    return geo_locations[0] 

def get_state(row):
    if row is None:
        return ''
    geo_locations = row.split('>')
    return geo_locations[1] if len(geo_locations) == 3 else ''

def get_dma(row):
    if row is None:
        return ''
    geo_locations = row.split('>')
    return geo_locations[2] if len(geo_locations) == 3 else ''

sqlContext.udf.register("get_country", get_country)
sqlContext.udf.register("get_state", get_state)
sqlContext.udf.register("get_dma", get_dma)

<function __main__.get_dma(row)>

In [22]:
from datetime import datetime

def get_weekday(timestamp): 
    if timestamp is None:
        return ''
    dt = datetime.fromtimestamp((int(timestamp) + 1465876799998) / 1000)
    return str(dt.weekday())

def get_day(timestamp): 
    if timestamp is None:
        return ''
    dt = datetime.fromtimestamp((int(timestamp) + 1465876799998) / 1000)
    return str(dt.day)

def get_month(timestamp): 
    if timestamp is None:
        return ''
    dt = datetime.fromtimestamp((int(timestamp) + 1465876799998) / 1000)
    return str(dt.month)

def get_hour(timestamp):
    if timestamp is None:
        return ''
    dt = datetime.fromtimestamp((int(timestamp) + 1465876799998) / 1000)
    return str(dt.hour)

sqlContext.udf.register("get_weekday", get_weekday)
sqlContext.udf.register("get_day", get_day)
sqlContext.udf.register("get_month", get_month)
sqlContext.udf.register("get_hour", get_hour)

<function __main__.get_hour(timestamp)>

In [23]:
def get_train_table():
    return ss.sql("""
SELECT
    clicks_train.clicked,
    clicks_train.display_id,
    get_country(events.geo_location) as country,
    get_state(events.geo_location) as state,
    get_dma(events.geo_location) as dma,
    get_weekday(events.timestamp) as weekday,
    clicks_train.ad_id,
    promoted_content.document_id as ad_document_id,
    promoted_content.campaign_id,
    promoted_content.advertiser_id,
    events.document_id as display_document_id,
    events.platform
FROM clicks_train
LEFT JOIN events ON clicks_train.display_id = events.display_id
LEFT JOIN promoted_content ON clicks_train.ad_id = promoted_content.ad_id
""")

In [24]:
def make_vw_train_test(row, target=True):
    clicked = ''
    if target:
        clicked = 1 if int(row.clicked) == 1 else -1,
    return (
        "{clicked} |f"
        " display_id_{display_id}"
        " country_{country}"
        " state_{state}"
        " dma_{dma}"
        " weekday_{weekday}"
        " ad_id_{ad_id}"
        " ad_document_id_{ad_document_id}"
        " campaign_id_{campaign_id}"
        " advertiser_id_{advertiser_id}"
        " display_document_id_{display_document_id}"
        " platform_{platform}"
    ).format(
        clicked=clicked,
        display_id=row.display_id,
        country=row.country,
        state=row.state,
        dma=row.dma,
        #month=row.month,
        #day=row.day if row.day else 0,
        weekday=row.weekday,
        #hour=row.hour if row.hour else 0,
        ad_id=row.ad_id,
        ad_document_id=row.ad_document_id,
        campaign_id=row.campaign_id,
        advertiser_id=row.advertiser_id,
        display_document_id=row.display_document_id,
        platform=row.platform
    )
def make_vw_train(row):
    return make_vw_train_test(row, target=True)

def make_vw_test(row):
    return make_vw_train_test(row, target=False)

In [25]:
%%time
dataset_name = "data/dataset-v4.data"
if not(os.path.exists(dataset_name)):
    train_table = get_train_table()
    train_table.rdd.map(make_vw_train).saveAsTextFile(dataset_name)
else:
    print('already exists')

CPU times: user 326 ms, sys: 88.7 ms, total: 415 ms
Wall time: 46min 31s


In [58]:
def merge_parted_file(parted_file, full_file, use_replacement=True):
    if os.path.exists(full_file):
        print('Already exists')
        return
    if parted_file[-1] == '/':
        parted_file = parted_file[:-1]
    os.system('cat {0}/part-* > {1}'.format(parted_file, full_file))
    print('file merged')
    if use_replacement:
        os.system('sed -i \'s/(-1,)/-1/\' {0}'.format(full_file))
        print('(-1,) replaced with -1')
        os.system('sed -i \'s/(1,)/1/\' {0}'.format(full_file))
        print('(1,) replaced with 1')
    print("done")

In [40]:
%%time
merge_parted_file('data/dataset-v4.data', 'data/dataset-v4.txt')

file merged
(-1,) replaced with -1
(1,) replaced with 1
done
CPU times: user 184 ms, sys: 94 ms, total: 278 ms
Wall time: 34min 37s


## Train VW

https://github.com/JohnLangford/vowpal_wabbit/wiki/Command-line-arguments

In [41]:
! head -n2 data/dataset-v4.txt

-1 |f display_id_15514599 country_US state_ dma_ weekday_6 ad_id_100010 ad_document_id_1132761 campaign_id_13049 advertiser_id_2848 display_document_id_1914050 platform_1
-1 |f display_id_15434742 country_US state_FL dma_539 weekday_6 ad_id_100010 ad_document_id_1132761 campaign_id_13049 advertiser_id_2848 display_document_id_339764 platform_1


In [42]:
! vw -d data/dataset-v4.txt -b 28 -c -k --ftrl --passes 3 -f model -l 0.01 --l1 1 --ftrl_alpha 0.05 --ftrl_beta 0.5 --holdout_off --loss_function logistic --random_seed 42 --progress 1000000

using l1 regularization = 1
final_regressor = model
Enabling FTRL based optimization
Algorithm used: Proximal-FTRL
ftrl_alpha = 0.05
ftrl_beta = 0.5
Num weight bits = 28
learning rate = 0.01
initial_t = 0
power_t = 0.5
decay_learning_rate = 1
creating cache_file = data/dataset-v4.txt.cache
Reading datafile = data/dataset-v4.txt
num sources = 1
average  since         example        example  current  current  current
loss     last          counter         weight    label  predict features
0.462493 0.462493      1000000      1000000.0   1.0000  -1.3345       12
0.456527 0.450562      2000000      2000000.0  -1.0000  -1.8718       12
0.447067 0.428145      3000000      3000000.0  -1.0000  -1.1681       12
0.446389 0.444357      4000000      4000000.0  -1.0000  -2.4393       12
0.445750 0.443193      5000000      5000000.0  -1.0000  -0.8910       12
0.445471 0.444073      6000000      6000000.0   1.0000  -2.4542       12
0.443786 0.433678      7000000      7000000.0  -1.0000  -2.0833       

0.435591 0.438660    107000000    107000000.0  -1.0000  -2.3664       12
0.435578 0.434170    108000000    108000000.0  -1.0000  -1.5457       12
0.435579 0.435619    109000000    109000000.0   1.0000  -1.5220       12
0.435943 0.475659    110000000    110000000.0  -1.0000  -1.3827       12
0.436036 0.446236    111000000    111000000.0  -1.0000  -2.9987       12
0.436070 0.439864    112000000    112000000.0  -1.0000  -1.2434       12
0.436153 0.445406    113000000    113000000.0  -1.0000  -3.2465       12
0.436018 0.420763    114000000    114000000.0  -1.0000  -1.0974       12
0.436340 0.473070    115000000    115000000.0  -1.0000  -1.5146       12
0.436669 0.474510    116000000    116000000.0  -1.0000  -1.2266       12
0.436423 0.407852    117000000    117000000.0  -1.0000  -2.1233       12
0.436057 0.393345    118000000    118000000.0  -1.0000  -2.3823       12
0.436013 0.430818    119000000    119000000.0  -1.0000  -1.4112       12
0.436000 0.434454    120000000    120000000.0  -1.0

0.434340 0.411476    220000000    220000000.0   1.0000  -2.2523       12
0.434231 0.410232    221000000    221000000.0   1.0000  -1.6791       12
0.434353 0.461327    222000000    222000000.0   1.0000  -1.8026       12
0.434222 0.405203    223000000    223000000.0  -1.0000  -2.3069       12
0.434123 0.412035    224000000    224000000.0   1.0000   0.1425       12
0.434256 0.464018    225000000    225000000.0  -1.0000  -3.0008       12
0.434192 0.419867    226000000    226000000.0  -1.0000  -2.1408       12
0.434214 0.439187    227000000    227000000.0  -1.0000  -3.1588       12
0.434139 0.417127    228000000    228000000.0  -1.0000  -2.8219       12
0.434024 0.407620    229000000    229000000.0  -1.0000  -3.1854       12
0.434087 0.448551    230000000    230000000.0  -1.0000  -3.2437       12
0.434034 0.421863    231000000    231000000.0   1.0000  -1.1286       12
0.433874 0.396839    232000000    232000000.0  -1.0000  -1.7584       12
0.433872 0.433520    233000000    233000000.0  -1.0

## Check VW test performance

In [47]:
def get_test_table():
    return ss.sql("""
SELECT
    clicks_test.display_id,
    get_country(events.geo_location) as country,
    get_state(events.geo_location) as state,
    get_dma(events.geo_location) as dma,
    get_weekday(events.timestamp) as weekday,
    clicks_test.ad_id,
    promoted_content.document_id as ad_document_id,
    promoted_content.campaign_id,
    promoted_content.advertiser_id,
    events.document_id as display_document_id,
    events.platform
FROM clicks_test
LEFT JOIN events ON clicks_test.display_id = events.display_id
LEFT JOIN promoted_content ON clicks_test.ad_id = promoted_content.ad_id
""")

In [48]:
%%time
test_table = None
dataset_test_name = "data/dataset-test-v4.data"
if not(os.path.exists(dataset_test_name)):
    test_table = get_test_table()
    test_table.rdd.map(make_vw_test).saveAsTextFile(dataset_test_name)
    test_table.registerTempTable('test_table')
else:
    print('already exists')

CPU times: user 102 ms, sys: 69.9 ms, total: 172 ms
Wall time: 16min 50s


In [50]:
%%time
merge_parted_file(dataset_test_name, 'data/dataset-test-v4.txt')

file merged
(-1,) replaced with -1
(1,) replaced with 1
done
CPU times: user 39.4 ms, sys: 52.8 ms, total: 92.2 ms
Wall time: 8min 8s


In [51]:
! vw -d data/dataset-test-v4.txt -i model -t -k -p test_predictions.txt --progress 1000000 --link=logistic

only testing
predictions = test_predictions.txt
Num weight bits = 28
learning rate = 0.5
initial_t = 0
power_t = 0.5
using no cache
Reading datafile = data/dataset-test-v4.txt
num sources = 1
average  since         example        example  current  current  current
loss     last          counter         weight    label  predict features
0.000000 0.000000      1000000      1000000.0  unknown   0.0471       12
0.000000 0.000000      2000000      2000000.0  unknown   0.1998       12
0.000000 0.000000      3000000      3000000.0  unknown   0.0843       12
0.000000 0.000000      4000000      4000000.0  unknown   0.2207       12
0.000000 0.000000      5000000      5000000.0  unknown   0.0162       12
0.000000 0.000000      6000000      6000000.0  unknown   0.1677       12
0.000000 0.000000      7000000      7000000.0  unknown   0.1037       12
0.000000 0.000000      8000000      8000000.0  unknown   0.5450       12
0.000000 0.000000      9000000      9000000.0  unknown   0.0974       12
0.000

In [52]:
! cat data/test_predictions.txt | head -n2

0.221685
0.149993
cat: write error: Broken pipe


In [1]:
import numpy as np

def read_vw_predictions(p):
    y_pred = []
    with open(p, "r") as f:
        for line in f:
            y_pred.append(float(line.split()[0]))
    return np.array(y_pred)

y_pred = read_vw_predictions("data/test_predictions.txt")

In [2]:
def read_pred_data(file_name):
    display_ids = []
    ad_ids = []
    with open(file_name, 'r') as f:
        for line in f:
            #print(line)
            elems = list(line.split(','))
            
            try:
                display_ids.append(int(elems[0]))
                ad_ids.append(int(elems[5]))
            except:
                print(line)
    return display_ids, ad_ids

In [79]:
test_table.write.csv('data/test_table.folder')

In [81]:
merge_parted_file('data/test_table.folder', 'data/test_table.csv', False)

file merged
done


In [3]:
display_ids, ad_ids = read_pred_data('data/test_table.csv')

In [4]:
from tqdm import tqdm
import numpy as np

In [5]:
display_ids = np.array(display_ids)
ad_ids = np.array(ad_ids)
display_to_set_of_ads = dict()
for dis_id, ad_id in tqdm(zip(display_ids, ad_ids), total=len(display_ids)):
    if dis_id not in display_to_set_of_ads:
        display_to_set_of_ads[dis_id] = list()
        
    display_to_set_of_ads[dis_id].append(ad_id)

100%|██████████| 32225162/32225162 [02:01<00:00, 265127.77it/s]


In [6]:
display_to_set_of_ads[16874594]

[170392, 150083, 66758, 162754, 180797, 172888]

In [7]:
id2score = {(dis_id, ad_id): score for ad_id, dis_id, score in zip(ad_ids, display_ids, y_pred)}

In [8]:
ordered_display_to_set_of_ads = list(sorted(display_to_set_of_ads.items(), key=lambda x: x[0]))

In [12]:
with open('submission.csv', 'w') as f:
    print('display_id,ad_id', file=f)
    for display, potential_ids in tqdm(ordered_display_to_set_of_ads):
        print(str(display) + ',' + ' '.join(map(str, sorted(potential_ids, key=lambda x: id2score[(display,x)], reverse=True))), file=f)

100%|██████████| 6245533/6245533 [03:27<00:00, 30051.84it/s]


In [13]:
! kaggle competitions submit -f submission.csv -m "kek" outbrain-click-prediction

100%|████████████████████████████████████████| 260M/260M [00:03<00:00, 80.6MB/s]
Successfully submitted to Outbrain Click Prediction

In [14]:
! head submission.csv

display_id,ad_id
16874594,170392 172888 162754 150083 66758 180797
16874595,8846 143982 30609
16874596,289915 289122 132820 11430 153260 57197 173005 288385
16874597,285834 305790 143981 182039 308836 155945 137858 180965
16874598,145937 335632 67292 250082
16874599,91681 173130 210516 296295 213116 163776
16874600,57591 70529 30682 133050 2150 114836
16874601,190713 92003 129490 14082 140942 118470
16874602,154918 281563 269017 131316 268548
