![dask.png](figs/dask.png)

### Dask installation:
`` pip install "dask[complete]" pyarrow s3fs graphviz``

In [None]:
import numpy as np
import time
import os

import dask
import dask.dataframe as dd
from dask.distributed import LocalCluster, Client
import dask.array as da
import pandas as pd



In [None]:
import os
# The jupyter notebook is launched from your $HOME directory.
# Change the working directory to the workshop directory
# which was created in your username directory under /scratch/vp91
os.chdir(os.path.expandvars("/scratch/vp91/$USER/"))

### Dask Collections

* **High-level collections**: Mimic NumPy, lists, and pandas but can operate in parallel on datasets that don’t fit into memory 
    * Array
    * DataFrame
    * Bag
    
* **Low-level collections**: Give finer control to build custom parallel and distributed computations
    * Delayed
    * Futures



# Dask Dataframes

![dataframe.png](figs/dataframe.png)

* One Dask DataFrame is comprised of many in-memory pandas DataFrames separated along the index. 
* One operation on a Dask DataFrame triggers many pandas operations on the constituent pandas DataFrames 
* These operations are mindful of potential parallelism and memory constraints.

In [None]:
!ls dask-intro/data/nycflights/*.csv

In [None]:
# Read all the csv file into a single Dask dataframe
ddf = dd.read_csv(
    os.path.join("dask-intro/data", "nycflights", "*.csv"), parse_dates={"Date": [0, 1, 2]}
)

* dask.dataframe.read_csv only reads in a sample from the beginning of the file
* These inferred datatypes are then enforced when reading all partitions
* Sometimes, datatypes inferred in the sample can be incorrect. 
    * The first n rows have no value for CRSElapsedTime (which pandas infers as a float), and later on turn out to be strings (object dtype). 

* Good practice - specify dtypes directly using the dtype keyword. 

In [None]:
ddf = dd.read_csv(
    os.path.join("dask-intro/data", "nycflights", "*.csv"),
    parse_dates={"Date": [0, 1, 2]},
    dtype={"TailNum": str, "CRSElapsedTime": float, "Cancelled": bool},
)


In [None]:
ddf

### Lazy evaluation
* Representation of the DataFrame object contains no data 
* Dask has just done enough to read the start of the first file, and infer the column names and dtypes

* Dask **constructs** the logic (called task graph) of your computation immediately
* **Evaluates** them only when necessary

In [None]:

ddf.visualize()

* Functions like len, head, tail also trigger an evaluation.
    * load actual data, (that is, load each file into a pandas DataFrame)
    * apply the corresponding functions to each pandas DataFrame (also known as a partition)
    * combine the subtotals to give you the final grand total

In [None]:
len(ddf)

In [None]:
ddf.head()

### Operation on multiple files in Pandas

In [None]:
%%time

# find the max value of the DepDelay coulmn in all the 10 dataframes
files = os.listdir(os.path.join('dask-intro/data', 'nycflights'))
maxes = []

for file in files:
    df = pd.read_csv(os.path.join('dask-intro/data', 'nycflights', file))
    maxes.append(df.DepDelay.max())

final_max = max(maxes)
print(final_max)

### Operation on multiple files in Dask

In [None]:
# find the max value of the DepDelay coulmn in all the 10 dataframes


# This only creates the task graph, it does not execute the operation
result = ddf.DepDelay.max()

In [None]:
result.visualize()

In [None]:
%%time
result.compute()

### Excercise: Find the number of flight from each city

* We can also combine multiple compute steps into a single instruction
* This is usualy more efficient
    * Task graphs for both results are merged when calling dask.compute
    * shared operations to only be done once instead of twice

In [None]:
non_canceled = ddf[~ddf.Cancelled]
mean_delay = non_canceled.DepDelay.mean()
std_delay = non_canceled.DepDelay.std()

In [None]:
%%time

mean_delay_res = mean_delay.compute()
std_delay_res = std_delay.compute()

In [None]:
%%time

mean_delay_res, std_delay_res = dask.compute(mean_delay, std_delay)

# Dask  Arrays - parallelized numpy

![arrays.png](figs/arrays.png)

* Dask Array implements a subset of the NumPy ndarray interface using **blocked** algorithms
* Large array is cut into many small arrays
* Large computations are performed by combining many smaller computations

In [None]:

# NumPy array
a_np = np.ones(10)
a_np

In [None]:
# This is how a blocked operation is done in numpy. We divide the whole ndarray
# of size 10 int slices of 2, each of size 5

a_np_sum = a_np[:5].sum() + a_np[5:].sum()
a_np_sum

In [None]:
# Dask array

# In task ndarray we specify the slices usinh the keyword chunk. 
# chunk defines the numer of elements in each slice

a_da = da.ones(10, chunks=5)
a_da

In [None]:
a_da_sum = a_da.sum()
a_da_sum

In [None]:
a_da_sum.visualize()

In [None]:
a_da_sum.compute()

* Dask can also find an optimal chunk by itself
* If your chunks are too small
    * the amount of actual work done by every task is very tiny
    * the overhead of coordinating all these tasks results in a very inefficient process
* If your chunks are too big
    * you will likely run out of memory
    * data will have to be moved to the disk 
    * this will lead to performance decrements

In [None]:
%%time

xd = da.random.normal(10, 0.1, size=(30_000, 30_000), chunks=(3000, 3000)) # We specify the chunk
yd = xd.mean(axis=0)
yd.compute()

In [None]:
%%time

xd = da.random.normal(10, 0.1, size=(30_000, 30_000)) # Dask finds the chunk
yd = xd.mean(axis=0)
yd.compute()

In [None]:
xd.chunksize

# Delayed decorator

* A Block of code can have operations that can happen in parallel
* Normally in python these operation will happen sequentially
    * Or the user will identify the parallel section and write parallel codes
* The Dask **delayed** function decorates your functions so that they operate lazily 
* Dask will defer execution of the function, placing the function and its arguments into a task graph
* Dask will then identify oppurtunities for parallelism in the task graph
* The Dask schedulers will exploit this parallelism, generally improving performance

In [None]:
@dask.delayed
def inc(x):
    time.sleep(1)
    return x + 1




In [None]:
@dask.delayed
def add(x, y):
    time.sleep(1)
    return x + y

In [None]:
# As the two increments are independent of each other, we can run them in parallel

x = inc(1)
y = inc(2)
z = add(x, y)

In [None]:
# Here Z is a delayed object

z.visualize()

In [None]:
z.compute()

# Dask future

* we can submit individual functions for evaluation
* The call returns immediately, giving one or more future
    * whose status begins as “pending”
    * later becomes “finished”
* There is no **blocking** of the local Python session.

* Difference between futures and delayed
    * delayed is lazy (it just constructs a graph) 
    * futures are eager. 
    * With futures, as soon as the inputs are available and there is compute available, the computation starts

In [None]:
client = Client(n_workers=4)

def inc(x):
    time.sleep(1)
    return x + 1


def double(x):
    sleep(2)
    return 2 * x


def add(x, y):
    time.sleep(1)
    return x + y

In [None]:
future = client.submit(inc, 1)  # returns immediately with pending future
future

#### If we check the future after a few seconds we can see that it is complete

In [None]:
future

In [None]:

future.result()

In [None]:
client.close()

# Distributed Dask 

![dask_cluster.png](figs/dask_cluster.png)

* Dask can work on a clusteer
* We have been using the distributed scheduler for our work, but just on a single machine.
* When we instantiate a Client() object with no arguments it will attempt to locate a Dask cluster
    * It will check your local Dask config and environment variables to see if connection information has been specified
    * If not it will create an instance of LocalCluster and use that

In [None]:
cluster = LocalCluster()
cluster

In [None]:
cluster.get_logs()

In [None]:
client = Client(cluster)
client

In [None]:

del client, cluster

## Compute Vs Persist

In [None]:
df = dask.datasets.timeseries()
df.npartitions

In [None]:
type(df)

In [None]:
computed_df = df.compute()
type(computed_df)

In [None]:
df_persist = df.persist()
type(df_persist)

In [None]:
df_persist.npartitions