# Dask workers

In [1]:
from dask.distributed import Client, LocalCluster
from dask_jobqueue import SLURMCluster
import os

In [2]:
which_pc = "merlin_paper_gsa"
if 'merlin' in which_pc:
    path_dask_logs = '/data/user/kim_a/dask_logs'
    if not os.path.exists(path_dask_logs):
        os.makedirs(path_dask_logs)
    cluster = SLURMCluster(cores     = 8,
                           processes = 6,
                           memory    ="12GB", 
                           walltime  = '20:00:00',
                           interface ='ib0',
                           local_directory = path_dask_logs,
                           log_directory   = path_dask_logs,
                           queue="daily",
                           ) 
elif 'local' in which_pc:
    cluster = LocalCluster(memory_limit='7GB') 

In [3]:
client = Client(cluster)

In [4]:
n_workers = 60
cluster.scale(n_workers)

In [6]:
client

0,1
Client  Scheduler: tcp://192.168.196.21:38026  Dashboard: http://192.168.196.21:8787/status,Cluster  Workers: 0  Cores: 0  Memory: 0 B


In [None]:
# client.close()
# cluster.close()

# Overall setup

In [26]:
from gsa_framework.lca import LCAModel
from gsa_framework.methods.correlations import CorrelationCoefficients
from gsa_framework.methods.extended_FAST import eFAST
from gsa_framework.methods.saltelli_sobol import SaltelliSobol
from gsa_framework.methods.gradient_boosting import GradientBoosting
from gsa_framework.validation import Validation
from gsa_framework.convergence import Convergence
from pathlib import Path
import brightway2 as bw
import time
import numpy as np
from gsa_framework.plotting import histogram_Y1_Y2
from gsa_framework.utils import read_hdf5_array, read_pickle, write_hdf5_array, write_pickle
import h5py
import dask

In [27]:
def generate_model_output_from_workers(n_workers, dirpath_Y, filepath_Y):
    if filepath_Y.exists():
        Y = read_hdf5_array(filepath_Y).flatten()
        print("{} already exists".format(filepath_Y.name))
    else:
        Y = np.array([])
        for i in range(n_workers):
            Y_chunk_filename = "{}.{}.pickle".format(i, n_workers)
            filepath_Y_chunk = dirpath_Y / Y_chunk_filename
            Y_chunk = read_pickle(filepath_Y_chunk)
            Y = np.hstack(
                [Y, Y_chunk]
            )  # TODO change to vstack for multidimensional output
        write_hdf5_array(Y, filepath_Y)
    return Y

In [28]:
def compute_scores_per_worker(num_params, iterations, n_workers, i_worker):
    gsa = setup_sobol(num_params, iterations)
    gsa.dirpath_Y.mkdir(parents=True, exist_ok=True)
    filepath_X_chunk = gsa.write_dir / "arrays" / "x_chunks" / "{}.{}.pickle".format(i_worker, n_workers)
    X_chunk_unitcube = read_pickle(filepath_X_chunk)
    X_chunk_rescaled = gsa.model.rescale(X_chunk_unitcube)
    scores = gsa.model(X_chunk_rescaled)
    Y_filename = "{}.{}.pickle".format(i_worker, n_workers)
    filepath = gsa.dirpath_Y / Y_filename
    write_pickle(scores, filepath)
    return scores

# 1. Correlations, 10'000 and 30'000 params

In [None]:
def setup_correlations(num_params, iterations):
    path_base = Path('/data/user/kim_a/paper_gsa/gsa_framework_files')
    # LCA model
    bw.projects.set_current("GSA for paper")
    co = bw.Database("CH consumption 1.0")
    act = [act for act in co if "Food" in act['name']][0]
    demand = {act: 1}
    method = ("IPCC 2013", "climate change", "GTP 100a")
    # Define some variables
    write_dir = path_base / "lca_model_{}".format(num_params)
    model = LCAModel(demand, method, write_dir, num_params=num_params)
    gsa_seed = 3403
    # Setup GSA
    gsa = CorrelationCoefficients(
        iterations=iterations,
        model=model,
        write_dir=write_dir,
        seed=gsa_seed,
    )
    return gsa

In [None]:
%%time
num_params = 30000
iterations = 2 * num_params
gsa = setup_correlations(num_params, iterations)
X_unitcube = gsa.generate_unitcube_samples()

In [None]:
# %%time
# # Test
# X_chunk_unitcube = X_unitcube[20:30,:]
# n_workers = 20
# i = 3
# scores_test = compute_scores_per_worker(
#     gsa = gsa,
#     X_chunk_unitcube=X_chunk_unitcube, 
#     n_workers=n_workers, 
#     i_worker=i,
# )

### Run dask

In [None]:
iterations_per_worker = iterations // n_workers
task_per_worker = dask.delayed(compute_scores_per_worker)
model_evals = []
for i in range(n_workers):
    start = i * iterations_per_worker
    end = (i + 1) * iterations_per_worker
    X_chunk_unitcube = X_unitcube[start:end, :]
    model_eval = task_per_worker(
        gsa = gsa,
        X_chunk_unitcube=X_chunk_unitcube, 
        n_workers=n_workers, 
        i_worker=i,
    )
    model_evals.append(model_eval)

In [None]:
# %%time
# dask.compute(model_evals)

### Collect scores

In [None]:
dirpath_Y = gsa.dirpath_Y
filepath_Y = gsa.filepath_Y
Y = generate_model_output_from_workers(n_workers, dirpath_Y, filepath_Y)

# 2. eFAST 

In [None]:
def setup_efast(num_params, iterations):
    path_base = Path('/data/user/kim_a/paper_gsa/gsa_framework_files')
    # LCA model
    bw.projects.set_current("GSA for paper")
    co = bw.Database("CH consumption 1.0")
    act = [act for act in co if "Food" in act['name']][0]
    demand = {act: 1}
    method = ("IPCC 2013", "climate change", "GTP 100a")
    # Define some variables
    write_dir = path_base / "lca_model_{}".format(num_params)
    model = LCAModel(demand, method, write_dir, num_params=num_params)
    gsa_seed = 3403
    M = 4
    gsa = eFAST(
        M=M, iterations=iterations, model=model, write_dir=write_dir, seed=gsa_seed
    )
    return gsa

In [None]:
%%time
num_params = 10000
iterations = 65 * num_params
gsa = setup_efast(num_params, iterations)
# X_unitcube = gsa.generate_unitcube_samples() # TODO do it only once!

In [None]:
# %%time
# # Test
# start = 20
# end = 30
# with h5py.File(gsa.filepath_X_unitcube, "r") as f:
#     X_chunk_unitcube = np.array(f["dataset"][start:end,:])    
# n_workers = 20
# i = 3
# scores_test = compute_scores_per_worker(
#     gsa=gsa,
#     X_chunk_unitcube=X_chunk_unitcube, 
#     n_workers=n_workers, 
#     i_worker=i,
# )

In [None]:
iterations_per_worker = iterations // n_workers
task_per_worker = dask.delayed(compute_scores_per_worker)
model_evals = []
for i in range(n_workers):
    start = i * iterations_per_worker
    end = (i + 1) * iterations_per_worker
    with h5py.File(gsa.filepath_X_unitcube, "r") as f:
        X_chunk_unitcube = np.array(f["dataset"][start:end,:])  
    model_eval = task_per_worker(
        gsa=gsa,
        X_chunk_unitcube=X_chunk_unitcube, 
        n_workers=n_workers, 
        i_worker=i,
    )
    scores = model_evals.append(model_eval)

In [None]:
%%time
dask.compute(model_evals)

# 3. Sobol method

In [29]:
def setup_sobol(num_params, iterations):
    path_base = Path('/data/user/kim_a/paper_gsa/gsa_framework_files')
    # LCA model
    bw.projects.set_current("GSA for paper")
    co = bw.Database("CH consumption 1.0")
    act = [act for act in co if "Food" in act['name']][0]
    demand = {act: 1}
    method = ("IPCC 2013", "climate change", "GTP 100a")
    # Define some variables
    write_dir = path_base / "lca_model_{}".format(num_params)
    model = LCAModel(demand, method, write_dir, num_params=num_params)
    gsa_seed = 3403
    gsa = SaltelliSobol(iterations=iterations, model=model, write_dir=write_dir)
    return gsa

In [30]:
%%time
num_params = 10000
iterations = 100 * num_params
gsa = setup_sobol(num_params, iterations)
# X_unitcube = gsa.generate_unitcube_samples() # TODO do it only once!

CPU times: user 8.89 s, sys: 323 ms, total: 9.21 s
Wall time: 2.2 s


In [None]:
# %%time
# Create x_chunk files
# iterations_per_worker = 1000
# n_times = gsa.iterations // n_workers // iterations_per_worker + 1
# n_tasks = gsa.iterations // iterations_per_worker

# k=0
# with h5py.File(gsa.filepath_X_unitcube, "r") as f:
#     for j in range(n_times):
#         for i in range(n_workers):
#             start = k * iterations_per_worker
#             end = (k + 1) * iterations_per_worker
#             if k <= n_tasks:
#                 print(k, start, end)
#                 X_chunk_unitcube = np.array(f["dataset"][start:end,:]) 
#                 filepath_X_chunk = gsa.write_dir / "arrays" / "x_chunks" / "{}.{}.pickle".format(k, n_tasks)
#                 write_pickle(X_chunk_unitcube, filepath_X_chunk)
#                 k += 1

In [None]:
# task_per_worker = dask.delayed(compute_scores_per_worker)
# iterations_per_worker = 1000
# n_times = gsa.iterations // n_workers // iterations_per_worker + 1
# n_tasks = gsa.iterations // iterations_per_worker

# k=0
# for j in range(n_times):
#     print(j)
#     model_evals = []
#     for i in range(n_workers):
#         if k <= n_tasks:
#             model_eval = task_per_worker(
#                 gsa=gsa,
#                 n_workers=n_tasks, 
#                 i_worker=k,
#             )
#             model_evals.append(model_eval)
#             k += 1
#     dask.compute(model_evals)

In [None]:
task_per_worker = dask.delayed(compute_scores_per_worker)
iterations_per_worker = 1000
n_times = gsa.iterations // n_workers // iterations_per_worker + 1
n_tasks = gsa.iterations // iterations_per_worker

num_params = 10000
iterations = 100 * num_params

model_evals = []
j = 0
for k in range(n_tasks+2):    
    if len(model_evals) == n_workers or k==n_tasks+1:
        print(j)
        j += 1
        dask.compute(model_evals)
        model_evals = []
    if k==n_tasks+1:
        break
    Y_filename = "{}.{}.pickle".format(k, n_tasks)
    filepath = gsa.dirpath_Y / Y_filename
    if not filepath.exists():
        model_eval = task_per_worker(
            num_params=num_params, 
            iterations=iterations,
            n_workers=n_tasks, 
            i_worker=k,
        )
        model_evals.append(model_eval)
    

0
1
2
3
4
5
6


In [23]:
len(model_evals)

31

In [19]:
n_tasks

990

# Constructing LCA models test

In [None]:
from gsa_framework.lca import LCAModel
from gsa_framework.methods.correlations import CorrelationCoefficients
from gsa_framework.methods.extended_FAST import eFAST
from gsa_framework.methods.saltelli_sobol import SaltelliSobol
from gsa_framework.methods.gradient_boosting import GradientBoosting
from gsa_framework.validation import Validation
from pathlib import Path
import brightway2 as bw
import time
import numpy as np
from gsa_framework.plotting import histogram_Y1_Y2
from gsa_framework.utils import read_hdf5_array

if __name__ == "__main__":

#     path_base = Path(
#         "/Users/akim/PycharmProjects/gsa_framework/dev/write_files/paper_gsa/"
#     )
    path_base = Path('/data/user/kim_a/paper_gsa/gsa_framework_files')

    # LCA model
    bw.projects.set_current("GSA for paper")
    co = bw.Database("CH consumption 1.0")
    act = [act for act in co if "Food" in act['name']][0]
    demand = {act: 1}
    method = ("IPCC 2013", "climate change", "GTP 100a")

    # Define some variables
    num_params = 10000
    iterations_validation = 2000
    write_dir = path_base / "lca_model_{}".format(num_params)
    model = LCAModel(demand, method, write_dir) # TODO add num_params later
    gsa_seed = 3403
    validation_seed = 7043
    fig_format = ["html", "pickle"]
    parameter_inds_convergence_plot = [0,1,2] #TODO

    # Make sure  that the chosen num_params in LCA are appropriate
    val = Validation(
        model=model,
        iterations=2000,
        seed=4444,
        default_x_rescaled=model.default_uncertain_amounts,
        write_dir=write_dir,
    )
    tag = "numParams{}".format(num_params)
    scores_dict = model.get_lsa_scores_pickle(model.write_dir / "LSA_scores")
    uncertain_tech_params_where_subset, _ = model.get_nonzero_params_from_num_params(scores_dict, num_params)
    parameter_choice = []
    for u in uncertain_tech_params_where_subset:
        where_temp = np.where(model.uncertain_tech_params_where == u)[0]
        assert len(where_temp) == 1
        parameter_choice.append(where_temp[0])
    parameter_choice.sort()
    Y_subset = val.get_influential_Y_from_parameter_choice(parameter_choice=parameter_choice, tag=tag)
    val.plot_histogram_Y_all_Y_inf(Y_subset, num_influential=num_params)