Skip to content

Commit

Permalink
Update UFSDA ATM ens for new COM directory structure (#1538)
Browse files Browse the repository at this point in the history
g-w PR #1421 changed the GFS COM directory structure. This PR updates UFSDA ATM ensemble analysis jobs and python script to work with the updated GFS COM directory structure.

Fixes #1518
  • Loading branch information
RussTreadon-NOAA committed Apr 28, 2023
1 parent 406d690 commit ec2dd3a
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 89 deletions.
12 changes: 5 additions & 7 deletions jobs/JGLOBAL_ATMENS_ANALYSIS_FINALIZE
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,17 @@ source "${HOMEgfs}/ush/jjob_header.sh" -e "atmensanlfinal" -c "base atmensanl at
##############################################
# Set variables used in the script
##############################################
GDATE=$(date +%Y%m%d%H -d "${PDY} ${cyc} - ${assim_freq} hours")
GDUMP="gdas"
GDUMP_ENS="enkf${GDUMP}"

##############################################
# Begin JOB SPECIFIC work
##############################################
# Generate COM variable from template
MEMDIR='ensstat' RUN=${GDUMP_ENS} YMD=${PDY} HH=${cyc} generate_com -rx \
COM_ATMOS_ANALYSIS_ENS:COM_ATMOS_ANALYSIS_TMPL

export COMOUT=${COMOUT:-${ROTDIR}/${RUN}.${PDY}/${cyc}}
mkdir -p "${COMOUT}"

# COMIN_GES and COMIN_GES_ENS are used in script
export COMIN_GES="${ROTDIR}/${GDUMP}.${GDATE:0:8}/${GDATE:8:2}/atmos"
export COMIN_GES_ENS="${ROTDIR}/enkf${GDUMP}.${GDATE:0:8}/${GDATE:8:2}"
mkdir -m 755 -p "${COM_ATMOS_ANALYSIS_ENS}"

###############################################################
# Run relevant script
Expand Down
13 changes: 7 additions & 6 deletions jobs/JGLOBAL_ATMENS_ANALYSIS_INITIALIZE
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,20 @@ source "${HOMEgfs}/ush/jjob_header.sh" -e "atmensanlinit" -c "base atmensanl atm
##############################################
# Set variables used in the script
##############################################
# shellcheck disable=SC2153
GDATE=$(date +%Y%m%d%H -d "${PDY} ${cyc} - ${assim_freq} hours")
gPDY=${GDATE:0:8}
gcyc=${GDATE:8:2}
GDUMP="gdas"


##############################################
# Begin JOB SPECIFIC work
##############################################
export COMOUT=${COMOUT:-${ROTDIR}/${RUN}.${PDY}/${cyc}}
mkdir -p "${COMOUT}"
# Generate COM variables from templates
RUN=${GDUMP} YMD=${PDY} HH=${cyc} generate_com -rx COM_OBS

# COMIN_GES and COMIN_GES_ENS are used in script
export COMIN_GES="${ROTDIR}/${GDUMP}.${GDATE:0:8}/${GDATE:8:2}/atmos"
export COMIN_GES_ENS="${ROTDIR}/enkf${GDUMP}.${GDATE:0:8}/${GDATE:8:2}"
RUN=${GDUMP} YMD=${gPDY} HH=${gcyc} generate_com -rx \
COM_ATMOS_ANALYSIS_PREV:COM_ATMOS_ANALYSIS_TMPL

###############################################################
# Run relevant script
Expand Down
3 changes: 0 additions & 3 deletions jobs/JGLOBAL_ATMENS_ANALYSIS_RUN
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@ source "${HOMEgfs}/ush/jjob_header.sh" -e "atmensanlrun" -c "base atmensanl atme
# Begin JOB SPECIFIC work
##############################################

export COMOUT=${COMOUT:-${ROTDIR}/${RUN}.${PDY}/${cyc}}
mkdir -p "${COMOUT}"

###############################################################
# Run relevant script

Expand Down
185 changes: 112 additions & 73 deletions ush/python/pygfs/task/atmens_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@

from pygw.attrdict import AttrDict
from pygw.file_utils import FileHandler
from pygw.timetools import add_to_datetime, to_fv3time, to_timedelta, to_YMDH
from pygw.timetools import add_to_datetime, to_fv3time, to_timedelta, to_YMDH, to_YMD
from pygw.fsutils import rm_p, chdir
from pygw.yaml_file import parse_yamltmpl, parse_j2yaml, save_as_yaml
from pygw.logger import logit
from pygw.executable import Executable
from pygw.exceptions import WorkflowException
from pygw.template import Template, TemplateConstants
from pygfs.task.analysis import Analysis

logger = getLogger(__name__.split('.')[-1])
Expand Down Expand Up @@ -45,8 +46,6 @@ def __init__(self, config):
'npz_anl': self.config.LEVS - 1,
'ATM_WINDOW_BEGIN': _window_begin,
'ATM_WINDOW_LENGTH': f"PT{self.config.assim_freq}H",
'comin_ges_atm': self.config.COMIN_GES,
'comin_ges_atmens': self.config.COMIN_GES_ENS,
'OPREFIX': f"{self.config.EUPD_CYC}.t{self.runtime_config.cyc:02d}z.", # TODO: CDUMP is being replaced by RUN
'APREFIX': f"{self.runtime_config.CDUMP}.t{self.runtime_config.cyc:02d}z.", # TODO: CDUMP is being replaced by RUN
'GPREFIX': f"gdas.t{self.runtime_config.previous_cycle.hour:02d}z.",
Expand Down Expand Up @@ -79,6 +78,27 @@ def initialize(self: Analysis) -> None:
"""
super().initialize()

# Make member directories in DATA for background and in DATA and ROTDIR for analysis files
# create template dictionary for output member analysis directories
template_inc = self.task_config.COM_ATMOS_ANALYSIS_TMPL
tmpl_inc_dict = {
'ROTDIR': self.task_config.ROTDIR,
'RUN': self.task_config.RUN,
'YMD': to_YMD(self.task_config.current_cycle),
'HH': self.task_config.current_cycle.strftime('%H')
}
dirlist = []
for imem in range(1, self.task_config.NMEM_ENKF + 1):
dirlist.append(os.path.join(self.task_config.DATA, 'bkg', f'mem{imem:03d}'))
dirlist.append(os.path.join(self.task_config.DATA, 'anl', f'mem{imem:03d}'))

# create output directory path for member analysis
tmpl_inc_dict['MEMDIR'] = f"mem{imem:03d}"
incdir = Template.substitute_structure(template_inc, TemplateConstants.DOLLAR_CURLY_BRACE, tmpl_inc_dict.get)
dirlist.append(incdir)

FileHandler({'mkdir': dirlist}).sync()

# stage CRTM fix files
crtm_fix_list_path = os.path.join(self.task_config.HOMEgfs, 'parm', 'parm_gdas', 'atm_crtm_coeff.yaml')
logger.debug(f"Staging CRTM fix files from {crtm_fix_list_path}")
Expand All @@ -92,7 +112,7 @@ def initialize(self: Analysis) -> None:
FileHandler(jedi_fix_list).sync()

# stage backgrounds
FileHandler(self.get_bkg_dict(AttrDict(self.task_config))).sync()
FileHandler(self.get_bkg_dict()).sync()

# generate ensemble da YAML file
logger.debug(f"Generate ensemble da YAML file: {self.task_config.fv3jedi_yaml}")
Expand All @@ -108,13 +128,6 @@ def initialize(self: Analysis) -> None:
]
FileHandler({'mkdir': newdirs}).sync()

# Make directories for member analysis files
anldir = []
for imem in range(1, self.task_config.NMEM_ENKF + 1):
memchar = f"mem{imem:03d}"
anldir.append(os.path.join(self.task_config.DATA, 'anl', f'mem{imem:03d}'))
FileHandler({'mkdir': anldir}).sync()

@logit(logger)
def execute(self: Analysis) -> None:
"""Execute a global atmens analysis
Expand Down Expand Up @@ -169,7 +182,7 @@ def finalize(self: Analysis) -> None:
"""
# ---- tar up diags
# path of output tar statfile
atmensstat = os.path.join(self.task_config.COMOUT, f"{self.task_config.APREFIX}atmensstat")
atmensstat = os.path.join(self.task_config.COM_ATMOS_ANALYSIS_ENS, f"{self.task_config.APREFIX}atmensstat")

# get list of diag files to put in tarball
diags = glob.glob(os.path.join(self.task_config.DATA, 'diags', 'diag*nc4'))
Expand All @@ -190,12 +203,12 @@ def finalize(self: Analysis) -> None:
archive.add(diaggzip, arcname=os.path.basename(diaggzip))

# copy full YAML from executable to ROTDIR
logger.info(f"Copying {self.task_config.fv3jedi_yaml} to {self.task_config.COMOUT}")
logger.info(f"Copying {self.task_config.fv3jedi_yaml} to {self.task_config.COM_ATMOS_ANALYSIS_ENS}")
src = os.path.join(self.task_config.DATA, f"{self.task_config.CDUMP}.t{self.task_config.cyc:02d}z.atmens.yaml")
dest = os.path.join(self.task_config.COMOUT, f"{self.task_config.CDUMP}.t{self.task_config.cyc:02d}z.atmens.yaml")
dest = os.path.join(self.task_config.COM_ATMOS_ANALYSIS_ENS, f"{self.task_config.CDUMP}.t{self.task_config.cyc:02d}z.atmens.yaml")
logger.debug(f"Copying {src} to {dest}")
yaml_copy = {
'mkdir': [self.task_config.COMOUT],
'mkdir': [self.task_config.COM_ATMOS_ANALYSIS_ENS],
'copy': [[src, dest]]
}
FileHandler(yaml_copy).sync()
Expand All @@ -207,55 +220,6 @@ def finalize(self: Analysis) -> None:
def clean(self):
super().clean()

@logit(logger)
def get_bkg_dict(self, task_config: Dict[str, Any]) -> Dict[str, List[str]]:
"""Compile a dictionary of model background files to copy
This method constructs a dictionary of FV3 RESTART files (coupler, core, tracer)
that are needed for global atmens DA and returns said dictionary for use by the FileHandler class.
Parameters
----------
task_config: Dict
a dictionary containing all of the configuration needed for the task
Returns
----------
bkg_dict: Dict
a dictionary containing the list of model background files to copy for FileHandler
"""
# NOTE for now this is FV3 RESTART files and just assumed to be fh006
# loop over ensemble members
dirlist = []
bkglist = []
for imem in range(1, task_config.NMEM_ENKF + 1):
memchar = f"mem{imem:03d}"

# accumulate directory list for member restart files
dirlist.append(os.path.join(task_config.DATA, 'bkg', memchar))

# get FV3 RESTART files, this will be a lot simpler when using history files
rst_dir = os.path.join(task_config.comin_ges_atmens, memchar, 'atmos/RESTART')
run_dir = os.path.join(task_config.DATA, 'bkg', memchar)

# atmens DA needs coupler
basename = f'{to_fv3time(task_config.current_cycle)}.coupler.res'
bkglist.append([os.path.join(rst_dir, basename), os.path.join(task_config.DATA, 'bkg', memchar, basename)])

# atmens DA needs core, srf_wnd, tracer, phy_data, sfc_data
for ftype in ['fv_core.res', 'fv_srf_wnd.res', 'fv_tracer.res', 'phy_data', 'sfc_data']:
template = f'{to_fv3time(self.task_config.current_cycle)}.{ftype}.tile{{tilenum}}.nc'
for itile in range(1, task_config.ntiles + 1):
basename = template.format(tilenum=itile)
bkglist.append([os.path.join(rst_dir, basename), os.path.join(run_dir, basename)])

bkg_dict = {
'mkdir': dirlist,
'copy': bkglist,
}

return bkg_dict

@logit(logger)
def jedi2fv3inc(self: Analysis) -> None:
"""Generate UFS model readable analysis increment
Expand Down Expand Up @@ -285,21 +249,37 @@ def jedi2fv3inc(self: Analysis) -> None:
# Reference the python script which does the actual work
incpy = os.path.join(self.task_config.HOMEgfs, 'ush/jediinc2fv3.py')

# create template dictionaries
template_inc = self.task_config.COM_ATMOS_ANALYSIS_TMPL
tmpl_inc_dict = {
'ROTDIR': self.task_config.ROTDIR,
'RUN': self.task_config.RUN,
'YMD': to_YMD(self.task_config.current_cycle),
'HH': self.task_config.current_cycle.strftime('%H')
}

template_ges = self.task_config.COM_ATMOS_HISTORY_TMPL
tmpl_ges_dict = {
'ROTDIR': self.task_config.ROTDIR,
'RUN': self.task_config.RUN,
'YMD': to_YMD(self.task_config.previous_cycle),
'HH': self.task_config.previous_cycle.strftime('%H')
}

# loop over ensemble members
for imem in range(1, self.task_config.NMEM_ENKF + 1):
memchar = f"mem{imem:03d}"

# make output directory for member increment
incdir = [
os.path.join(self.task_config.COMOUT, memchar, 'atmos')
]
FileHandler({'mkdir': incdir}).sync()
# create output path for member analysis increment
tmpl_inc_dict['MEMDIR'] = memchar
incdir = Template.substitute_structure(template_inc, TemplateConstants.DOLLAR_CURLY_BRACE, tmpl_inc_dict.get)

# rewrite UFS-DA atmens increments
atmges_fv3 = os.path.join(self.task_config.COMIN_GES_ENS, memchar, 'atmos',
f"{self.task_config.CDUMP}.t{self.runtime_config.previous_cycle.hour:02d}z.atmf006.nc")
tmpl_ges_dict['MEMDIR'] = memchar
gesdir = Template.substitute_structure(template_ges, TemplateConstants.DOLLAR_CURLY_BRACE, tmpl_ges_dict.get)
atmges_fv3 = os.path.join(gesdir, f"{self.task_config.CDUMP}.t{self.task_config.previous_cycle.hour:02d}z.atmf006.nc")
atminc_jedi = os.path.join(self.task_config.DATA, 'anl', memchar, f'atminc.{cdate_inc}z.nc4')
atminc_fv3 = os.path.join(self.task_config.COMOUT, memchar, 'atmos',
f"{self.task_config.CDUMP}.t{self.runtime_config.cyc:02d}z.atminc.nc")
atminc_fv3 = os.path.join(incdir, f"{self.task_config.CDUMP}.t{self.task_config.cyc:02d}z.atminc.nc")

# Execute incpy to create the UFS model atm increment file
# TODO: use MPMD or parallelize with mpi4py
Expand All @@ -310,3 +290,62 @@ def jedi2fv3inc(self: Analysis) -> None:
cmd.add_default_arg(atminc_fv3)
logger.debug(f"Executing {cmd}")
cmd(output='stdout', error='stderr')

@logit(logger)
def get_bkg_dict(self: Analysis) -> Dict[str, List[str]]:
"""Compile a dictionary of model background files to copy
This method constructs a dictionary of ensemble FV3 restart files (coupler, core, tracer)
that are needed for global atmens DA and returns said dictionary for use by the FileHandler class.
Parameters
----------
None
Returns
----------
bkg_dict: Dict
a dictionary containing the list of model background files to copy for FileHandler
"""
# NOTE for now this is FV3 restart files and just assumed to be fh006
# loop over ensemble members
rstlist = []
bkglist = []

# get FV3 restart files, this will be a lot simpler when using history files
template_res = self.task_config.COM_ATMOS_RESTART_TMPL
tmpl_res_dict = {
'ROTDIR': self.task_config.ROTDIR,
'RUN': self.task_config.RUN,
'YMD': to_YMD(self.task_config.previous_cycle),
'HH': self.task_config.previous_cycle.strftime('%H'),
'MEMDIR': None
}

for imem in range(1, self.task_config.NMEM_ENKF + 1):
memchar = f"mem{imem:03d}"

# get FV3 restart files, this will be a lot simpler when using history files
tmpl_res_dict['MEMDIR'] = memchar
rst_dir = Template.substitute_structure(template_res, TemplateConstants.DOLLAR_CURLY_BRACE, tmpl_res_dict.get)
rstlist.append(rst_dir)

run_dir = os.path.join(self.task_config.DATA, 'bkg', memchar)

# atmens DA needs coupler
basename = f'{to_fv3time(self.task_config.current_cycle)}.coupler.res'
bkglist.append([os.path.join(rst_dir, basename), os.path.join(self.task_config.DATA, 'bkg', memchar, basename)])

# atmens DA needs core, srf_wnd, tracer, phy_data, sfc_data
for ftype in ['fv_core.res', 'fv_srf_wnd.res', 'fv_tracer.res', 'phy_data', 'sfc_data']:
template = f'{to_fv3time(self.task_config.current_cycle)}.{ftype}.tile{{tilenum}}.nc'
for itile in range(1, self.task_config.ntiles + 1):
basename = template.format(tilenum=itile)
bkglist.append([os.path.join(rst_dir, basename), os.path.join(run_dir, basename)])

bkg_dict = {
'mkdir': rstlist,
'copy': bkglist,
}

return bkg_dict

0 comments on commit ec2dd3a

Please sign in to comment.