# Introduction to Dask

This notebook is aimed at new users of Dask.  A working knowledge of pandas is assumed.

# What is Dask?

Dask allows users to work with datasets too large to fit in memory while using a similar syntax to some of the most popular python packages.  Dask has high level API's similar to the following packages:
- Pandas:  Tabular Categorical and Numeric Data
- Numpy:  Numeric Data
- Python lists (similar to Spark RDDs):  Unstructured Data

Dask also scales well.  It performs well on a single machine and on a cluster with thousands of cores.

# High Level API's

## Dask Dataframe mimics Pandas

## Dask Array mimics Numpy Arrays

## Dask Bag mimics iterators, Toolz, and PySpark

***

# Dask DataFrame Under the Hood

- Dask divides a a large dataset that potentially can't fit into memory into many smaller pandas dataframes.  Each pandas dataframe makes up a partition.
- Dask will load in memory and work with each of the partitions as needed to perform the requested operations.
- The user can and sometimes needs to specify the size of dask partitions.  Dask partitions should fit comfortably in memory.

` Rule of Thumb: Keep Partition sizes ~ 100MB.`

`Better forumula: total_memory/(9*cores)`

In general, the high and low level API's (Collections) build task graphs consisting of the requested operations that are then run in an appropriate order by a Scheduler.

<img src='dask-overview.jpg' style="width:70%">

***

# Schedulers

- Various Scheduler Options Exist
    - Local Threads - Default for Dask Array, Dask Dataframe, Dask Delayed
    - Local Processes - Recommended to use Distributed Instead
    - Synchronous (Debugging)
    - Distributed - Works on a cluster or run locally on a single machine
        - Dashboard
        - Better Data Locality

https://docs.dask.org/en/latest/scheduling.html

***

# Examples

In [1]:
import dask, dask.dataframe as dd
import distributed
import sys
import pandas as pd

In [2]:
cluster = distributed.LocalCluster(n_workers=2, threads_per_worker=1) 
client = distributed.Client(cluster)
client

0,1
Client  Scheduler: tcp://127.0.0.1:40795  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 2  Cores: 2  Memory: 8.26 GB


In [None]:
size_GB = dask.datasets.timeseries(start='2000-01-01',
                        end='2000-01-31',
                        freq='1s',
                        partition_freq='1d',
                        seed=42).compute().memory_usage(deep=True).sum()/2**30
print(f'~{size_GB:0.2f} GB for 1 month')
print(f'~{size_GB*12*5:0.2f} GB for 5 years')

In [None]:
!free -h |  tail -n +2 | awk '{ print $1 $3 " used"}'

In [51]:
df = dask.datasets.timeseries(start='2000-01-01',
                        end='2000-06-30',
                        freq='1s',
                        partition_freq='1d',
                        seed=42)
df = df.persist()

In [None]:
!free -h |  tail -n +2 | awk '{ print $1 $3 " used"}'

In [None]:
df.head()

Couldn't work with this in pandas on my machine, but I can in Pandas

# Cheap/Expensive Operations in Dask

## Cheap Operations

### Trivially parallelizable operations (fast):

- Element-wise operations: df.x + df.y, df * df
- Row-wise selections: df[df.x > 0]
- Indexing by Value: df.loc[4.0:10.5]
- Common aggregations: df.x.max(), df.max()
- Is in: df[df.x.isin([1, 2, 3])]
- Date time/string accessors: df.timestamp.month

### Cleverly parallelizable operations (fast):

- groupby-aggregate (with common aggregations): df.groupby(df.x).y.max(), df.groupby('x').max()
- groupby-apply on index: df.groupby(['idx', 'x']).apply(myfunc), where idx is the index level name
- value_counts: df.x.value_counts()
- Drop duplicates: df.x.drop_duplicates()
- Join on index: dd.merge(df1, df2, left_index=True, right_index=True) or dd.merge(df1, df2, on=['idx', 'x']) where idx is the - index name for both df1 and df2
- Join with Pandas DataFrames: dd.merge(df1, df2, on='id')
- Element-wise operations with different partitions / divisions: df1.x + df2.y
- Date time resampling: df.resample(...)
- Rolling averages: df.rolling(...)
- Pearson’s correlation: df[['col1', 'col2']].corr()



## Expensive Operations

### Operations requiring a shuffle (slow-ish, unless on index)
- Set index: df.set_index(df.x)
- groupby-apply not on index (with anything): df.groupby(df.x).apply(myfunc)
- Join not on the index: dd.merge(df1, df2, on='name')

# Optimal Number of Partitions

Ideally, fewer partitions is better, but each partition needs needs to comfortably fit in memory.  Dask commonly has 2-3 partitions in memory at a time so it always has something to work on.  

The reason fewer partitions is better is each task has an overhead of ~1 ms, and more paritions means more tasks and thus more overhead.  The overhead from a few thousand tasks is usually not going to be noticeable.

- https://docs.dask.org/en/latest/best-practices.html#avoid-very-large-partitions
- https://docs.dask.org/en/latest/best-practices.html#avoid-very-large-graphs

### Trivial Example of Too Many Partitions

In [22]:
%%time
import dask.array as da
CHUNK_SIZE = 1000 # bytes
x = da.random.random(10_000_000, chunks=CHUNK_SIZE)
x.sum().compute()



CPU times: user 44.4 s, sys: 2.33 s, total: 46.7 s
Wall time: 57.3 s


5001634.491113036

In [26]:
%%time
import dask.array as da
CHUNK_SIZE = 10_000 # bytes
x = da.random.random(10_000_000, chunks=CHUNK_SIZE)
x.sum().compute()

CPU times: user 3.86 s, sys: 186 ms, total: 4.04 s
Wall time: 4.6 s


4999800.495494896

You really should be doing this computation in numpy

## Look at the statistical profiler (requires Bokeh?) in the link above, It's a good way to debug.

- Task stream 
    - Each line is a thread
    - Color Coordination with tasks below in Progress Bar
    - Red rectangle is worker communication
- Progress Bars 
    - Gray means ready to be computed, but waiting
    - Bright Color means in memory
    - Transparentish colored portion to the left means processed, and no longer in memory
- Tasks Processing
    - Shows how many tasks are ready to be processed by the worker, red means not enough tasks queued to keep worker busy
- Bytes Stored
    - Shows how much memory each worker has in memory at any given time.  Turns orange if it get's to a certain level, and red if it's higher still.
- Many other features, click on other tabs to see

- https://distributed.dask.org/en/latest/diagnosing-performance.html
- https://www.youtube.com/watch?v=N_GqzcuGLCY

Often you might open a large data set, reduce the size by grouping somehow.  It's a good idea to repartition after doing so.

In [146]:
df = dask.datasets.timeseries(start='2000-01-01',
                        end='2001-06-30',
                        freq='1s',
                        partition_freq='1d',
                        seed=42)

In [147]:
df = df.groupby((df.index.year-df[0].year)*1000+df.index.dayofyear).sum()
df = df.repartition(npartitions=df.npartions//60**2)
df = df.persist() # to save calculations for many further calculations

In [148]:
df.head()

Unnamed: 0_level_0,id,x,y
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
2000001,86398019,-6.885473,41.831835
2000002,86403247,52.697918,-75.17113
2000003,86396708,-388.17118,76.150513
2000004,86399204,-172.614676,-20.051186
2000005,86392161,256.048209,48.594984


***

***

# When NOT to use Dask

Dask is a great tool when the data is too large to work with in memory, but it is not the best tool to use for all use cases.

In the Dask documentation Best Practices Section (https://docs.dask.org/en/latest/best-practices.html):
- Dask documentation recommends using plain pandas, numpy, etc. when those tools are appropriate over dask.
- Dask documentation also recommends a common workflow of reducing a large dataset (e.g. with statistics) with Dask until it can fit in memory, and then switching to pandas, numpy, etc.
- Parallelism brings extra complexity and overhead. Sometimes it’s necessary for larger problems, but often it’s not. Before adding a parallel computing system like Dask to your workload you may want to first try some alternatives:
    - Better Algorithms
    - Better File Formats
    - Compiled Code
    - Sampling
    - Reduce Bottlenecks via Profiling


***

# Low Level API's

## Dask Delayed mimics for loops and wraps custom code

In [None]:
from dask import delayed
L = []
for fn in filenames:                  # Use for loops to build up computation
    data = delayed(load)(fn)          # Delay execution of function
    L.append(delayed(process)(data))  # Build connections between variables

result = delayed(summarize)(L)
result.compute()

In [None]:
import dask
lazy_results = [ ]
for a in A:
    for b in B:
        if a < b:
            c = dask.delayed(f)(a, b) # add lazy task
        else: 
            c = dask.delayed(g)(a, b) # add lazy task
        lazy_results.append(c)


results = dask.compute(*lazy_results) # compute all in parallel

Be purposeful in choosing threads vs processes for dask.  If using lots of pandas/numpy functions then threads are better, if using lots of native python functions, processes are better.

## Dask Futures interface provides general submission of custom tasks

In [None]:
from dask.distributed import Client
client = Client('scheduler:port')

futures = []
for fn in filenames:
    future = client.submit(load, fn)
    futures.append(future)

summary = client.submit(summarize, futures)
summary.result()

# Dataset

https://anaconda.org/jonmmease/spatial-partitioning-for-datashader-points-rendering/notebook

# Useful Hyperlinks

Dask Official Documentation - https://docs.dask.org/en/latest/


In [4]:
%%html
<iframe src="https://docs.dask.org/en/latest/" width="900" height="400"></iframe>