Skip to content

Commit

Permalink
Update stage python scripts
Browse files Browse the repository at this point in the history
  • Loading branch information
KateFriedman-NOAA committed May 22, 2024
1 parent 4861ed1 commit b7a8c82
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 39 deletions.
23 changes: 15 additions & 8 deletions scripts/exglobal_stage_ic.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@ def main():
stage = Stage(config)

#Pull out all the configuration keys needed to run stage job
keys = ['RUN','MODE','EXP_WARM_START','CDUMP','rCDUMP',
keys = ['RUN','MODE','current_cycle','EXP_WARM_START','CDUMP','rCDUMP',
'ROTDIR','PARMgfs',
'ntiles',
'BASE_CPLIC','waveGRD','OCNRES','USE_OCN_PERTURB_FILES',
'CPL_ATMIC','CPL_ICEIC','CPL_MEDIC','CPL_OCNIC','CPL_WAVIC']
'BASE_CPLIC','waveGRD','OCNRES',
#TODO: GEFS only#'USE_OCN_PERTURB_FILES',
#TODO: Need this#'CPL_MEDIC',
'CPL_ATMIC','CPL_ICEIC','CPL_OCNIC','CPL_WAVIC']

stage_dict = AttrDict()
for key in keys:
Expand All @@ -32,19 +35,23 @@ def main():
stage_dict[key] = stage.task_config[key]

#TEST PRINT
#for key in stage_dict:
# print(f'{key} = {stage_dict[key]}')
for key in stage_dict:
print(f'{key} = {stage_dict[key]}')

cwd = os.getcwd()

os.chdir(config.ROTDIR)

# Determine which ICs to stage
stage_set = stage.determine(stage_dict)
#stage_sets = stage.determine_stage(stage_dict)
stage_set = stage.determine_stage(stage_dict)

# Stage ICs
# TODO - create and invoke copies
stage.execute(stage_set)
#for stage_set in stage_sets:
# print(f'set = {stage_set}')
# stage.execute_stage(stage_set)
print(f'set = {stage_set}')
stage.execute_stage(stage_set)

os.chdir(cwd)

Expand Down
105 changes: 74 additions & 31 deletions ush/python/pygfs/task/stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,9 @@
from logging import getLogger
from typing import Any, Dict, List

from wxflow import (AttrDict,
FileHandler,
Hsi,
Htar,
Task,
cast_strdict_as_dtypedict,
chgrp,
get_gid,
logit,
mkdir_p,
parse_j2yaml,
rm_p,
strftime,
to_YMD,
to_YMDH,
Template,
TemplateConstants)
from wxflow import (AttrDict, FileHandler, Hsi, Htar, Task, cast_strdict_as_dtypedict,
chgrp, get_gid, logit, mkdir_p, parse_j2yaml, rm_p, strftime,
to_YMD, to_YMDH, Template, TemplateConstants)

logger = getLogger(__name__.split('.')[-1])

Expand Down Expand Up @@ -83,14 +69,54 @@ def _gen_relative_paths(self, root_path: str) -> Dict:

return rel_path_dict

@staticmethod
@logit(logger)
def determine(self, stage_dict: Dict[str, Any]) -> (Dict[str, Any], List[Dict[str, Any]]):
def _create_fileset(stage_set: Dict[str, Any]) -> List:
"""
Collect the list of all available files from the parsed yaml dict.
Globs are expanded and if required files are missing, an error is
raised.
TODO: expand all globs in the jinja yaml files instead of expanding
them here and issue errors here if globbing patterns (*, ?, [])
are found.
Parameters
----------
stage_set: Dict
Contains full paths for required and optional files to be staged.
"""

fileset = []
if "required" in stage_set:
if stage_set.required is not None:
for item in stage_set.required:
glob_set = glob.glob(item)
if len(glob_set) == 0:
raise FileNotFoundError(f"FATAL ERROR: Required file, directory, or glob {item} not found!")
for entry in glob_set:
fileset.append(entry)

if "optional" in stage_set:
if stage_set.optional is not None:
for item in stage_set.optional:
glob_set = glob.glob(item)
if len(glob_set) == 0:
logger.warning(f"WARNING: optional file/glob {item} not found!")
else:
for entry in glob_set:
fileset.append(entry)

return fileset

@logit(logger)
def determine_stage(self, stage_dict: Dict[str, Any]) -> (Dict[str, Any], List[Dict[str, Any]]):
"""Determine which initial condition files need to be placed in ROTDIR.
Parameters
----------
stage_dict : Dict[str, Any]
Task specific keys, e.g. runtime options (DO_AERO, DO_ICE, etc)
Task specific keys, e.g. runtime options (DO_WAVE, DO_ICE, etc)
Return
------
Expand All @@ -100,10 +126,6 @@ def determine(self, stage_dict: Dict[str, Any]) -> (Dict[str, Any], List[Dict[st

stage_parm = os.path.join(stage_dict.PARMgfs, "stage")

# Add the glob.glob function for capturing log filenames
# TODO remove this kludge once log filenames are explicit
stage_dict['glob'] = glob.glob

# Add the os.path.exists function to the dict for yaml parsing
stage_dict['path_exists'] = os.path.exists

Expand All @@ -114,24 +136,45 @@ def determine(self, stage_dict: Dict[str, Any]) -> (Dict[str, Any], List[Dict[st
if stage_dict.MODE == "cycled":
master_yaml = "master_cycled.yaml.j2"
elif stage_dict.MODE == "forecast-only":
master_yaml = "master_forecast_only.yaml.j2"
#master_yaml = "master_forecast_only.yaml.j2"
master_yaml = "fv3_cold.yaml.j2"
elif stage_dict.RUN == "gefs":
raise NotImplementedError("FATAL ERROR: Staging is not yet set up for GEFS runs")
elif stage_dict.RUN == "sfs":
raise NotImplementedError("FATAL ERROR: Staging is not yet set up for SFS runs")
else:
raise ValueError(f"FATAL ERROR: Staging is not enabled for {stage_dict.RUN} runs")

parsed_sets = parse_j2yaml(os.path.join(stage_parm, master_yaml), stage_dict)
#parsed_sets = parse_j2yaml(os.path.join(stage_parm, master_yaml), stage_dict)
stage_set = parse_j2yaml(os.path.join(stage_parm, master_yaml), stage_dict)
#print(f'parsed_sets = {parsed_sets}')

#stage_sets = []

stage_sets = []
#for dataset in parsed_sets.datasets.values():

for dataset in parsed_sets.datasets.values():
# dataset["fileset"] = Stage._create_fileset(dataset)

dataset["fileset"] = Stage._create_fileset(dataset)
# stage_sets.append(dataset)

stage_sets.append(dataset)
#return stage_sets
return stage_set

return stage_sets
@logit(logger)
def execute_stage(self, stage_set: Dict[str, Any]) -> None:
"""Perform local staging of initial condition files.
Parameters
----------
stage_set : Dict[str, Any]
FileHandler instructions to populate ROTDIR with
Return
------
None
"""

#TODO - create def for staging
# Copy files to ROTDIR
for key in stage_set.keys():
# print(f'key = {key}')
FileHandler(stage_set[key]).sync()

0 comments on commit b7a8c82

Please sign in to comment.