Skip to content

Commit

Permalink
Updates to stage.py
Browse files Browse the repository at this point in the history
- General cleanup
- Rework determine_stage function
- Rework execute_stage function

Refs NOAA-EMC#2475
  • Loading branch information
KateFriedman-NOAA committed May 30, 2024
1 parent 3e754c1 commit b625906
Showing 1 changed file with 31 additions and 36 deletions.
67 changes: 31 additions & 36 deletions ush/python/pygfs/task/stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def __init__(self, config: Dict[str, Any]) -> None:
self.task_config = AttrDict(**self.config, **self.runtime_config)

@logit(logger)
def determine_stage(self, stage_dict: Dict[str, Any]) -> (Dict[str, Any], List[Dict[str, Any]]):
def determine_stage(self, stage_dict: Dict[str, Any]) -> List[str]:
"""Determine which initial condition files need to be placed in ROTDIR.
Parameters
Expand All @@ -49,52 +49,39 @@ def determine_stage(self, stage_dict: Dict[str, Any]) -> (Dict[str, Any], List[D
Return
------
stage_set : Dict[str, Any]
Set of FileHandler instructions to copy files to the ROTDIR
stage_sets : List
List of yamls to parse for staging
"""

stage_parm = os.path.join(stage_dict.PARMgfs, "stage")

# Add the os.path.exists function to the dict for yaml parsing
stage_dict['path_exists'] = os.path.exists
stage_sets = []

if not os.path.isdir(stage_dict.ROTDIR):
raise FileNotFoundError(f"FATAL ERROR: The ROTDIR ({stage_dict.ROTDIR}) does not exist!")

if stage_dict.RUN == "gfs" or stage_dict.RUN == "gdas":
if stage_dict.MODE == "cycled":
master_yaml = "master_cycled.yaml.j2"
elif stage_dict.MODE == "forecast-only":
#master_yaml = "master_forecast_only.yaml.j2"
master_yaml = "fv3_cold.yaml.j2"
elif stage_dict.RUN == "gefs":
raise NotImplementedError("FATAL ERROR: Staging is not yet set up for GEFS runs")
elif stage_dict.RUN == "sfs":
raise NotImplementedError("FATAL ERROR: Staging is not yet set up for SFS runs")
if stage_dict.EXP_WARM_START:
stage_sets.append("fv3_warm.yaml.j2")
else:
raise ValueError(f"FATAL ERROR: Staging is not enabled for {stage_dict.RUN} runs")

#parsed_sets = parse_j2yaml(os.path.join(stage_parm, master_yaml), stage_dict)
stage_set = parse_j2yaml(os.path.join(stage_parm, master_yaml), stage_dict)
#print(f'parsed_sets = {parsed_sets}')
stage_sets.append("fv3_cold.yaml.j2")

#stage_sets = []
if stage_dict.DO_WAVE:
stage_sets.append("wave.yaml.j2")

#for dataset in parsed_sets.datasets.values():
if stage_dict.DO_OCN:
stage_sets.append("ocean.yaml.j2")

# dataset["fileset"] = Stage._create_fileset(dataset)
if stage_dict.DO_ICE:
stage_sets.append("ice.yaml.j2")

# stage_sets.append(dataset)
if stage_dict.DO_NEST:
stage_sets.append("fv3_nest.yaml.j2")

#return stage_sets
return stage_set
return stage_sets

@logit(logger)
def execute_stage(self, stage_set: Dict[str, Any]) -> None:
def execute_stage(self, stage_dict: Dict[str, Any], stage_sets: List) -> None:
"""Perform local staging of initial condition files.
Parameters
----------
stage_sets : List
List of stage sets to send to FileHandler
stage_set : Dict[str, Any]
FileHandler instructions to populate ROTDIR with
Expand All @@ -103,7 +90,15 @@ def execute_stage(self, stage_set: Dict[str, Any]) -> None:
None
"""

# Copy files to ROTDIR
for key in stage_set.keys():
# print(f'key = {key}')
FileHandler(stage_set[key]).sync()
stage_parm = os.path.join(stage_dict.PARMgfs, "stage")

if not os.path.isdir(stage_dict.ROTDIR):
raise FileNotFoundError(f"FATAL ERROR: The ROTDIR ({stage_dict.ROTDIR}) does not exist!")

for set_yaml in stage_sets:

stage_set = parse_j2yaml(os.path.join(stage_parm, set_yaml), stage_dict)

# Copy files to ROTDIR
for key in stage_set.keys():
FileHandler(stage_set[key]).sync()

0 comments on commit b625906

Please sign in to comment.