Skip to content

Commit

Permalink
Staging job updates
Browse files Browse the repository at this point in the history
- Add new keys
- Remove determine function
- Update execute function to use single stage.yaml.j2

Refs NOAA-EMC#2475
  • Loading branch information
KateFriedman-NOAA committed Jun 17, 2024
1 parent 03b5024 commit f3c72b2
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 65 deletions.
10 changes: 3 additions & 7 deletions scripts/exglobal_stage_ic.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ def main():
# Pull out all the configuration keys needed to run stage job
keys = ['RUN', 'MODE', 'CASE', 'CASE_ENS', 'OCNRES', 'ICERES', 'waveGRD',
'EXP_WARM_START', 'current_cycle', 'CDUMP', 'rCDUMP',
'ROTDIR', 'PARMgfs', 'ICSDIR',
'ntiles', 'MEMDIR', 'USE_OCN_PERTURB_FILES',
'ROTDIR', 'PARMgfs', 'ICSDIR', 'DTG_PREFIX',
'ntiles', 'MEMDIR', 'REPLAY_ICS',
'DO_WAVE', 'DO_OCN', 'DO_ICE', 'DO_NEST', # TODO: Add DO_MED
'CPL_ATMIC', 'CPL_ICEIC', 'CPL_OCNIC', 'CPL_WAVIC']

Expand All @@ -37,12 +37,8 @@ def main():
# Add the os.path.exists function to the dict for yaml parsing
stage_dict['path_exists'] = os.path.exists

# Determine which ICs to stage
stage_sets = stage.determine_stage(stage_dict)

# Stage ICs
stage.execute_stage(stage_dict, stage_sets)

stage.execute_stage(stage_dict)

if __name__ == '__main__':
main()
64 changes: 6 additions & 58 deletions ush/python/pygfs/task/stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,69 +37,17 @@ def __init__(self, config: Dict[str, Any]) -> None:
self.task_config = AttrDict(**self.config, **self.runtime_config)

@logit(logger)
def determine_stage(self, stage_dict: Dict[str, Any]) -> List[str]:
"""Determine which initial condition files need to be placed in ROTDIR.
Parameters
----------
stage_dict : Dict[str, Any]
Task specific keys, e.g. runtime options (DO_WAVE, DO_ICE, etc)
Return
------
stage_sets : List
List of yamls to parse for staging
"""

stage_sets = []

if stage_dict.MODE == "cycled" and stage_dict.RUN == "gdas":
stage_sets.append("analysis.yaml.j2")

if stage_dict.EXP_WARM_START:
stage_sets.append("atmosphere_warm.yaml.j2")
else:
stage_sets.append("atmosphere_cold.yaml.j2")

if stage_dict.DO_WAVE:
stage_sets.append("wave.yaml.j2")

if stage_dict.DO_OCN:
stage_sets.append("ocean.yaml.j2")

if stage_dict.DO_ICE:
stage_sets.append("ice.yaml.j2")

if stage_dict.DO_NEST:
stage_sets.append("atmosphere_nest.yaml.j2")

return stage_sets

@logit(logger)
def execute_stage(self, stage_dict: Dict[str, Any], stage_sets: List) -> None:
def execute_stage(self, stage_dict: Dict[str, Any]) -> None:
"""Perform local staging of initial condition files.
Parameters
----------
stage_sets : List
List of stage sets to send to FileHandler
stage_set : Dict[str, Any]
FileHandler instructions to populate ROTDIR with
Return
------
None
"""

stage_parm = os.path.join(stage_dict.PARMgfs, "stage")

if not os.path.isdir(stage_dict.ROTDIR):
raise FileNotFoundError(f"FATAL ERROR: The ROTDIR ({stage_dict.ROTDIR}) does not exist!")

for set_yaml in stage_sets:
stage_parm = os.path.join(stage_dict.PARMgfs, "stage")

stage_set = parse_j2yaml(os.path.join(stage_parm, set_yaml), stage_dict)
stage_set = parse_j2yaml(os.path.join(stage_parm, "stage.yaml.j2"), stage_dict)

# Copy files to ROTDIR
for key in stage_set.keys():
FileHandler(stage_set[key]).sync()
# Copy files to ROTDIR
for key in stage_set.keys():
FileHandler(stage_set[key]).sync()

0 comments on commit f3c72b2

Please sign in to comment.