Skip to content

Commit

Permalink
Update xml generation to include JEDI_VAR and JEDI_ENS
Browse files Browse the repository at this point in the history
  • Loading branch information
RussTreadon-NOAA committed Jul 21, 2022
1 parent cbb3f6d commit fbbc582
Show file tree
Hide file tree
Showing 2 changed files with 152 additions and 8 deletions.
44 changes: 36 additions & 8 deletions workflow/applications.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ def __init__(self, configuration: Configuration) -> None:
self.do_wafs = _base.get('DO_WAFS', False)
self.do_vrfy = _base.get('DO_VRFY', True)
self.do_metp = _base.get('DO_METP', False)
self.do_jedivar = _base.get('DO_JEDIVAR', False)
self.do_jediens = _base.get('DO_JEDIENS', False)

self.do_hpssarch = _base.get('HPSSARCH', False)

Expand Down Expand Up @@ -170,15 +172,27 @@ def _cycled_configs(self):
Returns the config_files that are involved in the cycled app
"""

configs = ['prep',
'anal', 'sfcanl', 'analdiag', 'analcalc',
'fcst', 'post', 'vrfy', 'arch']
configs = ['prep']
## 'sfcanl', 'analcalc',
## 'fcst', 'post', 'vrfy', 'arch']

if self.do_jedivar:
configs += ['atmanalprep', 'atmanalrun', 'atmanalpost']
else:
configs += ['anal', 'analdiag']

configs += ['sfcanl', 'analcalc', 'fcst', 'post', 'vrfy', 'arch']


if self.do_gldas:
configs += ['gldas']

if self.do_hybvar:
configs += ['eobs', 'eomg', 'ediag', 'eupd', 'ecen', 'esfc', 'efcs', 'echgres', 'epos', 'earc']
if self.do_jediens:
configs += ['atmensanalprep', 'atmensanalrun', 'atmensanalpost']
else:
configs += ['eobs', 'eomg', 'ediag', 'eupd']
configs += ['ecen', 'esfc', 'efcs', 'echgres', 'epos', 'earc']

if self.do_metp:
configs += ['metp']
Expand Down Expand Up @@ -316,10 +330,17 @@ def _get_cycled_task_names(self):
This is the place where that order is set.
"""

gdas_gfs_common_tasks_before_fcst = ['prep', 'anal', 'sfcanl', 'analcalc']
gdas_gfs_common_tasks_before_fcst = ['prep']
gdas_gfs_common_tasks_after_fcst = ['post', 'vrfy']
gdas_gfs_common_cleanup_tasks = ['arch']

if self.do_jedivar:
gdas_gfs_common_tasks_before_fcst += ['atmanalprep', 'atmanalrun', 'atmanalpost']
else:
gdas_gfs_common_tasks_before_fcst += ['anal']

gdas_gfs_common_tasks_before_fcst += ['sfcanl', 'analcalc']

gldas_tasks = ['gldas']
wave_prep_tasks = ['waveinit', 'waveprep']
wave_bndpnt_tasks = ['wavepostbndpnt', 'wavepostbndpntbll']
Expand All @@ -328,12 +349,19 @@ def _get_cycled_task_names(self):
hybrid_gdas_or_gfs_tasks = []
hybrid_gdas_tasks = []
if self.do_hybvar:
hybrid_gdas_or_gfs_tasks += ['eobs', 'eupd', 'echgres']
hybrid_gdas_or_gfs_tasks += ['ediag'] if self.lobsdiag_forenkf else ['eomg']
if self.do_jediens:
hybrid_gdas_or_gfs_tasks += ['atmensanalprep', 'atmensanalrun', 'atmensanalpost', 'echgres']
else:
hybrid_gdas_or_gfs_tasks += ['eobs', 'eupd', 'echgres']
hybrid_gdas_or_gfs_tasks += ['ediag'] if self.lobsdiag_forenkf else ['eomg']
hybrid_gdas_tasks += ['ecen', 'esfc', 'efcs', 'epos', 'earc']

# Collect all "gdas" cycle tasks
gdas_tasks = gdas_gfs_common_tasks_before_fcst + ['analdiag']
gdas_tasks = []
if self.do_jedivar:
gdas_tasks += gdas_gfs_common_tasks_before_fcst
else:
gdas_tasks += gdas_gfs_common_tasks_before_fcst + ['analdiag']

if self.do_gldas:
gdas_tasks += gldas_tasks
Expand Down
116 changes: 116 additions & 0 deletions workflow/rocoto/workflow_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ class Tasks:
SERVICE_TASKS = ['arch', 'earc', 'getic']
VALID_TASKS = ['aerosol_init', 'coupled_ic', 'getic', 'init',
'prep', 'anal', 'sfcanl', 'analcalc', 'analdiag', 'gldas', 'arch',
'atmanalprep', 'atmanalrun', 'atmanalpost',
'earc', 'ecen', 'echgres', 'ediag', 'efcs',
'eobs', 'eomg', 'epos', 'esfc', 'eupd',
'atmensanalprep', 'atmensanalrun', 'atmensanalpost',
'fcst', 'post', 'ocnpost', 'vrfy', 'metp',
'postsnd', 'awips', 'gempak',
'wafs', 'wafsblending', 'wafsblending0p25',
Expand Down Expand Up @@ -376,6 +378,65 @@ def analdiag(self):

return task

def atmanalprep(self):

suffix = self._base["SUFFIX"]
dump_suffix = self._base["DUMP_SUFFIX"]
gfs_cyc = self._base["gfs_cyc"]
dmpdir = self._base["DMPDIR"]
gfs_enkf = True if self.app_config.do_hybvar and 'gfs' in self.app_config.eupd_cdumps else False

deps = []
dep_dict = {'type': 'metatask', 'name': f'{"gdas"}post', 'offset': '-06:00:00'}
deps.append(rocoto.add_dependency(dep_dict))
data = f'&ROTDIR;/gdas.@Y@m@d/@H/atmos/gdas.t@Hz.atmf009{suffix}'
dep_dict = {'type': 'data', 'data': data, 'offset': '-06:00:00'}
deps.append(rocoto.add_dependency(dep_dict))
data = f'{dmpdir}/{self.cdump}{dump_suffix}.@Y@m@d/@H/{self.cdump}.t@Hz.updated.status.tm00.bufr_d'
dep_dict = {'type': 'data', 'data': data}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep_condition='and', dep=deps)

cycledef = self.cdump
if self.cdump in ['gfs'] and gfs_enkf and gfs_cyc != 4:
cycledef = 'gdas'

resources = self.get_resource('atmanalprep')
task = create_wf_task('atmanalprep', resources, cdump=self.cdump, envar=self.envars, dependency=dependencies,
cycledef=cycledef)
return task

def atmanalrun(self):

deps = []
dep_dict = {'type': 'task', 'name': f'{self.cdump}atmanalprep'}
deps.append(rocoto.add_dependency(dep_dict))
if self.app_config.do_hybvar:
dep_dict = {'type': 'metatask', 'name': f'{"gdas"}epmn', 'offset': '-06:00:00'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep_condition='and', dep=deps)
else:
dependencies = rocoto.create_dependency(dep=deps)

resources = self.get_resource('atmanalrun')
task = create_wf_task('atmanalrun', resources, cdump=self.cdump, envar=self.envars, dependency=dependencies)

return task

def atmanalpost(self):

deps = []
dep_dict = {'type': 'task', 'name': f'{self.cdump}atmanalrun'}
deps.append(rocoto.add_dependency(dep_dict))
dep_dict = {'type': 'cycleexist', 'offset': '-06:00:00'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep_condition='and', dep=deps)

resources = self.get_resource('atmanalpost')
task = create_wf_task('atmanalpost', resources, cdump=self.cdump, envar=self.envars, dependency=dependencies)

return task

def gldas(self):

deps = []
Expand Down Expand Up @@ -880,6 +941,61 @@ def eupd(self):

return task

def atmensanalprep(self):

suffix = self._base["SUFFIX"]
dump_suffix = self._base["DUMP_SUFFIX"]
gfs_cyc = self._base["gfs_cyc"]
dmpdir = self._base["DMPDIR"]
gfs_enkf = True if self.app_config.do_hybvar and 'gfs' in self.app_config.eupd_cdumps else False

deps = []
dep_dict = {'type': 'metatask', 'name': f'{"gdas"}post', 'offset': '-06:00:00'}
deps.append(rocoto.add_dependency(dep_dict))
data = f'&ROTDIR;/gdas.@Y@m@d/@H/atmos/gdas.t@Hz.atmf009{suffix}'
dep_dict = {'type': 'data', 'data': data, 'offset': '-06:00:00'}
deps.append(rocoto.add_dependency(dep_dict))
data = f'{dmpdir}/{self.cdump}{dump_suffix}.@Y@m@d/@H/{self.cdump}.t@Hz.updated.status.tm00.bufr_d'
dep_dict = {'type': 'data', 'data': data}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep_condition='and', dep=deps)

cycledef = self.cdump
if self.cdump in ['gfs'] and gfs_enkf and gfs_cyc != 4:
cycledef = 'gdas'

resources = self.get_resource('atmensanalprep')
task = create_wf_task('atmensanalprep', resources, cdump=self.cdump, envar=self.envars, dependency=dependencies,
cycledef=cycledef)

return task

def atmensanalrun(self):

deps = []
dep_dict = {'type': 'task', 'name': f'{self.cdump}atmensanalprep'}
deps.append(rocoto.add_dependency(dep_dict))
dep_dict = {'type': 'metatask', 'name': f'{"gdas"}epmn', 'offset': '-06:00:00'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep_condition='and', dep=deps)

resources = self.get_resource('atmensanalrun')
task = create_wf_task('atmensanalrun', resources, cdump=self.cdump, envar=self.envars, dependency=dependencies)

return task

def atmensanalpost(self):

deps = []
dep_dict = {'type': 'task', 'name': f'{self.cdump}atmensanalrun'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep=deps)

resources = self.get_resource('atmensanalpost')
task = create_wf_task('atmensanalpost', resources, cdump=self.cdump, envar=self.envars, dependency=dependencies)

return task

def ecen(self):

self._is_this_a_gdas_task(self.cdump, 'ecen')
Expand Down

0 comments on commit fbbc582

Please sign in to comment.