## What is Dask?

- [Dask](https://docs.dask.org/en/stable/)
- [10 minute intro ](https://docs.dask.org/en/stable/10-minutes-to-dask.html)
- [API Reference ](https://docs.dask.org/en/stable/dataframe-api.html)






**Dask DataFrames coordinate many pandas DataFrames/Series arranged along the index**. A Dask DataFrame is partitioned row-wise, grouping rows by index value for efficiency. These pandas objects may live on disk or on other machines.

Internally, a Dask DataFrame is split into many partitions, where each partition is one Pandas DataFrame. When our index is sorted and we know the values of the divisions of our partitions, then we can be clever and efficient with expensive algorithms (e.g. groupby’s, joins, etc…).

**Use Cases:**

Dask DataFrame is used in situations where pandas is commonly needed, usually when pandas fails due to data size or speed of computation. Common use cases are:

- Manipulating large datasets, even when those datasets don’t fit in memory

- Accelerating long computations by using many cores

- Distributed computing on large datasets with standard pandas operations like `groupby`, `join`, and time series computations

Dask DataFrame may not be the best choice in the following situations:

- If your dataset fits comfortably into RAM on your laptop, then you may be better off just using pandas. There may be simpler ways to improve performance than through parallelism

- If your dataset doesn’t fit neatly into the pandas tabular model, then you might find more use in [dask.bag](https://docs.dask.org/en/stable/bag.html) or [dask.array](https://docs.dask.org/en/stable/array.html)

- If you need functions that are not implemented in Dask DataFrame, then you might want to look at [dask.delayed](https://docs.dask.org/en/stable/delayed.html) which offers more flexibility

- If you need a proper database with all of the features that databases offer you might prefer something like [Postgres](https://www.postgresql.org/) or [SQLite](https://www.sqlite.org/index.html)





In [4]:
#Import libraries and datasets
import pandas as pd
import numpy as np
import scipy as sp
import seaborn as sns
import dask.datasets
import dask.dataframe as dd

ts_data = dask.datasets.timeseries()
df = sns.load_dataset('diamonds')


## Transitioning to Dask DataFrames

In [5]:
# load ddf from existing df
ddf = dd.from_pandas(df,npartitions = 2) 
# many loading options available

ddf #dask dataframe 
# by default it has lazy execution where computation are triggered by compute() (or head) 
ddf.compute() # convert dd to pd.DataFrame
ddf.head(2)

# Attributes of Dask dataframe distinct from pd.Dataframe
ddf.npartitions # number of partitions
ddf.divisions # Divisions includes the minimum value of every partition’s index and the maximum value of the last partition’s index
ddf.partitions[1] # access a particular partition
ddf.partitions[1].index # which have similar pd.DataFrame attributes

# Special consideration

# By default, groupby methods return an object with only 1 partition. 
# This is to optimize performance, and assumes the groupby reduction returns an object that is small enough to fit into memory. 
# If your returned object is larger than this, you can increase the number of output partitions using the split_out argument.
ddf.groupby('cut').mean() #npartitions=1
ddf.groupby('cut').mean(split_out=2) #npartitions=2




Unnamed: 0_level_0,carat,depth,table,price,x,y,z
npartitions=2,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
,float64,float64,float64,float64,float64,float64,float64
,...,...,...,...,...,...,...
,...,...,...,...,...,...,...


In [6]:
# Dask syntax intentionally mimics most well knows pandas apis
ddf.loc[15:20] # subset rows
ddf[["carat","price"]] # subset columns
ddf.dtypes  # access attributes
ddf.head(3)
ddf.query('price > 50') # same as pd.DataFrame

lazy_manipulations = (ddf.query('price > 50').
    groupby('clarity').
    price.mean())
lazy_manipulations.compute() # trigger computation to pd.DataFrame

# dask aggregate has more features than pandas agg equivalent, supports reductions on the same group.

ddf_aggs = (ddf.groupby('cut')
    .aggregate({"price":"mean","carat":"sum"}))

# Can persist data into RAM if possible making future operations on it faster
ddf_aggs = ddf_aggs.repartition(npartitions = 1).persist()

df_merged = ddf.merge(ddf_aggs,left_on= "cut",right_index=True, suffixes=("_original", "_aggregated"))

df_merged.head(2)



Unnamed: 0,carat_original,cut,color,clarity,depth,table,price_original,x,y,z,price_aggregated,carat_aggregated
0,0.23,Ideal,E,SI2,61.5,55.0,326,3.95,3.98,2.43,3457.54197,15146.84
11,0.23,Ideal,J,VS1,62.8,56.0,340,3.93,3.9,2.46,3457.54197,15146.84


Note that not all apis from pandas are available in Dask. For example, `ddf.filter(['carat','price'])` is not available. For more details and a list of available options, see [here](https://docs.dask.org/en/stable/dataframe-api.html).

<div class="challenge">

### Challenge 

a. What is the price per carat over the entire dataset?

b. Create a column called price_to_carat that calculates this for each row

c. Create a column called expensive that flags whether price is greater than price_to_carat

d. How many expensive diamonds are there

<details>
<summary>Solution</summary>

- Average price to carat $4928
- 15003 expensive diamonds compared to whole dataset 

```python
price_per_carat = (ddf.price.sum() / ddf.carat.sum()).compute()

ddf = ddf.assign(price_to_carat = ddf.price / ddf.carat)

def greater_than_avg(price):
    if price > price_per_carat:
        return True
    else:
        return False

ddf = ddf.assign(expensive = ddf.price.apply(greater_than_avg))
ddf.sort_values('expensive',ascending= False).compute()
number_expensive = ddf.expensive.sum().compute()

```
</details>
</div>

## Dask Best Practice Guide

1. **Use `set_index()` sparingly** to speed up data naturally sorted on a single index
    - Use `ddf.set_index('column')`

2. **Persist intelligently**
- If you have the available RAM for your dataset then you can persist data in memory. On distributed systems, it is a way of telling the cluster that it should start executing the computations that you have defined so far, and that it should try to keep those results in memory. 
    - `df = df.persist()`

3. **Repartition to reduce overhead**
    - As you reduce or increase the size of your pandas DataFrames by filtering or joining, it may be wise to reconsider how many partitions you need. Adjust partitions accordingly using repartition. 
    - `df = df.repartition(npartitions=df.npartitions // 100)`

4. **Consider storing large data in [Apache Parquet](https://parquet.apache.org/) Format** (binary column based format)


In [14]:
# Time series data with every second observations from year 2000
ts_data 

# dask can use datetime index to reduce data efficiently
ts_data[["x", "y"]].resample("1h").mean().head()

# Build up lazy data manipulations and compute selectively to reduce data

ts_subset = ts_data.groupby('name').aggregate({"x": "sum", "y": "max"})

# Repartition appropriately, smaller dataset doesn't need many partitions
ts_subset = ts_subset.repartition(npartitions= 1)

ts_subset.head(10)

# Set index selectively as its expensive
ts_subset = ts_subset.set_index("name")

# Persist in RAM if possible after expensive calculations to rather than continue building lazy operations. 
ts_subset = ts_subset.persist()

# Continue with pandas if memory is fine
ts_subset_df = ts_subset.compute()
ts_subset_df.sort_values("name").head(3)




Unnamed: 0_level_0,x,y
name,Unnamed: 1_level_1,Unnamed: 2_level_1
Alice,322.982262,0.999975
Bob,34.466002,0.999986
Charlie,222.431201,0.999984


## Using external functions in Dask

In [8]:
from sklearn.linear_model import LinearRegression

def train(partition):
    if not len(partition):
        return
    est = LinearRegression()
    est.fit(partition[["x"]].values, partition.y.values)
    return est

'''
The meta argument tells Dask how to create the DataFrame or Series that will hold the result of .apply(). 
In this case, train() returns a single value, so .apply() will create a Series. 
This means we need to tell Dask what the type of that single column should be and optionally give it a name.
'''
results = ts_subset.groupby("name").apply(
    train, meta=("LinearRegression", object)
).compute()

results["Bob"] # linear model of a particular group


## DataFrames: Reading in messy data

Go through [existing Binder](https://examples.dask.org/dataframes/04-reading-messy-data-into-dataframes.html) - demostrates both dask and using delayed functions.

## Dask Arrays

In [9]:
import dask.array as da
x = da.random.random((10000, 10000), chunks=(1000, 1000))
x

Unnamed: 0,Array,Chunk
Bytes,762.94 MiB,7.63 MiB
Shape,"(10000, 10000)","(1000, 1000)"
Count,1 Graph Layer,100 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 762.94 MiB 7.63 MiB Shape (10000, 10000) (1000, 1000) Count 1 Graph Layer 100 Chunks Type float64 numpy.ndarray",10000  10000,

Unnamed: 0,Array,Chunk
Bytes,762.94 MiB,7.63 MiB
Shape,"(10000, 10000)","(1000, 1000)"
Count,1 Graph Layer,100 Chunks
Type,float64,numpy.ndarray


In [10]:
# numpy syntax as usual
y = x + x.T
z = y[::2, 5000:].mean(axis=1) # axis 0 is index, axis 1 is columns
z
# Trigger compute and investigate Client

Unnamed: 0,Array,Chunk
Bytes,39.06 kiB,3.91 kiB
Shape,"(5000,)","(500,)"
Count,7 Graph Layers,10 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 39.06 kiB 3.91 kiB Shape (5000,) (500,) Count 7 Graph Layers 10 Chunks Type float64 numpy.ndarray",5000  1,

Unnamed: 0,Array,Chunk
Bytes,39.06 kiB,3.91 kiB
Shape,"(5000,)","(500,)"
Count,7 Graph Layers,10 Chunks
Type,float64,numpy.ndarray


### More more info on arrays - Go through tutorial on 
https://tutorial.dask.org/



## Diagnostics - Profile resource efficiency in real time

The Dask Dashboard enables resource monitoring across RAM, CPU, workers, threads and tasks (functions).

[https://docs.dask.org/en/stable/dashboard.html](https://docs.dask.org/en/stable/dashboard.html)


#### A few key definitions:

- **Bytes Stored and Bytes per Worker**: Cluster memory and Memory per worker.

- **Task Processing/CPU Utilization/Occupancy**: Tasks being processed by each worker/ CPU Utilization per worker/ Expected runtime for all tasks currently on a worker.

- **Progress**: Progress of a set of tasks.

There are three different colors of workers in a task graph:

- **Blue**: Processing tasks.
- **Green**: Saturated: It has enough work to stay busy.
- **Red**: Idle: Does not have enough work to stay busy.

- **Task Stream**: Individual task across threads. 
    - White colour represents deadtime.



In [11]:
# To load diagnostic in web browser on local
from dask.distributed import Client
client = Client()
client #client.shutdown after use


0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 5
Total threads: 10,Total memory: 32.00 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:63334,Workers: 5
Dashboard: http://127.0.0.1:8787/status,Total threads: 10
Started: Just now,Total memory: 32.00 GiB

0,1
Comm: tcp://127.0.0.1:63357,Total threads: 2
Dashboard: http://127.0.0.1:63364/status,Memory: 6.40 GiB
Nanny: tcp://127.0.0.1:63337,
Local directory: /var/folders/1b/_jymrbj17cz6t7cxdl86xshh0000gr/T/dask-worker-space/worker-0x5qvryt,Local directory: /var/folders/1b/_jymrbj17cz6t7cxdl86xshh0000gr/T/dask-worker-space/worker-0x5qvryt

0,1
Comm: tcp://127.0.0.1:63355,Total threads: 2
Dashboard: http://127.0.0.1:63362/status,Memory: 6.40 GiB
Nanny: tcp://127.0.0.1:63340,
Local directory: /var/folders/1b/_jymrbj17cz6t7cxdl86xshh0000gr/T/dask-worker-space/worker-0s__jczq,Local directory: /var/folders/1b/_jymrbj17cz6t7cxdl86xshh0000gr/T/dask-worker-space/worker-0s__jczq

0,1
Comm: tcp://127.0.0.1:63358,Total threads: 2
Dashboard: http://127.0.0.1:63360/status,Memory: 6.40 GiB
Nanny: tcp://127.0.0.1:63338,
Local directory: /var/folders/1b/_jymrbj17cz6t7cxdl86xshh0000gr/T/dask-worker-space/worker-mnj91k25,Local directory: /var/folders/1b/_jymrbj17cz6t7cxdl86xshh0000gr/T/dask-worker-space/worker-mnj91k25

0,1
Comm: tcp://127.0.0.1:63356,Total threads: 2
Dashboard: http://127.0.0.1:63361/status,Memory: 6.40 GiB
Nanny: tcp://127.0.0.1:63341,
Local directory: /var/folders/1b/_jymrbj17cz6t7cxdl86xshh0000gr/T/dask-worker-space/worker-nvggbtok,Local directory: /var/folders/1b/_jymrbj17cz6t7cxdl86xshh0000gr/T/dask-worker-space/worker-nvggbtok

0,1
Comm: tcp://127.0.0.1:63359,Total threads: 2
Dashboard: http://127.0.0.1:63363/status,Memory: 6.40 GiB
Nanny: tcp://127.0.0.1:63339,
Local directory: /var/folders/1b/_jymrbj17cz6t7cxdl86xshh0000gr/T/dask-worker-space/worker-sgo11i0u,Local directory: /var/folders/1b/_jymrbj17cz6t7cxdl86xshh0000gr/T/dask-worker-space/worker-sgo11i0u


In [12]:
# Example of efficient resource utilisation
import dask.array as da
x = da.random.random(size = (10_000,10_000,10), chunks= (1000,1000,5))
y = da.random.random(size = (10_000,10_000,10), chunks= (1000,1000,5))
z = (da.arcsin(x) + da.arcsin(y)).sum(axis = (1,2))
z.compute()

array([114139.43869439, 114133.41973571, 114035.048779  , ...,
       114502.06960059, 114077.42169854, 114266.38837534])

In [13]:
# Inefficient resource utilisation - dask introduces too much overhead for simple sizes np handles well
x = da.random.random(size = (10_000_000),chunks = (1000,))
x.sum().compute()

4999875.009376019

<div class="keypoints">

### Key points

- The similarity-by-design of the Dask API with pandas makes the transition easy compared to alternatives - although not all functions are replicated.
- Scaling up to distributed systems, or down to simply running on your laptop, makes  code easily transferable between different resources.
- Dask enables parallelism without low level alterations in code.


</div>
