<img src="images/dask_horizontal.svg"
     width="45%"
     alt="Dask logo\">
     
# Custom Operations

In the overview notabook we discussed some of the many algorithms that are pre-defined for different types of Dask collections
(such as Arrays and DataFrames). These include operations like `mean`, `max`, `value_counts` and many other standard operations.

In this notebook we'll:
 - explore how those operations are implemented
 - learn how to construct our own custom operations
 - gain deeper insight into the task graph system

**Related Documentation**

  - [Array Tutorial](https://tutorial.dask.org/03_array.html)
  - [Best Practices](https://docs.dask.org/en/latest/best-practices.html#learn-techniques-for-customization)

## Blocked Algorithms

Dask computations are implemented using _blocked algorithms_. These algorithms break up a computation on a large array into many computations on smaller pieces of the array. This minimizes the memory load (amount of RAM) of computations and allows for working with larger-than-memory datasets in parallel.

In [None]:
import dask.array as da

x = da.random.random(size=(1_000, 1_000), chunks=(250, 500))
x

In the overview notebook we looked at the task graph for the following computation:

In [None]:
result = (x + x.T).sum(axis=0).mean()

Now let's break that down a bit and look at the task graph for just one part of that computation.

In [None]:
x.T.visualize()

This graph demonstrates how blocked algorithms work. In the perfectly parallelizable situation, Dask can operate on each block in isolation and then reassemble the results from the outputs. Dask makes it easy to contruct graphs like this using a numpy-like API. 

## Custom Block Computations
Block computations operate on a per-block basis. So each block gets the function applied to it, and the output has the same chunk location as the input.

Some examples include the following:
- custom IO operations
- applying embarassingly parallel functions for which there is no exising Dask implementation

![map_blocks](images/custom_operations_map_blocks.png)

**Related Documentation**

   - [`dask.array.map_blocks`](https://docs.dask.org/en/latest/array-api.html?highlight=map_blocks#dask.array.Array.map_blocks)
   - [`dask.dataframe.map_partitions`](http://dask.pydata.org/en/latest/dataframe-api.html#dask.dataframe.DataFrame.map_partitions)

### `map_blocks`

Let's imagine that there was no `da.random.random` method. We can create our own version using `map_blocks`. 

In [None]:
import numpy as np

def random_sample():
    return np.random.random(size=(250, 500))

x = da.map_blocks(random_sample, chunks=((250, 250, 250, 250), (500, 500)), dtype=float)
x

In [None]:
x.visualize()

> #### Understanding `chunks` argument
>
> In the example above we explicitly declare what the size of the output chunks will be ``chunks=((250, 250, 250, 250), (500, 500))`` this means 8 chunks each with shape `(250, 500)` you'll also see the chunks argument written in the short version where only the shape of one chunk is defined ``chunks=(250, 500)``. These mean the same thing.
>
> Specifying the output chunks is very useful when doing more involved operations with ``map_blocks``. By specifying ``chunks``, you can guarantee that the output will have the right shape. Having the right shapelets you properly chain together other operations. 

In that example we created an array from scratch by passing in `dtype` and `chunks`. Next we'll consider the case of applying `map_blocks` to existing arrays.

#### Multiple arrays

``map_blocks`` can be used on single arrays or to combine several arrays. When multiple arrays are passed, ``map_blocks``
aligns blocks by block location without regard to shape.

In the following example we have two arrays with the same number of blocks
but with different shape and chunk sizes.

In [None]:
a = da.arange(1000, chunks=(100,))
b = da.arange(100, chunks=(10,))

Let's take a look at these arrays:

In [None]:
a

In [None]:
b

We can pass these arrays into ``map_blocks`` using a function that takes two inputs, calculates the max of each, than then returns a numpy array of the outputs. 

In [None]:
def func(a, b):
    return np.array([a.max(), b.max()])

result = da.map_blocks(func, a, b, chunks=(2,))
result.visualize()

#### Special arguments

There are special arguments (``block_info`` and ``block_id``) that you can use within ``map_blocks`` functions. 

 - ``block_id`` gives the index of the block within the chunks, so for a 1D array it will be something like `(i,)`. 
 - ``block_info`` is a dictionary where there is an integer key for each input dask array and a `None` key for the output array.

### ``map_partitions``

In Dask dataframe there is a similar method to ``map_blocks`` but it is called ``map_partitions``.

Here is an example of using it to check if the sum of two columns is greater than some arbitrary threshold.

In [None]:
import dask
import dask.dataframe as dd

ddf = dask.datasets.timeseries()

result = ddf.map_partitions(lambda df, threshold: (df.x + df.y) > 0, threshold=0)
result.compute()

#### Internal uses
In practice ``map_partitions`` is used to implement many of the helper dataframe methods
that let Dask dataframe mimic Pandas. Here is the implementation of `ddf.index` for instance:

```python
@property
def index(self):
    """Return dask Index instance"""
    return self.map_partitions(
        getattr,
        "index",
        token=self._name + "-index",
        meta=self._meta.index,
        enforce_metadata=False,
    )
```

[source](https://github.com/dask/dask/blob/09862ed99a02bf3a617ac53b116f9ecf81eea338/dask/dataframe/core.py#L458-L467)


#### Understanding `meta` argument

Dask dataframes and dask arrays have a special attribute called `_meta` that allows them to know metadata about the type of dataframe/array that they represent. This metadata includes:
 - dtype (int, float)
 - column names and order
 - name
 - type (pandas dataframe, cudf dataframe)
 
**Related documentation**

- [Dataframe metadata](https://docs.dask.org/en/latest/dataframe-design.html#metadata)

This information is stored in an empty object of the proper type.

In [None]:
print(ddf._meta)

That's how dask knows what to render when you display a dask object:

In [None]:
print(ddf)

When you add an item to the task graph, Dask tries to run the function on the meta before you call compute. 

This approach has several benefits:

- it gives Dask a sense of what the output will look like. 
- if there are fundamental issues, Dask will fail fast

Here's a few examples. 

In [None]:
ddf.sum()

In [None]:
ddf.name.str.startswith("A")

See how the output looks right? The dtypes are correct, the type is a `Series` rather than a `DataFrame` like the input.

**Exercise**

Try using `startswith` on a different column and see what you get :)

In [None]:
# solution
ddf.x.str.startswith("A")

### Declaring meta

Sometimes running the function on a miniature version of the data doesn't produce a result that is similar enough to your expected output. 

In those cases you can provide a `meta` manually.

In [None]:
result = ddf.map_partitions(lambda df, threshold: (df.x + df.y) > threshold, threshold=0, meta=("greater_than", bool))
result.compute()

### `map_overlap`
Sometimes you want to operate on a per-block basis, but you need some information from neighboring blocks. 

Example operations include the following:

- Convolve a filter across an image
- Rolling sum/mean/max, …
- Search for image motifs like a Gaussian blob that might span the border of a block
- Evaluate a partial derivative

Dask Array supports these operations by creating a new array where each block is slightly expanded by the borders of its neighbors. 

![](https://docs.dask.org/en/latest/_images/overlapping-neighbors.svg)

This costs an excess copy and the communication of many small chunks, but allows localized functions to evaluate in an embarrassingly parallel manner.

**Related Documentation**
   - [Array Overlap](https://docs.dask.org/en/latest/array-overlap.html)

The main API for these computations is the ``map_overlap`` method. ``map_overlap`` is very similar to ``map_blocks`` but has the additional arguments: ``depth``, ``boundary``, and ``trim``.

Here is an example of calculating the derivative:

In [None]:
import numpy as np
import dask.array as da
import matplotlib.pyplot as plt

a = np.array([1, 1, 2, 3, 3, 3, 2, 1, 1])
a = da.from_array(a, chunks=5)

plt.plot(a)

In [None]:
def derivative(a):
    return a - np.roll(a, 1)

b = a.map_overlap(derivative, depth=1, boundary=None)
b.compute()

In this case each block shares 1 value from its neighboring block: ``depth``. And since we set ``boundary=0``on the outer edges of the array, the first and last block are padded with the integer 0. Since we haven't specified ``trim`` it is true by default meaning that the overlap is removed before returning the results.

If you inspect the task graph you'll see two mostly independent towers of tasks, with just some value sharing at the edges.

In [None]:
b.visualize(collapse_outputs=True)

**Exercise**

Lets apply a gaussian filter to an image following the example from the [scipy docs](https://docs.scipy.org/doc/scipy/reference/generated/scipy.ndimage.gaussian_filter.html).

First create a dask array from the numpy array:

In [None]:
from scipy.datasets import ascent
import dask.array as da

a = da.from_array(ascent(), chunks=(128, 128))
a

Now use ``map_overlap`` to apply ``gausian_filter`` to each block.

```python
from scipy.ndimage import gaussian_filter

b = a.map_overlap(gaussian_filter, sigma=5, ...)
```

In [None]:
# solution
from scipy.ndimage import gaussian_filter

b = a.map_overlap(gaussian_filter, sigma=5, depth=10, boundary="periodic")

Check what you've come up with by plotting the results:

In [None]:
import matplotlib.pyplot as plt

fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(16, 8))
ax1.imshow(a)
ax2.imshow(b)
plt.show()

> Notice that if you set the depth to a smaller value, you can see the edges of the blocks in the output image.

## Reduction
Each dask collection has a `reduction` method. This is the generalized method that supports operations that reduce the dimensionality of the inputs.

The difference between `blockwise` and `reduction` is that with `reduction` you have finer grained control over the behavior of the tree-reduce.

![Custom operations: reduction](images/custom_operations_reduction.png)

**Related Documentation**
   - [`dask.array.reduction`](http://dask.pydata.org/en/latest/array-api.html#dask.dataframe.Array.reduction)
   - [`dask.dataframe.reduction`](http://dask.pydata.org/en/latest/dataframe-api.html#dask.dataframe.DataFrame.reduction)

### Internal uses

This is the internal definition of sum on dask.Array. In it you can see that there is a
regular ``np.sum`` applied across each block and then tree-reduced with ``np.sum`` again.

```python
def sum(a, axis=None, dtype=None, keepdims=False, split_every=None, out=None):
    if dtype is None:
        dtype = getattr(np.zeros(1, dtype=a.dtype).sum(), "dtype", object)
    result = reduction(
        a,
        chunk.sum,  # this is just `np.sum`
        chunk.sum,  # this is just `np.sum`
        axis=axis,
        keepdims=keepdims,
        dtype=dtype,
        split_every=split_every,
        out=out,
    )
    return result
```
[source](https://github.com/dask/dask/blob/ac1bd05cfd40207d68f6eb8603178d7ac0ded922/dask/array/reductions.py#L344-L357)

Here is `da.sum` reimplemented as a custom reduction.

In [None]:
da.reduction(x, np.sum, np.sum, dtype=x.dtype).visualize()

By visualizing `b` we can see how the tree reduction works. First ``sum`` is applied to each block, then every 4 chunks are combined using ``sum-partial``. This keeps going until there are less than 4 results left, then ``sum-aggregate`` is used to finish up.

**Exercise**

See how the graph changes when you set the chunks - maybe to `(100, 250)` or `(250, 250)`

In [None]:
# solution
x = da.random.random(size=(1_000, 1_000), chunks=(100, 250))
x.sum().visualize()

### Understanding ``split_every``

``split_every`` controls the number of chunk outputs that are used as input to each ``partial`` call. 

Here is an example of doing partial aggregation on every 5 blocks along the 0 axis and every 2 blocks along the 1 axis (so 10 blocks go into each `partial-sum`)

In [None]:
x.sum(split_every={0: 5, 1: 2}).visualize()

**Exercise**

Try setting different values for `split_every` and visualizing the task graph to see the impact.

In [None]:
# solution
x.sum(split_every={0: 10, 1: 2}).visualize()

> **Side note**
>
> You can use reductions to calculate aggregations per-block reduction even if you don't want to combine and aggregate the results of those blocks:
>
> ```python
> da.reduction(x, np.sum, lambda x, **kwargs: x, dtype=int).compute()
> ```

## When to use which method

In this notebook we've covered several different mechanisms for applying arbitrary functions to the blocks in arrays or dataframes. Here's a brief summary of when you should use these various methods

- `map_block`, `map_partition` - block organization of the input matches the block organization of the output and the function is fully parallelizable. 
- `map_overlap` - block organizations of input and output match, but the function is not fully parallelizable (requires input from neighboring chunks).
- `blockwise` - same function can be applied to the blocks as to the partial and aggregated versions. Also output blocks can be in different orientations.
- `reduction` - dimensionality of output does not necessarily match that of input and function is fully parallelizable.
- `groupby().agg` - data needs to be aggregated per group (the index of the output will be the group keys).
- `dask.delayed` - data doesn't have a complex block organization or the data is small and the computation is pretty fast.