## Dask on Mio test!

**THE FOLLOWING COMMAND WILL START THE NUMBER OF NODES IN**

    cluster.scale(N)
    
**IT STARTS RUNNING THE NODES EVEN IF IT HAS NOTHING TO DO**

Recommended to use 

    cluster.adapt()
    
instead to automatically scale up and down jobs

In [1]:
import dask
import dask.array as da

In [2]:
from dask_jobqueue import SLURMCluster

# The values in this function can be set in ~/.config/dask/jobqueue.yaml instead of in this function
# A copy on my config is included in this repo
# More info: https://jobqueue.dask.org/en/latest/configuration.html
cluster = SLURMCluster(cores=16, # cores per job
                       memory="100GB", # memory per job, not sure what the mio nodes have
                       #processes = sqrt(cores) # cut the job into this many processes. Default is good
                       project='dask_test', 
                       queue='geop,compute', # prefer geop nodes, but accept compute
                       walltime='02:00:00', # time we are reserving the nodes for
                       log_directory="./logs", # directory for logs
                       local_directory="~/scratch/dask_test" # directory for file spilling in case things get big
                      )

# cluster.scale(n=2,jobs=2)  # Start 2 workers in 2 jobs that match the description above
cluster.adapt(maximum_jobs=20) # automatically launches and kills nodes based on load
 
from dask.distributed import Client
client = Client(cluster)    # Connect to that cluster

## Do something Dask

In [3]:
x = da.random.random((50000, 50000), chunks=(1000, 1000))
x

Unnamed: 0,Array,Chunk
Bytes,20.00 GB,8.00 MB
Shape,"(50000, 50000)","(1000, 1000)"
Count,2500 Tasks,2500 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 20.00 GB 8.00 MB Shape (50000, 50000) (1000, 1000) Count 2500 Tasks 2500 Chunks Type float64 numpy.ndarray",50000  50000,

Unnamed: 0,Array,Chunk
Bytes,20.00 GB,8.00 MB
Shape,"(50000, 50000)","(1000, 1000)"
Count,2500 Tasks,2500 Chunks
Type,float64,numpy.ndarray


In [4]:
y = x + x.T
z = y.mean(axis=1)
z

Unnamed: 0,Array,Chunk
Bytes,400.00 kB,8.00 kB
Shape,"(50000,)","(1000,)"
Count,10900 Tasks,50 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 400.00 kB 8.00 kB Shape (50000,) (1000,) Count 10900 Tasks 50 Chunks Type float64 numpy.ndarray",50000  1,

Unnamed: 0,Array,Chunk
Bytes,400.00 kB,8.00 kB
Shape,"(50000,)","(1000,)"
Count,10900 Tasks,50 Chunks
Type,float64,numpy.ndarray


### X, Y, AND Z HAVE NOT BEEN COMPUTED YET -That happens with a compute() call

In [5]:
result = z.compute()

In [6]:
k = z[0] + z[1]
k

Unnamed: 0,Array,Chunk
Bytes,8 B,8 B
Shape,(),()
Count,10903 Tasks,1 Chunks
Type,float64,numpy.ndarray
Array Chunk Bytes 8 B 8 B Shape () () Count 10903 Tasks 1 Chunks Type float64 numpy.ndarray,,

Unnamed: 0,Array,Chunk
Bytes,8 B,8 B
Shape,(),()
Count,10903 Tasks,1 Chunks
Type,float64,numpy.ndarray


In [17]:
k.compute()

2.000242140843226

In [8]:
result[0] + result[1]

2.000242140843226

In [9]:
print(type(result))

<class 'numpy.ndarray'>


## What if we have a custom function?

In [10]:
import time
import random

def inc(x):
    time.sleep(random.random())
    return x + 1

def dec(x):
    time.sleep(random.random())
    return x - 1

def add(x, y):
    time.sleep(random.random())
    return x + y

In [11]:
inc_delay = dask.delayed(inc)
dec_delay = dask.delayed(dec)
add_delay = dask.delayed(add)
add_delay

Delayed('add-0b1be891-fbdf-4051-825c-145f0201afb0')

In [12]:
xd = inc_delay(5)
yd = dec_delay(2)
zd = add_delay(xd, yd)
zd

Delayed('add-34b8932e-9938-4f9e-b7fb-f657fdee4937')

In [15]:
resultd = zd.compute()

In [14]:
resultd

7

## Cleanup

In [16]:
# Kills Dask moniotring too - So if you're using the status page, run this when all done
client.close() # Release the client
cluster.close() # Release the nodes