In [1]:
import pandas as pd
import numpy as np
import dask.dataframe as dd
from dask.imperative import delayed
from dask.diagnostics import ProgressBar
import h5py
from datetime import datetime
import matplotlib.pyplot as plt

import time
import os

%matplotlib notebook


  warn("dask.imperative has been moved to dask.delayed")


In [2]:
# This will show a progress bar on all operations of dask 
from dask.diagnostics import ProgressBar
ProgressBar().register()

In [None]:
# This creates a disk cache for operations that spill out of disk
import chest
chest= chest.Chest()

# Loading data, easy as pie
If you know **pandas** you'll be right at home.

In [8]:
# Dataframes
train = dd.read_csv('data/train.csv', parse_dates=['date_time'])
test = dd.read_csv('data/test.csv', parse_dates=['date_time'])


### That happened very quick, didn't it?
That's because dask execution is *lazy*, nothing actually happened when you create a *dask.dataframe* instance. It does not go and look up for the data until you do some calculation. What it did was create the structure of the dataframe:


In [9]:
train

dd.DataFrame<from-de..., npartitions=122>

# Some quick look at the data:

In [None]:
print(len(train))

In [10]:
train.head()

[########################################] | 100% Completed |  1.8s


Unnamed: 0,date_time,site_name,posa_continent,user_location_country,user_location_region,user_location_city,orig_destination_distance,user_id,is_mobile,is_package,...,srch_children_cnt,srch_rm_cnt,srch_destination_id,srch_destination_type_id,is_booking,cnt,hotel_continent,hotel_country,hotel_market,hotel_cluster
0,2014-08-11 07:46:59,2,3,66,348,48862,2234.2641,12,0,1,...,0,1,8250,1,0.0,3.0,2.0,50.0,628.0,1.0
1,2014-08-11 08:22:12,2,3,66,348,48862,2234.2641,12,0,1,...,0,1,8250,1,1.0,1.0,2.0,50.0,628.0,1.0
2,2014-08-11 08:24:33,2,3,66,348,48862,2234.2641,12,0,0,...,0,1,8250,1,0.0,1.0,2.0,50.0,628.0,1.0
3,2014-08-09 18:05:16,2,3,66,442,35390,913.1932,93,0,0,...,0,1,14984,1,0.0,1.0,2.0,50.0,1457.0,80.0
4,2014-08-09 18:08:18,2,3,66,442,35390,913.6259,93,0,0,...,0,1,14984,1,0.0,1.0,2.0,50.0,1457.0,21.0


In [None]:
train.dtypes

In [None]:
# Categorize some vars:
train.srch_adults_cnt = train.srch_adults_cnt.astype('category')
train.srch_children_cnt = train.srch_children_cnt.astype('category')
train.is_mobile = train.is_mobile.astype('category')

# Basic operations

In [None]:
train.groupby(['srch_adults_cnt','srch_children_cnt']).user_id.count().compute()

In [None]:
# How many distinct countries do we have
train.user_location_country.value_counts().compute().head()

In [None]:
# At what time of the day people make hotel requests?
result = train.groupby(train.date_time.dt.time).user_id.count()
res=result.compute()


In [None]:
res.plot()
plt.show()

In [None]:
result = train.groupby(train.date_time.dt.date).user_id.count()
res=result.compute()

In [None]:
res.plot()
plt.show()

In [None]:
del res

# Indices
We saw the performance of dask isn't _that_ good. Can we hack around to improve it?
Let's try with indices. This is the variable that defines **partititions** in the dataframes.

In [None]:
train_ind=train
train_ind.set_index(train_ind.date_time.dt.date)

In [None]:
result1 = train_ind.groupby(train_ind.date_time.dt.time).user_id.count()
result1.compute()

In [None]:
result2 = train_ind.groupby(train_ind.date_time.dt.date).user_id.count()
res=result2.compute()

In [None]:
del result, result2, train_ind

# Caching
**dask** provides an experimental freature called *opportunistic caching*, that intelligently stores intermediate results for faster access later.
Bear in mind that this feature is *experimental*, dask may underestimate the size of an object in memory, and when loaded, you may enjoy some swapping fun.

The approach used is distinct from the *Least Recently Used (LRU)* approach used in **Spark**.
The philosophy is more about keeping cached what you'll be needing in analytic computations, i.e.:
- Expensive to recompute (in seconds)
- Cheap to store (in bytes)
- Frequently used
- Recenty used

If you want to know more: 
- https://en.wikipedia.org/wiki/Cache_algorithms
- https://dask.pydata.org/en/latest/caching.html
- http://matthewrocklin.com/blog/work/2015/08/03/Caching
- https://github.com/blaze/cachey

In [None]:
# Let's perform some ops on the same variable
max_dis=train.orig_destination_distance.max().compute()
min_dis=train.orig_destination_distance.min().compute()
mean_dis=train.orig_destination_distance.mean().compute()
print(min_dis, max_dis, mean_dis)

In [None]:
# Let's cache 
# you need package cachey installed to do this
from dask.cache import Cache
cache = Cache(2e9)
cache.register()

In [None]:
# Repeat the operation
max_dis=train.orig_destination_distance.max().compute()
min_dis=train.orig_destination_distance.min().compute()
mean_dis=train.orig_destination_distance.mean().compute()
print(max_dis, min_dis, mean_dis)

# Storing data
dask provides convenient methods to store data on common formats, such as *csv* and *hdf*.
Besides, it supports its own format called *castra*. It's two main characteristics:
- It's **chunked**: Data is splitted in order to apply methods on parallel.
- It's **columnar**: when you load a csv, to retrieve a column of this file you have to process the whole file and split lines. With a *columnar* format, you can just load the column you're interested. Most analytic operations just require some of the columns of a datasets, not all of them, so processing them is a waste of time.

Running this example you see that writing to *hdf* is ridiculously faster, but this happens because the first time of the write, it had to scan the files, and the second it didn't.
This happens even though caching is *not* activated in dask. This may be because caching ocurring at OS level.

In [4]:
train.to_csv('data/dask_output.csv')

[########################################] | 100% Completed | 23min 37.0s


In [7]:
train.to_hdf('data/dask_output.hdf5', key='x')

[########################################] | 100% Completed |  6min 23.3s


In [11]:
total=dd.concat(train,test)

dd.DataFrame<from-de..., npartitions=122>

# Dask Arrays
Probably the most successful part of the project but maybe with least application on Kernel/LR in itself.
It's the on-disk counterpart of **Numpy** (not the whole API, just the most common functions)

In [48]:
def random_array(array_name='random.hdf5', size=1000000000):
    import numpy as np
    np.random.seed(0)
    if os.path.exists(array_name):
        return None
        
    print("Store random array in %s" % array_name)
    import h5py

    with h5py.File(os.path.join('data',array_name)) as f:
        dset = f.create_dataset('/x', shape=(size,), dtype='f4')
        for i in range(0, size, 1000000):
            dset[i: i + 1000000] = np.random.exponential(size=1000000)

In [47]:
# Let's create 2 arrays of 1 billion (1e9) elements.
# Don't try this in memory
random_array('random1.hdf5', size=int(2e9))
random_array('random2.hdf5', size=int(2e9))

Store random array in random2.hdf5


In [3]:
# Let's load this into dask arrays.
import h5py
files = [h5py.File('data/random%s.hdf5' % (i+1))['/x'] for i in range(2)]

In [4]:
files

[<HDF5 dataset "x": shape (1000000000,), type "<f4">,
 <HDF5 dataset "x": shape (1000000000,), type "<f4">]

In [5]:
import dask.array as da
from dask.diagnostics import ProgressBar
ProgressBar().register()
arrays = [da.from_array(file,chunks=(10000000, 10000000)) for file in files]

In [6]:
# This creates a disk cache for operations that spill out of disk
import chest
chest= chest.Chest(available_memory=1e9)

In [25]:
dot=arrays[0].dot(arrays[1])
dot.compute()

[########################################] | 100% Completed |  2min 31.4s


1000059832.0

### What's the memory usage for these two arrays?

In [49]:
print('Array sizes: %s MB ' % ((arrays[0].nbytes+arrays[1].nbytes)/1024))

Array sizes: 7812500.0 MB 


### Neither of these fitted in your laptop.
You just went big data, yay!

### Outputting files
**dask** provides methods to export objects s
Previous operation was ok because the output only produced one number. If you want to export files greater than memory, **dask** struggles a bit.

In [8]:
# What about the outer product?
outer=arrays[0]*arrays[1]

In [10]:
import bcolz
out = bcolz.zeros(shape=outer.shape, rootdir='output.bcolz')

In [11]:
# Store array
da.store(outer, out)  # doctest: +SKIP

[########################################] | 100% Completed |  3min 41.4s


In [24]:
outer[10000:120000].compute()

[########################################] | 100% Completed |  0.2s


array([  0.02259074,   1.22242928,   0.46698231, ...,   6.49351358,
        12.0568552 ,   1.98702204], dtype=float32)

## Don't try this at home

In [None]:
da.to_hdf5('data/product.hdf5', '/x', outer.compute(chest=chest))

[########################################] | 100% Completed |  1min 46.4s
