Independent parallel example with Dask, from https://examples.dask.org/applications/embarrassingly-parallel.html
    
Before running this notebook do the following in the terminal of one of the cluster login nodes:

1. Install Miniconda3 as listed in https://www.chpc.utah.edu/documentation/software/python-anaconda.php, that is:
        
> wget https://repo.continuum.io/miniconda/Miniconda3-latest-Linux-x86_64.sh  
> bash ./Miniconda3-latest-Linux-x86_64.sh -b -p $HOME/software/pkg/miniconda3  
> mkdir -p $HOME/MyModules/miniconda3  
> cp /uufs/chpc.utah.edu/sys/installdir/python/modules/miniconda3/latest.lua $HOME/MyModules/miniconda3 

2. In the terminal, load the new miniconda3 module and install Dask
> module use $HOME/MyModules  
> module load miniconda3/latest  
> conda install dask 

3. Log into ondemand.chpc.utah.edu with your CHPC creditentials

4. Go to Interactive Apps - Jupyter Notebook on notchpeak

5. In the Environment Setup text box, put:
> module use $HOME/MyModules  
> module load miniconda3/latest  
this will make sure the Jupyter notebook started through the Open OnDemand job will load your own miniconda that has Dask installed.

6. Use notchpeak-shared-short for account and partition, and select your choice of CPU cores and walltime hours (within the listed limits). Then hit Launch to submit the job.

7. Once the job starts, hit the blue Connect to Jupyter button and open this notebook in it.


We are following embarrassingly parallel example at https://examples.dask.org/applications/embarrassingly-parallel.html

In [1]:
from dask.distributed import Client, progress
client = Client(threads_per_worker=4, n_workers=1)
client

0,1
Client  Scheduler: tcp://127.0.0.1:44193  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 1  Cores: 4  Memory: 4.19 GB


In [2]:
import time
import random

def costly_simulation(list_param):
    time.sleep(random.random())
    return sum(list_param)

In [3]:
%time costly_simulation([1, 2, 3, 4])

CPU times: user 7.44 ms, sys: 2.01 ms, total: 9.45 ms
Wall time: 98.6 ms


10

In [4]:
import pandas as pd
import numpy as np

input_params = pd.DataFrame(np.random.random(size=(500, 4)),
                            columns=['param_a', 'param_b', 'param_c', 'param_d'])
input_params.head()

Unnamed: 0,param_a,param_b,param_c,param_d
0,0.596837,0.561082,0.429463,0.473729
1,0.354301,0.843361,0.72738,0.266897
2,0.827863,0.472732,0.336698,0.986588
3,0.098369,0.119939,0.363779,0.392682
4,0.718471,0.309634,0.980632,0.982298


In [5]:
%%time
results = []
for parameters in input_params.values[:10]:
    result = costly_simulation(parameters)
    results.append(result)
results

CPU times: user 134 ms, sys: 31.3 ms, total: 165 ms
Wall time: 4.27 s


[2.0611104661055744,
 2.1919391753293813,
 2.623881531073982,
 0.9747683500528982,
 2.991035546149825,
 0.9627170938083086,
 2.203029404493799,
 2.541805980108227,
 1.2280634650473525,
 1.936713653567948]

In [6]:
import dask
lazy_results = []

In [7]:
%%time

for parameters in input_params.values[:10]:
    lazy_result = dask.delayed(costly_simulation)(parameters)
    lazy_results.append(lazy_result)
lazy_results[0]

CPU times: user 3.28 ms, sys: 2.82 ms, total: 6.1 ms
Wall time: 5.52 ms


Delayed('costly_simulation-34ceeba3-6b19-4e46-96a6-9abef86cce6c')

In [8]:
%time dask.compute(*lazy_results)

CPU times: user 92.4 ms, sys: 15.9 ms, total: 108 ms
Wall time: 1.39 s


(2.0611104661055744,
 2.1919391753293813,
 2.623881531073982,
 0.9747683500528982,
 2.991035546149825,
 0.9627170938083086,
 2.203029404493799,
 2.541805980108227,
 1.2280634650473525,
 1.936713653567948)

In [9]:
import dask
lazy_results = []

for parameters in input_params.values:
    lazy_result = dask.delayed(costly_simulation)(parameters)
    lazy_results.append(lazy_result)

futures = dask.persist(*lazy_results)  # trigger computation in the background

In [10]:
%time results = dask.compute(*futures)
results[:5]

CPU times: user 3.93 s, sys: 556 ms, total: 4.48 s
Wall time: 57.1 s


(2.0611104661055744,
 2.1919391753293813,
 2.623881531073982,
 0.9747683500528982,
 2.991035546149825)

In [11]:
client

0,1
Client  Scheduler: tcp://127.0.0.1:37257  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 1  Cores: 4  Memory: 4.19 GB
