# Parallelism in Python

### John Kirkham

# The Problem

* Typical threading models are hard for (new) users to understand
* Easy to run into difficult to debug scenarios (e.g. deadlocking, race conditions, etc.)
* Implementation often becomes tied to a certain scale (e.g. multithreaded code -> cluster parallelized code)
* How could this be done better?

# Task-based parallelism

* Describe the pieces of the computation
* Relate these pieces to each other
* Use a scheduler to perform the computation

# Common implementations

* Dask
* ipyparallel
* Luigi

# Dask - Introducing a Task Graph

![]( images/pipeline.svg )

# Dask - A short example

![]( images/dask_example1.svg )

# Dask - A short example


```python
In [1]: import dask

In [2]: a = [0, 1, 2, 3]

In [3]: d = {"a": a, "b": (sum, "a")}
```

# Dask - A short example (question)


```python
In [1]: import dask

In [2]: a = [0, 1, 2, 3]

In [3]: d = {"a": a, "b": (sum, "a")}

In [4]: dask.get(d, "a")
Out[4]: ?

In [5]: dask.get(d, "b")
Out[5]: ?
```

# Dask - A short example (answer)


```python
In [1]: import dask

In [2]: a = [0, 1, 2, 3]

In [3]: d = {"a": a, "b": (sum, "a")}

In [4]: dask.get(d, "a")
Out[4]: [0, 1, 2, 3]

In [5]: dask.get(d, "b")
Out[5]: 6
```

# Dask - A short example (follow-up question)


```python
In [1]: import dask

In [2]: d = {"b": (sum, [0, 1, 2, 3])}

In [3]: dask.get(d, "b")
Out[3]: ?
```

# Dask - A short example (follow-up answer)


```python
In [1]: import dask

In [2]: d = {"b": (sum, [0, 1, 2, 3])}

In [3]: dask.get(d, "b")
Out[3]: 6
```

# Dask - Using delayed (question)


```python
In [1]: import dask

In [2]: a = [0, 1, 2, 3]

In [3]: r = dask.delayed(sum)(a)

In [4]: d = dict(r.__dask_graph__())

In [5]: d
Out[5]: {'sum-11e9df38-2121-41a5-b9be-f4e83318ac72': (<function sum(iterable, start=0, /)>, [0, 1, 2, 3])}

In [6]: dask.get(d, "sum-11e9df38-2121-41a5-b9be-f4e83318ac72")
Out[6]: ?

In [7]: r.compute()
Out[7]: ?
```

# Dask - Using delayed (answer)


```python
In [1]: import dask

In [2]: a = [0, 1, 2, 3]

In [3]: r = dask.delayed(sum)(a)

In [4]: d = dict(r.__dask_graph__())

In [5]: d
Out[5]: {'sum-11e9df38-2121-41a5-b9be-f4e83318ac72': (<function sum(iterable, start=0, /)>, [0, 1, 2, 3])}

In [6]: dask.get(d, "sum-11e9df38-2121-41a5-b9be-f4e83318ac72")
Out[6]: 6

In [7]: r.compute()
Out[7]: 6
```

# Dask - Map (intro)

![]( images/dask_map.svg )

# Dask - Map (intro)


```python
In [1]: def my_map(func, *args):
   ...:     for v in args:
   ...:         yield func(v)
   ...:
```

# Dask - Map (question)


```python
In [1]: import dask

In [2]: @dask.delayed
   ...: def addTwo(x):
   ...:     return x + 2
   ...:

In [3]: a = [0, 1, 2, 3]

In [4]: b = list(map(addTwo, a))

In [5]: b
Out[5]: ?

In [6]: dask.compute(*b)
Out[6]: ?
```

# Dask - Map (answer)


```python
In [1]: import dask

In [2]: @dask.delayed
   ...: def addTwo(x):
   ...:     return x + 2
   ...:

In [3]: a = [0, 1, 2, 3]

In [4]: b = list(map(addTwo, a))

In [5]: b
Out[5]:
[Delayed('addTwo-0bec16d0-6189-42a8-9390-8768548cef33'),
 Delayed('addTwo-728c5880-4a29-401c-afe6-144f17875cf7'),
 Delayed('addTwo-c82784a4-9000-49bd-88f9-976b5b07d479'),
 Delayed('addTwo-a492a9a2-9262-4e9e-93dd-4d0d557ad50f')]

In [6]: dask.compute(*b)
Out[6]: (2, 3, 4, 5)
```

# Dask - Reduce (intro)

![]( images/dask_reduce.svg )

# Dask - Reduce (question)


```python
In [1]: from functools import reduce

In [2]: from operator import add

In [3]: import dask

In [4]: add = dask.delayed(add)

In [5]: a = [0, 1, 2, 3]

In [6]: b = reduce(add, a)

In [7]: b
Out[7]: ?

In [8]: b.compute()
Out[8]: ?
```

# Dask - Reduce (answer)


```python
In [1]: from functools import reduce

In [2]: from operator import add

In [3]: import dask

In [4]: add = dask.delayed(add)

In [5]: a = [0, 1, 2, 3]

In [6]: b = reduce(add, a)

In [7]: b
Out[7]: Delayed('add-91c483ed-2953-4795-b572-5d61455aaace')

In [8]: b.compute()
Out[8]: 6
```

# Dask - Reduce (follow-up question)


```python
In [1]: from functools import reduce

In [2]: from operator import add

In [3]: import dask

In [4]: add = dask.delayed(add)

In [5]: a = [0, 1, 2, 3]

In [6]: b = reduce(add, a)

In [7]: b.visualize()
Out[7]: ?
```

# Dask - Reduce (follow-up question)

![]( images/dask_reduce_example.svg )

# Dask - Reduce (performance)

1. Where did the values go?
2. How can we make this parallel friendly?

# Dask - Reduce (performance)


```python
In [1]: from functools import reduce

In [2]: from operator import add

In [3]: import dask

In [4]: add = dask.delayed(add)

In [5]: a = [0, 1, 2, 3]

In [6]: b = reduce(add, a)

In [7]: dict(b.__dask_graph__())                                                                                                                             
Out[7]: 
{
    'add-e2859f48-4e09-4bbb-9ccd-8006a1e5e664': (
        <function _operator.add>, 0, 1
    ),
    'add-b18a107a-b112-49ee-a244-02941b20e0b1': (
        <function _operator.add>, 'add-e2859f48-4e09-4bbb-9ccd-8006a1e5e664', 2
    ),
    'add-04d0c8e9-fde4-4222-b3f3-9777a85a9049': (
        <function _operator.add>, 'add-b18a107a-b112-49ee-a244-02941b20e0b1', 3
    )
}
```

# Dask - Reduce (performance)


```python
In [1]: from operator import add

In [2]: import dask

In [3]: add = dask.delayed(add)

In [4]: def reduce(func, values):
   ...:     l = len(values)
   ...:     if l == 1:
   ...:         return values[0]
   ...:     else:
   ...:         l_half = l // 2
   ...:         return func(reduce(func, values[:l_half]), reduce(func, values[l_half:]))
   ...:

In [5]: a = [0, 1, 2, 3]

In [6]: b = reduce(add, a)
```

# Dask - Reduce (follow-up question)

![]( images/dask_reduce_tree.svg )

# Dask - Using Map and Reduce (problem)

1. Write a function to compute a weighted mean.

2. Try it on a list of values and weights.

   1. Inspect the graph it creates.

   2. Compute the result.

# Dask - Using Map and Reduce (answer)


```python
In [1]: from __future__ import division

In [2]: from operator import add, mul

In [3]: import dask

In [4]: add = dask.delayed(add)

In [5]: def reduce(func, values):
   ...:     l = len(values)
   ...:     if l == 1:
   ...:         return values[0]
   ...:     else:
   ...:         l_half = l // 2
   ...:         return func(reduce(func, values[:l_half]), reduce(func, values[l_half:]))
   ...:

In [6]: def weighted_mean(values, weights):
   ...:     return reduce(add, list(map(mul, weights, values))) / reduce(add, weights)
   ...:
```

# Dask - Using Map and Reduce (answer)


```python
In [7]: v = [0, 1, 2, 3]; w = [5, 10, 33, 16]

In [8]: wm = weighted_mean(v, w)

In [9]: wm.compute()
Out[9]: 1.9375

In [10]: wm.visualize()
```

# Dask - Using Map and Reduce (answer)

![]( images/dask_delayed_weighted_mean.svg )