In [None]:
from IPython.display import Image, SVG

import numpy as np
np.set_printoptions(precision=3)

## jseabold/dask-pydata-chi-2016

# About Me

* Data Scientist at [Civis Analytics](https://civisanalytics.com/)
* [@jseabold](http://twitter.com/jseabold)
* <a href="mailto:jsseabold@gmail.com.com" target="_top">jsseabold@gmail.com</a>

<img src="http://dask.readthedocs.io/en/latest/_images/dask_horizontal.svg"
     align="left"
     width="30%"
     alt="Dask logo">

## Introduction

* At a high-level, dask provides parallel NumPy and Pandas
* It also provides primitives or ad-hoc parallel algorithms (dask.delayed similar to joblib.delayed)

Dask is a flexible parallel computing library for analytics. Dask emphasizes the following virtues:

* **Familiar**: Provides parallelized NumPy array and Pandas DataFrame objects
* **Native**: Enables distributed computing in Pure Python with access to the PyData stack.
* **Fast**: Operates with low overhead, low latency, and minimal serialization necessary for fast numerical algorithms
* **Flexible**: Supports complex and messy workloads
* **Scales up**: Runs resiliently on clusters with 100s of nodes
* **Scales down**: Trivial to set up and run on a laptop in a single process
* **Responsive**: Designed with interactive computing in mind it provides rapid feedback and diagnostics to aid humans

## Dask Resources

[Examples and Tutorials](http://dask.pydata.org/en/latest/examples-tutorials.html)

[Matt Rocklin's Dask YouTube Playlist](https://www.youtube.com/channel/UCFYhuCL11p3oO9375_NbLQg)

### The Dask Computational Model

* Parallel programming with task scheduling
* Familiar abstractions for executing tasks in parallel on data that doesn't fit into memory
  * Arrays, DataFrames
* Task graphs
  * Representation of a parallel computation
* Scheduling
  * Executes task graphs in parallel on a single machine using threads or processes
  * Support for distributed execution using `dask.distributed`
    * Workflows for the distributed scheduler might be different than those presented below

### Note

* If you don't have a big data problem, don't use a big data tool
* Many of the below examples could easily be handled in-memory with some better choices

In [None]:
Image("http://dask.pydata.org/en/latest/_images/collections-schedulers.png")

## Dask Array

* Subset of numpy ndarray interface using blocked algorithms
* Dask array complements large on-disk array stores like HDF5, NetCDF, and BColz

In [None]:
SVG("http://dask.pydata.org/en/latest/_images/dask-array-black-text.svg")

* Arithmetic and scalar mathematics, `+, *, exp, log, ...`
* Reductions along axes, `sum(), mean(), std(), sum(axis=0), ...`
* Tensor contractions / dot products / matrix multiply, `tensordot`
* Axis reordering / transpose, `transpose`
* Slicing, `x[:100, 500:100:-2]`
* Fancy indexing along single axes with lists or numpy arrays, `x[:, [10, 1, 5]]`
* The array protocol `__array__`
* Some linear algebra `svd, qr, solve, solve_triangular, lstsq`

[Full API Documentation](http://dask.pydata.org/en/latest/array-api.html)

In [None]:
import dask.array as da

* Create a dask array using `da.arange` analogous to `np.arange`
* The idea of the `chunk` is important and has performance implications

In [None]:
x = da.arange(25, chunks=5)

In [None]:
y = x ** 2

* Dask operates on a delayed computation model
* It builds up an expression of the computation in chunks
* Creates a **Task Graph** that you can explore

In [None]:
y

In [None]:
y.visualize()

In [None]:
y.dask

* You can execute the graph by using **`compute`**

In [None]:
y.compute()

* As an example of the `__array__` protocol

In [None]:
np.array(y)

### Exercise

Create a `da.array`, `x` that has 25 chunks and is made up of the first 2500 numbers and take the square root of all of the elements. Print the last element.

In [None]:
# [ Solution Here ]

In [None]:
%load solutions/dask_root.py

### Scheduling Backends

* You can control the scheduler backend that is used by `compute`
* These choices can be important in a few situations
  * Debugging
  * Fast tasks
  * Cross-task communication

#### Synchronous Queue Scheduler

* `dask.get` is an alias for the synchronous backend. Useful for debugging.

In [None]:
import dask

In [None]:
y.compute(get=dask.get)

#### Threaded Scheduler

* `dask.threaded.get` is the default
* Uses a thread pool backend
* A thread is the smallest unit of work that an OS can schedule
  * Threads are "lightweight"
* They execute within the same process and thus share the same memory and file resources ([everything is a file](https://en.wikipedia.org/wiki/Everything_is_a_file) in unix)
* Limitations
  * Limited by the Global Interpreter Lock (GIL)
    * A GIL means that only one thread can execute at the same time
  * Pure python functions likely won't show a speed-up (with a few exceptions)
    * C code can release the GIL
    * I/O tasks are not blocked by the GIL

In [None]:
y.compute(get=dask.threaded.get)

* By default, dask will use as many threads as there are logical processors on your machine

In [None]:
from multiprocessing import cpu_count
cpu_count()

#### Process Scheduler

* Backend that uses multiprocessing
* Uses a process pool backend
  * On unix-like system this is a system call to `fork`
  * Calling `fork` creates a new child process which is a *copy*(-on-write) of the parent process
  * Owns its own resources. This is "heavy"
* Limitations
  * Relies on serializing objects for the workers (slow and error prone)
  * Workers must communicate through parent process

In [None]:
import dask.multiprocessing

y.compute(get=dask.multiprocessing.get)

#### Distributed Executor

* This is part of the `dask.distributed` library
* Distributes work over the network across machines using web sockets and an asychronous web framework for Python (tornado)
  * Some recent additions make this work for, e.g., distributed DataFrames
* We're not going to cover the distributed scheduler
  * Matt and Jim gave a [nice talk](https://www.youtube.com/watch?v=PAGjm4BMKlk) at SciPy 2016 on distributed computing with dask

## Random Array

* Create a billion number array of 32-bit floats on disk using HDF5
* HDF5 is an implementation of the Hierarchical Data Format common in scientific applications
  * Multiple data formats (tables, nd-arrays, raster images)
  * Fast lookups via B-tree indices (like SQL)
  * Filesystem-like data format
  * Support for meta-information
* **Warning**, the result of this operation is 4 GB

In [None]:
import os

import h5py
import numpy as np


def random_array():
    if os.path.exists(os.path.join('data', 'random.hdf5')):
        return

    print("Create random data for array exercise")

    with h5py.File(os.path.join('data', 'random.hdf5')) as f:
        dset = f.create_dataset('/x', shape=(1000000000,), dtype='f4')
        for i in range(0, 1000000000, 1000000):
            dset[i: i + 1000000] = np.random.exponential(size=1000000)

random_array()

### Blocked Algorithms

* Dask works on arrays by executing blocked algorithms on chunks of data
* For example, consider taking the mean of a billion numbers. We might instead break up the array into 1,000 chunks, each of size 1,000,000, take the sum of each chunk, and then take the sum of the intermediate sums and divide this by the total number of observations.
* The result (one sum on one billion numbers) is obtained from calculating many smaller results (one thousand sums on one million numbers each, followed by another sum of a thousand numbers.)

In [None]:
import h5py
import os

f = h5py.File(os.path.join('data', 'random.hdf5'))
dset = f['/x']

* If we were to implement this ourselves it might look like this
1. Computing the sum of each 1,000,000 sized chunk of the array
2. Computing the sum of the 1,000 intermediate sums

In [None]:
sums = []
for i in range(0, 1000000000, 1000000):
    chunk = dset[i: i + 1000000]
    sums.append(chunk.sum())

total = np.sum(sums)
print(total / 1e9)

* Dask does this for you and uses the backend scheduler to do so in parallel
* Create a dask array from an array-like structure (any object that implements numpy-like slicing)

In [None]:
x = da.from_array(dset, chunks=(1000000, ))

* x looks and behaves much like a numpy array
  * Arithmetic, slicing, reductions
* Use tab-completion to look at the methods of `x`

In [None]:
result = x.mean()

In [None]:
result

In [None]:
result.compute()

In [None]:
x[:10].compute()

### Exercise

Use `dask.array.random.normal` to create a 20,000 x 20,000 array $X ~ \sim N(10, .1)$ with `chunks` set to `(1000, 1000)`

Take the mean of every 100th element along axis 0.

*Hint*: Recall you can slice with the following syntax [start:end:step]

In [None]:
x[::2]

In [None]:
# [Solution here]

In [None]:
%load solutions/dask_array.py

## Performance vs. NumPy

Your performance may vary. If you attempt the NumPy version then please ensure that you have more than 4GB of main memory.

In [None]:
import numpy as np

In [None]:
%%time
x = np.random.normal(10, 0.1, size=(20000, 20000)) 
y = x[::100].mean(axis=0)
y

Faster and needs only MB of memory

In [None]:
%%time
x = da.random.normal(10, 0.1, size=(20000, 20000), chunks=(1000, 1000))
y = x[::100].mean(axis=0)
y.compute()

## Linear Algebra

* Dask implements a few linear algebra functions that are paraellizable
* `da.linalg.qr`
* `da.linalg.cholesky`
* `da.linalg.svd`

## Dask Bag

* In mathematics, a bags is multiset
* In dask, we can think of them as parallel lists for semi-structured data
  * Nested, variable length, heterogenously typed, etc.
  * E.g., JSON blobs or text data
* Anything that can be represented as a large collection of generic Python objects 
* Mainly used for cleaning and processing
  * I.e., usually the first step in a workflow 
* Bag implements a number of useful methods for operation on sequences like `map`, `filter`, `fold`, `frequencies` and `groupby`
* Streaming computation on top of generators
* Bags use the multiprocessing backend by default

#### Generator Aside

* What are generators in Python?

In [None]:
def func(n=10):
    for i in range(n):
        yield i

In [None]:
func()

In [None]:
for i in func():
    print(i)

## Food Inspection Data

* http://opendatadc.org/dataset/restaurant-inspection-data
* Some processing in `data/clean_inspections.py` results in `data/inspections.zip`
* This archive contains several gzipped files
* Each file contains health inspection data for a month and a year
* Each line in each file is a JSON object

* First unzip the file

In [None]:
import os
import zipfile

inspections_dir = os.path.join('data', 'inspections')
with zipfile.ZipFile(os.path.join('data', 'inspections.zip')) as zf:
    zf.extractall(path=inspections_dir)

* Let's take a look

In [None]:
import dask.bag as db

In [None]:
bag = db.read_text(os.path.join(inspections_dir, 'inspections*.json.gz'))

* Use `take` to grab a single line

In [None]:
bag.take(1)

* This is just unprocessed text
* Using map to process the lines in the text files

In [None]:
import json  # ujson

In [None]:
js = bag.map(json.loads)

In [None]:
js.take(1)

* What are the inspection types?
* We can grab a field using `pluck` and return the `distinct` elements

In [None]:
visits = js.pluck('Inspection Type').distinct()

In [None]:
visits.visualize()

In [None]:
visits.compute()

* How often do we see these inspection types?

In [None]:
counts = js.pluck('Inspection Type').frequencies()

In [None]:
counts.compute()

* Easier to read if sorted

In [None]:
import operator

sorted(counts, key=operator.itemgetter(1), reverse=True)

### Exercise

* Use `filter` and `take` the first 5 records with a `Complaint` inspection type

In [None]:
# [Solution Here]

In [None]:
%load solutions/health_complaints.py

### GroupBy / FoldBy

* GroupBy collects items in your collection so that all items with the same value under some function are collected together into a key-value pair.
* This requires a full on-disk shuffle and is *very* inefficient
* You almost never want to do this in a real workflow if you can avoid it

In [None]:
b = db.from_sequence(['Alice', 'Bob', 'Charlie', 'Dan', 'Edith', 'Frank'])
b.groupby(len).compute()

* Group by evens and odds

In [None]:
b = db.from_sequence(list(range(10)))
b.groupby(lambda x: x % 2).compute()

Group by evens and odds and take the largest value

In [None]:
b.groupby(lambda x: x % 2).map(lambda k, v: (k, max(v))).compute()

* FoldBy, while harder to grok, is much more efficient
* This does a streaming combined groupby and reduction
* Familiar to Spark users as the `combineByKey` method on `RDD`

When using foldby you provide

1. A key function on which to group elements
2. A binary operator such as you would pass to reduce that you use to perform reduction per each group
3. A combine binary operator that can combine the results of two reduce calls on different parts of your dataset

Your reduction must be associative. That is, the order in which the reduction takes place does not matter. 

It will happen in parallel in each of the partitions of your dataset. Then all of these intermediate results will be combined by the combine binary operator.

Taking a step back, this is just what we saw in `sum` above

* `functools.reduce` works like so

In [None]:
import functools

In [None]:
values = range(10)

In [None]:
def func(acc, y):
    print(acc)
    print(y)
    print()
    return acc + y

In [None]:
functools.reduce(func, values)

In [None]:
b.foldby?

In [None]:
b

In [None]:
b.foldby(lambda x: x % 2, binop=max, combine=max).compute()

* Using the restaurant inspection data above, let's find the worst place to eat in DC

In [None]:
js.take(1)

* How many inpections per restuarant are there?
  * These records all appear to be restaurants that have at least one reported violation not each visit

In [None]:
from dask.diagnostics import ProgressBar

In [None]:
counts = js.foldby(key='Name',
                   binop=lambda total, x: total + 1,
                   initial=0,
                   combine=operator.add,            
                   combine_initial=0)

In [None]:
with ProgressBar():
    result = counts.compute()

In [None]:
sorted(result, key=operator.itemgetter(1), reverse=True)[:5]

* Of course, we could have computed this using `frequencies`

In [None]:
result = js.pluck('Name').frequencies()

sorted(result, key=operator.itemgetter(1), reverse=True)[:5]

### Exercise

* In reality, we want to know the worst locations. Re-do the above by Name and Address. Use the `FoldBy` notation rather than frequencies.

* We may be much more concerned with *Critical Violations*. Compute the number of critical violations for each restaurant name.
* Decompose the problem in to pieces
  * First, create a reduce function that computes the total for each inspection for each business name
  * Change the above example to accumulate the total amount instead of count

In [None]:
# [Solution Here]

In [None]:
%load solutions/bag_foldby.py

* What are the types of violations that are caught?

In [None]:
js.take(1)

* We can use the `reduction` method of the `Bag` to compute this
* `reduction` works first within each partition and then across each partition

* First we want to pull ou the `Violations` field

In [None]:
violations = js.pluck('Violations')

* Now we have a Bag of lists, each of which contains the violations for an inspection
* To reduce within the partitions, we need to iterate over the partition and within each list of violations

In [None]:
def get_violations(inspections):
    return {x['Violation Category'] for inspection in inspections for x in inspection}

* To reduce of the partitions, we need to use a set union reduction

In [None]:
def aggregate(sets):
    return functools.reduce(set.union, sets)

In [None]:
js.pluck('Violations').reduction(get_violations, aggregate).compute()

### Exercise

* What are the most frequent violations for each restaurant?

In [None]:
# [Solution here]

In [None]:
%load solutions/violation_counts.py

## Dask DataFrames

## Data

source: http://britains-diet.labs.theodi.org/data/

info: https://www.gov.uk/government/statistics/family-food-open-data

Go ahead and unzip the data.

In [None]:
import zipfile

zf = zipfile.ZipFile('data/nfs.zip')
zf.extractall(path='data/')

### Overview

* subset of the pandas API
* Good for analyzing heterogenously typed tabular data arranged along an index

<img src="http://dask.pydata.org/en/latest/_images/dask-dataframe.svg", width="30%">

**Trivially parallelizable operations (fast)**:

  * Elementwise operations: `df.x + df.y, df * df`
  * Row-wise selections: `df[df.x > 0]`
  * Loc: `df.loc[4.0:10.5]`
  * Common aggregations: `df.x.max(), df.max()`
  * Is in: `df[df.x.isin([1, 2, 3])]`
  * Datetime/string accessors: `df.timestamp.dt.month`

**Cleverly parallelizable operations (fast)**:

  * groupby-aggregate (with common aggregations): `df.groupby(df.x).y.max(), df.groupby('x').max()`
  * value_counts: `df.x.value_counts()`
  * Drop duplicates: `df.x.drop_duplicates()`
  * Join on index: `dd.merge(df1, df2, left_index=True, right_index=True)`
  * Join with Pandas DataFrames: `dd.merge(df1, df2, on='id')`
  * Elementwise operations with different partitions: `df1.x + df2.y`
  * Datetime resampling: `df.resample(...)`
  * Rolling averages: `df.rolling(...)`
  * Pearson Correlations: `df[['col1', 'col2']].corr()`

**Operations requiring a shuffle (slow-ish, unless on index)**

  * Set index: `df.set_index(df.x)`
  * groupby-apply (with anything): `df.groupby(df.x).apply(myfunc)`
  * Join not on the index: `dd.merge(df1, df2, on='name')`

[Full DataFrame API](http://dask.pydata.org/en/latest/dataframe-api.html)

### Reading data

In [None]:
import dask.dataframe as dd

In [None]:
df = dd.read_csv("data/nfs/NFS*.csv")

* `DataFrame.head` is one operation that is not lazy

In [None]:
df.head(5)

### Partitions

* By default the data is partitioned by the file
* In our case, this is good. The files have a natural partition
* When this is not the case, you must do a disk-based shuffle which is slow

In [None]:
df.npartitions

In [None]:
df.known_divisions

* We are going to set the partition explicitly to `styr` to make some operations more performant
* Partitions are denoted by the left-side of the bins for the partitions.
* The final value is assumed to be the inclusive right-side for the last bin.


So

```
[1974, 1975, 1976]
```

Would be 2 partitions. The first contains 1974. The second contains 1975 and 1976. To get three partitions, one for the final observation, duplicate it.

```
[1974, 1975, 1976, 1976]
```

In [None]:
partitions = list(range(1974, 2001)) + [2000]

df = df.set_partition('styr', divisions=partitions)

In [None]:
df.known_divisions

In [None]:
df.divisions

* Nothing yet is loaded in to memory
* Meta-information from pandas is available

In [None]:
df.info()

## DataFrame API

* In addition to the (supported) pandas DataFrame API, dask provides a few more convenient methods
    * `DataFrame.categorize`
    * `DataFrame.map_partitions`
    * `DataFrame.get_partition`
    * `DataFrame.repartition`
    * `DataFrame.set_partition`
    * `DataFrame.to_{bag|castra}`
    * `DataFrame.visualize`
    
* A few methods have a slightly different API

    * `DataFrame.apply`
    * `GroupBy.apply`

### get_partition

In [None]:
df2000 = df.get_partition(26)

In [None]:
type(df2000)

What food group was consumed the most times in 2000?

In [None]:
grp = df2000.groupby('minfd')

In [None]:
size = grp.apply(len, meta='size')

* Turn it into a Series first

In [None]:
minfd = size.compute().idxmax()

In [None]:
print(minfd)

* Get the pre-processed mapping across food grouping variables

In [None]:
food_mapping = pd.read_csv("data/nfs/food_mapping.csv")

* Pandas provides the efficient `isin` method

In [None]:
food_mapping.ix[food_mapping.minfd.isin([minfd])]

### Exercise

* What was the most consumed food group in 1974?

In [None]:
# [Solution here]

In [None]:
%load solutions/nfs_most_purchased.py

### map_partitions

* Map partitions does what you might expect
* Maps a function across partitions
* Let's calculate the most frequently purchase food group for each year

In [None]:
def most_frequent_food(partition):
    # partition is a pandas.DataFrame
    grpr = partition.groupby('minfd')
    size = grpr.size()
    minfd = size.idxmax()
    idx = food_mapping.minfd.isin([minfd])
    description = food_mapping.ix[idx].minfddesc.iloc[0]
    year = int(partition.index[0])
    return year, description

In [None]:
mnfd_year = df.map_partitions(most_frequent_food, 
                              meta={'year': int,
                                    'description': str})

In [None]:
mnfd_year.compute()

### Exercise

* Within each year, group by household `minfd` and calculate daily per capita consumption of each food group. Hint, you want to use `map_partitions`.

In [None]:
# [Solution Here]

In [None]:
%load solutions/average_consumption.py