Skip to content

Commit

Permalink
update soca scripts and tests to work with wxflow changes (NOAA-EMC#1192
Browse files Browse the repository at this point in the history
)
  • Loading branch information
RussTreadon-NOAA committed Jun 22, 2024
1 parent 6c235c0 commit 57d6f02
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 90 deletions.
9 changes: 9 additions & 0 deletions test/soca/socaincr2mom6.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,15 @@ set -ex

srcdir=$1

# Set g-w HOMEgfs
topdir=$(cd "$(dirname "$(readlink -f -n "${srcdir}" )" )/.." && pwd -P)
export HOMEgfs=$topdir

# Set python path for workflow utilities and tasks
wxflowPATH="${HOMEgfs}/ush/python:${HOMEgfs}/ush/python/wxflow"
PYTHONPATH="${PYTHONPATH:+${PYTHONPATH}:}${wxflowPATH}"
export PYTHONPATH

mom6_iau_incr="gdas.t12z.ocn.incr.nc"

cat > nsst.yaml << EOF
Expand Down
112 changes: 56 additions & 56 deletions ush/soca/marine_recenter.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,15 @@ def __init__(self, config: Dict) -> None:

# Variables of convenience
# TODO (AFE) maybe the g- vars should be done in the jjob
PDY = self.runtime_config['PDY']
cyc = self.runtime_config['cyc']
DATA = self.runtime_config.DATA
PDY = self.task_config['PDY']
cyc = self.task_config['cyc']
DATA = self.task_config.DATA
cdate = PDY + timedelta(hours=cyc)
gdate = cdate - timedelta(hours=6)
self.runtime_config['gcyc'] = gdate.strftime("%H")
self.runtime_config['gPDY'] = datetime(gdate.year,
gdate.month,
gdate.day)
self.task_config['gcyc'] = gdate.strftime("%H")
self.task_config['gPDY'] = datetime(gdate.year,
gdate.month,
gdate.day)

gdas_home = os.path.join(config['HOMEgfs'], 'sorc', 'gdas.cd')

Expand All @@ -67,26 +67,26 @@ def __init__(self, config: Dict) -> None:
'ATM_WINDOW_BEGIN': window_begin_iso,
'ATM_WINDOW_MIDDLE': window_middle_iso,
'DATA': DATA,
'dump': self.runtime_config.RUN,
'fv3jedi_stage_files': self.config.FV3JEDI_STAGE_YAML,
'fv3jedi_stage': self.config.FV3JEDI_STAGE_YAML,
'dump': self.task_config.RUN,
'fv3jedi_stage_files': self.task_config.FV3JEDI_STAGE_YAML,
'fv3jedi_stage': self.task_config.FV3JEDI_STAGE_YAML,
'stage_dir': DATA,
'soca_input_fix_dir': self.config.SOCA_INPUT_FIX_DIR,
'NMEM_ENS': self.config.NMEM_ENS,
'soca_input_fix_dir': self.task_config.SOCA_INPUT_FIX_DIR,
'NMEM_ENS': self.task_config.NMEM_ENS,
'ATM_WINDOW_LENGTH': f"PT{config['assim_freq']}H"})

berror_yaml_dir = os.path.join(gdas_home, 'parm', 'soca', 'berror')
self.config['recen_yaml_template'] = os.path.join(berror_yaml_dir, 'soca_ensrecenter.yaml')
self.config['recen_yaml_file'] = os.path.join(DATA, 'soca_ensrecenter.yaml')
self.config['gridgen_yaml'] = os.path.join(gdas_home, 'parm', 'soca', 'gridgen', 'gridgen.yaml')
self.config['BKG_LIST'] = 'bkg_list.yaml'
self.config['window_begin'] = window_begin
self.config['mom_input_nml_src'] = os.path.join(gdas_home, 'parm', 'soca', 'fms', 'input.nml')
self.config['mom_input_nml_tmpl'] = os.path.join(DATA, 'mom_input.nml.tmpl')
self.config['mom_input_nml'] = os.path.join(DATA, 'mom_input.nml')
self.config['bkg_dir'] = os.path.join(DATA, 'bkg')
self.config['INPUT'] = os.path.join(DATA, 'INPUT')
self.config['ens_dir'] = os.path.join(DATA, 'ens')
self.task_config['recen_yaml_template'] = os.path.join(berror_yaml_dir, 'soca_ensrecenter.yaml')
self.task_config['recen_yaml_file'] = os.path.join(DATA, 'soca_ensrecenter.yaml')
self.task_config['gridgen_yaml'] = os.path.join(gdas_home, 'parm', 'soca', 'gridgen', 'gridgen.yaml')
self.task_config['BKG_LIST'] = 'bkg_list.yaml'
self.task_config['window_begin'] = window_begin
self.task_config['mom_input_nml_src'] = os.path.join(gdas_home, 'parm', 'soca', 'fms', 'input.nml')
self.task_config['mom_input_nml_tmpl'] = os.path.join(DATA, 'mom_input.nml.tmpl')
self.task_config['mom_input_nml'] = os.path.join(DATA, 'mom_input.nml')
self.task_config['bkg_dir'] = os.path.join(DATA, 'bkg')
self.task_config['INPUT'] = os.path.join(DATA, 'INPUT')
self.task_config['ens_dir'] = os.path.join(DATA, 'ens')

@logit(logger)
def initialize(self):
Expand All @@ -100,46 +100,46 @@ def initialize(self):
"""

logger.info("initialize")
RUN = self.runtime_config.RUN
gcyc = self.runtime_config.gcyc
RUN = self.task_config.RUN
gcyc = self.task_config.gcyc

ufsda.stage.soca_fix(self.recen_config)

################################################################################
# prepare input.nml
FileHandler({'copy': [[self.config.mom_input_nml_src, self.config.mom_input_nml_tmpl]]}).sync()
FileHandler({'copy': [[self.task_config.mom_input_nml_src, self.task_config.mom_input_nml_tmpl]]}).sync()

# swap date and stack size
domain_stack_size = self.config.DOMAIN_STACK_SIZE
ymdhms = [int(s) for s in self.config.window_begin.strftime('%Y,%m,%d,%H,%M,%S').split(',')]
with open(self.config.mom_input_nml_tmpl, 'r') as nml_file:
domain_stack_size = self.task_config.DOMAIN_STACK_SIZE
ymdhms = [int(s) for s in self.task_config.window_begin.strftime('%Y,%m,%d,%H,%M,%S').split(',')]
with open(self.task_config.mom_input_nml_tmpl, 'r') as nml_file:
nml = f90nml.read(nml_file)
nml['ocean_solo_nml']['date_init'] = ymdhms
nml['fms_nml']['domains_stack_size'] = int(domain_stack_size)
ufsda.disk_utils.removefile(self.config.mom_input_nml)
nml.write(self.config.mom_input_nml)
ufsda.disk_utils.removefile(self.task_config.mom_input_nml)
nml.write(self.task_config.mom_input_nml)

FileHandler({'mkdir': [self.config.bkg_dir]}).sync()
bkg_utils.gen_bkg_list(bkg_path=self.config.COM_OCEAN_HISTORY_PREV,
out_path=self.config.bkg_dir,
window_begin=self.config.window_begin,
yaml_name=self.config.BKG_LIST)
FileHandler({'mkdir': [self.task_config.bkg_dir]}).sync()
bkg_utils.gen_bkg_list(bkg_path=self.task_config.COM_OCEAN_HISTORY_PREV,
out_path=self.task_config.bkg_dir,
window_begin=self.task_config.window_begin,
yaml_name=self.task_config.BKG_LIST)

################################################################################
# Copy initial condition

bkg_utils.stage_ic(self.config.bkg_dir, self.runtime_config.DATA, gcyc)
bkg_utils.stage_ic(self.task_config.bkg_dir, self.task_config.DATA, gcyc)

################################################################################
# stage ensemble members
logger.info("---------------- Stage ensemble members")
FileHandler({'mkdir': [self.config.ens_dir]}).sync()
nmem_ens = self.config.NMEM_ENS
gPDYstr = self.runtime_config.gPDY.strftime("%Y%m%d")
FileHandler({'mkdir': [self.task_config.ens_dir]}).sync()
nmem_ens = self.task_config.NMEM_ENS
gPDYstr = self.task_config.gPDY.strftime("%Y%m%d")
ens_member_list = []
for mem in range(1, nmem_ens+1):
for domain in ['ocean', 'ice']:
mem_dir = os.path.join(self.config.ROTDIR,
mem_dir = os.path.join(self.task_config.ROTDIR,
f'enkf{RUN}.{gPDYstr}',
f'{gcyc}',
f'mem{str(mem).zfill(3)}',
Expand All @@ -150,7 +150,7 @@ def initialize(self):
f009 = f'enkf{RUN}.{domain}.t{gcyc}z.inst.f009.nc'

fname_in = os.path.abspath(os.path.join(mem_dir_real, f009))
fname_out = os.path.realpath(os.path.join(self.config.ens_dir,
fname_out = os.path.realpath(os.path.join(self.task_config.ens_dir,
domain+"."+str(mem)+".nc"))
ens_member_list.append([fname_in, fname_out])

Expand All @@ -161,8 +161,8 @@ def initialize(self):

logger.info(f"---------------- generate soca_ensrecenter.yaml")

recen_yaml = parse_j2yaml(self.config.recen_yaml_template, self.recen_config)
recen_yaml.save(self.config.recen_yaml_file)
recen_yaml = parse_j2yaml(self.task_config.recen_yaml_template, self.recen_config)
recen_yaml.save(self.task_config.recen_yaml_file)

@logit(logger)
def run(self):
Expand All @@ -177,12 +177,12 @@ def run(self):

logger.info("run")

chdir(self.runtime_config.DATA)
chdir(self.task_config.DATA)

exec_cmd_gridgen = Executable(self.config.APRUN_OCNANALECEN)
exec_name_gridgen = os.path.join(self.config.JEDI_BIN, 'gdas_soca_gridgen.x')
exec_cmd_gridgen = Executable(self.task_config.APRUN_OCNANALECEN)
exec_name_gridgen = os.path.join(self.task_config.JEDI_BIN, 'gdas_soca_gridgen.x')
exec_cmd_gridgen.add_default_arg(exec_name_gridgen)
exec_cmd_gridgen.add_default_arg(self.config.gridgen_yaml)
exec_cmd_gridgen.add_default_arg(self.task_config.gridgen_yaml)

try:
logger.debug(f"Executing {exec_cmd_gridgen}")
Expand All @@ -193,10 +193,10 @@ def run(self):
raise WorkflowException(f"An error occured during execution of {exec_cmd_gridgen}")
pass

exec_cmd_recen = Executable(self.config.APRUN_OCNANALECEN)
exec_name_recen = os.path.join(self.config.JEDI_BIN, 'gdas_ens_handler.x')
exec_cmd_recen = Executable(self.task_config.APRUN_OCNANALECEN)
exec_name_recen = os.path.join(self.task_config.JEDI_BIN, 'gdas_ens_handler.x')
exec_cmd_recen.add_default_arg(exec_name_recen)
exec_cmd_recen.add_default_arg(os.path.basename(self.config.recen_yaml_file))
exec_cmd_recen.add_default_arg(os.path.basename(self.task_config.recen_yaml_file))

try:
logger.debug(f"Executing {exec_cmd_recen}")
Expand All @@ -220,16 +220,16 @@ def finalize(self):

logger.info("finalize")

RUN = self.runtime_config.RUN
cyc = self.runtime_config.cyc
RUN = self.task_config.RUN
cyc = self.task_config.cyc
incr_file = f'enkf{RUN}.t{cyc}z.ocninc.nc'
nmem_ens = self.config.NMEM_ENS
PDYstr = self.runtime_config.PDY.strftime("%Y%m%d")
nmem_ens = self.task_config.NMEM_ENS
PDYstr = self.task_config.PDY.strftime("%Y%m%d")
mem_dir_list = []
copy_list = []

for mem in range(1, nmem_ens+1):
mem_dir = os.path.join(self.config.ROTDIR,
mem_dir = os.path.join(self.task_config.ROTDIR,
f'enkf{RUN}.{PDYstr}',
f'{cyc}',
f'mem{str(mem).zfill(3)}',
Expand Down
58 changes: 29 additions & 29 deletions ush/soca/prep_ocean_obs.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,20 @@ def __init__(self, config: Dict) -> None:
logger.info("init")
super().__init__(config)

PDY = self.runtime_config['PDY']
cyc = self.runtime_config['cyc']
PDY = self.task_config['PDY']
cyc = self.task_config['cyc']
cdate = PDY + timedelta(hours=cyc)
assim_freq = self.config['assim_freq']
assim_freq = self.task_config['assim_freq']
half_assim_freq = assim_freq/2

self.runtime_config['cdate'] = cdate
self.task_config['cdate'] = cdate
window_begin_datetime = cdate - timedelta(hours=half_assim_freq)
window_begin_datetime = cdate + timedelta(hours=half_assim_freq)
self.window_begin = window_begin_datetime.strftime('%Y-%m-%dT%H:%M:%SZ')
self.window_end = window_begin_datetime.strftime('%Y-%m-%dT%H:%M:%SZ')

self.config.conversion_list_file = 'conversion_list.yaml'
self.config.save_list_file = 'save_list.yaml'
self.task_config.conversion_list_file = 'conversion_list.yaml'
self.task_config.save_list_file = 'save_list.yaml'

@logit(logger)
def initialize(self):
Expand All @@ -65,36 +65,36 @@ def initialize(self):

logger.info("initialize")

cdate = self.runtime_config['cdate']
cdate = self.task_config['cdate']
cdatestr = cdate.strftime('%Y%m%d%H')
RUN = self.runtime_config.RUN
cyc = self.runtime_config['cyc']
assim_freq = self.config['assim_freq']
RUN = self.task_config.RUN
cyc = self.task_config['cyc']
assim_freq = self.task_config['assim_freq']

SOCA_INPUT_FIX_DIR = self.config['SOCA_INPUT_FIX_DIR']
SOCA_INPUT_FIX_DIR = self.task_config['SOCA_INPUT_FIX_DIR']
ocean_mask_src = os.path.join(SOCA_INPUT_FIX_DIR, 'RECCAP2_region_masks_all_v20221025.nc')
ocean_mask_dest = os.path.join(self.runtime_config.DATA, 'RECCAP2_region_masks_all_v20221025.nc')
ocean_mask_dest = os.path.join(self.task_config.DATA, 'RECCAP2_region_masks_all_v20221025.nc')

try:
FileHandler({'copy': [[ocean_mask_src, ocean_mask_dest]]}).sync()
except OSError:
logger.warning("Could not copy RECCAP2_region_masks_all_v20221025.nc")

OBS_YAML = self.config['OBS_YAML']
OBS_YAML = self.task_config['OBS_YAML']
observer_config = YAMLFile(OBS_YAML)

OBSPREP_YAML = self.config['OBSPREP_YAML']
OBSPREP_YAML = self.task_config['OBSPREP_YAML']
if os.path.exists(OBSPREP_YAML):
obsprep_config = YAMLFile(OBSPREP_YAML)
else:
logger.critical(f"OBSPREP_YAML file {OBSPREP_YAML} does not exist")
raise FileNotFoundError

JSON_TMPL_DIR = self.config.JSON_TMPL_DIR
BUFR2IODA_PY_DIR = self.config.BUFR2IODA_PY_DIR
JSON_TMPL_DIR = self.task_config.JSON_TMPL_DIR
BUFR2IODA_PY_DIR = self.task_config.BUFR2IODA_PY_DIR

COMIN_OBS = self.config.COMIN_OBS
COMOUT_OBS = self.config['COMOUT_OBS']
COMIN_OBS = self.task_config.COMIN_OBS
COMOUT_OBS = self.task_config['COMOUT_OBS']
if not os.path.exists(COMOUT_OBS):
os.makedirs(COMOUT_OBS)

Expand Down Expand Up @@ -132,8 +132,8 @@ def initialize(self):
interval = timedelta(hours=assim_freq * i)
window_cdates.append(cdate + interval)

input_files = prep_ocean_obs_utils.obs_fetch(self.config,
self.runtime_config,
input_files = prep_ocean_obs_utils.obs_fetch(self.task_config,
self.task_config,
obsprep_space,
window_cdates)

Expand Down Expand Up @@ -186,7 +186,7 @@ def initialize(self):

# yes, there is redundancy between the yamls fed to the ioda converter and here,
# this seems safer and easier than being selective about the fields
save_as_yaml({"observations": obsspaces_to_convert}, self.config.conversion_list_file)
save_as_yaml({"observations": obsspaces_to_convert}, self.task_config.conversion_list_file)

@logit(logger)
def run(self):
Expand All @@ -201,9 +201,9 @@ def run(self):

logger.info("run")

chdir(self.runtime_config.DATA)
chdir(self.task_config.DATA)

obsspaces_to_convert = YAMLFile(self.config.conversion_list_file)
obsspaces_to_convert = YAMLFile(self.task_config.conversion_list_file)

processes = []
for observation in obsspaces_to_convert['observations']:
Expand All @@ -213,7 +213,7 @@ def run(self):
logger.info(f"Trying to convert {obtype} to IODA")
if obs_space["type"] == "nc":
process = Process(target=prep_ocean_obs_utils.run_netcdf_to_ioda, args=(obs_space,
self.config.OCNOBS2IODAEXEC))
self.task_config.OCNOBS2IODAEXEC))
elif obs_space["type"] == "bufr":
process = Process(target=prep_ocean_obs_utils.run_bufr_to_ioda, args=(obs_space,))
else:
Expand All @@ -229,7 +229,7 @@ def run(self):
process.join()
completed.append(obs_space)

save_as_yaml({"observations": completed}, self.config.save_list_file)
save_as_yaml({"observations": completed}, self.task_config.save_list_file)

@logit(logger)
def finalize(self):
Expand All @@ -244,11 +244,11 @@ def finalize(self):

logger.info("finalize")

RUN = self.runtime_config.RUN
cyc = self.runtime_config.cyc
COMOUT_OBS = self.config.COMOUT_OBS
RUN = self.task_config.RUN
cyc = self.task_config.cyc
COMOUT_OBS = self.task_config.COMOUT_OBS

obsspaces_to_save = YAMLFile(self.config.save_list_file)
obsspaces_to_save = YAMLFile(self.task_config.save_list_file)

for obsspace_to_save in obsspaces_to_save['observations']:

Expand Down
8 changes: 4 additions & 4 deletions ush/soca/prep_ocean_obs_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@
# copies them to DATA, and returns a list of the files so handled


def obs_fetch(config, runtime_config, obsprep_space, cycles):
def obs_fetch(config, task_config, obsprep_space, cycles):

DMPDIR = config.DMPDIR
COMIN_OBS = config.COMIN_OBS

RUN = runtime_config.RUN
PDY = runtime_config.PDY
cyc = runtime_config.cyc
RUN = task_config.RUN
PDY = task_config.PDY
cyc = task_config.cyc

subdir = obsprep_space['dmpdir subdir']
dumpdir_regex = obsprep_space['dmpdir regex']
Expand Down
2 changes: 1 addition & 1 deletion ush/soca/run_jjobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def export_env_vars_script(self):
self.f.write(f"export CDATE='{CDATE}'\n")

# Add to python environement
self.f.write("PYTHONPATH=${HOMEgfs}/ush/python/wxflow/src:${PYTHONPATH}\n")
self.f.write("PYTHONPATH=${HOMEgfs}/ush/python/wxflow:${PYTHONPATH}\n")

def setupexpt(self):
"""
Expand Down

0 comments on commit 57d6f02

Please sign in to comment.