Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added option to use forcing start/stop dates and misc improvements to… #93

Merged
merged 2 commits into from
Aug 17, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions ci/requirements-3.5.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ dependencies:
- pandas
- netcdf4
- numba
- bottleneck
- numpy
- scipy
- pytest
Expand Down
1 change: 1 addition & 0 deletions ci/requirements-3.6.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ dependencies:
- xarray
- pandas
- netcdf4
- bottleneck
- numba
- numpy
- scipy
Expand Down
10 changes: 7 additions & 3 deletions metsim/datetime.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,12 @@ def date_range(start=None, end=None, periods=None, freq='D', tz=None,
start_num, end_num = date2num(
pd.to_datetime([start, end]).to_pydatetime(),
units, calendar=calendar)
periods = int((end_num - start_num) / steps) # Todo divide by freq
# Todo divide by freq
periods = int((end_num - start_num) / steps) + 1

times = num2date(
np.linspace(start_num, end_num, periods,
endpoint=False,
endpoint=True,
dtype=np.float128), units, calendar)

index = pd.DatetimeIndex(xr.conventions.nctime_to_nptime(times))
Expand All @@ -89,5 +90,8 @@ def units_from_freq(freq, origin=DEFAULT_ORIGIN):
return 'hours since %s' % origin
elif 'D' in freq:
return 'days since %s' % origin
elif 'T' in freq:
return 'minutes since %s' % origin
else:
raise NotImplementedError()
raise NotImplementedError(
'freq {} not supported at this time'.format(freq))
21 changes: 10 additions & 11 deletions metsim/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,11 @@ def read_state(params: dict, domain: xr.Dataset) -> xr.Dataset:
"netcdf": read_netcdf,
"data": read_data
}
start = params['start'] - pd.Timedelta('90 days')
stop = params['start'] - pd.Timedelta('1 days')

return read_funcs[params['state_fmt']](
params['state'], domain=domain, iter_dims=params['iter_dims'],
start=start, stop=stop, calendar=params['calendar'],
start=params['state_start'], stop=params['state_stop'],
calendar=params['calendar'],
var_dict=params.get('state_vars', None))


Expand Down Expand Up @@ -146,18 +146,18 @@ def read_netcdf(data_handle, domain=None, iter_dims=['lat', 'lon'],
ds = xr.open_dataset(data_handle)

if var_dict is not None:
var_list = list(var_dict.keys())
ds = ds[var_list]
ds.rename(var_dict, inplace=True)

if start is not None and stop is not None:
if start is not None or stop is not None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this have the chance to break? For example, if start=None and stop!=None we will end up with (in the line below) ds = ds.sel(time=slice(None, stop)).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ds = ds.sel(time=slice(None, stop)) will work. Just like this works:

In [1]: x = list(range(10))

In [2]: x
Out[2]: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [3]: x[slice(None, 4)]
Out[3]: [0, 1, 2, 3]

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, I had no idea that you could do that.

ds = ds.sel(time=slice(start, stop))
dates = ds.indexes['time']
ds['day_of_year'] = xr.Variable(('time', ), dates.dayofyear)

if domain is not None:
ds = ds.sel(**{d: domain[d] for d in iter_dims})
out = ds.load()
ds.close()
return out
return ds


def read_data(data_handle, domain=None, iter_dims=['lat', 'lon'],
Expand All @@ -168,17 +168,16 @@ def read_data(data_handle, domain=None, iter_dims=['lat', 'lon'],
if var_dict is not None:
data_handle.rename(var_dict, inplace=True)
varlist = list(var_dict.values())
data_handle = data_handle[varlist]

if start is not None and stop is not None:
data_handle = data_handle[varlist].sel(time=slice(start, stop))
data_handle = data_handle.sel(time=slice(start, stop))
dates = data_handle.indexes['time']
data_handle['day_of_year'] = xr.Variable(('time', ), dates.dayofyear)

if domain is not None:
data_handle = data_handle.sel(**{d: domain[d] for d in iter_dims})
out = data_handle.load()
data_handle.close()
return out
return data_handle


def read_binary(data_handle, domain=None, iter_dims=['lat', 'lon'],
Expand Down
102 changes: 91 additions & 11 deletions metsim/metsim.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,12 +148,83 @@ def __init__(self, params: dict):
logger.setLevel(self.params['verbose'])
ch.setLevel(self.params['verbose'])
logger.addHandler(ch)
logger.info("read_domain")
self.domain = io.read_domain(self.params)
self._normalize_times()
logger.info("read_met_data")
self.met_data = io.read_met_data(self.params, self.domain)
self._validate_force_times(force_times=self.met_data['time'])
logger.info("read_state")
self.state = io.read_state(self.params, self.domain)
self.met_data['elev'] = self.domain['elev']
self.met_data['lat'] = self.domain['lat']
logger.info("_aggregate_state")
self._aggregate_state()
logger.info("load_inputs")
self.load_inputs()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wary of having self.load_inputs() after self._aggregate_state(). The whole reason we were doing the load and close in the IO routines was to avoid the situation where the state and input forcings are looking at the same file and undefined behavior occurred. It might be possible to reason through and prove that this situation won't occur with this setup, but I think it's easier to side with caution on this one.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm skeptical that this is actually a problem. This is all happening before we startup MetSim's multiprocessing which is where I'd image any odd behavior came from. The reason I like this method is that we fully define exactly what data we need in the previous lines before we ever load any data. I think this is preferable to blinding loading data we don't know if we need.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point, and I think it's reasonable to say that no issues will occur after looking at it a bit more. I'm still a bit sensitive to that bug because it took so long to track down.


def _normalize_times(self):
# handle when start/stop times are not specified
for p in ['start', 'stop']:
# TODO: make sure these options get documented
if self.params[p] in [None, 'forcing', '']:
self.params[p] = None
elif isinstance(self.params[p], str):
if ':' in self.params[p]:
# start from config file
date, hour = self.params[p].split(':')
year, month, day = date.split('/')
self.params[p] = pd.datetime(int(year), int(month),
int(day), int(hour))
elif '/' in self.params[p]:
# end from config file
year, month, day = self.params[p].split('/')
self.params[p] = pd.datetime(int(year), int(month),
int(day))
else:
self.params[p] = pd.to_datetime(self.params[p])
else:
self.params[p] = pd.to_datetime(self.params[p])

logger.info('start {}'.format(self.params['start']))
logger.info('stop {}'.format(self.params['stop']))

def _validate_force_times(self, force_times):

for p, i in [('start', 0), ('stop', -1)]:
# infer times from force_times
if self.params[p] is None:
self.params[p] = pd.Timestamp(
force_times.values[i]).to_pydatetime()

assert self.params['start'] >= pd.Timestamp(
force_times.values[0]).to_pydatetime()
assert self.params['stop'] <= pd.Timestamp(
force_times.values[-1]).to_pydatetime()

self.params['state_start'] = (self.params['start'] -
pd.Timedelta("90 days"))
self.params['state_stop'] = (self.params['start'] -
pd.Timedelta("1 days"))
logger.info('start {}'.format(self.params['start']))
logger.info('stop {}'.format(self.params['stop']))

logger.info('force start {}', pd.Timestamp(
force_times.values[0]).to_pydatetime())
logger.info('force stop {}', pd.Timestamp(
force_times.values[-1]).to_pydatetime())

logger.info('state start {}'.format(self.params['state_start']))
logger.info('state stop {}'.format(self.params['state_stop']))

def load_inputs(self, close=True):
self.domain = self.domain.load()
self.met_data = self.met_data.load()
self.state = self.state.load()
if close:
self.domain.close()
self.met_data.close()
self.state.close()

def launch(self):
"""Farm out the jobs to separate processes"""
Expand Down Expand Up @@ -307,6 +378,9 @@ def setup_output(self, prototype: xr.Dataset=None):
# Need to convert some parameters to strings
if k in ['start', 'stop', 'annual', 'mtclim_swe_corr']:
v = str(v)
elif k in ['state_start', 'state_stop']:
# skip
continue
# Don't include complex types
if isinstance(v, dict):
v = json.dumps(v)
Expand All @@ -326,31 +400,37 @@ def setup_output(self, prototype: xr.Dataset=None):
def _aggregate_state(self):
"""Aggregate data out of the state file and load it into `met_data`"""
# Precipitation record
begin_record = self.params['start'] - pd.Timedelta("90 days")
end_record = self.params['start'] - pd.Timedelta("1 days")
record_dates = date_range(begin_record, end_record,

assert self.state.dims['time'] == 90
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really don't like this check, because it seems to limit some cases that are pretty useful in my mind. For example, if you have a long timeseries broken up into 1 year chunks you could previously just use the previous year's data as your state file. Now it looks like you'd have to preprocess all of those files and create an additional state file for each years input.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really, our read_state function handles this correctly now with improved slice selection.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm open to other ideas for how to pin down the behavior in this method but I was finding it useful to explicitly check the state shape before going into the computations in this method.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I didn't catch that's what you were doing with the state_start and all that. In that case, I like this.


record_dates = date_range(self.params['state_start'],
self.params['state_stop'],
calendar=self.params['calendar'])
trailing = self.state['prec'].sel(time=record_dates)
trailing = self.state['prec']
trailing['time'] = record_dates
total_precip = xr.concat([trailing, self.met_data['prec']], dim='time')
total_precip = (cnst.DAYS_PER_YEAR * total_precip.rolling(
time=90).mean().drop(record_dates, dim='time'))
time=90).mean().sel(time=slice(self.params['start'],
self.params['stop'])))

self.met_data['seasonal_prec'] = total_precip

# Smoothed daily temperature range
trailing = self.state['t_max'] - self.state['t_min']
begin_record = self.params['start'] - pd.Timedelta("90 days")
end_record = self.params['start'] - pd.Timedelta("1 days")
record_dates = date_range(begin_record, end_record,
calendar=self.params['calendar'])

trailing['time'] = record_dates
dtr = self.met_data['t_max'] - self.met_data['t_min']
sm_dtr = xr.concat([trailing, dtr], dim='time')
sm_dtr = sm_dtr.rolling(time=30).mean().drop(record_dates, dim='time')
self.met_data['smoothed_dtr'] = sm_dtr

# Put in SWE data
self.state['swe'] = self.state.sel(time=end_record).swe.drop('time')
self.met_data['swe'] = self.state.swe.copy()
if 'time' in self.state['swe'].dims:
self.state['swe'] = self.state['swe'].sel(
time=self.params['state_stop']).drop('time')
else:
self.state['swe'] = self.state['swe']
self.met_data['swe'] = self.state['swe'].copy()

def _validate_setup(self):
"""Updates the global parameters dictionary"""
Expand Down
19 changes: 2 additions & 17 deletions scripts/ms
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import sys
import json
import logging
import argparse
import pandas as pd
from configparser import SafeConfigParser
from collections import OrderedDict

Expand All @@ -42,7 +41,7 @@ def parse(args):
def init(opts):
"""Initialize some information based on the options & config"""
if not os.path.isfile(opts.config):
exit("Configuration file does not exist."
exit("Configuration file {} does not exist.".format(opts.config)
+ "Use `ms -h` for more information.")
config = SafeConfigParser()
config.optionxform = str
Expand All @@ -69,24 +68,10 @@ def init(opts):
domain_file = conf['domain']
state_file = conf['state']

# Generate the date range that will be put into the data frame
start_date, start_hour = conf['start'].split(':')
start_year, start_month, start_day = start_date.split('/')
end_year, end_month, end_day = conf['stop'].split('/')
start = pd.datetime(int(start_year),
int(start_month),
int(start_day),
int(start_hour))
stop = pd.datetime(int(end_year),
int(end_month),
int(end_day))

def to_list(s):
return json.loads(s.replace("'", '"'))

conf.update({"stop": stop,
"start": start,
"calendar": conf.get('calendar', 'standard'),
conf.update({"calendar": conf.get('calendar', 'standard'),
"nprocs": opts.n_processes,
"method": method,
"out_dir": out_dir,
Expand Down