Skip to content

Commit

Permalink
Merge branch 'develop' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
arbennett committed Mar 18, 2020
2 parents 7e16c43 + 831a431 commit 04d7da8
Show file tree
Hide file tree
Showing 39 changed files with 547 additions and 903 deletions.
4 changes: 2 additions & 2 deletions pysumma/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@
from .output_control import OutputControl
from .local_param_info import LocalParamInfo
from .force_file_list import ForceFileList
#from . import plotting
#from . import utils
from . import plotting
from . import utils
12 changes: 6 additions & 6 deletions pysumma/decisions.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,17 @@ def set_value(self, new_value):
self.value = new_value
else:
raise ValueError(os.linesep.join([
'Invalid option given for decision: {}'.format(self.name),
'You gave a value of: {}'.format(new_value),
'Valid options include: {}'.format(self.available_options)]))
'Invalid option given for decision: {}'.format(self.name),
'You gave a value of: {}'.format(new_value),
'Valid options include: {}'.format(self.available_options)]))

def __str__(self):
if self.name in ['simulStart', 'simulFinsh']:
value = "'{}'".format(self.value)
else:
value = self.value
return "{0} {1: <20} ! {2}".format(
self.name, value, self.description)
self.name, value, self.description)


class Decisions(OptionContainer):
Expand All @@ -49,8 +49,8 @@ class Decisions(OptionContainer):
a SUMMA decisions file.
"""

def __init__(self, path):
super().__init__(path, DecisionOption)
def __init__(self, dirpath, filepath):
super().__init__(DecisionOption, dirpath, filepath)

def set_option(self, key, value):
try:
Expand Down
114 changes: 94 additions & 20 deletions pysumma/ensemble.py
Original file line number Diff line number Diff line change
@@ -1,53 +1,63 @@
from copy import deepcopy
from distributed import Client, get_client
import os
import pandas as pd
import time
import xarray as xr

from .simulation import Simulation
from .utils import ChainDict, product_dict

OMP_NUM_THREADS = int(os.environ.get('OMP_NUM_THREADS', 1))


class Ensemble(object):
'''
Ensembles represent an multiple SUMMA configurations based on
changing the decisions or parameters of a given run.
'''

executable: str = None
simulations: dict = {}
submissions: list = []

def __init__(self, executable: str, filemanager: str,
configuration: dict, num_workers: int=1):
def __init__(self, executable: str,configuration: dict,
filemanager: str=None, num_workers: int=1,
threads_per_worker: int=OMP_NUM_THREADS,
scheduler: str=None):
"""
Create a new Ensemble object. The API mirrors that of the
Simulation object.
"""
self._status = 'Initialized'
self.executable = executable
self.filemanager = filemanager
self.configuration = configuration
self.num_workers = num_workers
self.executable: str = executable
self.filemanager: str = filemanager
self.configuration: dict = configuration
self.num_workers: int = num_workers
self.simulations: dict = {}
self.submissions: list = []
# Try to get a client, and if none exists then start a new one
try:
client = Client()
self._client = get_client()
# Start more workers if necessary
# Start more workers if necessary:
workers = len(self._client.get_worker_logs())
if workers <= self.num_workers:
self._client.cluster.scale(workers)
except ValueError:
self._client = Client(n_workers=workers, threads_per_worker=1)
self._client = Client(n_workers=self.num_workers,
threads_per_worker=threads_per_worker)
self._generate_simulation_objects()

def _generate_simulation_objects(self):
"""
Create a mapping of configurations to the simulation objects.
"""
for name, config in self.configuration.items():
self.simulations[name] = Simulation(
self.executable, self.filemanager, False)
if self.filemanager:
for name, config in self.configuration.items():
self.simulations[name] = Simulation(
self.executable, self.filemanager, False)
else:
for name, config in self.configuration.items():
assert config['file_manager'] is not None, \
"No filemanager found in configuration or Ensemble!"
self.simulations[name] = Simulation(
self.executable, config['file_manager'], False)

def _generate_coords(self):
"""
Expand Down Expand Up @@ -83,6 +93,10 @@ def merge_output(self):
if '=' in l else l for l in t))
decision_names = ['++'.join(tuple(n.split('++')[1:-1]))
for n in self.configuration.keys()]
if sum([len(dt) for dt in decision_tuples]) == 0:
raise NameError("Simulations in the ensemble do not share all"
" common decisions! Please use `open_output`"
" to retrieve the output of this Ensemble")
for i, t in enumerate(decision_names):
decision_names[i] = '++'.join(l.split('=')[0] for l in t)
new_idx = pd.MultiIndex.from_tuples(
Expand Down Expand Up @@ -111,9 +125,20 @@ def start(self, run_option: str, prerun_cmds: list=None):
config = self.configuration[n]
self.submissions.append(self._client.submit(
_submit, s, n, run_option, prerun_cmds, config))
time.sleep(2.0)

def run(self, run_option: str, prerun_cmds=None, monitor: bool=True):
"""
Run the ensemble
Parameters
----------
run_option:
Where to run the simulation. Can be ``local`` or ``docker``
prerun_cmds:
A list of shell commands to run before running SUMMA
monitor:
Whether to halt operation until runs are complete
"""
self.start(run_option, prerun_cmds)
if monitor:
return self.monitor()
Expand All @@ -128,6 +153,47 @@ def monitor(self):
for s in simulations:
self.simulations[s.run_suffix] = s

def summary(self):
"""
Show the user information about ensemble status
"""
success, error, other = [], [], []
for n, s in self.simulations.items():
if s.status == 'Success':
success.append(n)
elif s.status == 'Error':
error.append(n)
else:
other.append(n)
return {'success': success, 'error': error, 'other': other}

def rerun_failed(self, run_option: str, prerun_cmds=None,
monitor: bool=True):
"""
Try to re-run failed simulations.
Parameters
----------
run_option:
Where to run the simulation. Can be ``local`` or ``docker``
prerun_cmds:
A list of shell commands to run before running SUMMA
monitor:
Whether to halt operation until runs are complete
"""
run_summary = self.summary()
self.submissions = []
for n in run_summary['error']:
config = self.configuration[n]
s = self.simulations[n]
s.reset()
self.submissions.append(self._client.submit(
_submit, s, n, run_option, prerun_cmds, config))
if monitor:
return self.monitor()
else:
return True


def _submit(s: Simulation, name: str, run_option: str, prerun_cmds, config):
s.initialize()
Expand All @@ -147,19 +213,27 @@ def parameter_product(list_config):
{'parameters': d} for d in product_dict(**list_config)}


def total_product(dec_conf={}, param_conf={}):
def attribute_product(list_config):
return {'++'+'++'.join('{}={}'.format(k, v) for k, v in d.items())+'++':
{'attributes': d} for d in product_dict(**list_config)}


def total_product(dec_conf={}, param_conf={}, attr_conf={}):
full_conf = deepcopy(dec_conf)
full_conf.update(param_conf)
full_conf.update(attr_conf)
prod_dict = product_dict(**full_conf)
config = {}
for d in prod_dict:
name = '++' + '++'.join(
'{}={}'.format(k, v) if k in param_conf else v
'{}={}'.format(k, v) if k in param_conf or k in attr_conf else v
for k, v in d.items()) + '++'
config[name] = {'decisions': {}, 'parameters': {}}
config[name] = {'decisions': {}, 'parameters': {}, 'attributes': {}}
for k, v in d.items():
if k in dec_conf:
config[name]['decisions'][k] = v
elif k in param_conf:
config[name]['parameters'][k] = v
elif k in attr_conf:
config[name]['attributes'][k] = v
return config
90 changes: 68 additions & 22 deletions pysumma/file_manager.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import os
import json
import pkg_resources
import xarray as xr

from pathlib import Path
from .option import BaseOption, OptionContainer
from .decisions import Decisions
from .output_control import OutputControl
Expand Down Expand Up @@ -38,8 +40,8 @@ class FileManager(OptionContainer):
a SUMMA file manager file.
"""

def __init__(self, path):
super().__init__(path, FileManagerOption)
def __init__(self, path, name):
super().__init__(FileManagerOption, path, name)

def set_option(self, key, value):
o = self.get_option(key)
Expand All @@ -51,41 +53,85 @@ def get_constructor_args(self, line):

@property
def decisions(self):
p = self.get_value('settings_path') + self.get_value('decisions_path')
return Decisions(p)
p1 = self.get_value('settings_path')
p2 = self.get_value('decisions_path')
self._decisions = Decisions(p1, p2)
return self._decisions

@property
def output_control(self):
p = self.get_value('settings_path') + self.get_value('output_control')
return OutputControl(p)
p1 = self.get_value('settings_path')
p2 = self.get_value('output_control')
self._output_control = OutputControl(p1, p2)
return self._output_control

@property
def local_param_info(self):
p = (self.get_value('settings_path')
+ self.get_value('local_param_info'))
return LocalParamInfo(p)
p1 = self.get_value('settings_path')
p2 = self.get_value('local_param_info')
self._local_param_info = LocalParamInfo(p1, p2)
return self._local_param_info

@property
def basin_param_info(self):
p = (self.get_value('settings_path')
+ self.get_value('basin_param_info'))
return LocalParamInfo(p)
p1 = self.get_value('settings_path')
p2 = self.get_value('basin_param_info')
self._basin_param_info = LocalParamInfo(p1, p2)
return self._basin_param_info

@property
def force_file_list(self):
p1 = (self.get_value('settings_path')
+ self.get_value('forcing_file_list'))
p2 = self.get_value('input_path')
return ForceFileList(p1, p2)
p1 = self.get_value('settings_path')
p2 = self.get_value('forcing_file_list')
p3 = self.get_value('input_path')
self._force_file_list = ForceFileList(p1, p2, p3)
return self._force_file_list

@property
def local_attributes(self):
p = (self.get_value('settings_path')
+ self.get_value('local_attributes'))
return xr.open_dataset(p)
p1 = self.get_value('settings_path')
p2 = self.get_value('local_attributes')
self._local_attrs = xr.open_dataset(p1 + p2)
return self._local_attrs

@property
def parameter_trial(self):
p = (self.get_value('settings_path')
+ self.get_value('parameter_trial'))
return xr.open_dataset(p)
p1 = self.get_value('settings_path')
p2 = self.get_value('parameter_trial')
self._param_trial = xr.open_dataset(p1 + p2)
return self._param_trial

@property
def initial_conditions(self):
p1 = self.get_value('settings_path')
p2 = self.get_value('model_init_cond')
self._init_cond = xr.open_dataset(p1 + p2)
return self._init_cond

@property
def genparm(self):
p1, p2 = self.get_value('settings_path'), 'GENPARM.TBL'
with open(p1 + p2, 'r') as f:
self._genparm = f.readlines()
return self._genparm

@property
def mptable(self):
p1, p2 = self.get_value('settings_path'), 'MPTABLE.TBL'
with open(p1 + p2, 'r') as f:
self._mptable = f.readlines()
return self._mptable

@property
def soilparm(self):
p1, p2 = self.get_value('settings_path'), 'SOILPARM.TBL'
with open(p1 + p2, 'r') as f:
self._soilparm = f.readlines()
return self._soilparm

@property
def vegparm(self):
p1, p2 = self.get_value('settings_path'), 'VEGPARM.TBL'
with open(p1 + p2, 'r') as f:
self._vegparm = f.readlines()
return self._vegparm
18 changes: 15 additions & 3 deletions pysumma/force_file_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@ def __init__(self, name):
def set_value(self, new_value):
self.value = new_value

@property
def path(self):
return self.name.replace("'", "")

@path.setter
def path(self, value):
self.set_value(value)

def __str__(self):
return "'{}'".format(self.name.split('/')[-1])

Expand All @@ -22,9 +30,9 @@ class ForceFileList(OptionContainer):

prefix: str = ''

def __init__(self, file_list_path, force_file_prefix_path):
def __init__(self, dirpath, filepath, force_file_prefix_path):
self.prefix = force_file_prefix_path
super().__init__(file_list_path, ForceFileListOption)
super().__init__(ForceFileListOption, dirpath, filepath)

def set_option(self, key, value):
o = self.get_option(key)
Expand All @@ -35,5 +43,9 @@ def get_constructor_args(self, line):
return (os.path.join(self.prefix, file_name.strip()), )

@property
def forcing_list(self):
def forcing_paths(self):
return [o.path for o in self.options]

@property
def forcing_data(self):
return [o.value for o in self.options]

0 comments on commit 04d7da8

Please sign in to comment.