# Working with the Python DASK library

Mat Larade -- mat.larade@ace-net.ca

## Notes about the slides ...

They are in a Jupyter notebook using the RISE extension.

https://github.com/ualberta-rcg/wg-dask-webinar

RISE stands for Reveal.js - Jupyter/IPython Slideshow Extension

https://rise.readthedocs.io/en/maint-5.5/
    
Template shamelessly ripped from one chris want christ.want@ualberta.ca

## Other resources

I won't be doing a comprehensive overview Dask.
Here is some good supplimental material:
* Sharcnet webinar (last week) by Jinhui Qin
  * Great overview of Dask
  * https://youtube.sharcnet.ca
* U of A Workshop
  * https://ualberta-rcg.github.io/python-dask/
* Google for `dask tutorial`
  * Lots of good notebooks in a Github repository

## Python - General

* Python is an interperted software language that has a great deal of support in scientific and mathematical computing

* Two of the major packages we will be referencing today are Numpy and Pandas

## Numpy

* Numpy is a widely used and comprehensive mathematics package in python
* Numpy stands for "Numerical Python"
* Is very useful for matrix math, and matrix-like operations, such as loading data into machine learning algorithms
* This talk will not deep-dive into numpy, but will instead talk about a few key features and how they apply to dask

In [1]:
import numpy as np

## Numpy

* Numpy works on arrays of data.
* Arrays may have between 1 and n dimensions.

In [None]:
data = np.loadtxt(fname='sample_data.csv', delimiter=',')
print(data)

* numpy backs onto well-written compiled software, so it often performs far faster and more reliable calculations than most code you can write yourself


In [2]:
import pandas as pd


## DASK - General

Dask is a lazy framework that automates parallel operations. Lazy, meaning that it doesn’t operate until it is told to.

Dask is conceptually similar to a dishwasher\*, where it will wait idle until it’s told to do everything all at once**.

*Using Dask does not guarantee your code will be clean.

**Not everything actually done all at once, Dask does several things at once. </right>



## An artist's rendition of DASK before running based on my description
<left>![](./Picture1.png)</left>

## Dask - Delayed

* The Delayed command holds back the operations and assigns them to different cores.
* Rather than waiting for tasks to finish sequentially, initial tasks are assigned to different cores that operate simultaneously. 
* When a core finishes it’s job, it gets a new operation. 
* Similar to customs in an Airport.

![](./Picture2.gif)

## Dask - Delayed

We are going to demonstrate how dask.delayed works using an increment function that takes one second to execute, and and add function that takes one second to execute.

In [2]:
import dask
from time import sleep

def increment(x):
    sleep(1)
    return x + 1

def add(x, y):   
    sleep(1)
    return x +  y

In [3]:
%%time
# This takes three seconds to run because we call each
# function sequentially, one after the other

x = increment(1)
y = increment(2)
z = add(x, y)

Wall time: 3 s


In [4]:
%%time
# This runs immediately, all it does is build a graph

x = dask.delayed(increment)(1)
y = dask.delayed(increment)(2)
z = dask.delayed(add)(x, y)

Wall time: 0 ns


In [5]:
%%time
# This actually runs our computation using a local process pool

z.compute()

Wall time: 2.01 s


5

In [9]:
z.visualize()

RuntimeError: Drawing dask graphs requires the `graphviz` python library and the `graphviz` system library to be installed.

## Dask - Bagging

* Bagging works by taking semi-structured data and preparing that data to be operated on in parallel.
* Bags work on what they have to at any given time, so once a data chunk is processed, it is removed from memory, thus reducing the overall footprint of operations.

## Portions up your data so you can...
![](./Picture4.jpg)

## You can run more data through faster than single threads
 
![](./Picture3.jpg)
*in the metaphor, each core is it's own pizza oven

In [7]:
from dask import bag
bag1 = bag.from_sequence([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], npartitions=2)
bag1.take(3)

(1, 2, 3)

`Bag` objects hold the standard functional API found in projects like the Python standard library, `toolz`, or `pyspark`, including `map`, `filter`, `groupby`, etc..

Operations on `Bag` objects create new bags.  Call the `.compute()` method to trigger execution, as we saw for `Delayed` objects.  

In [8]:
def is_even(n):
    return n % 2 == 0

[4, 16, 36, 64, 100]

In [21]:
bag2 = bag.from_sequence([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
filtered_bag2a = bag2.filter(is_even).map(lambda x: x ** 2)
filtered_bag2a.compute()

[4, 16, 36, 64, 100]

In [25]:
filtered_bag2b = bag2.filter(lambda x:x>5)
filtered_bag2b.compute()

[6, 7, 8, 9, 10]

## Dask - Arrays

* Dask arrays are a multidimensional, structured data format that are
useful for manipulating large amounts of numerical data. 
* Dask arrays are distributed Numpy arrays.
* Dask arrays can be manipulated in parallel.


## Instead of moving this once
![](./Picture6.jpg)

## You move this hundreds of times
![](./Picture5.jpg)

## Dask - DataFrames

* Dask DataFrames extends Pandas DataFrames (Excel for Python) , but work best when the dataset is LARGER than your machine’s available RAM, or when there are many files that need to be inspected.
* If your data fits in ram, it tends to be more effective to just use Pandas, or even excel instead.


## Dask - Distributed

Dask can operate as it’s own task manager in one of three ways:
1. Threaded – Using small, independent chunks of code running in the same instance of a computer program. Best for operations on numerical data that doesn’t keep a Global Interpreter Lock (e.g. Numpy, Pandas, Scikit-Learn).
2. Processes – Sends data to separate instances of a program to be processed.  Generally works best when there are several instances of a program running at simultaneously that hold the Global Interpreter Lock.
3. Single-Threaded – Does one chunk of code at a time, with no parallel capacity. Primarily for Debugging.
