Skip to content

Commit

Permalink
Merge pull request #188 from USDA-ARS-NWRC/186_awsm_integration
Browse files Browse the repository at this point in the history
186 awsm integration
  • Loading branch information
Scott Havens committed Oct 2, 2020
2 parents c93c175 + 4cf0d20 commit dc717e8
Showing 1 changed file with 125 additions and 113 deletions.
238 changes: 125 additions & 113 deletions smrf/framework/model_framework.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
>>> s.create_distribution() # initialize the distribution
>>> s.initializeOutput() # initialize the outputs if desired
>>> s.loadData() # load weather data and station metadata
>>> s.distributeData() # distribute
>>> s.disttribute_data() # distribute
"""

Expand Down Expand Up @@ -218,6 +218,14 @@ def __exit__(self, exc_type, exc_value, traceback):
self._logger.info('SMRF closed --> %s' % datetime.now())
logging.shutdown()

@property
def possible_output_variables(self):
# Collect the potential output variables
variables = {}
for variable, module in self.distribute.items():
variables.update(module.output_variables)
return variables

def loadTopo(self):
"""
Load the information from the configFile in the ['topo'] section. See
Expand Down Expand Up @@ -332,19 +340,19 @@ def loadData(self):
self._logger.info('Backing up input data...')
backup_input(self.data, self.ucfg)

def distributeData(self):
def disttribute_data(self):
"""
Wrapper for various distribute methods. If threading was set in
configFile, then
:func:`~smrf.framework.model_framework.SMRF.distributeData_threaded`
:func:`~smrf.framework.model_framework.SMRF.disttribute_data_threaded`
will be called. Default will call
:func:`~smrf.framework.model_framework.SMRF.distributeData_single`.
:func:`~smrf.framework.model_framework.SMRF.disttribute_data_serial`.
"""

if self.threading:
self.distributeData_threaded()
self.disttribute_data_threaded()
else:
self.distributeData_single()
self.disttribute_data_serial()

def initialize_distribution(self, date_time=None):
"""Call the initialize method for each distribute module
Expand All @@ -357,7 +365,7 @@ def initialize_distribution(self, date_time=None):
for v in self.distribute:
self.distribute[v].initialize(self.topo, self.data, date_time)

def distributeData_single(self):
def disttribute_data_serial(self):
"""
Distribute the measurement point data for all variables in serial. Each
variable is initialized first using the :func:`smrf.data.loadTopo.Topo`
Expand All @@ -384,90 +392,10 @@ def distributeData_single(self):
# -------------------------------------
# Distribute the data
for output_count, t in enumerate(self.date_time):
# wait here for the model to catch up if needed

startTime = datetime.now()
self._logger.info('Distributing time step %s' % t)

if self.hrrr_data_timestep:
self.data.load_class.load_timestep(t)
self.data.set_variables()

# 0.1 sun angle for time step
cosz, azimuth, rad_vec = sunang.sunang(
t.astimezone(pytz.utc),
self.topo.basin_lat,
self.topo.basin_long)

# 0.2 illumination angle
illum_ang = None
if cosz > 0:
illum_ang = shade(
self.topo.sin_slope,
self.topo.aspect,
azimuth,
cosz)

# 1. Air temperature
self.distribute['air_temp'].distribute(self.data.air_temp.loc[t])

# 2. Vapor pressure
self.distribute['vapor_pressure'].distribute(
self.data.vapor_pressure.loc[t],
self.distribute['air_temp'].air_temp)

# 3. Wind_speed and wind_direction
self.distribute['wind'].distribute(
self.data.wind_speed.loc[t],
self.data.wind_direction.loc[t],
t)

# 4. Precipitation
self.distribute['precipitation'].distribute(
self.data.precip.loc[t],
self.distribute['vapor_pressure'].dew_point,
self.distribute['vapor_pressure'].precip_temp,
self.distribute['air_temp'].air_temp,
t,
self.data.wind_speed.loc[t],
self.data.air_temp.loc[t],
self.distribute['wind'].wind_direction,
self.distribute['wind'].wind_model.dir_round_cell,
self.distribute['wind'].wind_speed,
self.distribute['wind'].wind_model.cellmaxus)

# 5. Albedo
self.distribute['albedo'].distribute(
t,
illum_ang,
self.distribute['precipitation'].storm_days)

# 6. cloud_factor
self.distribute['cloud_factor'].distribute(
self.data.cloud_factor.loc[t])

# 7. Solar
self.distribute['solar'].distribute(
t,
self.distribute["cloud_factor"].cloud_factor,
illum_ang,
cosz,
azimuth,
self.distribute['albedo'].albedo_vis,
self.distribute['albedo'].albedo_ir)

# 8. thermal radiation
self.distribute['thermal'].distribute(
t,
self.distribute['air_temp'].air_temp,
self.distribute['vapor_pressure'].vapor_pressure,
self.distribute['vapor_pressure'].dew_point,
self.distribute['cloud_factor'].cloud_factor)

# 9. Soil temperature
self.distribute['soil_temp'].distribute()

# 10. output at the frequency and the last time step
self.distribute_single_timestep(t)
self.output(t)

telapsed = datetime.now() - startTime
Expand All @@ -476,7 +404,89 @@ def distributeData_single(self):

self.forcing_data = 1

def distributeData_threaded(self):
def distribute_single_timestep(self, t):

self._logger.info('Distributing time step {}'.format(t))

if self.hrrr_data_timestep:
self.data.load_class.load_timestep(t)
self.data.set_variables()

# 0.1 sun angle for time step
cosz, azimuth, rad_vec = sunang.sunang(
t.astimezone(pytz.utc),
self.topo.basin_lat,
self.topo.basin_long)

# 0.2 illumination angle
illum_ang = None
if cosz > 0:
illum_ang = shade(
self.topo.sin_slope,
self.topo.aspect,
azimuth,
cosz)

# 1. Air temperature
self.distribute['air_temp'].distribute(self.data.air_temp.loc[t])

# 2. Vapor pressure
self.distribute['vapor_pressure'].distribute(
self.data.vapor_pressure.loc[t],
self.distribute['air_temp'].air_temp)

# 3. Wind_speed and wind_direction
self.distribute['wind'].distribute(
self.data.wind_speed.loc[t],
self.data.wind_direction.loc[t],
t)

# 4. Precipitation
self.distribute['precipitation'].distribute(
self.data.precip.loc[t],
self.distribute['vapor_pressure'].dew_point,
self.distribute['vapor_pressure'].precip_temp,
self.distribute['air_temp'].air_temp,
t,
self.data.wind_speed.loc[t],
self.data.air_temp.loc[t],
self.distribute['wind'].wind_direction,
self.distribute['wind'].wind_model.dir_round_cell,
self.distribute['wind'].wind_speed,
self.distribute['wind'].wind_model.cellmaxus)

# 5. Albedo
self.distribute['albedo'].distribute(
t,
illum_ang,
self.distribute['precipitation'].storm_days)

# 6. cloud_factor
self.distribute['cloud_factor'].distribute(
self.data.cloud_factor.loc[t])

# 7. Solar
self.distribute['solar'].distribute(
t,
self.distribute["cloud_factor"].cloud_factor,
illum_ang,
cosz,
azimuth,
self.distribute['albedo'].albedo_vis,
self.distribute['albedo'].albedo_ir)

# 8. thermal radiation
self.distribute['thermal'].distribute(
t,
self.distribute['air_temp'].air_temp,
self.distribute['vapor_pressure'].vapor_pressure,
self.distribute['vapor_pressure'].dew_point,
self.distribute['cloud_factor'].cloud_factor)

# 9. Soil temperature
self.distribute['soil_temp'].distribute()

def disttribute_data_threaded(self):
"""
Distribute the measurement point data for all variables using threading
and queues. Each variable is initialized first using the
Expand Down Expand Up @@ -624,32 +634,15 @@ def create_distributed_threads(self, other_queue=None):
args=(self.smrf_queue, self.data_queue))
)

def initializeOutput(self):
"""
Initialize the output files based on the configFile section ['output'].
Currently only :func:`NetCDF files
<smrf.output.output_netcdf.OutputNetcdf>` are supported.
"""
out_location = self.config['output']['out_location']

# determine the variables to be output
s_var_list = ", ".join(self.config['output']['variables'])
self._logger.info('{} variables will be output'.format(s_var_list))

output_variables = self.config['output']['variables']

# Collect the potential output variables
possible_output_variables = {}
for variable, module in self.distribute.items():
possible_output_variables.update(module.output_variables)
def create_output_variable_dict(self, output_variables, out_location):

# determine which variables belong where
variable_dict = {}
for output_variable in output_variables:

if output_variable in possible_output_variables.keys():
if output_variable in self.possible_output_variables.keys():
fname = join(out_location, output_variable)
module = possible_output_variables[output_variable]['module']
module = self.possible_output_variables[output_variable]['module'] # noqa

# TODO this is a hack to not have to redo the gold files
if module == 'precipitation':
Expand All @@ -668,8 +661,27 @@ def initializeOutput(self):
self._logger.error(
'{} not an output variable'.format(output_variable))

return variable_dict

def initializeOutput(self):
"""
Initialize the output files based on the configFile section ['output'].
Currently only :func:`NetCDF files
<smrf.output.output_netcdf.OutputNetcdf>` are supported.
"""
out_location = self.config['output']['out_location']

# determine the variables to be output
s_var_list = ", ".join(self.config['output']['variables'])
self._logger.info('{} variables will be output'.format(s_var_list))

output_variables = self.config['output']['variables']

variable_dict = self.create_output_variable_dict(
output_variables, out_location)

self._logger.debug('{} of {} variables will be output'.format(
len(output_variables), len(possible_output_variables)))
len(output_variables), len(self.possible_output_variables)))

# determine what type of file to output
if self.config['output']['file_type'].lower() == 'netcdf':
Expand Down Expand Up @@ -762,7 +774,7 @@ def title(self, option):
self._logger.info(line)


def run_smrf(config):
def run_smrf(config, external_logger=None):
"""
Function that runs smrf how it should be operate for full runs.
Expand All @@ -771,7 +783,7 @@ def run_smrf(config):
"""
start = datetime.now()
# initialize
with SMRF(config) as s:
with SMRF(config, external_logger) as s:
# load topo data
s.loadTopo()

Expand All @@ -785,7 +797,7 @@ def run_smrf(config):
s.loadData()

# distribute
s.distributeData()
s.disttribute_data()

# post process if necessary
s.post_process()
Expand Down

0 comments on commit dc717e8

Please sign in to comment.