Skip to content

Commit

Permalink
Merge pull request #124 from arbennett/feature/spinup
Browse files Browse the repository at this point in the history
Feature/spinup
  • Loading branch information
arbennett committed Sep 17, 2020
2 parents 4b277f8 + d465c1b commit d0122fa
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 21 deletions.
31 changes: 28 additions & 3 deletions pysumma/calibration/meta/model_executable.template
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#!$pythonPath
import os
import pysumma as ps
import pysumma.evaluation as pse
import shutil
Expand All @@ -13,9 +14,11 @@ if __name__ == '__main__':
sim_calib_var = '$simVarName'
obs_calib_var = '$obsVarName'
out_file = '$outFile'
metrics_log = '$metricsLog'
param_mapping_file = '$paramMappingFile'
param_weight_file = '$paramWeightFile'
param_file = '$paramFile'
allow_failures = $allowFailures
simulation_args = $simulationArgs
conversion = $conversionFunc
filter = $filterFunc
Expand All @@ -37,18 +40,37 @@ if __name__ == '__main__':

# initialize simulation object
s = ps.Simulation(summa_exe, file_manager, **simulation_args)

if os.path.exists(out_file):
os.remove(out_file)
# run the simulation
s.run('local')
if s.status != 'Success':
print(s.stdout)
print('--------------------------------------------')
print(s.stderr)
if allow_failures:
kge = -3
nse = -3
mae = 9999.0
mse = 999999.0
rmse = 9999.0
with open(out_file, 'w+') as f:
f.write('%.6f'%kge + '\t #KGE\n')
f.write('%.6f'%mae + '\t #MAE\n')
f.write('%.6f'%mse + '\t #MSE\n')
f.write('%.6f'%rmse + '\t #RMSE\n')
f.write('%.6f'%nse + '\t #NSE\n')

with open(metrics_log, 'a') as f:
f.write('%.6f'%kge + ', %.6f'%mae
+ ', %.6f'%mse + ', %.6f'%rmse
+ ', %.6f'%nse + '\n')

assert s.status == 'Success'

# open output and calculate diagnostics
sim_ds = s.output
obs_ds = xr.open_dataset(obs_data_file)
sim_ds = s.output.load()
obs_ds = xr.open_dataset(obs_data_file).load()

# trim sim and obs to common time length
time_slice = pse.trim_time(sim_ds, obs_ds)
Expand All @@ -69,3 +91,6 @@ if __name__ == '__main__':
f.write('%.6f'%mse + '\t #MSE\n')
f.write('%.6f'%rmse + '\t #RMSE\n')
f.write('%.6f'%nse + '\t #NSE\n')

with open(metrics_log, 'a') as f:
f.write('%.6f'%kge + ', %.6f'%mae + ', %.6f'%mse + ', %.6f'%rmse + ', %.6f'%nse + '\n')
8 changes: 6 additions & 2 deletions pysumma/calibration/ostrich.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ def __init__(self, ostrich_executable, summa_executable, file_manager, python_pa
self.run_script: Path = self.config_path / 'run_script.py'
self.save_script: Path = self.config_path / 'save_script.py'
self.metrics_file: Path = self.config_path / 'metrics.txt'
self.metrics_log: Path = self.config_path / 'metrics_log.csv'
self.impot_strings: str = ''
self.conversion_function: callable = lambda x: x
self.filter_function: callable = lambda x, y: (x, y)
Expand All @@ -122,6 +123,7 @@ def __init__(self, ostrich_executable, summa_executable, file_manager, python_pa
self.objective_function: str = 'gcop'
self.maximize: bool = True
self.simulation_kwargs: Dict = {}
self.allow_failures: bool = False

def run(self, prerun_cmds=[], monitor=True):
"""Start calibration run"""
Expand Down Expand Up @@ -195,8 +197,8 @@ def write_weight_value_section(self, file_name='param_weights.txt') -> Path:
for cp in self.calib_params]) + '\n')
return Path('.') / file_name

def add_tied_param(self, param_name, lower_bound, upper_bound):
self.calib_params.append(OstrichParam(f'{param_name}', 0.5, (0.01, 0.99), weightname=f'{param_name}_scale'))
def add_tied_param(self, param_name, lower_bound, upper_bound, initial_value=0.5):
self.calib_params.append(OstrichParam(f'{param_name}', initial_value, (0.01, 0.99), weightname=f'{param_name}_scale'))
self.tied_params.append(OstrichTiedParam(param_name, lower_bound, upper_bound))

@property
Expand Down Expand Up @@ -265,12 +267,14 @@ def map_vars_to_run_template(self):
'simVarName': self.sim_calib_var,
'obsVarName': self.obs_calib_var,
'outFile': self.metrics_file,
'metricsLog': self.metrics_log,
'importStrings': self.import_strings,
'conversionFunc': "=".join(inspect.getsource(self.conversion_function).split('=')[1:]),
'filterFunc': "=".join(inspect.getsource(self.filter_function).split('=')[1:]),
'paramMappingFile': self.weightTemplateFile,
'paramWeightFile': self.weightValueFile,
'simulationArgs': self.simulation_kwargs,
'allowFailures': self.allow_failures,
'paramFile': (self.simulation.manager['settingsPath'].value
+ self.simulation.manager['trialParamFile'].value),
}
Expand Down
14 changes: 14 additions & 0 deletions pysumma/distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,20 @@ def monitor(self):
def merge_output(self):
pass

def map(self, fun, args, include_sims=True, monitor=True):
for i, (n, s) in enumerate(self.simulations.items()):
kwargs = self.chunk_args[i]
if include_sims:
all_args = (s, n, *args, kwargs)
else:
all_args = (*args, kwargs)
self.submissions.append(self._client.submit(
fun, *all_args))
if monitor:
return self.monitor()
else:
return True


def _submit(s: Simulation, name: str, run_option: str,
prerun_cmds: List[str], run_args: dict, **kwargs):
Expand Down
14 changes: 14 additions & 0 deletions pysumma/ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,20 @@ def run(self, run_option: str, prerun_cmds=None, monitor: bool=True):
else:
return True

def map(self, fun, args, include_sims=True, monitor=True):
for n, s in self.simulations.items():
config = self.configuration[n]
if include_sims:
all_args = (s, n, *args, {'config': config})
else:
all_args = (*args, {'config': config})
self.submissions.append(self._client.submit(
fun, *all_args))
if monitor:
return self.monitor()
else:
return True

def monitor(self):
"""
Halt computation until submitted simulations are complete
Expand Down
12 changes: 6 additions & 6 deletions pysumma/evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ def trim_time(sim, obs, roundto='min'):
def kling_gupta_efficiency(sim, obs):
obs = np.asarray(obs)
sim = np.asarray(sim)
obs_filtered = obs[~np.isnan(obs)]
sim_filtered = sim[~np.isnan(obs)]
obs_filtered = obs[np.logical_and(~np.isnan(obs), ~np.isnan(sim))]
sim_filtered = sim[np.logical_and(~np.isnan(obs), ~np.isnan(sim))]
sim_std = np.std(sim_filtered, ddof=1)
obs_std = np.std(obs_filtered, ddof=1)
sim_mu = np.mean(sim_filtered)
Expand All @@ -33,8 +33,8 @@ def kling_gupta_efficiency(sim, obs):
def decomposed_kling_gupta_efficiency(sim, obs):
obs = np.asarray(obs)
sim = np.asarray(sim)
obs_filtered = obs[~np.isnan(obs)]
sim_filtered = sim[~np.isnan(obs)]
obs_filtered = obs[np.logical_and(~np.isnan(obs), ~np.isnan(sim))]
sim_filtered = sim[np.logical_and(~np.isnan(obs), ~np.isnan(sim))]
sim_std = np.std(sim_filtered, ddof=1)
obs_std = np.std(obs_filtered, ddof=1)
sim_mu = np.mean(sim_filtered)
Expand All @@ -49,8 +49,8 @@ def decomposed_kling_gupta_efficiency(sim, obs):
def nash_sutcliffe_efficiency(sim, obs):
obs = np.asarray(obs)
sim = np.asarray(sim)
obs_filtered = obs[~np.isnan(obs)]
sim_filtered = sim[~np.isnan(obs)]
obs_filtered = obs[np.logical_and(~np.isnan(obs), ~np.isnan(sim))]
sim_filtered = sim[np.logical_and(~np.isnan(obs), ~np.isnan(sim))]
obs_mu = np.mean(obs_filtered)
num = np.sum( (sim_filtered - obs_filtered) ** 2 )
den = np.sum( (obs_filtered - obs_mu) ** 2 )
Expand Down
55 changes: 45 additions & 10 deletions pysumma/simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,17 @@
import shutil
import subprocess
import numpy as np
import pandas as pd
import xarray as xr
from glob import glob
from pathlib import Path
from typing import List

from .decisions import Decisions
from .file_manager import FileManager
from .output_control import OutputControl
from .global_params import GlobalParams
from .force_file_list import ForcingList
from .force_file_list import ForcingList, ForcingOption


class Simulation():
Expand Down Expand Up @@ -203,12 +205,12 @@ def reset(self):
def validate_layer_params(self, params):
"""Ensure that the layer parameters are valid"""
for i in range(1, 5):
assert (params[f'zmaxLayer{i}_upper']
<= params[f'zmaxLayer{i}_lower'], i)
assert (params[f'zmaxLayer{i}_upper'] / params[f'zminLayer{i}']
>= 2.5, i)
assert (params[f'zmaxLayer{i}_upper'] / params[f'zminLayer{i+1}']
>= 2.5, i)
assert params[f'zmaxLayer{i}_upper'] \
<= params[f'zmaxLayer{i}_lower'], i
assert params[f'zmaxLayer{i}_upper'] / params[f'zminLayer{i}'] \
>= 2.5, i
assert params[f'zmaxLayer{i}_upper'] / params[f'zminLayer{i+1}'] \
>= 2.5, i

def _gen_summa_cmd(self, run_suffix, processes=1, prerun_cmds=[],
startGRU=None, countGRU=None, iHRU=None,
Expand Down Expand Up @@ -270,7 +272,7 @@ def _run_docker(self, run_suffix, processes=1,

def start(self, run_option, run_suffix='pysumma_run', processes=1,
prerun_cmds=[], startGRU=None, countGRU=None, iHRU=None,
freq_restart=None, progress=None):
freq_restart=None, progress=None, **kwargs):
"""
Run a SUMMA simulation without halting. Most likely this should
not be used. Use the ``run`` method for most common use cases.
Expand All @@ -292,7 +294,7 @@ def start(self, run_option, run_suffix='pysumma_run', processes=1,

def run(self, run_option, run_suffix='pysumma_run', processes=1,
prerun_cmds=None, startGRU=None, countGRU=None, iHRU=None,
freq_restart=None, progress=None):
freq_restart=None, progress=None, **kwargs):
"""
Run a SUMMA simulation and halt until completion or error.
Expand Down Expand Up @@ -328,7 +330,7 @@ def run(self, run_option, run_suffix='pysumma_run', processes=1,
hourly output.
"""
self.start(run_option, run_suffix, processes, prerun_cmds,
startGRU, countGRU, iHRU, freq_restart, progress)
startGRU, countGRU, iHRU, freq_restart, progress, **kwargs)
self.monitor()

def monitor(self):
Expand Down Expand Up @@ -361,12 +363,45 @@ def monitor(self):

return self.status

def spinup(self, run_option, period='1Y', niters=10, run_suffix='pysumma_spinup', **kwargs):
# open forcings
spinup_force_name = f'{run_suffix}.nc'
with xr.open_mfdataset(self.force_file_list.forcing_paths) as ds:
start_date = pd.to_datetime(ds['time'].values[0])
end_date = pd.to_datetime(start_date + pd.Timedelta(period))
forcings = ds.sel(time=slice(start_date, end_date)).load()
spinup_force_path = self.manager['forcingPath'].value + spinup_force_name
forcings.to_netcdf(spinup_force_path)
self.force_file_list.options = [ForcingOption(spinup_force_path)]
self.manager['simStartTime'] = start_date
self.manager['simEndTime'] = end_date
ymdh = str(self.manager['simEndTime'].value).replace(' ', '-').replace('-', '').split(':')[0]
for n in range(niters):
self.run(run_option, run_suffix=run_suffix, freq_restart='e', **kwargs)
out_dir = self.manager['outputPath'].value
prefix = self.manager['outFilePrefix'].value
restart_file_name = f'{prefix}_restart_{ymdh}_{run_suffix}*nc'
restart_file_path = glob(f'{out_dir}{restart_file_name}')[0]
restart_file_name = restart_file_path.split(os.path.sep)[-1]
self.reset()
self.manager['simStartTime'] = start_date
self.manager['simEndTime'] = end_date
restart_dest = os.path.dirname(self.manager['initConditionFile'].value) + f'/{restart_file_name}'
shutil.copy(restart_file_path, self.manager['settingsPath'].value + restart_dest)
[os.remove(f) for f in self.get_output_files()]
self.manager['initConditionFile'] = restart_dest
with xr.open_dataset(self.manager['settingsPath'].value + restart_dest) as ds:
self.initial_conditions = ds.load()

def _write_configuration(self, name=''):
import shutil

self.config_path = self.config_path / name
self.config_path.mkdir(parents=True, exist_ok=True)
manager_path = str(self.manager_path.parent)
settings_path = os.path.abspath(os.path.realpath(str(self.manager['settingsPath'].value)))
settings_path = Path(settings_path.replace(manager_path, str(self.config_path)))

self.manager_path = self.config_path / self.manager.file_name
self.manager['settingsPath'] = str(settings_path) + os.sep
self.manager.write(path=self.config_path)
Expand Down

0 comments on commit d0122fa

Please sign in to comment.