diff --git a/jobs/JGLOBAL_AERO_ANALYSIS_FINALIZE b/jobs/JGLOBAL_AERO_ANALYSIS_FINALIZE new file mode 100755 index 0000000000..b4d35da5b4 --- /dev/null +++ b/jobs/JGLOBAL_AERO_ANALYSIS_FINALIZE @@ -0,0 +1,62 @@ +#! /usr/bin/env bash + +source "${HOMEgfs}/ush/preamble.sh" +export WIPE_DATA="NO" +export DATA=${DATA:-${DATAROOT}/${RUN}aeroanl_${cyc}} +source "${HOMEgfs}/ush/jjob_header.sh" -e "aeroanlfinal" -c "base aeroanl aeroanlfinal" + +############################################## +# Set variables used in the script +############################################## +export CDATE=${CDATE:-${PDY}${cyc}} +export CDUMP=${CDUMP:-${RUN:-"gfs"}} + + +############################################## +# Begin JOB SPECIFIC work +############################################## + +GDATE=$(date +%Y%m%d%H -d "${CDATE:0:8} ${CDATE:8:2} - ${assim_freq} hours") +export GDATE +gPDY=${GDATE:0:8} +export gcyc=${GDATE:8:2} +export GDUMP=${GDUMP:-"gdas"} + +export OPREFIX="${CDUMP}.t${cyc}z." +export GPREFIX="${GDUMP}.t${gcyc}z." +export APREFIX="${CDUMP}.t${cyc}z." + +export COMOUT=${COMOUT:-${ROTDIR}/${CDUMP}.${PDY}/${cyc}/chem} + +mkdir -p "${COMOUT}" + +# COMIN_GES and COMIN_GES_ENS are used in script +export COMIN_GES="${ROTDIR}/${GDUMP}.${gPDY}/${gcyc}/chem" +export COMIN_GES_ENS="${ROTDIR}/enkf${GDUMP}.${gPDY}/${gcyc}/chem" + +############################################################### +# Run relevant script + +EXSCRIPT=${GDASAEROFINALPY:-${HOMEgfs}/scripts/exglobal_aero_analysis_finalize.py} +${EXSCRIPT} +status=$? +[[ ${status} -ne 0 ]] && exit "${status}" + +############################################## +# End JOB SPECIFIC work +############################################## + +############################################## +# Final processing +############################################## +if [[ -e "${pgmout}" ]] ; then + cat "${pgmout}" +fi + +########################################## +# Remove the Temporary working directory +########################################## +cd "${DATAROOT}" || exit 1 +[[ ${KEEPDATA} = "NO" ]] && rm -rf "${DATA}" + +exit 0 diff --git a/jobs/JGLOBAL_AERO_ANALYSIS_INITIALIZE b/jobs/JGLOBAL_AERO_ANALYSIS_INITIALIZE new file mode 100755 index 0000000000..c73b149fe8 --- /dev/null +++ b/jobs/JGLOBAL_AERO_ANALYSIS_INITIALIZE @@ -0,0 +1,55 @@ +#! /usr/bin/env bash + +source "${HOMEgfs}/ush/preamble.sh" +export DATA=${DATA:-${DATAROOT}/${RUN}aeroanl_${cyc}} +source "${HOMEgfs}/ush/jjob_header.sh" -e "aeroanlinit" -c "base aeroanl aeroanlinit" + +############################################## +# Set variables used in the script +############################################## +export CDATE=${CDATE:-${PDY}${cyc}} +export CDUMP=${CDUMP:-${RUN:-"gfs"}} + + +############################################## +# Begin JOB SPECIFIC work +############################################## + +GDATE=$(date +%Y%m%d%H -d "${CDATE:0:8} ${CDATE:8:2} - ${assim_freq} hours") +export GDATE +gPDY=${GDATE:0:8} +export gcyc=${GDATE:8:2} +export GDUMP=${GDUMP:-"gdas"} + +export OPREFIX="${CDUMP}.t${cyc}z." +export GPREFIX="${GDUMP}.t${gcyc}z." +export APREFIX="${CDUMP}.t${cyc}z." + +export COMOUT=${COMOUT:-${ROTDIR}/${CDUMP}.${PDY}/${cyc}/chem} + +mkdir -p "${COMOUT}" + +# COMIN_GES and COMIN_GES_ENS are used in script +export COMIN_GES="${ROTDIR}/${GDUMP}.${gPDY}/${gcyc}/chem" +export COMIN_GES_ENS="${ROTDIR}/enkf${GDUMP}.${gPDY}/${gcyc}/chem" + +############################################################### +# Run relevant script + +EXSCRIPT=${GDASAEROINITPY:-${HOMEgfs}/scripts/exglobal_aero_analysis_initialize.py} +${EXSCRIPT} +status=$? +[[ ${status} -ne 0 ]] && exit "${status}" + +############################################## +# End JOB SPECIFIC work +############################################## + +############################################## +# Final processing +############################################## +if [[ -e "${pgmout}" ]] ; then + cat "${pgmout}" +fi + +exit 0 diff --git a/jobs/JGLOBAL_AERO_ANALYSIS_RUN b/jobs/JGLOBAL_AERO_ANALYSIS_RUN new file mode 100755 index 0000000000..4cd8cc2497 --- /dev/null +++ b/jobs/JGLOBAL_AERO_ANALYSIS_RUN @@ -0,0 +1,56 @@ +#! /usr/bin/env bash + +source "${HOMEgfs}/ush/preamble.sh" +export WIPE_DATA="NO" +export DATA=${DATA:-${DATAROOT}/${RUN}aeroanl_${cyc}} +source "${HOMEgfs}/ush/jjob_header.sh" -e "aeroanlrun" -c "base aeroanl aeroanlrun" + +############################################## +# Set variables used in the script +############################################## +export CDATE=${CDATE:-${PDY}${cyc}} +export CDUMP=${CDUMP:-${RUN:-"gfs"}} + + +############################################## +# Begin JOB SPECIFIC work +############################################## + +GDATE=$(date +%Y%m%d%H -d "${CDATE:0:8} ${CDATE:8:2} - ${assim_freq} hours") +export GDATE +gPDY=${GDATE:0:8} +export gcyc=${GDATE:8:2} +export GDUMP=${GDUMP:-"gdas"} + +export OPREFIX="${CDUMP}.t${cyc}z." +export GPREFIX="${GDUMP}.t${gcyc}z." +export APREFIX="${CDUMP}.t${cyc}z." + +export COMOUT=${COMOUT:-${ROTDIR}/${CDUMP}.${PDY}/${cyc}/chem} + +mkdir -p "${COMOUT}" + +# COMIN_GES and COMIN_GES_ENS are used in script +export COMIN_GES="${ROTDIR}/${GDUMP}.${gPDY}/${gcyc}/chem" +export COMIN_GES_ENS="${ROTDIR}/enkf${GDUMP}.${gPDY}/${gcyc}/chem" + +############################################################### +# Run relevant script + +EXSCRIPT=${GDASAERORUNSH:-${HOMEgfs}/scripts/exglobal_aero_analysis_run.sh} +${EXSCRIPT} +status=$? +[[ ${status} -ne 0 ]] && exit "${status}" + +############################################## +# End JOB SPECIFIC work +############################################## + +############################################## +# Final processing +############################################## +if [[ -e "${pgmout}" ]] ; then + cat "${pgmout}" +fi + +exit 0 diff --git a/jobs/rocoto/aeroanlfinal.sh b/jobs/rocoto/aeroanlfinal.sh index 3056959ac8..8f5a445de4 100755 --- a/jobs/rocoto/aeroanlfinal.sh +++ b/jobs/rocoto/aeroanlfinal.sh @@ -1,15 +1,23 @@ #! /usr/bin/env bash -source "$HOMEgfs/ush/preamble.sh" +source "${HOMEgfs}/ush/preamble.sh" ############################################################### -# Source FV3GFS workflow modules -. $HOMEgfs/ush/load_fv3gfs_modules.sh +# Source UFSDA workflow modules +. "${HOMEgfs}/ush/load_ufsda_modules.sh" status=$? -[[ $status -ne 0 ]] && exit $status +[[ ${status} -ne 0 ]] && exit "${status}" export job="aeroanlfinal" export jobid="${job}.$$" ############################################################### -echo "Do nothing for now" +# setup python path for workflow utilities and tasks +pygwPATH="${HOMEgfs}/ush/python:${HOMEgfs}/ush/python/pygw/src" +PYTHONPATH="${PYTHONPATH:+${PYTHONPATH}:}${pygwPATH}" +export PYTHONPATH +############################################################### +# Execute the JJOB +"${HOMEgfs}/jobs/JGLOBAL_AERO_ANALYSIS_FINALIZE" +status=$? +exit "${status}" diff --git a/jobs/rocoto/aeroanlinit.sh b/jobs/rocoto/aeroanlinit.sh index a3d0238369..4e3d32ff9f 100755 --- a/jobs/rocoto/aeroanlinit.sh +++ b/jobs/rocoto/aeroanlinit.sh @@ -1,15 +1,24 @@ #! /usr/bin/env bash -source "$HOMEgfs/ush/preamble.sh" +source "${HOMEgfs}/ush/preamble.sh" ############################################################### -# Source FV3GFS workflow modules -. $HOMEgfs/ush/load_fv3gfs_modules.sh +# Source UFSDA workflow modules +. "${HOMEgfs}/ush/load_ufsda_modules.sh" status=$? -[[ $status -ne 0 ]] && exit $status +[[ ${status} -ne 0 ]] && exit "${status}" export job="aeroanlinit" export jobid="${job}.$$" ############################################################### -echo "Do nothing for now" +# setup python path for workflow utilities and tasks +pygwPATH="${HOMEgfs}/ush/python:${HOMEgfs}/ush/python/pygw/src" +PYTHONPATH="${PYTHONPATH:+${PYTHONPATH}:}${pygwPATH}" +export PYTHONPATH + +############################################################### +# Execute the JJOB +"${HOMEgfs}/jobs/JGLOBAL_AERO_ANALYSIS_INITIALIZE" +status=$? +exit "${status}" diff --git a/jobs/rocoto/aeroanlrun.sh b/jobs/rocoto/aeroanlrun.sh index 5dc731a94c..529bb2d7d1 100755 --- a/jobs/rocoto/aeroanlrun.sh +++ b/jobs/rocoto/aeroanlrun.sh @@ -1,15 +1,18 @@ #! /usr/bin/env bash -source "$HOMEgfs/ush/preamble.sh" +source "${HOMEgfs}/ush/preamble.sh" ############################################################### -# Source FV3GFS workflow modules -. $HOMEgfs/ush/load_fv3gfs_modules.sh +# Source UFSDA workflow modules +. "${HOMEgfs}/ush/load_ufsda_modules.sh" status=$? -[[ $status -ne 0 ]] && exit $status +[[ ${status} -ne 0 ]] && exit "${status}" export job="aeroanlrun" export jobid="${job}.$$" ############################################################### -echo "Do nothing for now" +# Execute the JJOB +"${HOMEgfs}/jobs/JGLOBAL_AERO_ANALYSIS_RUN" +status=$? +exit "${status}" diff --git a/parm/config/config.aeroanl b/parm/config/config.aeroanl index edba514721..8607a0ae8f 100644 --- a/parm/config/config.aeroanl +++ b/parm/config/config.aeroanl @@ -5,13 +5,20 @@ echo "BEGIN: config.aeroanl" -export OBS_YAML_DIR=$HOMEgfs/sorc/gdas.cd/parm/aero/obs/config/ -export OBS_LIST=$HOMEgfs/sorc/gdas.cd/parm/aero/obs/lists/aero_prototype.yaml -export AEROVARYAML=$HOMEgfs/sorc/gdas.cd/parm/aero/variational/3dvar_dripcg.yaml -export BERROR_YAML=$HOMEgfs/sorc/gdas.cd/parm/aero/berror/static_bump.yaml -export FV3JEDI_FIX=$HOMEgfs/fix/gdas +export CASE_ANL=${CASE} +export OBS_YAML_DIR=${HOMEgfs}/sorc/gdas.cd/parm/aero/obs/config/ +export OBS_LIST=${HOMEgfs}/sorc/gdas.cd/parm/aero/obs/lists/gdas_aero_prototype.yaml +export AEROVARYAML=${HOMEgfs}/sorc/gdas.cd/parm/aero/variational/3dvar_gfs_aero.yaml +export STATICB_TYPE='identity' +export BERROR_YAML=${HOMEgfs}/sorc/gdas.cd/parm/aero/berror/staticb_${STATICB_TYPE}.yaml +export FV3JEDI_FIX=${HOMEgfs}/fix/gdas +export BERROR_DATA_DIR=${FV3JEDI_FIX}/bump/aero/${CASE_ANL}/ +export BERROR_DATE="20160630.000000" export io_layout_x=@IO_LAYOUT_X@ export io_layout_y=@IO_LAYOUT_Y@ +export JEDIVAREXE=${HOMEgfs}/exec/fv3jedi_var.x +export CRTM_VER="2.3.0" + echo "END: config.aeroanl" diff --git a/parm/config/config.aeroanlrun b/parm/config/config.aeroanlrun index 537f33081f..da13df2831 100644 --- a/parm/config/config.aeroanlrun +++ b/parm/config/config.aeroanlrun @@ -8,7 +8,4 @@ echo "BEGIN: config.aeroanlrun" # Get task specific resources . $EXPDIR/config.resources aeroanlrun -# Task specific variables -export JEDIVAREXE=$HOMEgfs/exec/fv3jedi_var.x - echo "END: config.aeroanlrun" diff --git a/parm/config/config.resources b/parm/config/config.resources index ac2e407d12..617b0cc2c9 100644 --- a/parm/config/config.resources +++ b/parm/config/config.resources @@ -193,6 +193,28 @@ elif [[ "${step}" = "atmanalpost" ]]; then elif [[ "${step}" = "aeroanlinit" ]]; then + # below lines are for creating JEDI YAML + case ${CASE} in + C768) + layout_x=6 + layout_y=6 + ;; + C384) + layout_x=5 + layout_y=5 + ;; + C192 | C96 | C48) + layout_x=8 + layout_y=8 + ;; + *) + echo "FATAL ERROR: Resolution not supported for aerosol analysis'" + exit 1 + esac + + export layout_x + export layout_y + export wtime_aeroanlinit="00:10:00" export npe_aeroanlinit=1 export nth_aeroanlinit=1 @@ -212,14 +234,17 @@ elif [[ "${step}" = "aeroanlrun" ]]; then layout_y=5 ;; C192 | C96 | C48) - layout_x=3 - layout_y=3 + layout_x=8 + layout_y=8 ;; *) echo "FATAL: Resolution not supported'" exit 1 esac + export layout_x + export layout_y + export wtime_aeroanlrun="00:30:00" npe_aeroanlrun=$(echo "${layout_x} * ${layout_y} * 6" | bc) export npe_aeroanlrun diff --git a/parm/parm_gdas/aero_crtm_coeff.yaml b/parm/parm_gdas/aero_crtm_coeff.yaml new file mode 100644 index 0000000000..350eb93f61 --- /dev/null +++ b/parm/parm_gdas/aero_crtm_coeff.yaml @@ -0,0 +1,23 @@ +mkdir: +- !ENV ${DATA}/crtm/ +copy: +- - !ENV ${FV3JEDI_FIX}/crtm/${CRTM_VER}/AerosolCoeff.bin + - !ENV ${DATA}/crtm/AerosolCoeff.bin +- - !ENV ${FV3JEDI_FIX}/crtm/${CRTM_VER}/CloudCoeff.bin + - !ENV ${DATA}/crtm/CloudCoeff.bin +- - !ENV ${FV3JEDI_FIX}/crtm/${CRTM_VER}/v.viirs-m_npp.SpcCoeff.bin + - !ENV ${DATA}/crtm/v.viirs-m_npp.SpcCoeff.bin +- - !ENV ${FV3JEDI_FIX}/crtm/${CRTM_VER}/v.viirs-m_npp.TauCoeff.bin + - !ENV ${DATA}/crtm/v.viirs-m_npp.TauCoeff.bin +- - !ENV ${FV3JEDI_FIX}/crtm/${CRTM_VER}/v.viirs-m_j1.SpcCoeff.bin + - !ENV ${DATA}/crtm/v.viirs-m_j1.SpcCoeff.bin +- - !ENV ${FV3JEDI_FIX}/crtm/${CRTM_VER}/v.viirs-m_j1.TauCoeff.bin + - !ENV ${DATA}/crtm/v.viirs-m_j1.TauCoeff.bin +- - !ENV ${FV3JEDI_FIX}/crtm/${CRTM_VER}/NPOESS.VISice.EmisCoeff.bin + - !ENV ${DATA}/crtm/NPOESS.VISice.EmisCoeff.bin +- - !ENV ${FV3JEDI_FIX}/crtm/${CRTM_VER}/NPOESS.VISland.EmisCoeff.bin + - !ENV ${DATA}/crtm/NPOESS.VISland.EmisCoeff.bin +- - !ENV ${FV3JEDI_FIX}/crtm/${CRTM_VER}/NPOESS.VISsnow.EmisCoeff.bin + - !ENV ${DATA}/crtm/NPOESS.VISsnow.EmisCoeff.bin +- - !ENV ${FV3JEDI_FIX}/crtm/${CRTM_VER}/NPOESS.VISwater.EmisCoeff.bin + - !ENV ${DATA}/crtm/NPOESS.VISwater.EmisCoeff.bin diff --git a/parm/parm_gdas/aero_jedi_fix.yaml b/parm/parm_gdas/aero_jedi_fix.yaml new file mode 100644 index 0000000000..31ece4ff8f --- /dev/null +++ b/parm/parm_gdas/aero_jedi_fix.yaml @@ -0,0 +1,11 @@ +mkdir: +- !ENV ${DATA}/fv3jedi +copy: +- - !ENV ${FV3JEDI_FIX}/fv3jedi/fv3files/akbk$(npz).nc4 + - !ENV ${DATA}/fv3jedi/akbk.nc4 +- - !ENV ${FV3JEDI_FIX}/fv3jedi/fv3files/fmsmpp.nml + - !ENV ${DATA}/fv3jedi/fmsmpp.nml +- - !ENV ${FV3JEDI_FIX}/fv3jedi/fv3files/field_table_gfdl + - !ENV ${DATA}/fv3jedi/field_table +- - !ENV ${FV3JEDI_FIX}/fv3jedi/fieldmetadata/gfs-aerosol.yaml + - !ENV ${DATA}/fv3jedi/gfs-restart.yaml diff --git a/parm/parm_gdas/aeroanl_inc_vars.yaml b/parm/parm_gdas/aeroanl_inc_vars.yaml new file mode 100644 index 0000000000..795b4cef04 --- /dev/null +++ b/parm/parm_gdas/aeroanl_inc_vars.yaml @@ -0,0 +1 @@ +['dust1', 'dust2', 'dust3', 'dust4', 'dust5', 'seas1', 'seas2', 'seas3', 'seas4', 'so4', 'oc1', 'oc2', 'bc1', 'bc2'] diff --git a/scripts/exglobal_aero_analysis_finalize.py b/scripts/exglobal_aero_analysis_finalize.py new file mode 100755 index 0000000000..eba47e710a --- /dev/null +++ b/scripts/exglobal_aero_analysis_finalize.py @@ -0,0 +1,25 @@ +#!/usr/bin/env python3 +# exgdas_global_aero_analysis_finalize.py +# This script creates an AerosolAnalysis class +# and runs the finalize method +# which perform post-processing and clean up activities +# for a global aerosol variational analysis +import os + +from pygw.logger import Logger, logit +from pygw.configuration import cast_strdict_as_dtypedict +from pygfs.task.aero_analysis import AerosolAnalysis + + +# Initialize root logger +logger = Logger(level='DEBUG', colored_log=True) + + +if __name__ == '__main__': + + # Take configuration from environment and cast it as python dictionary + config = cast_strdict_as_dtypedict(os.environ) + + # Instantiate the aerosol analysis task + AeroAnl = AerosolAnalysis(config) + AeroAnl.finalize() diff --git a/scripts/exglobal_aero_analysis_initialize.py b/scripts/exglobal_aero_analysis_initialize.py new file mode 100755 index 0000000000..bf0c61c8b9 --- /dev/null +++ b/scripts/exglobal_aero_analysis_initialize.py @@ -0,0 +1,25 @@ +#!/usr/bin/env python3 +# exgdas_global_aero_analysis_initialize.py +# This script creates an AerosolAnalysis class +# and runs the initialize method +# which create and stage the runtime directory +# and create the YAML configuration +# for a global aerosol variational analysis +import os + +from pygw.logger import Logger +from pygw.configuration import cast_strdict_as_dtypedict +from pygfs.task.aero_analysis import AerosolAnalysis + +# Initialize root logger +logger = Logger(level='DEBUG', colored_log=True) + + +if __name__ == '__main__': + + # Take configuration from environment and cast it as python dictionary + config = cast_strdict_as_dtypedict(os.environ) + + # Instantiate the aerosol analysis task + AeroAnl = AerosolAnalysis(config) + AeroAnl.initialize() diff --git a/scripts/exglobal_aero_analysis_run.sh b/scripts/exglobal_aero_analysis_run.sh new file mode 100755 index 0000000000..bbbdfc60f0 --- /dev/null +++ b/scripts/exglobal_aero_analysis_run.sh @@ -0,0 +1,15 @@ +#!/bin/bash +################################################################################ +# exgdas_global_aero_analysis_run.sh +# +# This script runs a global aerosol variational analysis with FV3-JEDI. +# It assumes the runtime directory has already been staged with the appropriate +# input files and YAML configuration (by the initialize script) before execution. +# +################################################################################ +# run executable +export pgm=${JEDIVAREXE} +. prep_step +${APRUN_AEROANL} "${DATA}/fv3jedi_var.x" "${DATA}/${CDUMP}.t${cyc}z.aerovar.yaml" 1>&1 2>&2 +export err=$?; err_chk +exit "${err}" diff --git a/sorc/build_gdas.sh b/sorc/build_gdas.sh index 9c7e024753..d612eb61a5 100755 --- a/sorc/build_gdas.sh +++ b/sorc/build_gdas.sh @@ -23,7 +23,7 @@ fi cd gdas.cd -BUILD_JOBS="${build_jobs}" ./build.sh -t "${MACHINE_ID}" +WORKFLOW_BUILD="ON" BUILD_JOBS="${build_jobs}" ./build.sh -t "${MACHINE_ID}" exit $? diff --git a/ush/load_ufsda_modules.sh b/ush/load_ufsda_modules.sh index 1c17b073b9..d4e525c613 100755 --- a/ush/load_ufsda_modules.sh +++ b/ush/load_ufsda_modules.sh @@ -41,17 +41,30 @@ elif [[ -d /lfs3 ]] ; then elif [[ -d /scratch1 ]] ; then # We are on NOAA Hera module load "${MODS}/hera" - if [[ "${DEBUG_WORKFLOW}" == "YES" ]] ; then + if [[ "${DEBUG_WORKFLOW:-NO}" == "YES" ]] ; then module list pip list fi + # set NETCDF variable based on ncdump location + NETCDF=$( which ncdump ) + export NETCDF + # prod_util stuff, find a better solution later... + module use /scratch2/NCEPDEV/nwprod/hpc-stack/libs/hpc-stack/modulefiles/compiler/intel/2022.1.2/ + module load prod_util elif [[ -d /work ]] ; then # We are on MSU Orion module load "${MODS}/orion" - if [[ "${DEBUG_WORKFLOW}" == "YES" ]] ; then + if [[ "${DEBUG_WORKFLOW:-NO}" == "YES" ]] ; then module list pip list fi + # set NETCDF variable based on ncdump location + ncdump=$( which ncdump ) + NETCDF=$( echo "${ncdump}" | cut -d " " -f 3 ) + export NETCDF + # prod_util stuff, find a better solution later... + module use /apps/contrib/NCEP/hpc-stack/libs/hpc-stack/modulefiles/compiler/intel/2022.1.2/ + module load prod_util elif [[ -d /glade ]] ; then # We are on NCAR Yellowstone echo WARNING: UFSDA NOT SUPPORTED ON THIS PLATFORM diff --git a/ush/python/pygfs/__init__.py b/ush/python/pygfs/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/ush/python/pygfs/task/__init__.py b/ush/python/pygfs/task/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/ush/python/pygfs/task/aero_analysis.py b/ush/python/pygfs/task/aero_analysis.py new file mode 100644 index 0000000000..c658943de5 --- /dev/null +++ b/ush/python/pygfs/task/aero_analysis.py @@ -0,0 +1,287 @@ +#!/usr/bin/env python3 + +import os +import glob +import gzip +import tarfile +from netCDF4 import Dataset +from logging import getLogger +from typing import Dict, List, Any + +from pygw.attrdict import AttrDict +from pygw.file_utils import FileHandler +from pygw.timetools import to_isotime, to_fv3time, to_timedelta +from pygw.fsutils import rm_p +from pygw.template import Template, TemplateConstants +from pygw.yaml_file import YAMLFile +from pygw.logger import logit +from pygfs.task.analysis import Analysis + +logger = getLogger(__name__.split('.')[-1]) + + +class AerosolAnalysis(Analysis): + """ + Class for global aerosol analysis tasks + """ + @logit(logger, name="AerosolAnalysis") + def __init__(self, config): + super().__init__(config) + + _res = int(self.config['CASE'][1:]) + _res_enkf = int(self.config['CASE_ENKF'][1:]) + _window_begin = self.runtime_config.current_cycle - to_timedelta(f"{self.config['assim_freq']}H") / 2 + + # Create a local dictionary that is repeatedly used across this class + local_dict = AttrDict( + { + 'npx_ges': _res + 1, + 'npy_ges': _res + 1, + 'npz_ges': self.config.LEVS - 1, + 'npz': self.config.LEVS - 1, + 'npx_anl': _res_enkf + 1, + 'npy_anl': _res_enkf + 1, + 'npz_anl': self.config['LEVS'] - 1, + 'AERO_WINDOW_BEGIN': to_isotime(_window_begin), + 'AERO_WINDOW_LENGTH': f"PT{self.config['assim_freq']}H", + 'BKG_ISOTIME': to_isotime(self.runtime_config.current_cycle), + 'BKG_YYYYmmddHHMMSS': to_fv3time(self.runtime_config.current_cycle), + 'cdate_fv3': to_fv3time(self.runtime_config.current_cycle), + 'comin_ges_atm': self.config.COMIN_GES.replace('chem', 'atmos'), # 'chem' is COMPONENT, aerosol fields are in 'atmos' tracers + } + ) + + # task_config is everything that this task should need + self.task_config = AttrDict(**self.config, **self.runtime_config, **local_dict) + + @logit(logger) + def initialize(self: Analysis) -> None: + """Initialize a global aerosol analysis + + This method will initialize a global aerosol analysis using JEDI. + This includes: + - staging CRTM fix files + - staging FV3-JEDI fix files + - staging B error files + - staging model backgrounds + - generating a YAML file for the JEDI executable + - linking the JEDI executable (TODO make it copyable, requires JEDI fix) + - creating output directories + """ + super().initialize() + + # stage CRTM fix files + crtm_fix_list_path = os.path.join(self.task_config['HOMEgfs'], 'parm', 'parm_gdas', 'aero_crtm_coeff.yaml') + crtm_fix_list = YAMLFile(path=crtm_fix_list_path) + crtm_fix_list = Template.substitute_structure(crtm_fix_list, TemplateConstants.DOLLAR_PARENTHESES, self.task_config.get) + FileHandler(crtm_fix_list).sync() + + # stage fix files + jedi_fix_list_path = os.path.join(self.task_config['HOMEgfs'], 'parm', 'parm_gdas', 'aero_jedi_fix.yaml') + jedi_fix_list = YAMLFile(path=jedi_fix_list_path) + jedi_fix_list = Template.substitute_structure(jedi_fix_list, TemplateConstants.DOLLAR_PARENTHESES, self.task_config.get) + FileHandler(jedi_fix_list).sync() + + # stage berror files + # copy BUMP files, otherwise it will assume ID matrix + if self.task_config.get('STATICB_TYPE', 'bump_aero') in ['bump_aero']: + FileHandler(AerosolAnalysis.get_berror_dict(self.task_config)).sync() + + # stage backgrounds + FileHandler(self.get_bkg_dict(AttrDict(self.task_config, **self.task_config))).sync() + + # generate variational YAML file + yaml_out = os.path.join(self.task_config['DATA'], f"{self.task_config['CDUMP']}.t{self.runtime_config['cyc']:02d}z.aerovar.yaml") + varda_yaml = YAMLFile(path=self.task_config['AEROVARYAML']) + varda_yaml = Template.substitute_structure(varda_yaml, TemplateConstants.DOUBLE_CURLY_BRACES, self.task_config.get) + varda_yaml = Template.substitute_structure(varda_yaml, TemplateConstants.DOLLAR_PARENTHESES, self.task_config.get) + varda_yaml.save(yaml_out) + logger.info(f"Wrote YAML to {yaml_out}") + + # link var executable + exe_src = self.task_config['JEDIVAREXE'] + exe_dest = os.path.join(self.task_config['DATA'], os.path.basename(exe_src)) + if os.path.exists(exe_dest): + rm_p(exe_dest) + os.symlink(exe_src, exe_dest) + + # need output dir for diags and anl + newdirs = [ + os.path.join(self.task_config['DATA'], 'anl'), + os.path.join(self.task_config['DATA'], 'diags'), + ] + FileHandler({'mkdir': newdirs}).sync() + + @logit(logger) + def finalize(self: Analysis) -> None: + """Finalize a global aerosol analysis + + This method will finalize a global aerosol analysis using JEDI. + This includes: + - tarring up output diag files and place in ROTDIR + - copying the generated YAML file from initialize to the ROTDIR + - copying the guess files to the ROTDIR + - applying the increments to the original RESTART files + - moving the increment files to the ROTDIR + + Please note that some of these steps are temporary and will be modified + once the model is able to read aerosol tracer increments. + """ + # ---- tar up diags + # path of output tar statfile + aerostat = os.path.join(self.task_config['COMOUTaero'], f"{self.task_config['APREFIX']}aerostat") + + # get list of diag files to put in tarball + diags = glob.glob(os.path.join(self.task_config['DATA'], 'diags', 'diag*nc4')) + + # gzip the files first + for diagfile in diags: + with open(diagfile, 'rb') as f_in, gzip.open(f"{diagfile}.gz", 'wb') as f_out: + f_out.writelines(f_in) + + # open tar file for writing + with tarfile.open(aerostat, "w") as archive: + for diagfile in diags: + archive.add(f"{diagfile}.gz") + + # copy full YAML from executable to ROTDIR + src = os.path.join(self.task_config['DATA'], f"{self.task_config['CDUMP']}.t{self.runtime_config['cyc']:02d}z.aerovar.yaml") + dest = os.path.join(self.task_config['COMOUTaero'], f"{self.task_config['CDUMP']}.t{self.runtime_config['cyc']:02d}z.aerovar.yaml") + yaml_copy = { + 'mkdir': self.task_config['COMOUTaero'], + 'copy': [src, dest] + } + FileHandler(yaml_copy).sync() + + # ---- NOTE below is 'temporary', eventually we will not be using FMS RESTART formatted files + # ---- all of the rest of this method will need to be changed but requires model and JEDI changes + # ---- copy RESTART fv_tracer files for future reference + fms_bkg_file_template = os.path.join(self.task_config.comin_ges_atm, 'RESTART', f'{self.task_config.cdate_fv3}.fv_tracer.res.tileX.nc') + bkglist = [] + for itile in range(1, self.task_config.ntiles + 1): + bkg_path = fms_bkg_file_template.replace('tileX', f'tile{itile}') + dest = os.path.join(self.task_config['COMOUTaero'], f'aeroges.{os.path.basename(bkg_path)}') + bkglist.append([bkg_path, dest]) + FileHandler({'copy': bkglist}).sync() + + # ---- add increments to RESTART files + logger.info('Adding increments to RESTART files') + self._add_fms_cube_sphere_increments() + + # ---- move increments to ROTDIR + logger.info('Moving increments to ROTDIR') + fms_inc_file_template = os.path.join(self.task_config['DATA'], 'anl', f'aeroinc.{self.task_config.cdate_fv3}.fv_tracer.res.tileX.nc') + inclist = [] + for itile in range(1, self.task_config.ntiles + 1): + inc_path = fms_inc_file_template.replace('tileX', f'tile{itile}') + dest = os.path.join(self.task_config['COMOUTaero'], os.path.basename(inc_path)) + inclist.append([inc_path, dest]) + FileHandler({'copy': inclist}).sync() + + def clean(self): + super().clean() + + @logit(logger) + def _add_fms_cube_sphere_increments(self: Analysis) -> None: + """This method adds increments to RESTART files to get an analysis + NOTE this is only needed for now because the model cannot read aerosol increments. + This method will be assumed to be deprecated before this is implemented operationally + """ + # only need the fv_tracer files + fms_inc_file_template = os.path.join(self.task_config['DATA'], 'anl', f'aeroinc.{self.task_config.cdate_fv3}.fv_tracer.res.tileX.nc') + fms_bkg_file_template = os.path.join(self.task_config.comin_ges_atm, 'RESTART', f'{self.task_config.cdate_fv3}.fv_tracer.res.tileX.nc') + # get list of increment vars + incvars_list_path = os.path.join(self.task_config['HOMEgfs'], 'parm', 'parm_gdas', 'aeroanl_inc_vars.yaml') + incvars = YAMLFile(path=incvars_list_path) + super().add_fv3_increments(fms_inc_file_template, fms_bkg_file_template, incvars) + + @logit(logger) + def get_bkg_dict(self, task_config: Dict[str, Any]) -> Dict[str, List[str]]: + """Compile a dictionary of model background files to copy + + This method constructs a dictionary of FV3 RESTART files (coupler, core, tracer) + that are needed for global aerosol DA and returns said dictionary for use by the FileHandler class. + + Parameters + ---------- + task_config: Dict + a dictionary containing all of the configuration needed for the task + + Returns + ---------- + bkg_dict: Dict + a dictionary containing the list of model background files to copy for FileHandler + """ + super().get_bkg_dict(task_config) + # NOTE for now this is FV3 RESTART files and just assumed to be fh006 + + # get FV3 RESTART files, this will be a lot simpler when using history files + rst_dir = os.path.join(task_config.comin_ges_atm, 'RESTART') # for now, option later? + + # aerosol DA only needs core/tracer + bkglist = [] + basename = f'{task_config.cdate_fv3}.coupler.res' + bkglist.append([os.path.join(rst_dir, basename), os.path.join(task_config['DATA'], 'bkg', basename)]) + basename_core = f'{task_config.cdate_fv3}.fv_core.res.tileX.nc' + basename_tracer = f'{task_config.cdate_fv3}.fv_tracer.res.tileX.nc' + for itile in range(1, task_config.ntiles + 1): + basename = basename_core.replace('tileX', f'tile{itile}') + bkglist.append([os.path.join(rst_dir, basename), os.path.join(task_config['DATA'], 'bkg', basename)]) + basename = basename_tracer.replace('tileX', f'tile{itile}') + bkglist.append([os.path.join(rst_dir, basename), os.path.join(task_config['DATA'], 'bkg', basename)]) + bkg_dict = { + 'mkdir': [os.path.join(task_config['DATA'], 'bkg')], + 'copy': bkglist, + } + return bkg_dict + + @logit(logger) + def get_berror_dict(self, config: Dict[str, Any]) -> Dict[str, List[str]]: + """Compile a dictionary of background error files to copy + + This method will construct a dictionary of BUMP background error files + for global aerosol DA and return said dictionary for use by the FileHandler class. + This dictionary contains coupler and fv_tracer files + for correlation and standard deviation as well as NICAS localization. + + Parameters + ---------- + config: Dict + a dictionary containing all of the configuration needed + + Returns + ---------- + berror_dict: Dict + a dictionary containing the list of background error files to copy for FileHandler + """ + super.get_berror_dict(config) + # aerosol static-B needs nicas, cor_rh, cor_rv and stddev files. + b_dir = config['BERROR_DATA_DIR'] + b_datestr = config['BERROR_DATE'] + berror_list = [] + + for ftype in ['cor_rh', 'cor_rv', 'stddev']: + template_bump_coupler = f'{b_datestr}.{ftype}.coupler.res' + template_bump_tracer = f'{b_datestr}.{ftype}.fv_tracer.res.tileX.nc' + berror_list.append([ + os.path.join(b_dir, template_bump_coupler), + os.path.join(config['DATA'], 'berror', template_bump_coupler) + ]) + for itile in range(1, config.ntiles + 1): + bump_tracer = template_bump_tracer.replace('tileX', f'tile{itile}') + berror_list.append([ + os.path.join(b_dir, bump_tracer), + os.path.join(config['DATA'], 'berror', bump_tracer) + ]) + + nproc = config.ntiles * config['layout_x'] * config['layout_y'] + for nn in range(1, nproc + 1): + berror_list.append([ + os.path.join(b_dir, f'nicas_aero_nicas_local_{nproc:06}-{nn:06}.nc'), + os.path.join(config['DATA'], 'berror', f'nicas_aero_nicas_local_{nproc:06}-{nn:06}.nc') + ]) + berror_dict = { + 'mkdir': [os.path.join(config['DATA'], 'berror')], + 'copy': berror_list, + } + return berror_dict diff --git a/ush/python/pygfs/task/analysis.py b/ush/python/pygfs/task/analysis.py new file mode 100644 index 0000000000..10b2a14df5 --- /dev/null +++ b/ush/python/pygfs/task/analysis.py @@ -0,0 +1,127 @@ +#!/usr/bin/env python3 + +import os +from logging import getLogger +from netCDF4 import Dataset +from typing import List, Dict, Any + +from pygw.yaml_file import YAMLFile +from pygw.file_utils import FileHandler +from pygw.logger import logit +from pygw.task import Task + +logger = getLogger(__name__.split('.')[-1]) + + +class Analysis(Task): + """Parent class for GDAS tasks + + The Analysis class is the parent class for all + Global Data Assimilation System (GDAS) tasks + directly related to peforming an analysis + """ + + def __init__(self, config: Dict[str, Any]) -> None: + super().__init__(config, ntiles=6) + + def initialize(self) -> None: + super().initialize() + # all analyses need to stage observations + obs_dict = self.get_obs_dict() + FileHandler(obs_dict).sync() + + @logit(logger) + def get_obs_dict(self: Task) -> Dict[str, Any]: + """Compile a dictionary of observation files to copy + + This method uses the OBS_LIST configuration variable to generate a dictionary + from a list of YAML files that specify what observation files are to be + copied to the run directory from the observation input directory + + Parameters + ---------- + + Returns + ---------- + obs_dict: Dict + a dictionary containing the list of observation files to copy for FileHandler + """ + obs_list_config = YAMLFile(path=self.config['OBS_LIST']) + # get observers from master dictionary + observers = obs_list_config['observers'] + copylist = [] + for ob in observers: + obfile = ob['obs space']['obsdatain']['engine']['obsfile'] + basename = os.path.basename(obfile) + copylist.append([os.path.join(self.config['COMIN_OBS'], basename), obfile]) + obs_dict = { + 'mkdir': [os.path.join(self.runtime_config['DATA'], 'obs')], + 'copy': copylist + } + return obs_dict + + @logit(logger) + def add_fv3_increments(self, inc_file_tmpl: str, bkg_file_tmpl: str, incvars: List) -> None: + """Add cubed-sphere increments to cubed-sphere backgrounds + + Parameters + ---------- + inc_file_tmpl : str + template of the FV3 increment file of the form: 'filetype.{tileX}.nc' + bkg_file_tmpl : str + template of the FV3 background file of the form: 'filetype.{tileX}.nc' + incvars : List + List of increment variables to add to the background + """ + + for tt in range(1, self.config.ntiles + 1): + inc_path = inc_file_tmpl.replace('tileX', f'tile{tt}') + bkg_path = bkg_file_tmpl.replace('tileX', f'tile{tt}') + with Dataset(inc_path, mode='r') as incfile, Dataset(bkg_path, mode='a') as rstfile: + for vname in incvars: + increment = incfile.variables[vname][:] + bkg = rstfile.variables[vname][:] + anl = bkg + increment + rstfile.variables[vname][:] = anl[:] + try: + rstfile.variables[vname].delncattr('checksum') # remove the checksum so fv3 does not complain + except AttributeError: + pass # checksum is missing, move on + + @logit(logger) + def get_bkg_dict(self, task_config: Dict[str, Any]) -> Dict[str, List[str]]: + """Compile a dictionary of model background files to copy + + This method is a placeholder for now... will be possibly made generic at a later date + + Parameters + ---------- + task_config: Dict + a dictionary containing all of the configuration needed for the task + + Returns + ---------- + bkg_dict: Dict + a dictionary containing the list of model background files to copy for FileHandler + """ + bkg_dict = {'foo': 'bar'} + return bkg_dict + + @logit(logger) + def get_berror_dict(self, config: Dict[str, Any]) -> Dict[str, List[str]]: + """Compile a dictionary of background error files to copy + + This method is a placeholder for now... will be possibly made generic at a later date + + Parameters + ---------- + config: Dict + a dictionary containing all of the configuration needed + + Returns + ---------- + berror_dict: Dict + a dictionary containing the list of background error files to copy for FileHandler + """ + berror_dict = {'foo': 'bar'} + return berror_dict diff --git a/ush/python/pygw/src/pygw/task.py b/ush/python/pygw/src/pygw/task.py index 71b427e4bf..7d0db3885a 100644 --- a/ush/python/pygw/src/pygw/task.py +++ b/ush/python/pygw/src/pygw/task.py @@ -1,7 +1,9 @@ +import datetime as dt import logging from typing import Dict from pygw.attrdict import AttrDict +from pygw.timetools import to_datetime logger = logging.getLogger(__name__.split('.')[-1]) @@ -43,9 +45,16 @@ def __init__(self, config: Dict, *args, **kwargs): for kk in runtime_keys: try: self.runtime_config[kk] = config[kk] + del self.config[kk] except KeyError: raise KeyError(f"Encountered an unreferenced runtime_key {kk} in 'config'") + # Any other composite runtime variables that may be needed for the duration of the task + self.runtime_config['current_cycle'] = to_datetime(str(self.runtime_config['PDY'])) + \ + dt.timedelta(hours=self.runtime_config['cyc']) + + pass + def initialize(self): """ Initialize methods for a task diff --git a/ush/python/pygw/src/pygw/timetools.py b/ush/python/pygw/src/pygw/timetools.py index 5554efaacd..51f10d7084 100644 --- a/ush/python/pygw/src/pygw/timetools.py +++ b/ush/python/pygw/src/pygw/timetools.py @@ -125,3 +125,17 @@ def strptime(dtstr, fmt): return datetime.datetime.strptime(dtstr, fmt) except Exception as ee: raise Exception(f"Bad datetime string (format): '{dtstr} ({fmt})'") + + +def to_isotime(dt): + """ + Return a ISO formatted '%Y-%m-%dT%H:%M:%SZ' string from a datetime object. + """ + return strftime(dt, '%Y-%m-%dT%H:%M:%SZ') + + +def to_fv3time(dt): + """ + Return a FV3 formatted '%Y%m%d.%H%M%S' string from a datetime object. + """ + return strftime(dt, '%Y%m%d.%H%M%S') diff --git a/workflow/rocoto/workflow_tasks.py b/workflow/rocoto/workflow_tasks.py index 6e560dfb5b..2244504d3c 100644 --- a/workflow/rocoto/workflow_tasks.py +++ b/workflow/rocoto/workflow_tasks.py @@ -661,6 +661,10 @@ def _fcst_cycled(self): dep_dict = {'type': 'task', 'name': f'{self.cdump}waveprep'} dependencies.append(rocoto.add_dependency(dep_dict)) + if self.app_config.do_aero: + dep_dict = {'type': 'task', 'name': f'{self.cdump}aeroanlfinal'} + dependencies.append(rocoto.add_dependency(dep_dict)) + dependencies = rocoto.create_dependency(dep_condition='and', dep=dependencies) if self.cdump in ['gdas']: