From ec2dd3ab8fb78c4e0aff7d81cb64095fe6130ad6 Mon Sep 17 00:00:00 2001 From: RussTreadon-NOAA <26926959+RussTreadon-NOAA@users.noreply.github.com> Date: Fri, 28 Apr 2023 14:45:09 -0400 Subject: [PATCH] Update UFSDA ATM ens for new COM directory structure (#1538) g-w PR #1421 changed the GFS COM directory structure. This PR updates UFSDA ATM ensemble analysis jobs and python script to work with the updated GFS COM directory structure. Fixes #1518 --- jobs/JGLOBAL_ATMENS_ANALYSIS_FINALIZE | 12 +- jobs/JGLOBAL_ATMENS_ANALYSIS_INITIALIZE | 13 +- jobs/JGLOBAL_ATMENS_ANALYSIS_RUN | 3 - ush/python/pygfs/task/atmens_analysis.py | 185 ++++++++++++++--------- 4 files changed, 124 insertions(+), 89 deletions(-) diff --git a/jobs/JGLOBAL_ATMENS_ANALYSIS_FINALIZE b/jobs/JGLOBAL_ATMENS_ANALYSIS_FINALIZE index d40d79cf78..37a49e0ae0 100755 --- a/jobs/JGLOBAL_ATMENS_ANALYSIS_FINALIZE +++ b/jobs/JGLOBAL_ATMENS_ANALYSIS_FINALIZE @@ -8,19 +8,17 @@ source "${HOMEgfs}/ush/jjob_header.sh" -e "atmensanlfinal" -c "base atmensanl at ############################################## # Set variables used in the script ############################################## -GDATE=$(date +%Y%m%d%H -d "${PDY} ${cyc} - ${assim_freq} hours") GDUMP="gdas" +GDUMP_ENS="enkf${GDUMP}" ############################################## # Begin JOB SPECIFIC work ############################################## +# Generate COM variable from template +MEMDIR='ensstat' RUN=${GDUMP_ENS} YMD=${PDY} HH=${cyc} generate_com -rx \ + COM_ATMOS_ANALYSIS_ENS:COM_ATMOS_ANALYSIS_TMPL -export COMOUT=${COMOUT:-${ROTDIR}/${RUN}.${PDY}/${cyc}} -mkdir -p "${COMOUT}" - -# COMIN_GES and COMIN_GES_ENS are used in script -export COMIN_GES="${ROTDIR}/${GDUMP}.${GDATE:0:8}/${GDATE:8:2}/atmos" -export COMIN_GES_ENS="${ROTDIR}/enkf${GDUMP}.${GDATE:0:8}/${GDATE:8:2}" +mkdir -m 755 -p "${COM_ATMOS_ANALYSIS_ENS}" ############################################################### # Run relevant script diff --git a/jobs/JGLOBAL_ATMENS_ANALYSIS_INITIALIZE b/jobs/JGLOBAL_ATMENS_ANALYSIS_INITIALIZE index dca7d0ffc6..246502cdfa 100755 --- a/jobs/JGLOBAL_ATMENS_ANALYSIS_INITIALIZE +++ b/jobs/JGLOBAL_ATMENS_ANALYSIS_INITIALIZE @@ -7,19 +7,20 @@ source "${HOMEgfs}/ush/jjob_header.sh" -e "atmensanlinit" -c "base atmensanl atm ############################################## # Set variables used in the script ############################################## +# shellcheck disable=SC2153 GDATE=$(date +%Y%m%d%H -d "${PDY} ${cyc} - ${assim_freq} hours") +gPDY=${GDATE:0:8} +gcyc=${GDATE:8:2} GDUMP="gdas" - ############################################## # Begin JOB SPECIFIC work ############################################## -export COMOUT=${COMOUT:-${ROTDIR}/${RUN}.${PDY}/${cyc}} -mkdir -p "${COMOUT}" +# Generate COM variables from templates +RUN=${GDUMP} YMD=${PDY} HH=${cyc} generate_com -rx COM_OBS -# COMIN_GES and COMIN_GES_ENS are used in script -export COMIN_GES="${ROTDIR}/${GDUMP}.${GDATE:0:8}/${GDATE:8:2}/atmos" -export COMIN_GES_ENS="${ROTDIR}/enkf${GDUMP}.${GDATE:0:8}/${GDATE:8:2}" +RUN=${GDUMP} YMD=${gPDY} HH=${gcyc} generate_com -rx \ + COM_ATMOS_ANALYSIS_PREV:COM_ATMOS_ANALYSIS_TMPL ############################################################### # Run relevant script diff --git a/jobs/JGLOBAL_ATMENS_ANALYSIS_RUN b/jobs/JGLOBAL_ATMENS_ANALYSIS_RUN index 5a267f197a..0d10c76b05 100755 --- a/jobs/JGLOBAL_ATMENS_ANALYSIS_RUN +++ b/jobs/JGLOBAL_ATMENS_ANALYSIS_RUN @@ -13,9 +13,6 @@ source "${HOMEgfs}/ush/jjob_header.sh" -e "atmensanlrun" -c "base atmensanl atme # Begin JOB SPECIFIC work ############################################## -export COMOUT=${COMOUT:-${ROTDIR}/${RUN}.${PDY}/${cyc}} -mkdir -p "${COMOUT}" - ############################################################### # Run relevant script diff --git a/ush/python/pygfs/task/atmens_analysis.py b/ush/python/pygfs/task/atmens_analysis.py index 636129d3ee..28b121644a 100644 --- a/ush/python/pygfs/task/atmens_analysis.py +++ b/ush/python/pygfs/task/atmens_analysis.py @@ -9,12 +9,13 @@ from pygw.attrdict import AttrDict from pygw.file_utils import FileHandler -from pygw.timetools import add_to_datetime, to_fv3time, to_timedelta, to_YMDH +from pygw.timetools import add_to_datetime, to_fv3time, to_timedelta, to_YMDH, to_YMD from pygw.fsutils import rm_p, chdir from pygw.yaml_file import parse_yamltmpl, parse_j2yaml, save_as_yaml from pygw.logger import logit from pygw.executable import Executable from pygw.exceptions import WorkflowException +from pygw.template import Template, TemplateConstants from pygfs.task.analysis import Analysis logger = getLogger(__name__.split('.')[-1]) @@ -45,8 +46,6 @@ def __init__(self, config): 'npz_anl': self.config.LEVS - 1, 'ATM_WINDOW_BEGIN': _window_begin, 'ATM_WINDOW_LENGTH': f"PT{self.config.assim_freq}H", - 'comin_ges_atm': self.config.COMIN_GES, - 'comin_ges_atmens': self.config.COMIN_GES_ENS, 'OPREFIX': f"{self.config.EUPD_CYC}.t{self.runtime_config.cyc:02d}z.", # TODO: CDUMP is being replaced by RUN 'APREFIX': f"{self.runtime_config.CDUMP}.t{self.runtime_config.cyc:02d}z.", # TODO: CDUMP is being replaced by RUN 'GPREFIX': f"gdas.t{self.runtime_config.previous_cycle.hour:02d}z.", @@ -79,6 +78,27 @@ def initialize(self: Analysis) -> None: """ super().initialize() + # Make member directories in DATA for background and in DATA and ROTDIR for analysis files + # create template dictionary for output member analysis directories + template_inc = self.task_config.COM_ATMOS_ANALYSIS_TMPL + tmpl_inc_dict = { + 'ROTDIR': self.task_config.ROTDIR, + 'RUN': self.task_config.RUN, + 'YMD': to_YMD(self.task_config.current_cycle), + 'HH': self.task_config.current_cycle.strftime('%H') + } + dirlist = [] + for imem in range(1, self.task_config.NMEM_ENKF + 1): + dirlist.append(os.path.join(self.task_config.DATA, 'bkg', f'mem{imem:03d}')) + dirlist.append(os.path.join(self.task_config.DATA, 'anl', f'mem{imem:03d}')) + + # create output directory path for member analysis + tmpl_inc_dict['MEMDIR'] = f"mem{imem:03d}" + incdir = Template.substitute_structure(template_inc, TemplateConstants.DOLLAR_CURLY_BRACE, tmpl_inc_dict.get) + dirlist.append(incdir) + + FileHandler({'mkdir': dirlist}).sync() + # stage CRTM fix files crtm_fix_list_path = os.path.join(self.task_config.HOMEgfs, 'parm', 'parm_gdas', 'atm_crtm_coeff.yaml') logger.debug(f"Staging CRTM fix files from {crtm_fix_list_path}") @@ -92,7 +112,7 @@ def initialize(self: Analysis) -> None: FileHandler(jedi_fix_list).sync() # stage backgrounds - FileHandler(self.get_bkg_dict(AttrDict(self.task_config))).sync() + FileHandler(self.get_bkg_dict()).sync() # generate ensemble da YAML file logger.debug(f"Generate ensemble da YAML file: {self.task_config.fv3jedi_yaml}") @@ -108,13 +128,6 @@ def initialize(self: Analysis) -> None: ] FileHandler({'mkdir': newdirs}).sync() - # Make directories for member analysis files - anldir = [] - for imem in range(1, self.task_config.NMEM_ENKF + 1): - memchar = f"mem{imem:03d}" - anldir.append(os.path.join(self.task_config.DATA, 'anl', f'mem{imem:03d}')) - FileHandler({'mkdir': anldir}).sync() - @logit(logger) def execute(self: Analysis) -> None: """Execute a global atmens analysis @@ -169,7 +182,7 @@ def finalize(self: Analysis) -> None: """ # ---- tar up diags # path of output tar statfile - atmensstat = os.path.join(self.task_config.COMOUT, f"{self.task_config.APREFIX}atmensstat") + atmensstat = os.path.join(self.task_config.COM_ATMOS_ANALYSIS_ENS, f"{self.task_config.APREFIX}atmensstat") # get list of diag files to put in tarball diags = glob.glob(os.path.join(self.task_config.DATA, 'diags', 'diag*nc4')) @@ -190,12 +203,12 @@ def finalize(self: Analysis) -> None: archive.add(diaggzip, arcname=os.path.basename(diaggzip)) # copy full YAML from executable to ROTDIR - logger.info(f"Copying {self.task_config.fv3jedi_yaml} to {self.task_config.COMOUT}") + logger.info(f"Copying {self.task_config.fv3jedi_yaml} to {self.task_config.COM_ATMOS_ANALYSIS_ENS}") src = os.path.join(self.task_config.DATA, f"{self.task_config.CDUMP}.t{self.task_config.cyc:02d}z.atmens.yaml") - dest = os.path.join(self.task_config.COMOUT, f"{self.task_config.CDUMP}.t{self.task_config.cyc:02d}z.atmens.yaml") + dest = os.path.join(self.task_config.COM_ATMOS_ANALYSIS_ENS, f"{self.task_config.CDUMP}.t{self.task_config.cyc:02d}z.atmens.yaml") logger.debug(f"Copying {src} to {dest}") yaml_copy = { - 'mkdir': [self.task_config.COMOUT], + 'mkdir': [self.task_config.COM_ATMOS_ANALYSIS_ENS], 'copy': [[src, dest]] } FileHandler(yaml_copy).sync() @@ -207,55 +220,6 @@ def finalize(self: Analysis) -> None: def clean(self): super().clean() - @logit(logger) - def get_bkg_dict(self, task_config: Dict[str, Any]) -> Dict[str, List[str]]: - """Compile a dictionary of model background files to copy - - This method constructs a dictionary of FV3 RESTART files (coupler, core, tracer) - that are needed for global atmens DA and returns said dictionary for use by the FileHandler class. - - Parameters - ---------- - task_config: Dict - a dictionary containing all of the configuration needed for the task - - Returns - ---------- - bkg_dict: Dict - a dictionary containing the list of model background files to copy for FileHandler - """ - # NOTE for now this is FV3 RESTART files and just assumed to be fh006 - # loop over ensemble members - dirlist = [] - bkglist = [] - for imem in range(1, task_config.NMEM_ENKF + 1): - memchar = f"mem{imem:03d}" - - # accumulate directory list for member restart files - dirlist.append(os.path.join(task_config.DATA, 'bkg', memchar)) - - # get FV3 RESTART files, this will be a lot simpler when using history files - rst_dir = os.path.join(task_config.comin_ges_atmens, memchar, 'atmos/RESTART') - run_dir = os.path.join(task_config.DATA, 'bkg', memchar) - - # atmens DA needs coupler - basename = f'{to_fv3time(task_config.current_cycle)}.coupler.res' - bkglist.append([os.path.join(rst_dir, basename), os.path.join(task_config.DATA, 'bkg', memchar, basename)]) - - # atmens DA needs core, srf_wnd, tracer, phy_data, sfc_data - for ftype in ['fv_core.res', 'fv_srf_wnd.res', 'fv_tracer.res', 'phy_data', 'sfc_data']: - template = f'{to_fv3time(self.task_config.current_cycle)}.{ftype}.tile{{tilenum}}.nc' - for itile in range(1, task_config.ntiles + 1): - basename = template.format(tilenum=itile) - bkglist.append([os.path.join(rst_dir, basename), os.path.join(run_dir, basename)]) - - bkg_dict = { - 'mkdir': dirlist, - 'copy': bkglist, - } - - return bkg_dict - @logit(logger) def jedi2fv3inc(self: Analysis) -> None: """Generate UFS model readable analysis increment @@ -285,21 +249,37 @@ def jedi2fv3inc(self: Analysis) -> None: # Reference the python script which does the actual work incpy = os.path.join(self.task_config.HOMEgfs, 'ush/jediinc2fv3.py') + # create template dictionaries + template_inc = self.task_config.COM_ATMOS_ANALYSIS_TMPL + tmpl_inc_dict = { + 'ROTDIR': self.task_config.ROTDIR, + 'RUN': self.task_config.RUN, + 'YMD': to_YMD(self.task_config.current_cycle), + 'HH': self.task_config.current_cycle.strftime('%H') + } + + template_ges = self.task_config.COM_ATMOS_HISTORY_TMPL + tmpl_ges_dict = { + 'ROTDIR': self.task_config.ROTDIR, + 'RUN': self.task_config.RUN, + 'YMD': to_YMD(self.task_config.previous_cycle), + 'HH': self.task_config.previous_cycle.strftime('%H') + } + + # loop over ensemble members for imem in range(1, self.task_config.NMEM_ENKF + 1): memchar = f"mem{imem:03d}" - # make output directory for member increment - incdir = [ - os.path.join(self.task_config.COMOUT, memchar, 'atmos') - ] - FileHandler({'mkdir': incdir}).sync() + # create output path for member analysis increment + tmpl_inc_dict['MEMDIR'] = memchar + incdir = Template.substitute_structure(template_inc, TemplateConstants.DOLLAR_CURLY_BRACE, tmpl_inc_dict.get) # rewrite UFS-DA atmens increments - atmges_fv3 = os.path.join(self.task_config.COMIN_GES_ENS, memchar, 'atmos', - f"{self.task_config.CDUMP}.t{self.runtime_config.previous_cycle.hour:02d}z.atmf006.nc") + tmpl_ges_dict['MEMDIR'] = memchar + gesdir = Template.substitute_structure(template_ges, TemplateConstants.DOLLAR_CURLY_BRACE, tmpl_ges_dict.get) + atmges_fv3 = os.path.join(gesdir, f"{self.task_config.CDUMP}.t{self.task_config.previous_cycle.hour:02d}z.atmf006.nc") atminc_jedi = os.path.join(self.task_config.DATA, 'anl', memchar, f'atminc.{cdate_inc}z.nc4') - atminc_fv3 = os.path.join(self.task_config.COMOUT, memchar, 'atmos', - f"{self.task_config.CDUMP}.t{self.runtime_config.cyc:02d}z.atminc.nc") + atminc_fv3 = os.path.join(incdir, f"{self.task_config.CDUMP}.t{self.task_config.cyc:02d}z.atminc.nc") # Execute incpy to create the UFS model atm increment file # TODO: use MPMD or parallelize with mpi4py @@ -310,3 +290,62 @@ def jedi2fv3inc(self: Analysis) -> None: cmd.add_default_arg(atminc_fv3) logger.debug(f"Executing {cmd}") cmd(output='stdout', error='stderr') + + @logit(logger) + def get_bkg_dict(self: Analysis) -> Dict[str, List[str]]: + """Compile a dictionary of model background files to copy + + This method constructs a dictionary of ensemble FV3 restart files (coupler, core, tracer) + that are needed for global atmens DA and returns said dictionary for use by the FileHandler class. + + Parameters + ---------- + None + + Returns + ---------- + bkg_dict: Dict + a dictionary containing the list of model background files to copy for FileHandler + """ + # NOTE for now this is FV3 restart files and just assumed to be fh006 + # loop over ensemble members + rstlist = [] + bkglist = [] + + # get FV3 restart files, this will be a lot simpler when using history files + template_res = self.task_config.COM_ATMOS_RESTART_TMPL + tmpl_res_dict = { + 'ROTDIR': self.task_config.ROTDIR, + 'RUN': self.task_config.RUN, + 'YMD': to_YMD(self.task_config.previous_cycle), + 'HH': self.task_config.previous_cycle.strftime('%H'), + 'MEMDIR': None + } + + for imem in range(1, self.task_config.NMEM_ENKF + 1): + memchar = f"mem{imem:03d}" + + # get FV3 restart files, this will be a lot simpler when using history files + tmpl_res_dict['MEMDIR'] = memchar + rst_dir = Template.substitute_structure(template_res, TemplateConstants.DOLLAR_CURLY_BRACE, tmpl_res_dict.get) + rstlist.append(rst_dir) + + run_dir = os.path.join(self.task_config.DATA, 'bkg', memchar) + + # atmens DA needs coupler + basename = f'{to_fv3time(self.task_config.current_cycle)}.coupler.res' + bkglist.append([os.path.join(rst_dir, basename), os.path.join(self.task_config.DATA, 'bkg', memchar, basename)]) + + # atmens DA needs core, srf_wnd, tracer, phy_data, sfc_data + for ftype in ['fv_core.res', 'fv_srf_wnd.res', 'fv_tracer.res', 'phy_data', 'sfc_data']: + template = f'{to_fv3time(self.task_config.current_cycle)}.{ftype}.tile{{tilenum}}.nc' + for itile in range(1, self.task_config.ntiles + 1): + basename = template.format(tilenum=itile) + bkglist.append([os.path.join(rst_dir, basename), os.path.join(run_dir, basename)]) + + bkg_dict = { + 'mkdir': rstlist, + 'copy': bkglist, + } + + return bkg_dict