Delayed computation
===================
Dask allows to split a workload (python function calls) to multiple workers using `Futures`.
However, they are hard to use with tasks that are not totally independent. 

`dask.delayed` allows to submit tasks similarly to futures, but in a more pythonic way:   
Decorating function with `delayed`, they then can be used normally.     
Its output can be used for other delayed input before being computed.  

`delayed` can distribute many normal python workflow if written in functional style.


In [None]:
from dask import delayed, compute
from dask.distributed import Client
import numpy as np

client = Client(processes=False)
client

## Basic usage

In [None]:
def func(a, b, c):
    return (2 * a) + (b - c)


a, b, c = 3, 4, 5
func(a, b, c)

In [None]:
@delayed
def lazy_func(a, b, c):
    return (2 * a) + (b - c)


func_delayed = lazy_func(a, b, c)
func_delayed

The output is a `Delayed` object.  Computations are not started yet.  
Only when `compute` is called, it is passed to the workers.

In [None]:
func_delayed.compute()

Outputs can be used in other delayed functions without being computed.

In [None]:
from operator import add, sub, mul


e = delayed(mul)(2, a)
f = delayed(sub)(b, c)
g = delayed(add)(e, f)
g

Operator on delayed also work as expected:

In [None]:
h = e + f
h

When a `delayed` is build as such from multiple task, `dask` will remember all operations in a tree which can be shown with `visualize`:

In [None]:
g.visualize()

In [None]:
g.compute(), h.compute()

Delayed Data
------------
When some data is reused as input in multiple delayed functions, it can be set as delayed to help the scheduler.
Delayed data class method can be used.

In [None]:
# Load the string in the operation tree
var1 = delayed("12345")
var1

In [None]:
var2 = delayed( lambda N: [1]*N ) (5)
var2

In [None]:
#Operation between function output and delayed data
oper1 = var1[2] + var2[3] + var1[0]
oper1

In [None]:
oper1.visualize(rankdir="LR")

In [None]:
oper1.compute()

## Side effect
**Side effect should not be used!**

In [None]:
l = []
tasks = [delayed(l.append)(i) for i in range(10)]
compute(tasks) # Better  [task.compute() for task in tasks]
l

In [None]:
l = delayed([])
tasks = [l.append(i) for i in range(10)]
compute(tasks)
l.compute()

## Data creation

With large data, we need to be careful that the data is not all in the client's memory.  
Large dataset should be created / loaded in a delayed task.

In [None]:
@delayed
def make_sample(N):
    return np.expm1(5 * np.random.rand(N)**3)

samples = [make_sample(1000) for _ in range(5)]

stds = [delayed(np.std)(sample) for sample in samples]
std_avg = delayed(np.mean)(stds)
std_avg.visualize(rankdir="LR")

In [None]:
std_avg.compute()

Memoization
-----------
Dask remember previous computation when done in the same `compute` call.

In [None]:
@delayed
def inc(i):
    from time import sleep
    sleep(5)
    return i+1

a = inc(1)
b = inc(1)
c = inc(a)
d = b + c
%time d.compute()

In [None]:
%time [a.compute(), b.compute(), c.compute()]
%time compute([a, b, c])

## Exercise: Putting it all together

Use `dask.delayed` to compute the following workflow:

- Open an image.
- Take `N` random 100x100 pixels blocks.
- Compute the differences between each pair of blocks.
- Make a histogram of these differences.

In [None]:
from PIL import Image
import itertools

im = np.array(Image.open("images/exemple_1.png"))[:, :, :3]


def crop(im: np.ndarray, x: int, y: int):
    """
    Cut a 100x100 block in the image starting at x, y (as fraction).
    """
    Nx, Ny, _ = im.shape
    x0 = int(x * (Nx-100))
    y0 = int(y * (Ny-100))
    return im[x0:x0+100, y0:y0+100]


def diff(im1: np.ndarray, im2: np.ndarray):
    """
    Compute the difference between images
    """
    return np.abs(im1 - im2)


def build_histogram(im, N):
    """
    Create an histogram of the difference between
    pairs of N random block in the image.
    """
    xs = np.random.random(N)
    ys = np.random.random(N)
    
    pieces = [
        crop(im, x, y)
        for x, y in zip(xs, ys)
    ]
    
    diffs = [
        diff(im1, im2)
        for im1, im2 in itertools.combinations(pieces, 2)
    ]
    
    means = [np.mean(diff) for diff in diffs]
    
    return np.histogram(means)

build_histogram(im, 50)

## Solution

<!---
im = delayed(Image.open)("images/exemple_1.png")
im = delayed(lambda I: np.array(I)[:, :, :3])(im)

@delayed
def crop(im: np.ndarray, x: int, y: int):
    """
    Cut a 100x100 block in the image starting at x, y (as fraction).
    """
    Nx, Ny, _ = im.shape
    x0 = int(x * (Nx-100))
    y0 = int(y * (Ny-100))
    return im[x0:x0+100, y0:y0+100]

@delayed
def diff(im1: np.ndarray, im2: np.ndarray):
    """
    Compute the difference between images
    """
    return np.abs(im1 - im2)

# Do not delay a function that call other function delayed
def build_histogram(im, N):
    """
    Create an histogram of the difference between
    pairs of N random block in the image.
    """
    xs = np.random.random(N)
    ys = np.random.random(N)
    
    pieces = [
        crop(im, x, y)
        for x, y in zip(xs, ys)
    ]
    
    diffs = [
        diff(im1, im2)
        for im1, im2 in itertools.combinations(pieces, 2)
    ]
    
    means = [delayed(np.mean)(diff) for diff in diffs]
    
    histogram = delayed(np.histogram)(means)
    return histogram

histogram = build_histogram(im, 4)
histogram.visualize(rankdir="LR")

histogram = build_histogram(im, 50)
%time histogram.compute()
--->

Exercise: Block randomize an image
==================================

1. Open an image.
2. Split it into blocks.
3. Apply a filter each blocks.
4. Randomize the positions of each block.
5. Save it to a file.

_Hint:_
<!-- 
Don't overthink it.
You don't need to understand how image manipulation work.
-->



In [None]:
from PIL import ImageFilter
from glob import glob

In [None]:
files = glob("images/*.png")
block_size = 200

for file in files:
    # Image.open does not load the full image at once, but only when needed.
    image = Image.open(file)
    
    # Compute number of block and block size
    num_blocks = (image.size[0] // block_size, image.size[1] // block_size)
    out_size = (num_blocks[0] * block_size, num_blocks[1] * block_size)
    
    # Cut borders so the image is a multiple of block_size pixels
    left_border = (image.size[0] - out_size[0]) // 2
    right_border = out_size[0] + left_border
    up_border = (image.size[1] - out_size[1]) // 2
    down_border = out_size[1] + up_border
    image = image.crop((left_border, up_border, right_border, down_border))
    
    # Create the output image
    new = Image.new("RGBA", out_size)

    # Create output shuffle order
    shuffled_order = np.arange(num_blocks[0] * num_blocks[1])
    np.random.shuffle(shuffled_order)

    # Paste each block in the new image
    for old_idx, new_idx in enumerate(shuffled_order):
        x, y = np.unravel_index(old_idx, num_blocks)
        # Get the block
        block = image.crop(
            (x * block_size, y * block_size, (x+1)*block_size, (y+1)*block_size)
        )
        # Filter it
        block = block.filter(ImageFilter.SHARPEN)
        x, y = np.unravel_index(new_idx, num_blocks)
        # Insert it
        new.paste(block, (x * block_size, y * block_size))
    # Save the file
    new.save(f"schuffled_{file.split('.')[0][7:]}.png")

Solution
--------
<!-- 
def block_random(file, block_size):
    # Image.open does not load the full image at once, but only when needed.
    image = Image.open(file)
    
    # Compute number of block and block size
    num_blocks = (image.size[0] // block_size, image.size[1] // block_size)
    out_size = (num_blocks[0] * block_size, num_blocks[1] * block_size)
    
    # Cut borders so the image is a multiple of block_size pixels
    left_border = (image.size[0] - out_size[0]) // 2
    right_border = out_size[0] + left_border
    up_border = (image.size[1] - out_size[1]) // 2
    down_border = out_size[1] + up_border
    image = image.crop((left_border, up_border, right_border, down_border))
    
    # Create the output image
    new = Image.new("RGBA", out_size)

    # Create output shuffle order
    shuffled_order = np.arange(num_blocks[0] * num_blocks[1])
    np.random.shuffle(shuffled_order)

    # Paste each block in the new image
    for old_idx, new_idx in enumerate(shuffled_order):
        x, y = np.unravel_index(old_idx, num_blocks)
        # Get the block
        block = image.crop(
            (x * block_size, y * block_size, (x+1)*block_size, (y+1)*block_size)
        )
        # Filter it
        block = block.filter(ImageFilter.SHARPEN)
        x, y = np.unravel_index(new_idx, num_blocks)
        # Insert it
        new.paste(block, (x * block_size, y * block_size))
    # Save the file
    new.save(f"schuffled_{file.split('.')[0][7:]}.png")


files = glob("images/*.png")
block_size = 200
    
compute([
    block_random(file, block_size)
    for file in files
])
-->

## Exercise: Working with mutable

We have a function that computes 2 values from an input which we want to store in a dict.
We want to create the dict in a delayed function.

_Hint:_
<!---
The dict must be in the return
--->

In [None]:
from collections import defaultdict

def analyse(data):
    avg = np.mean(data, axis=0)
    key = int(avg.sum() * 10)
    value = data - np.add.outer(avg, avg)
    return key, value


result = defaultdict(lambda : 0)
datas = [np.random.rand(10, 10) for _ in range(5)]

for data in datas:
    key, value = analyse(data)
    result[key] += value

len(result)

## Solution
<!---
# Method one: Create the full dict in one call
@delayed
def make():
    return np.random.rand(10, 10)


@delayed
def analyse(data):
    avg = np.mean(data, axis=0)
    key = int(avg.sum() * 10)
    value = data - np.add.outer(avg, avg)
    return key, value


@delayed()
def make_dict(analysed):
    result = defaultdict(lambda : 0)
    for key, value in analysed:
        result[key] =  value
    return result


datas = [make() for _ in range(5)]
analysed = [analyse(data) for data in datas]
result = make_dict(analysed)


# Method two: Create a add_item method
@delayed
def add_item(result, key, value):
    if key in result:
        value = result[key] + value
    result[key] = value
    return result


result2 = {}
for _ in range(5):
    data = make()
    analysed = analyse(data)
    result2 = add_item(result2, analysed[0], analysed[1])

result2.compute()

--->