In [1]:
%matplotlib inline

In [2]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import dask.dataframe as dd
import dask.bag as db
import dask.diagnostics as dg
import dask
from dask.multiprocessing import get as mp_scheduler
from dask.multiprocessing import get as mt_scheduler
from bokeh.io import output_notebook
from typing import List

output_notebook()
plt.style.use('seaborn-whitegrid')

In [3]:
MEGABYTES = 1024**2

# Kaggle

https://github.com/Kaggle/kaggle-api

```bash
pip install kaggle
mkdir -p ~/.kaggle/
cp ~/Downloads/kaggle.json ~/.kaggle/
chmod 600 ~/.kaggle/kaggle.json
kaggle competitions download -c web-traffic-time-series-forecasting
```

```bash
DATA_HOME=~/.kaggle/competitions/web-traffic-time-series-forecasting
for i in $(ls $DATA_HOME); do
    unzip $DATA_HOME/$i -d $DATA_HOME/
done
```

In [4]:
home = '/home/severo/.kaggle/competitions/web-traffic-time-series-forecasting/'
files = !ls {home}
datasets = {f.split('.')[0]:home+f for f in files if f.endswith('.csv')}
datasets

{'key_1': '/home/severo/.kaggle/competitions/web-traffic-time-series-forecasting/key_1.csv',
 'key_2': '/home/severo/.kaggle/competitions/web-traffic-time-series-forecasting/key_2.csv',
 'sample_submission_1': '/home/severo/.kaggle/competitions/web-traffic-time-series-forecasting/sample_submission_1.csv',
 'sample_submission_2': '/home/severo/.kaggle/competitions/web-traffic-time-series-forecasting/sample_submission_2.csv',
 'train_1': '/home/severo/.kaggle/competitions/web-traffic-time-series-forecasting/train_1.csv',
 'train_2': '/home/severo/.kaggle/competitions/web-traffic-time-series-forecasting/train_2.csv'}

# Pandas

In [5]:
sum(pd.read_csv(d).memory_usage(deep=True).sum() for d in datasets.values())/MEGABYTES

6605.351316452026

In [6]:
pd.read_csv(datasets['train_1'], chunksize=5).get_chunk()

Unnamed: 0,Page,2015-07-01,2015-07-02,2015-07-03,2015-07-04,2015-07-05,2015-07-06,2015-07-07,2015-07-08,2015-07-09,...,2016-12-22,2016-12-23,2016-12-24,2016-12-25,2016-12-26,2016-12-27,2016-12-28,2016-12-29,2016-12-30,2016-12-31
0,2NE1_zh.wikipedia.org_all-access_spider,18.0,11.0,5.0,13.0,14.0,9.0,9.0,22.0,26.0,...,32,63,15,26,14,20,22,19,18,20
1,2PM_zh.wikipedia.org_all-access_spider,11.0,14.0,15.0,18.0,11.0,13.0,22.0,11.0,10.0,...,17,42,28,15,9,30,52,45,26,20
2,3C_zh.wikipedia.org_all-access_spider,1.0,0.0,1.0,1.0,0.0,4.0,0.0,3.0,4.0,...,3,1,1,7,4,4,6,3,4,17
3,4minute_zh.wikipedia.org_all-access_spider,35.0,13.0,10.0,94.0,4.0,26.0,14.0,9.0,11.0,...,32,10,26,27,16,11,17,19,10,11
4,52_Hz_I_Love_You_zh.wikipedia.org_all-access_s...,,,,,,,,,,...,48,9,25,13,3,11,27,13,36,10


In [7]:
pd.read_csv(datasets['key_1'], chunksize=5).get_chunk()

Unnamed: 0,Page,Id
0,!vote_en.wikipedia.org_all-access_all-agents_2...,bf4edcf969af
1,!vote_en.wikipedia.org_all-access_all-agents_2...,929ed2bf52b9
2,!vote_en.wikipedia.org_all-access_all-agents_2...,ff29d0f51d5c
3,!vote_en.wikipedia.org_all-access_all-agents_2...,e98873359be6
4,!vote_en.wikipedia.org_all-access_all-agents_2...,fa012434263a


# Dask

```bash
jupyter labextension install jupyterlab_bokeh
```

In [8]:
(
    dd
    .read_csv(datasets['key_1'])
    .head()
)

Unnamed: 0,Page,Id
0,!vote_en.wikipedia.org_all-access_all-agents_2...,bf4edcf969af
1,!vote_en.wikipedia.org_all-access_all-agents_2...,929ed2bf52b9
2,!vote_en.wikipedia.org_all-access_all-agents_2...,ff29d0f51d5c
3,!vote_en.wikipedia.org_all-access_all-agents_2...,e98873359be6
4,!vote_en.wikipedia.org_all-access_all-agents_2...,fa012434263a


In [9]:
with dg.ProgressBar(), dg.ResourceProfiler(dt=0.1) as rprof, dg.Profiler() as prof:
    dag = (
        dd
        .read_csv(datasets['train_1'])
        .sum()
    )
    
    result = dag.compute()
    
dg.visualize([rprof, prof])

[########################################] | 100% Completed | 39.8s


In [10]:
with dg.ProgressBar(), dg.ResourceProfiler(dt=0.1) as rprof, dg.Profiler() as prof:
    dag = (
        dd
        .read_csv(datasets['train_1'], blocksize=10*MEGABYTES)
        .sum()
    )
    
    result = dag.compute()
    
dg.visualize([rprof, prof])

[########################################] | 100% Completed | 18.8s


In [11]:
headers = (
    open(datasets['train_1'], 'r') # open file for reading
    .readline()                    # get first line only
    .strip()                       # remove trailing \n
    .replace('"', '')              # removes quotes
    .split(',')                    # casts string to list on ','
)

headers[:5]

['Page', '2015-07-01', '2015-07-02', '2015-07-03', '2015-07-04']

In [12]:
with dg.ProgressBar(), dg.ResourceProfiler(dt=0.1) as rprof, dg.Profiler() as prof:
    dag = (
        db
        .read_text(datasets['train_1'], blocksize=10*MEGABYTES)     # loads file in chunks of 10MB
        .map(str.strip)                                             # removes trailing \n
        .map(str.split, ',')                                        # splits each line into a list 
        .filter(lambda x: x[0] != '"Page"')                         # filters out first line
        .map(lambda x: x[-len(headers)+1:])                         # ignores first column (index)
        .map(lambda x: np.array([float(y) if y else 0 for y in x])) # creates an array of floats
        .sum()                                                      # aggregates arrays
    )

    result2 = dag.compute()
    
dg.visualize([rprof, prof])

[########################################] | 100% Completed | 11.4s
