In [1]:
import os
import ipyparallel as ipp
import time
import pickle


from neuralhydrology.utils.config import Config

from neuralhydrology.nh_run import ESDL_start_run, eval_run
from pathlib import Path
import xarray as xr
from neuralhydrology.utils.nh_results_ensemble import create_results_ensemble

In [2]:
os.getcwd()

'/global/home/groups/pc_dsdisc/ESDL/ESDL_LSTM/neuralhydrology'

In [3]:
mycluster = ipp.Cluster(n=int(os.getenv('SLURM_CPUS_ON_NODE')))
c = mycluster.start_and_connect_sync()
dview = c[:]
dview.block = True   
dview.apply(lambda : "Hello, World")

Starting 32 engines with <class 'ipyparallel.cluster.launcher.LocalEngineSetLauncher'>
100%|██████████| 32/32 [00:05<00:00,  5.57engine/s]


['Hello, World',
 'Hello, World',
 'Hello, World',
 'Hello, World',
 'Hello, World',
 'Hello, World',
 'Hello, World',
 'Hello, World',
 'Hello, World',
 'Hello, World',
 'Hello, World',
 'Hello, World',
 'Hello, World',
 'Hello, World',
 'Hello, World',
 'Hello, World',
 'Hello, World',
 'Hello, World',
 'Hello, World',
 'Hello, World',
 'Hello, World',
 'Hello, World',
 'Hello, World',
 'Hello, World',
 'Hello, World',
 'Hello, World',
 'Hello, World',
 'Hello, World',
 'Hello, World',
 'Hello, World',
 'Hello, World',
 'Hello, World']

In [4]:
len(c.ids)

32

In [5]:
# hidden_states = [64, 96, 156, 196, 256]
# dropout_rate = [0.0, 0.25, 0.4, 0.5]
# input_seq_lengths = [90, 180, 270, 365]
#stacked_layers = [1, 2]

hidden_states = range(63, 63+len(c.ids))
dropout_rate = [0.5]
input_seq_lengths = [90]


param_combos = []
for i in hidden_states:
    for j in dropout_rate:
        for k in input_seq_lengths:
            param_combos.append((i, j, k))

In [6]:
len(param_combos)

32

In [7]:
def ESDL_ensemble(params):
    '''Train ensemble for given config file and return ensemble NSE'''
    num_ensemble_members=1
    
    #copy data 24 times, each worker calls a different one
    
    config_path = Path("../initial_exploration/parallel_grid_search.yml") #read this in once, pass in object to all workers
    config = Config(config_path)
    config.update_config({'epochs': 5})
    config.update_config({'hidden_size': params[0]})
    config.update_config({'output_dropout': params[1]})
    config.update_config({'seq_length': params[2]})
    
    output_path = 'parallel_grid_search_test' + str(params[0]) + '_' + str(params[1]) + '_' + str(params[2])
    config.update_config({'experiment_name': output_path})
    
    #train num_ensemble_members models
    paths = [] #store the path of the results of the model
    for i in range(num_ensemble_members):
        ESDL_start_run(config, gpu=-1)
        path = config.run_dir
        paths.append(path)
    
    #evaluate models
    for p in paths:
        eval_run(run_dir=p, period="test")
        eval_run(run_dir=p, period="validation")
        with open(p / "test" / "model_epoch005" / "test_results.p", "rb") as fp: #comment next three lines out when using more than one ensemble member
            results = pickle.load(fp)
            ensemble_nse = results['Tuler']['1D']['NSE'] 

    # ensemble_run = create_results_ensemble(paths, period='validation')
    # ensemble_nse = ensemble_run['Tuler']['1D']['NSE']
    return (params, ensemble_nse)

In [None]:
# push imports to workers

dview.execute('from neuralhydrology.utils.config import Config')
dview.execute('from neuralhydrology.nh_run import ESDL_start_run, eval_run')
dview.execute('from neuralhydrology.nh_run import ESDL_start_run, eval_run')
dview.execute('from pathlib import Path')
dview.execute('import xarray as xr')
dview.execute('from neuralhydrology.utils.nh_results_ensemble import create_results_ensemble')
dview.execute('import pickle')

In [None]:
lview = c.load_balanced_view()
# Cause execution on main process to wait while tasks sent to workers finish
lview.block = True 

In [None]:
# in parallel

start_time = time.time()

all_nse = lview.map(ESDL_ensemble, param_combos) # map each param combo to the ESDL_ensemble fn, where they run in parallel
end_time = time.time()

elapsed_time = end_time - start_time
print("Elapsed time:", elapsed_time, "seconds")

In [23]:
all_nse

[((63, 0.5, 90), 0.640073150396347),
 ((64, 0.5, 90), 0.6261449456214905),
 ((65, 0.5, 90), 0.623724639415741),
 ((66, 0.5, 90), 0.668786495923996),
 ((67, 0.5, 90), 0.6115140318870544),
 ((68, 0.5, 90), 0.6680737435817719),
 ((69, 0.5, 90), 0.6114262342453003),
 ((70, 0.5, 90), 0.5818195939064026),
 ((71, 0.5, 90), 0.6156918704509735),
 ((72, 0.5, 90), 0.6414744555950165),
 ((73, 0.5, 90), 0.5829590857028961),
 ((74, 0.5, 90), 0.5853991806507111),
 ((75, 0.5, 90), 0.606875866651535),
 ((76, 0.5, 90), 0.5530081391334534),
 ((77, 0.5, 90), 0.5351739823818207),
 ((78, 0.5, 90), 0.6493227481842041),
 ((79, 0.5, 90), 0.59109827876091),
 ((80, 0.5, 90), 0.6361516714096069),
 ((81, 0.5, 90), 0.6096990704536438),
 ((82, 0.5, 90), 0.5568223297595978),
 ((83, 0.5, 90), 0.5860029757022858),
 ((84, 0.5, 90), 0.591246634721756),
 ((85, 0.5, 90), 0.5354208946228027),
 ((86, 0.5, 90), 0.5735471546649933),
 ((87, 0.5, 90), 0.6563299000263214),
 ((88, 0.5, 90), 0.5440194308757782),
 ((89, 0.5, 90), 0.

In [24]:
#in series
all_nse = []
start_time = time.time()

for i in param_combos:
    all_nse.append(ESDL_ensemble(i))

end_time = time.time()
elapsed_time = end_time - start_time
print("Elapsed time:", elapsed_time, "seconds")

2024-09-29 13:58:43,763: Logging to /global/scratch/users/evanrobert/parallel_test/parallel_grid_search_test63_0.5_90_2909_135843/output.log initialized.
2024-09-29 13:58:43,764: ### Folder structure created at /global/scratch/users/evanrobert/parallel_test/parallel_grid_search_test63_0.5_90_2909_135843
2024-09-29 13:58:43,764: ### Run configurations for parallel_grid_search_test63_0.5_90
2024-09-29 13:58:43,764: data_dir: /global/scratch/users/evanrobert
2024-09-29 13:58:43,765: experiment_name: parallel_grid_search_test63_0.5_90
2024-09-29 13:58:43,765: run_dir: /global/scratch/users/evanrobert/parallel_test/parallel_grid_search_test63_0.5_90_2909_135843
2024-09-29 13:58:43,766: train_basin_file: ../initial_exploration/exploration_tuler
2024-09-29 13:58:43,766: validation_basin_file: ../initial_exploration/exploration_tuler
2024-09-29 13:58:43,766: test_basin_file: ../initial_exploration/exploration_tuler
2024-09-29 13:58:43,767: train_start_date: 1981-10-01 00:00:00
2024-09-29 13:58

In [43]:
all_nse

[((63, 0.5, 90), 0.595575600862503),
 ((64, 0.5, 90), 0.5938922166824341),
 ((65, 0.5, 90), 0.5654114782810211),
 ((66, 0.5, 90), 0.6782033443450928)]

In [89]:
# 1 run wiht 1 worker 25
# 2 run wiht 1 worker 150
#56 with 24 workers


# 2 runs with 2 ensemble members with 1 epoch: 152 seconds 
# 1 runs with 2 ensemble members with 1 epoch: 12 seconds 

In [91]:
ipp.__version__

'8.8.0'

In [94]:
param_combos

[(66, 0.5, 90), (68, 0.5, 90)]