## This notebook demonstrates simple distributed computing with dask.

In [None]:
from dask_janelia import auto_cluster
from distributed import Client, progress
import dask.array as da
from dask import bag
import numpy as np
from dataclasses import dataclass
from IPython.core.display import display, HTML
import time
def html_display(arr):
    display(HTML(arr._repr_html_()))

### Introduction to the two objects that manage our distributed computation

In [7]:
from distributed import LocalCluster
num_workers = 10

# context manager ensures cleanup
with LocalCluster(threads_per_worker=1, n_workers=0) as clust, Client(clust) as client:
    html_display(client.cluster)
    html_display(client)
    print(f'\n After requesting {num_workers} workers....')
    client.cluster.scale(num_workers)
    # wait for workers to start up
    time.sleep(.5)
    html_display(client)
    time.sleep(10)

0,1
Client  Scheduler: tcp://127.0.0.1:35601  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 0  Cores: 0  Memory: 0 B



 After requesting 10 workers....


0,1
Client  Scheduler: tcp://127.0.0.1:35601  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 10  Cores: 10  Memory: 112.52 GB


### Distributed computing of arbitrary python functions

In [32]:
points = range(100), range(100)

def distance(arg1, arg2):
    return (arg1 ** 2 + arg2 ** 2) ** .5

with auto_cluster(n_workers=2) as clust, Client(clust) as cl:
    print(clust.dashboard_link)
    futures = cl.map(distance, *points)
    results = cl.gather(futures)

print(results[:10])

http://127.0.0.1:8787/status
[0.0, 1.4142135623730951, 2.8284271247461903, 4.242640687119285, 5.656854249492381, 7.0710678118654755, 8.48528137423857, 9.899494936611665, 11.313708498984761, 12.727922061357855]


### Distribute arbitrary python objects, call their methods on workers

In [33]:
@dataclass
class point2d:
    x: float
    y: float
    
    def distance(self):
        return np.hypot(self.x, self.y)
    
num_points = 100
points = [point2d(x,y) for x,y in np.random.randn(num_points,2)]
points_bag = bag.from_sequence(points)

with auto_cluster(n_workers=2) as clust, Client(clust) as cl:
    print(clust.dashboard_link)
    distances = points_bag.map(lambda v: v.distance()).compute(scheduler=cl)

print(distances[:10])

http://127.0.0.1:8787/status
[1.329203016819606, 0.5310734383827062, 2.8247553619979, 0.33218738642967494, 0.6828809793793926, 1.496594513969543, 1.6920124957092155, 1.322823111554193, 0.8034387491667473, 0.47581444323546235]


In [60]:
points = da.random.random(size=(2,10000), chunks=(-1, 100))
a = np.hypot(*points)
html_display(a)

Unnamed: 0,Array,Chunk
Bytes,80.00 kB,800 B
Shape,"(10000,)","(100,)"
Count,400 Tasks,100 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 80.00 kB 800 B Shape (10000,) (100,) Count 400 Tasks 100 Chunks Type float64 numpy.ndarray",10000  1,

Unnamed: 0,Array,Chunk
Bytes,80.00 kB,800 B
Shape,"(10000,)","(100,)"
Count,400 Tasks,100 Chunks
Type,float64,numpy.ndarray


In [67]:
dataset_chunks = (2,100)
dataset_shape = (2, 10000)
dataset_dtype='float'

normalized_chunks = da.core.normalize_chunks(dataset_chunks, dataset_shape)
filenames = np.arange(dataset_shape[0]).astype('str')

def load_data(filename):
    return np.random.random(size=dataset_chunks)

dataset = da.map_blocks(load_data, 
                        filenames, 
                        chunks=normalized_chunks, 
                        dtype=dataset_dtype)
html_display(dataset)

Unnamed: 0,Array,Chunk
Bytes,160.00 kB,1.60 kB
Shape,"(2, 10000)","(2, 100)"
Count,100 Tasks,100 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 160.00 kB 1.60 kB Shape (2, 10000) (2, 100) Count 100 Tasks 100 Chunks Type float64 numpy.ndarray",10000  2,

Unnamed: 0,Array,Chunk
Bytes,160.00 kB,1.60 kB
Shape,"(2, 10000)","(2, 100)"
Count,100 Tasks,100 Chunks
Type,float64,numpy.ndarray


We can take a slice of the data, which returns a smaller lazy array object