Skip to content

Commit

Permalink
Issue #521: extend rocoto workflow xml generator to include UFS DA
Browse files Browse the repository at this point in the history
  • Loading branch information
RussTreadon-NOAA committed Feb 15, 2022
1 parent 29c9ce2 commit 134fbe9
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 22 deletions.
3 changes: 3 additions & 0 deletions parm/config/config.base.emc.dyn
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,9 @@ export gldas_cyc=00
export imp_physics=11

# Shared parameters
# DA engine
export DO_UFSDA="NO"

# Hybrid related
export DOHYBVAR="YES"
export NMEM_ENKF=@NMEM_ENKF@
Expand Down
3 changes: 3 additions & 0 deletions parm/config/config.base.nco.static
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,9 @@ export WAVE_CDUMP="both"
export imp_physics=11

# Shared parameters
# DA engine
export DO_UFSDA="NO"

# Hybrid related
export DOHYBVAR="YES"
export NMEM_ENKF="80"
Expand Down
65 changes: 44 additions & 21 deletions ush/rocoto/setup_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def main():
print(f'input arg: --expdir = {repr(args.expdir)}')
sys.exit(1)

gfs_steps = ['prep', 'anal', 'analdiag', 'analcalc', 'gldas', 'fcst', 'postsnd', 'post', 'vrfy', 'arch']
gfs_steps = ['analcalc', 'gldas', 'fcst', 'postsnd', 'post', 'vrfy', 'arch']
gfs_steps_gempak = ['gempak']
gfs_steps_awips = ['awips']
gfs_steps_wafs = ['wafs', 'wafsgrib2', 'wafsblending', 'wafsgcip', 'wafsgrib20p25', 'wafsblending0p25']
Expand All @@ -55,8 +55,11 @@ def main():
# From gfsv16b latest
# gfs_steps = ['prep', 'anal', 'gldas', 'fcst', 'postsnd', 'post', 'awips', 'gempak', 'vrfy', 'metp', 'arch']
hyb_steps = ['eobs', 'ediag', 'eomg', 'eupd', 'ecen', 'esfc', 'efcs', 'echgres', 'epos', 'earc']
gsida_steps = ['prep', 'anal', 'analdiag']
ufsda_steps = ['atmanalprep', 'atmanalrun', 'atmanalpost']

steps = gfs_steps + hyb_steps if _base.get('DOHYBVAR', 'NO') == 'YES' else gfs_steps
steps = gfs_steps + ufsda_steps if _base.get('DO_UFSDA', 'NO') == 'YES' else gfs_steps + gsida_steps
steps = steps + hyb_steps if _base.get('DOHYBVAR', 'NO') == 'YES' else steps
steps = steps + metp_steps if _base.get('DO_METP', 'NO') == 'YES' else steps
steps = steps + gfs_steps_gempak if _base.get('DO_GEMPAK', 'NO') == 'YES' else steps
steps = steps + gfs_steps_awips if _base.get('DO_AWIPS', 'NO') == 'YES' else steps
Expand Down Expand Up @@ -241,13 +244,23 @@ def get_gdasgfs_resources(dict_configs, cdump='gdas'):
do_gldas = base.get('DO_GLDAS', 'NO').upper()
do_wave = base.get('DO_WAVE', 'NO').upper()
do_wave_cdump = base.get('WAVE_CDUMP', 'BOTH').upper()
do_ufsda = base.get('DO_UFSDA', 'NO').upper()
reservation = base.get('RESERVATION', 'NONE').upper()

#tasks = ['prep', 'anal', 'fcst', 'post', 'vrfy', 'arch']
tasks = ['prep', 'anal', 'analcalc']
if do_ufsda in ['Y', 'YES']:
task_prep = ['atmanalprep']
task_anal = ['atmanalrun']
task_diag = ['atmanalpost']
else:
task_prep = ['prep']
task_anal = ['anal']
task_diag = ['analdiag']

tasks = task_prep + task_anal
tasks += ['analcalc']
if cdump in ['gdas']:
tasks += ['analdiag']
tasks += task_diag
if cdump in ['gdas'] and do_gldas in ['Y', 'YES']:
tasks += ['gldas']
if cdump in ['gdas'] and do_wave in ['Y', 'YES'] and do_wave_cdump in ['GDAS', 'BOTH']:
Expand Down Expand Up @@ -410,6 +423,7 @@ def get_gdasgfs_tasks(dict_configs, cdump='gdas'):
do_metp = base.get('DO_METP', 'NO').upper()
do_gldas = base.get('DO_GLDAS', 'NO').upper()
do_wave = base.get('DO_WAVE', 'NO').upper()
do_ufsda = base.get('DO_UFSDA', 'NO').upper()
if do_wave in ['YES']:
do_wave_bnd = dict_configs['wavepostsbs'].get('DOBNDPNT_WAVE', "YES").upper()
do_wave_cdump = base.get('WAVE_CDUMP', 'BOTH').upper()
Expand All @@ -418,6 +432,15 @@ def get_gdasgfs_tasks(dict_configs, cdump='gdas'):

dict_tasks = OrderedDict()

if do_ufsda in ['Y', 'YES']:
task_prep = 'atmanalprep'
task_anal = 'atmanalrun'
task_diag = 'atmanalpost'
else:
task_prep = 'prep'
task_anal = 'anal'
task_diag = 'analdiag'

# prep
deps = []
dep_dict = {'type': 'metatask', 'name': f'{"gdas"}post', 'offset': '-06:00:00'}
Expand All @@ -434,14 +457,14 @@ def get_gdasgfs_tasks(dict_configs, cdump='gdas'):

if gfs_enkf and cdump in ['gfs']:
if gfs_cyc == 4:
task = wfu.create_wf_task('prep', cdump=cdump, envar=envars, dependency=dependencies)
task = wfu.create_wf_task(task=task_prep, cdump=cdump, envar=envars, dependency=dependencies)
else:
task = wfu.create_wf_task('prep', cdump=cdump, envar=envars, dependency=dependencies, cycledef='gdas')
task = wfu.create_wf_task(task=task_prep, cdump=cdump, envar=envars, dependency=dependencies, cycledef='gdas')

else:
task = wfu.create_wf_task('prep', cdump=cdump, envar=envars, dependency=dependencies)
task = wfu.create_wf_task(task=task_prep, cdump=cdump, envar=envars, dependency=dependencies)

dict_tasks[f'{cdump}prep'] = task
dict_tasks[f'{cdump}{task_prep}'] = task

# wave tasks in gdas or gfs or both
if do_wave_cdump in ['BOTH']:
Expand All @@ -454,7 +477,7 @@ def get_gdasgfs_tasks(dict_configs, cdump='gdas'):
# waveinit
if do_wave in ['Y', 'YES'] and cdump in cdumps:
deps = []
dep_dict = {'type': 'task', 'name': '{cdump}prep'}
dep_dict = {'type': 'task', 'name': '{cdump}{task_prep}'}
deps.append(rocoto.add_dependency(dep_dict))
dep_dict = {'type': 'cycleexist', 'condition': 'not', 'offset': '-06:00:00'}
deps.append(rocoto.add_dependency(dep_dict))
Expand All @@ -473,24 +496,24 @@ def get_gdasgfs_tasks(dict_configs, cdump='gdas'):

# anal
deps = []
dep_dict = {'type': 'task', 'name': f'{cdump}prep'}
dep_dict = {'type': 'task', 'name': f'{cdump}{task_prep}'}
deps.append(rocoto.add_dependency(dep_dict))
if dohybvar in ['y', 'Y', 'yes', 'YES']:
dep_dict = {'type': 'metatask', 'name': f'{"gdas"}epmn', 'offset': '-06:00:00'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep_condition='and', dep=deps)
else:
dependencies = rocoto.create_dependency(dep=deps)
task = wfu.create_wf_task('anal', cdump=cdump, envar=envars, dependency=dependencies)
task = wfu.create_wf_task(task=task_anal, cdump=cdump, envar=envars, dependency=dependencies)

dict_tasks[f'{cdump}anal'] = task
dict_tasks[f'{cdump}{task_anal}'] = task

# analcalc
deps1 = []
data = f'&ROTDIR;/{cdump}.@Y@m@d/@H/atmos/{cdump}.t@Hz.loginc.txt'
dep_dict = {'type': 'data', 'data': data}
deps1.append(rocoto.add_dependency(dep_dict))
dep_dict = {'type': 'task', 'name': f'{cdump}anal'}
dep_dict = {'type': 'task', 'name': f'{cdump}{task_anal}'}
deps.append(rocoto.add_dependency(dep_dict))
if dohybvar in ['y', 'Y', 'yes', 'YES'] and cdump == 'gdas':
dep_dict = {'type': 'task', 'name': f'{"gdas"}echgres', 'offset': '-06:00:00'}
Expand All @@ -508,7 +531,7 @@ def get_gdasgfs_tasks(dict_configs, cdump='gdas'):
data = f'&ROTDIR;/{cdump}.@Y@m@d/@H/atmos/{cdump}.t@Hz.loginc.txt'
dep_dict = {'type': 'data', 'data': data}
deps1.append(rocoto.add_dependency(dep_dict))
dep_dict = {'type': 'task', 'name': f'{cdump}anal'}
dep_dict = {'type': 'task', 'name': f'{cdump}{task_anal}'}
deps1.append(rocoto.add_dependency(dep_dict))
dependencies1 = rocoto.create_dependency(dep_condition='or', dep=deps1)

Expand All @@ -518,17 +541,17 @@ def get_gdasgfs_tasks(dict_configs, cdump='gdas'):
deps2.append(rocoto.add_dependency(dep_dict))
dependencies2 = rocoto.create_dependency(dep_condition='and', dep=deps2)

task = wfu.create_wf_task('analdiag', cdump=cdump, envar=envars, dependency=dependencies2)
task = wfu.create_wf_task(task=task_diag, cdump=cdump, envar=envars, dependency=dependencies2)

dict_tasks[f'{cdump}analdiag'] = task
dict_tasks[f'{cdump}{task_diag}'] = task

# gldas
if cdump in ['gdas'] and do_gldas in ['Y', 'YES']:
deps1 = []
data = f'&ROTDIR;/{cdump}.@Y@m@d/@H/atmos/{cdump}.t@Hz.loginc.txt'
dep_dict = {'type': 'data', 'data': data}
deps1.append(rocoto.add_dependency(dep_dict))
dep_dict = {'type': 'task', 'name': f'{cdump}anal'}
dep_dict = {'type': 'task', 'name': f'{cdump}{task_anal}'}
deps1.append(rocoto.add_dependency(dep_dict))
dependencies1 = rocoto.create_dependency(dep_condition='or', dep=deps1)

Expand Down Expand Up @@ -556,7 +579,7 @@ def get_gdasgfs_tasks(dict_configs, cdump='gdas'):
dep_dict = {'type': 'task', 'name': f'{cdump}analcalc'}
deps1.append(rocoto.add_dependency(dep_dict))
elif cdump in ['gfs']:
dep_dict = {'type': 'task', 'name': f'{cdump}anal'}
dep_dict = {'type': 'task', 'name': f'{cdump}{task_anal}'}
deps1.append(rocoto.add_dependency(dep_dict))
dependencies1 = rocoto.create_dependency(dep_condition='or', dep=deps1)

Expand Down Expand Up @@ -987,7 +1010,7 @@ def get_hyb_tasks(dict_configs, cycledef='enkf'):

# eobs
deps = []
dep_dict = {'type': 'task', 'name': f'{cdump}prep'}
dep_dict = {'type': 'task', 'name': f'{cdump}{task_prep}'}
deps.append(rocoto.add_dependency(dep_dict))
dep_dict = {'type': 'metatask', 'name': f'{"gdas"}epmn', 'offset': '-06:00:00'}
deps.append(rocoto.add_dependency(dep_dict))
Expand Down Expand Up @@ -1402,7 +1425,7 @@ def create_xml(dict_configs):
if gfs_cyc != 0:
xmlfile.append(dict_to_strings(dict_gfs_resources))
elif gfs_cyc == 0 and dohybvar in ['Y', 'YES'] and eupd_cyc in ['BOTH', 'GFS']:
xmlfile.append(dict_gfs_resources['gfsprep'])
xmlfile.append(dict_gfs_resources['gfs{task_prep}'])

xmlfile.append(workflow_header)

Expand All @@ -1414,7 +1437,7 @@ def create_xml(dict_configs):
if gfs_cyc != 0:
xmlfile.append(dict_to_strings(dict_gfs_tasks))
elif gfs_cyc == 0 and dohybvar in ['Y', 'YES'] and eupd_cyc in ['BOTH', 'GFS']:
xmlfile.append(dict_gfs_tasks['gfsprep'])
xmlfile.append(dict_gfs_tasks['gfs{task_prep}'])
xmlfile.append('\n')

xmlfile.append(workflow_footer)
Expand Down
2 changes: 1 addition & 1 deletion ush/rocoto/workflow_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ def get_scheduler(machine):
except KeyError:
raise UnknownMachineError(f'Unknown machine: {machine}, ABORT!')

def create_wf_task(task, cdump='gdas', cycledef=None, envar=None, dependency=None, \
def create_wf_task(task=None, cdump='gdas', cycledef=None, envar=None, dependency=None, \
metatask=None, varname=None, varval=None, vardict=None, \
final=False):

Expand Down

0 comments on commit 134fbe9

Please sign in to comment.