# DASK

In [1]:
option_comp = 'local'

In [2]:
from dask.distributed import Client

if option_comp == "merlin":

    from dask_jobqueue import SLURMCluster
    
    cluster = SLURMCluster(cores     = 8, 
                           memory    ='4GB', 
                           walltime  = '06:00:00',
                           interface ='ib0',
                           local_directory = '/data/user/kim_a',
                           log_directory   = '/data/user/kim_a',
                           job_extra="--exclusive") 
    
elif option_comp == "local":

    from dask.distributed import LocalCluster
    
    cluster = LocalCluster()

In [3]:
client = Client(cluster)

In [6]:
n_workers = 8
cluster.scale(n_workers)

In [7]:
client

0,1
Client  Scheduler: tcp://127.0.0.1:60228  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 8  Cores: 24  Memory: 34.36 GB


In [None]:
# client.close()
# cluster.close()

# Conventional reference model

In [38]:
from cge_klausen import parameters as cge_parameters
from cge_model import GeothermalConventionalModel
from setup_files_gsa import *

n_iter = 320
project = 'Geothermal'
option = 'cge'
seed = 13413203

electricity_conv_prod, methods, cge_model, cge_parameters = setup_gt_project(project, option)

# Generate stochastic values
cge_parameters.stochastic(iterations=n_iter) # TODO add seed
# Reference model
cge_parameters_sto = cge_model.run_ps(cge_parameters)

In [39]:
%%time 
ref_cge = run_mc(cge_parameters_sto, electricity_conv_prod, methods, n_iter)

CPU times: user 14min 55s, sys: 26.5 s, total: 15min 22s
Wall time: 2min 39s


In [40]:
ref_cge

{'climate change total': array([0.41734607, 0.12133854, 0.09826054, 0.15811886, 0.04907548,
        0.05991758, 0.0638623 , 0.50512962, 0.29830347, 0.03930585,
        0.12548262, 0.01797248, 0.0877036 , 0.52166457, 0.28749149,
        0.19084863, 0.2356731 , 0.15225142, 0.32001406, 0.02104983,
        0.15557913, 0.12801752, 0.11136883, 0.05742966, 0.21607734,
        0.16151139, 0.47560842, 0.15393066, 0.10173475, 0.02385928,
        0.02529589, 0.09088552, 0.09236601, 0.15464815, 0.68278413,
        0.13694845, 0.09083727, 0.41861313, 0.29822641, 0.20812619,
        0.13204847, 0.08656033, 0.06402493, 0.07333204, 0.07433397,
        0.17656752, 0.17561163, 0.14529046, 0.08377878, 0.12487511,
        0.04702133, 0.24497703, 0.09188469, 0.1083788 , 0.09121464,
        0.12848928, 0.03970776, 0.04128465, 0.14032762, 0.0953026 ,
        0.18355509, 0.0361099 , 0.0791804 , 0.02975068, 0.06272862,
        0.02276636, 0.07892886, 0.02729774, 0.32115529, 0.13385602,
        0.06898445, 0.14

# Computations with dask

In [41]:
import dask

In [42]:
def task_per_worker(n_workers, i_worker, option, n_iter):
    project = 'Geothermal'
    electricity_prod, methods, gt_model, parameters = setup_gt_project(project, option)
    # Generate stochastic values
    parameters.stochastic(iterations=n_iter) # TODO add seed, also as the function input
    # Reference model
    parameters_sto = gt_model.run_ps(parameters)
    
    # Create chunks for workers
    # Extract part of the sample for the current worker
    chunk_size = n_iter//n_workers
    start = i_worker*chunk_size
    if i_worker != n_workers-1:
        end = (i_worker+1)*chunk_size
    else:
        end = n_iter
        
    parameters_sto_chunk = [(p[0], p[1], p[2][start:end]) for p in parameters_sto]
    
    results = run_mc(parameters_sto_chunk, electricity_prod, methods, end-start)
    
    return results

In [43]:
%%time
task_per_worker(8, 7, option, n_iter)

CPU times: user 2min 5s, sys: 5.23 s, total: 2min 10s
Wall time: 32.1 s


{'climate change total': array([0.07130548, 0.08807215, 0.19253477, 0.05507062, 0.07715278,
        0.1907083 , 0.12364092, 0.08937042, 0.04541838, 0.04468774,
        0.032495  , 0.01616493, 0.74355135, 0.069473  , 0.04605995,
        0.18769338, 0.04136605, 0.02451439, 0.1447042 , 0.18918838,
        0.03057007, 0.10865247, 0.09955529, 0.04895091, 0.09966091,
        0.0272939 , 0.10300581, 0.05225791, 0.25358862, 0.17718849,
        0.02853241, 0.05716507, 0.24100327, 0.07466111, 0.04691314,
        0.09525337, 0.12870479, 0.03687186, 0.0695842 , 0.05760639]),
 'carcinogenic effects': array([1.46247775e-09, 7.07408972e-10, 9.58832237e-10, 1.18435521e-09,
        1.59900679e-09, 3.65301561e-09, 6.85028652e-10, 7.26625417e-10,
        2.03647645e-09, 7.17727442e-10, 9.43759861e-10, 1.05809258e-09,
        1.81201305e-09, 1.24827324e-09, 1.13440428e-09, 1.71469521e-09,
        1.29219065e-09, 5.71581777e-10, 1.27679111e-09, 5.96552141e-10,
        1.07702299e-09, 7.99543148e-10, 3.6570

In [44]:
task_per_worker = dask.delayed(task_per_worker)

In [45]:
model_evals = []
for i in range(n_workers):
    model_eval = task_per_worker(n_workers, i, option, n_iter)
    model_evals.append(model_eval)

In [46]:
%%time
res_intermediate = dask.compute(model_evals)

CPU times: user 15.5 s, sys: 2.49 s, total: 18 s
Wall time: 1min 58s


In [47]:
res_intermediate = np.array(res_intermediate).squeeze()

In [48]:
res_all = {}
for m in methods:
    res_array = np.array([])
    for res in res_intermediate:
        res_array = np.hstack([res_array, res[m[-1]]])
    res_all[m[-1]] = res_array

In [53]:
res_all['land use']

array([0.04650413, 0.02660039, 0.01976293, 0.03939582, 0.01578489,
       0.02865784, 0.02361102, 0.01991875, 0.01875124, 0.03261028,
       0.07890252, 0.01166512, 0.11846756, 0.01845049, 0.01227474,
       0.01560099, 0.02322211, 0.01422899, 0.03961534, 0.03297618,
       0.10937777, 0.02475989, 0.0848552 , 0.01136605, 0.03829226,
       0.05904455, 0.00987994, 0.02294232, 0.02456274, 0.00834451,
       0.03545793, 0.01057476, 0.04079371, 0.0110157 , 0.00988588,
       0.01732084, 0.03135755, 0.06750052, 0.05464802, 0.0327824 ,
       0.03113498, 0.01425338, 0.05741362, 0.02726948, 0.03342815,
       0.0179017 , 0.01548511, 0.01716777, 0.07778232, 0.01357079,
       0.04933858, 0.06093204, 0.01197928, 0.01379968, 0.04819299,
       0.02834668, 0.01409636, 0.02527596, 0.03940808, 0.00971621,
       0.03067633, 0.01593218, 0.01839923, 0.03716485, 0.04717036,
       0.02553126, 0.06232392, 0.0317425 , 0.01930992, 0.04424967,
       0.02386473, 0.05810445, 0.01383576, 0.04034561, 0.05628

In [50]:
# Don't forget to close dask client and cluster
client.close()
cluster.close()

In [None]:
# TODO Save results