Skip to content

Commit

Permalink
Refactor rocoto task XML creation (#2189)
Browse files Browse the repository at this point in the history
Refactors the rocoto task generation to be recursive. 
This will allow nested metatasks to loop over multiple variables, which is needed for GEFS product generation.

As part of this refactor, there is no longer separate arguments to designate metatasks. 
Instead, task dicts can include a nested 'task_dict' as well as a 'var_dict' containing the variables to loop over. 
The nested task dict can then either have another layer, or be the innermost task.

To accommodate the new recursive nature, some defaults that were previously defined in create_wf_task() had to be pushed down into the function that creates the innermost task. 
Also, former keywords have been absorbed by the task dict.

Refs #823
Refs #827
  • Loading branch information
WalterKolczynski-NOAA committed Jan 8, 2024
1 parent 2b81cfa commit ef6827d
Show file tree
Hide file tree
Showing 5 changed files with 1,341 additions and 226 deletions.
2 changes: 1 addition & 1 deletion jobs/JGFS_ATMOS_GEMPAK_META
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# GFS GEMPAK META PRODUCT GENERATION
############################################
source "${HOMEgfs}/ush/preamble.sh"
source "${HOMEgfs}/ush/jjob_header.sh" -e "gempak_meta" -e "base"
source "${HOMEgfs}/ush/jjob_header.sh" -e "gempak_meta" -c "base"


###############################################
Expand Down
71 changes: 61 additions & 10 deletions workflow/rocoto/gefs_tasks.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from applications.applications import AppConfig
from rocoto.tasks import Tasks, create_wf_task
from rocoto.tasks import Tasks
import rocoto.rocoto as rocoto


Expand Down Expand Up @@ -57,41 +57,74 @@ def stage_ic(self):
dependencies = rocoto.create_dependency(dep_condition='and', dep=deps)

resources = self.get_resource('stage_ic')
task = create_wf_task('stage_ic', resources, cdump=self.cdump, envar=self.envars, dependency=dependencies)
task_name = f'stage_ic'
task_dict = {'task_name': task_name,
'resources': resources,
'dependency': dependencies,
'envars': self.envars,
'cycledef': 'gefs',
'command': f'{self.HOMEgfs}/jobs/rocoto/stage_ic.sh',
'job_name': f'{self.pslot}_{task_name}_@H',
'log': f'{self.rotdir}/logs/@Y@m@d@H/{task_name}.log',
'maxtries': '&MAXTRIES;'
}
task = rocoto.create_task(task_dict)

return task

def waveinit(self):

resources = self.get_resource('waveinit')
task = create_wf_task('waveinit', resources, cdump=self.cdump, envar=self.envars, dependency=None)
task_name = f'waveinit'
task_dict = {'task_name': task_name,
'resources': resources,
'envars': self.envars,
'cycledef': 'gefs',
'command': f'{self.HOMEgfs}/jobs/rocoto/waveinit.sh',
'job_name': f'{self.pslot}_{task_name}_@H',
'log': f'{self.rotdir}/logs/@Y@m@d@H/{task_name}.log',
'maxtries': '&MAXTRIES;'
}
task = rocoto.create_task(task_dict)

return task

def fcst(self):

# TODO: Add real dependencies
dependencies = []
dep_dict = {'type': 'task', 'name': f'{self.cdump}stage_ic'}
dep_dict = {'type': 'task', 'name': f'stage_ic'}
dependencies.append(rocoto.add_dependency(dep_dict))

if self.app_config.do_wave:
dep_dict = {'type': 'task', 'name': f'{self.cdump}waveinit'}
dep_dict = {'type': 'task', 'name': f'waveinit'}
dependencies.append(rocoto.add_dependency(dep_dict))

dependencies = rocoto.create_dependency(dep_condition='and', dep=dependencies)

resources = self.get_resource('fcst')
task = create_wf_task('fcst', resources, cdump=self.cdump, envar=self.envars, dependency=dependencies)
task_name = f'fcst'
task_dict = {'task_name': task_name,
'resources': resources,
'dependency': dependencies,
'envars': self.envars,
'cycledef': 'gefs',
'command': f'{self.HOMEgfs}/jobs/rocoto/fcst.sh',
'job_name': f'{self.pslot}_{task_name}_@H',
'log': f'{self.rotdir}/logs/@Y@m@d@H/{task_name}.log',
'maxtries': '&MAXTRIES;'
}
task = rocoto.create_task(task_dict)

return task

def efcs(self):
dependencies = []
dep_dict = {'type': 'task', 'name': f'{self.cdump}stage_ic'}
dep_dict = {'type': 'task', 'name': f'stage_ic'}
dependencies.append(rocoto.add_dependency(dep_dict))

if self.app_config.do_wave:
dep_dict = {'type': 'task', 'name': f'{self.cdump}waveinit'}
dep_dict = {'type': 'task', 'name': f'waveinit'}
dependencies.append(rocoto.add_dependency(dep_dict))

dependencies = rocoto.create_dependency(dep_condition='and', dep=dependencies)
Expand All @@ -100,9 +133,27 @@ def efcs(self):
efcsenvars.append(rocoto.create_envar(name='ENSGRP', value='#grp#'))

groups = self._get_hybgroups(self._base['NMEM_ENS'], self._configs['efcs']['NMEM_EFCSGRP'])
var_dict = {'grp': groups}

resources = self.get_resource('efcs')
task = create_wf_task('efcs', resources, cdump=self.cdump, envar=efcsenvars, dependency=dependencies,
metatask='efmn', varname='grp', varval=groups, cycledef='gefs')

task_name = f'efcs#grp#'
task_dict = {'task_name': task_name,
'resources': resources,
'dependency': dependencies,
'envars': efcsenvars,
'cycledef': 'gefs',
'command': f'{self.HOMEgfs}/jobs/rocoto/efcs.sh',
'job_name': f'{self.pslot}_{task_name}_@H',
'log': f'{self.rotdir}/logs/@Y@m@d@H/{task_name}.log',
'maxtries': '&MAXTRIES;'
}

metatask_dict = {'task_name': 'efmn',
'var_dict': var_dict,
'task_dict': task_dict
}

task = rocoto.create_task(metatask_dict)

return task
Loading

0 comments on commit ef6827d

Please sign in to comment.