diff --git a/workflow/applications.py b/workflow/applications.py index e91475e36f..f6889ac72f 100644 --- a/workflow/applications.py +++ b/workflow/applications.py @@ -105,6 +105,8 @@ def __init__(self, configuration: Configuration) -> None: self.do_wafs = _base.get('DO_WAFS', False) self.do_vrfy = _base.get('DO_VRFY', True) self.do_metp = _base.get('DO_METP', False) + self.do_jedivar = _base.get('DO_JEDIVAR', False) + self.do_jediens = _base.get('DO_JEDIENS', False) self.do_hpssarch = _base.get('HPSSARCH', False) @@ -170,15 +172,27 @@ def _cycled_configs(self): Returns the config_files that are involved in the cycled app """ - configs = ['prep', - 'anal', 'sfcanl', 'analdiag', 'analcalc', - 'fcst', 'post', 'vrfy', 'arch'] + configs = ['prep'] +## 'sfcanl', 'analcalc', +## 'fcst', 'post', 'vrfy', 'arch'] + if self.do_jedivar: + configs += ['atmanalprep', 'atmanalrun', 'atmanalpost'] + else: + configs += ['anal', 'analdiag'] + + configs += ['sfcanl', 'analcalc', 'fcst', 'post', 'vrfy', 'arch'] + + if self.do_gldas: configs += ['gldas'] if self.do_hybvar: - configs += ['eobs', 'eomg', 'ediag', 'eupd', 'ecen', 'esfc', 'efcs', 'echgres', 'epos', 'earc'] + if self.do_jediens: + configs += ['atmensanalprep', 'atmensanalrun', 'atmensanalpost'] + else: + configs += ['eobs', 'eomg', 'ediag', 'eupd'] + configs += ['ecen', 'esfc', 'efcs', 'echgres', 'epos', 'earc'] if self.do_metp: configs += ['metp'] @@ -316,10 +330,17 @@ def _get_cycled_task_names(self): This is the place where that order is set. """ - gdas_gfs_common_tasks_before_fcst = ['prep', 'anal', 'sfcanl', 'analcalc'] + gdas_gfs_common_tasks_before_fcst = ['prep'] gdas_gfs_common_tasks_after_fcst = ['post', 'vrfy'] gdas_gfs_common_cleanup_tasks = ['arch'] + if self.do_jedivar: + gdas_gfs_common_tasks_before_fcst += ['atmanalprep', 'atmanalrun', 'atmanalpost'] + else: + gdas_gfs_common_tasks_before_fcst += ['anal'] + + gdas_gfs_common_tasks_before_fcst += ['sfcanl', 'analcalc'] + gldas_tasks = ['gldas'] wave_prep_tasks = ['waveinit', 'waveprep'] wave_bndpnt_tasks = ['wavepostbndpnt', 'wavepostbndpntbll'] @@ -328,12 +349,19 @@ def _get_cycled_task_names(self): hybrid_gdas_or_gfs_tasks = [] hybrid_gdas_tasks = [] if self.do_hybvar: - hybrid_gdas_or_gfs_tasks += ['eobs', 'eupd', 'echgres'] - hybrid_gdas_or_gfs_tasks += ['ediag'] if self.lobsdiag_forenkf else ['eomg'] + if self.do_jediens: + hybrid_gdas_or_gfs_tasks += ['atmensanalprep', 'atmensanalrun', 'atmensanalpost', 'echgres'] + else: + hybrid_gdas_or_gfs_tasks += ['eobs', 'eupd', 'echgres'] + hybrid_gdas_or_gfs_tasks += ['ediag'] if self.lobsdiag_forenkf else ['eomg'] hybrid_gdas_tasks += ['ecen', 'esfc', 'efcs', 'epos', 'earc'] # Collect all "gdas" cycle tasks - gdas_tasks = gdas_gfs_common_tasks_before_fcst + ['analdiag'] + gdas_tasks = [] + if self.do_jedivar: + gdas_tasks += gdas_gfs_common_tasks_before_fcst + else: + gdas_tasks += gdas_gfs_common_tasks_before_fcst + ['analdiag'] if self.do_gldas: gdas_tasks += gldas_tasks diff --git a/workflow/rocoto/workflow_tasks.py b/workflow/rocoto/workflow_tasks.py index 8e356a2a52..e10f5f6779 100644 --- a/workflow/rocoto/workflow_tasks.py +++ b/workflow/rocoto/workflow_tasks.py @@ -12,8 +12,10 @@ class Tasks: SERVICE_TASKS = ['arch', 'earc', 'getic'] VALID_TASKS = ['aerosol_init', 'coupled_ic', 'getic', 'init', 'prep', 'anal', 'sfcanl', 'analcalc', 'analdiag', 'gldas', 'arch', + 'atmanalprep', 'atmanalrun', 'atmanalpost', 'earc', 'ecen', 'echgres', 'ediag', 'efcs', 'eobs', 'eomg', 'epos', 'esfc', 'eupd', + 'atmensanalprep', 'atmensanalrun', 'atmensanalpost', 'fcst', 'post', 'ocnpost', 'vrfy', 'metp', 'postsnd', 'awips', 'gempak', 'wafs', 'wafsblending', 'wafsblending0p25', @@ -376,6 +378,65 @@ def analdiag(self): return task + def atmanalprep(self): + + suffix = self._base["SUFFIX"] + dump_suffix = self._base["DUMP_SUFFIX"] + gfs_cyc = self._base["gfs_cyc"] + dmpdir = self._base["DMPDIR"] + gfs_enkf = True if self.app_config.do_hybvar and 'gfs' in self.app_config.eupd_cdumps else False + + deps = [] + dep_dict = {'type': 'metatask', 'name': f'{"gdas"}post', 'offset': '-06:00:00'} + deps.append(rocoto.add_dependency(dep_dict)) + data = f'&ROTDIR;/gdas.@Y@m@d/@H/atmos/gdas.t@Hz.atmf009{suffix}' + dep_dict = {'type': 'data', 'data': data, 'offset': '-06:00:00'} + deps.append(rocoto.add_dependency(dep_dict)) + data = f'{dmpdir}/{self.cdump}{dump_suffix}.@Y@m@d/@H/{self.cdump}.t@Hz.updated.status.tm00.bufr_d' + dep_dict = {'type': 'data', 'data': data} + deps.append(rocoto.add_dependency(dep_dict)) + dependencies = rocoto.create_dependency(dep_condition='and', dep=deps) + + cycledef = self.cdump + if self.cdump in ['gfs'] and gfs_enkf and gfs_cyc != 4: + cycledef = 'gdas' + + resources = self.get_resource('atmanalprep') + task = create_wf_task('atmanalprep', resources, cdump=self.cdump, envar=self.envars, dependency=dependencies, + cycledef=cycledef) + return task + + def atmanalrun(self): + + deps = [] + dep_dict = {'type': 'task', 'name': f'{self.cdump}atmanalprep'} + deps.append(rocoto.add_dependency(dep_dict)) + if self.app_config.do_hybvar: + dep_dict = {'type': 'metatask', 'name': f'{"gdas"}epmn', 'offset': '-06:00:00'} + deps.append(rocoto.add_dependency(dep_dict)) + dependencies = rocoto.create_dependency(dep_condition='and', dep=deps) + else: + dependencies = rocoto.create_dependency(dep=deps) + + resources = self.get_resource('atmanalrun') + task = create_wf_task('atmanalrun', resources, cdump=self.cdump, envar=self.envars, dependency=dependencies) + + return task + + def atmanalpost(self): + + deps = [] + dep_dict = {'type': 'task', 'name': f'{self.cdump}atmanalrun'} + deps.append(rocoto.add_dependency(dep_dict)) + dep_dict = {'type': 'cycleexist', 'offset': '-06:00:00'} + deps.append(rocoto.add_dependency(dep_dict)) + dependencies = rocoto.create_dependency(dep_condition='and', dep=deps) + + resources = self.get_resource('atmanalpost') + task = create_wf_task('atmanalpost', resources, cdump=self.cdump, envar=self.envars, dependency=dependencies) + + return task + def gldas(self): deps = [] @@ -880,6 +941,61 @@ def eupd(self): return task + def atmensanalprep(self): + + suffix = self._base["SUFFIX"] + dump_suffix = self._base["DUMP_SUFFIX"] + gfs_cyc = self._base["gfs_cyc"] + dmpdir = self._base["DMPDIR"] + gfs_enkf = True if self.app_config.do_hybvar and 'gfs' in self.app_config.eupd_cdumps else False + + deps = [] + dep_dict = {'type': 'metatask', 'name': f'{"gdas"}post', 'offset': '-06:00:00'} + deps.append(rocoto.add_dependency(dep_dict)) + data = f'&ROTDIR;/gdas.@Y@m@d/@H/atmos/gdas.t@Hz.atmf009{suffix}' + dep_dict = {'type': 'data', 'data': data, 'offset': '-06:00:00'} + deps.append(rocoto.add_dependency(dep_dict)) + data = f'{dmpdir}/{self.cdump}{dump_suffix}.@Y@m@d/@H/{self.cdump}.t@Hz.updated.status.tm00.bufr_d' + dep_dict = {'type': 'data', 'data': data} + deps.append(rocoto.add_dependency(dep_dict)) + dependencies = rocoto.create_dependency(dep_condition='and', dep=deps) + + cycledef = self.cdump + if self.cdump in ['gfs'] and gfs_enkf and gfs_cyc != 4: + cycledef = 'gdas' + + resources = self.get_resource('atmensanalprep') + task = create_wf_task('atmensanalprep', resources, cdump=self.cdump, envar=self.envars, dependency=dependencies, + cycledef=cycledef) + + return task + + def atmensanalrun(self): + + deps = [] + dep_dict = {'type': 'task', 'name': f'{self.cdump}atmensanalprep'} + deps.append(rocoto.add_dependency(dep_dict)) + dep_dict = {'type': 'metatask', 'name': f'{"gdas"}epmn', 'offset': '-06:00:00'} + deps.append(rocoto.add_dependency(dep_dict)) + dependencies = rocoto.create_dependency(dep_condition='and', dep=deps) + + resources = self.get_resource('atmensanalrun') + task = create_wf_task('atmensanalrun', resources, cdump=self.cdump, envar=self.envars, dependency=dependencies) + + return task + + def atmensanalpost(self): + + deps = [] + dep_dict = {'type': 'task', 'name': f'{self.cdump}atmensanalrun'} + deps.append(rocoto.add_dependency(dep_dict)) + dependencies = rocoto.create_dependency(dep=deps) + + resources = self.get_resource('atmensanalpost') + task = create_wf_task('atmensanalpost', resources, cdump=self.cdump, envar=self.envars, dependency=dependencies) + + return task + def ecen(self): self._is_this_a_gdas_task(self.cdump, 'ecen')