## Parallel Processing

Launching multiple copies if Venner command line using veneer-py


Running simulations in parallel

In [1]:
import veneer 
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline

In [2]:
%load_ext autoreload
%autoreload 2
import source_runner as sr
from source_runner import *

In [3]:
#### Run this to improve model performance, mainly through parallel computing. These can also be modified through Source UI
def configure_options(self,options):
    lines = ["# Generated Script","from Dynamic_SedNet.PluginSetup import DSScenarioDetails"]
    lines += ["DSScenarioDetails.%s = %s"%(k,v) for (k,v) in options.items()]
    script = '\n'.join(lines)
    #print(script)
    res = self.model._safe_run(script)

In [4]:
from veneer.manage import create_command_line
from veneer.manage import start,kill_all_now

In [5]:
veneer_install = 'pest_source/vcmd412/Plugins/Plugins412/4.1.2.5828_CommunityPlugins/Veneer/'
source_version = '4.1.2'
cmd_directory = 'pest_source/vcmd412'
veneer_cmd = create_command_line(veneer_install,source_version,dest=cmd_directory, force=False)
# veneer_cmd = 'E:/cloudStor/Projects/factor_fixing/run_source/pest_source/vcmd412/Plugins/Plugins412/4.1.2.5828_CommunityPlugins/Veneer/FlowMatters.Source.VeneerCmd.exe'

In [6]:
project = 'pest_source/MW_BASE_RC8_411_4712.rsproj'
# generate copies of servers
num_copies=1
first_port=9877
processes, ports = start(project,n_instances=num_copies,ports=first_port,debug=True, remote=False, veneer_exe=veneer_cmd)
vs = [veneer.Veneer(port=p) for p in ports]

Starting pest_source\vcmd412\FlowMatters.Source.VeneerCmd.exe -p 9877 -s  "e:\cloudStor\Projects\factor_fixing\run_source\pest_source\MW_BASE_RC8_411_4712.rsproj"
[0] Loading plugins

[0] Loaded \\E:\\cloudStor\\Projects\\factor_fixing\\run_source\\pest_source\\vcmd45\\Plugins\\latest\\CodeProjectWizardDemo.dll

[0] Plugins loaded (1/1)

[0] Opening project file: e:\cloudStor\Projects\factor_fixing\run_source\pest_source\MW_BASE_RC8_411_4712.rsproj

ERROR[0] log4net:ERROR Failed to find configuration section 'log4net' in the application's .config file. Check your .config file for the <log4net> and <configSections> elements. The configuration section should look like: <section name="log4net" type="log4net.Config.Log4NetConfigurationSectionHandler,log4net" />

[0] Loading project

[0] Cannot instantiate abstract class or interface: TIME.ScenarioManagement.RunManagement.RunningConfiguration

[0]     NHibernate.Tuple.PocoInstantiator.Instantiate()

[0]     NHibernate.Tuple.Entity.AbstractE

Exception: One or more instances of Veneer failed to start. Try again with debug=True to see output

Configure model for runs

#Start

In [9]:
# set model configuration
for v in vs:
    configure_options(v,{'RunNetworksInParallel':True,'PreRunCatchments':True,'ParallelFlowPhase':True})
    v.model.sourceScenarioOptions("PerformanceConfiguration","ProcessCatchmentsInParallel",True)
    ### Run this to turn off dsednet reporting window
    configure_options(v,{'ShowResultsAfterRun':False,'OverwriteResults':True})

In [10]:
# Set model runs
# Define start and end time, with a half year warmup period
start_date = '01/07/2005'
end_date = '30/06/2014'
record_gauge = 'gauge_124001B_AndromacheRvJochheims'
record_var = 'Constituents@Sediment - Fine@Downstream Flow Mass'
filter_elements = set_filter(veneer, record_gauge, record_var)

In [11]:
# Configuring recording and retrieving results
ts_match_criteria = {'RecordingVariable':'Constituents@Sediment - Fine@Downstream Flow Mass',
                    'NetworkElement':'gauge_124001B_OConnellRvStaffordsCrossing'}
for v in vs:
    v.configure_recording(enable=[ts_match_criteria])

#End

Parameter process 

#Start

In [12]:
# assign distributions to parameters
# parameters = parameters.loc[0:10, :]
param_file = '../input/upper v1.csv'
parameters = sr.load_parameter_file(param_file) 
parameters = parameters[:-1]
len_params = len(parameters)
param_dist = []
num_runs = 1500
for i in range(len_params):
    low_bd = parameters.loc[i, 'min']
    up_bd = parameters.loc[i, 'max']
    param_dist.append(cp.Uniform(lower=low_bd, upper=up_bd))
dist = cp.J(*param_dist)

# generate samples using Sobol sampling
# nodes generated by chaospy is n * m (numbers * )
# np.random.seed(1234)
# nodes = np.round(dist.sample(size=num_runs, rule='S'), 4)

In [13]:
# import samples
f_dir = '../output/22params/MW_results/'
f_name = 'samples.csv'
nodes = pd.read_csv('{}{}'.format(f_dir, f_name), index_col='id')
nodes = nodes.values.T

In [13]:
# export modelling results
f_dir = '../output/MW_results/'
# add model results into parameter samples
cols = [*(parameters.loc[:, 'Veneer_name'])]
results_df = pd.DataFrame(nodes.T, index=range(len(nodes.T)), columns=cols)
results_df.index.name = 'id'
results_df.to_csv('{}{}'.format(f_dir, 'samples.csv'))

In [14]:
# process parameters and store initial values
param_names, param_vename_dic, param_vename, param_types = sr.group_parameters(parameters)
initial_params = get_initial_param_vals(vs[0], param_names, param_vename, param_vename_dic)

#End

In [17]:
%%time
tss_results = []
total_runs = 1400
group_loops = np.floor_divide(num_runs, num_copies) + 1
for i in range(772, 773):
    group_run_responses = []
    for j in range(num_copies):
        total_runs += 1
        scaling_factor_index =  i * num_copies + j
        print(scaling_factor_index)
        if scaling_factor_index >= num_runs:
            break
        scaling_factor = nodes[:, scaling_factor_index]
        v= vs[j]
        v.drop_all_runs()
        
        # set scaling_factor to the current model
        for k in range(len_params):
            name = parameters.Veneer_name[k]
            param_new_factor = scaling_factor[k]
            param_value_ini = initial_params[name]
            if param_types[k] == 0:
                param_value_new = [param_new_factor for value in initial_params[name]]
            else:
                param_value_new = [param_new_factor * value for value in initial_params[name]]
            #set parameter values
            if name in param_vename_dic[param_vename[0]]:
                assert v.model.catchment.generation.set_param_values(name, param_value_new, fromList = True)
            if name in  param_vename_dic[param_vename[1]]:
                assert v.model.link.constituents.set_param_values(name, param_value_new,fromList = True)
            if name in  param_vename_dic[param_vename[2]]:
                assert v.model.node.set_param_values(name, param_value_new,fromList = True)
            if name in  param_vename_dic[param_vename[3]]:
                assert v.model.link.routing.set_param_values(name, param_value_new,fromList = True)

        response = v.run_model(start=start_date, end=end_date, run_async=True)
        group_run_responses.append(response)
        
    # Retrieve the time series of fine sediment mass at the interested gauge
    for j in range(num_copies):
        scaling_factor_index =  i * num_copies + j
        if scaling_factor_index >= (num_runs):
            break
        v = vs[j]
        r = group_run_responses[j]   
        code = r.getresponse().getcode() # wait until the job finished   
        run_results = v.retrieve_multiple_time_series(criteria=ts_match_criteria, name_fn=veneer.name_for_variable)
        # store the daily results and the index of sampling
        result_np =  run_results.values
        try:
            tss_results = np.hstack((tss_results, result_np))
        except (ValueError, UnboundLocalError) as e:
            timeframe = np.array(run_results.index.date.tolist()).reshape(len(run_results.index), 1)
            tss_results = np.hstack((timeframe, result_np))
#     if ((total_runs % 100) == 0 | (total_runs == num_runs)):
#         num_write = tss_results.shape[1]
#         np.savetxt('{}{}{}'.format(f_dir, str(total_runs), '_Tss_124001B.csv'), 
#            tss_results, delimiter=',', newline='\n',
#            header=", ".join(["Date"] + [str(i+1) for i in range(num_write)]), fmt='%s')
#         tss_results = []    
    veneer.log('Completed %d runs'%total_runs)

772
Completed 1401 runs
Wall time: 2min 5s


In [18]:
num_write = tss_results.shape[1]
np.savetxt('{}{}{}'.format(f_dir, str(772), '_Tss_124001B.csv'), 
    tss_results, delimiter=',', newline='\n',
    header=", ".join(["Date"] + [str(i+1) for i in range(num_write)]), fmt='%s')

In [24]:
%%time
for v in vs:    
    for k in range(len_params):
        name = parameters.Veneer_name[k]
        ini_param_val = initial_params[name]
        if name in param_vename_dic[param_vename[0]]:
            v.model.catchment.generation.set_param_values(name, ini_param_val, fromList=True)
        if name in param_vename_dic[param_vename[1]]:
            v.model.link.constituents.set_param_values(name, ini_param_val, fromList=True)
        if name in param_vename_dic[param_vename[2]]:
            v.model.node.set_param_values(name, ini_param_val, fromList=True)
        if name in param_vename_dic[param_vename[3]]:
            v.model.link.routing.set_param_values(name, ini_param_val, fromList=True)
        if name in param_vename_dic[param_vename[4]]:
            v.model.node.constituents.set_param_values(name, ini_param_val, fromList=True, 
                                        node_types=['StorageNodeModel'],aspect='model')

Wall time: 4.38 s


In [25]:
# Terminate the veneer servers
kill_all_now(processes)