Skip to content

Commit

Permalink
Adding PR 489 for RTOFS bugfix
Browse files Browse the repository at this point in the history
  • Loading branch information
perry shafran committed Jun 11, 2024
1 parent 8d8c0a1 commit 9253fcc
Showing 1 changed file with 72 additions and 80 deletions.
152 changes: 72 additions & 80 deletions ush/rtofs/df_preprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

def get_valid_range(logger, date_type, date_range, date_hours, fleads):
if None in date_hours:
e = (f"One or more FCST_{date_type}_HOURS is Nonetype. This may be"
e = (f"FATAL ERROR: One or more FCST_{date_type}_HOURS is Nonetype. This may be"
+ f" because the input string is empty.")
logger.error(e)
raise ValueError(e)
Expand All @@ -45,7 +45,7 @@ def get_valid_range(logger, date_type, date_range, date_hours, fleads):
elif date_type == 'VALID':
valid_range = date_range
else:
e = (f"Invalid DATE_TYPE: {str(date_type).upper()}. Valid values are"
e = (f"FATAL ERROR: Invalid DATE_TYPE: {str(date_type).upper()}. Valid values are"
+ f" VALID or INIT")
logger.error(e)
raise ValueError(e)
Expand Down Expand Up @@ -78,13 +78,13 @@ def run_prune_data(logger, stats_dir, prune_dir, output_base_template, verif_cas
str(var_name).upper(), model_list, obtype
)
else:
e1 = f"{stats_dir} exists but is empty."
e1 = f"FATAL ERROR: {stats_dir} exists but is empty."
e2 = f"Populate {stats_dir} and retry."
logger.error(e1)
logger.error(e2)
raise OSError(e1+"\n"+e2)
else:
e1 = f"{stats_dir} does not exist."
e1 = f"FATAL ERROR: {stats_dir} does not exist."
e2 = f"Create and populate {stats_dir} and retry."
logger.error(e1)
logger.error(e2)
Expand All @@ -93,24 +93,30 @@ def run_prune_data(logger, stats_dir, prune_dir, output_base_template, verif_cas

def check_empty(df, logger, called_from):
if df.empty:
logger.error(f"Called from {called_from}:")
logger.error(f"Empty Dataframe. Continuing onto next plot...")
logger.warning(f"Called from {called_from}:")
logger.warning(f"Empty Dataframe encountered while filtering a subset"
+ f" of input statistics...")
logger.info("========================================")
return True
else:
return False

def create_df(logger, stats_dir, pruned_data_dir, line_type, date_range,
model_list, met_version, clear_prune_dir):
model_list, met_version, clear_prune_dir, verif_type,
fcst_var_names, obs_var_names, interp, domain, date_type,
date_hours):
model_list = [str(model) for model in model_list]
# Create df combining pruned stats for all models in model_list
start_string = date_range[0].strftime('%HZ %d %B %Y')
end_string = date_range[1].strftime('%HZ %d %B %Y')
for model in model_list:
fpath = os.path.join(pruned_data_dir,f'{str(model)}.stat')
if not os.path.isfile(fpath):
logger.warning(
f"The stat file for {str(model)} does not exist in"
if not any(
group_name in str(model) for group_name in ["group", "set"]
):
logger.warning(
f"{str(model)} is not a model in"
+ f" {pruned_data_dir}."
)
logger.warning(
Expand All @@ -137,47 +143,49 @@ def create_df(logger, stats_dir, pruned_data_dir, line_type, date_range,
i = -1*len(df_line_type_colnames)
for col_name in df_colnames[i:]:
df_tmp[col_name] = df_tmp[col_name].astype(float)
df_tmp = run_filters(
df_tmp, logger, verif_type, fcst_var_names, obs_var_names,
interp, domain, date_type, date_range, date_hours
)
try:
df = pd.concat([df, df_tmp])
except NameError:
df = df_tmp
except UnboundLocalError as e:
df = df_tmp
except pd.errors.EmptyDataError as e:
logger.error(e)
logger.error(f"The file in question:")
logger.error(f"{fpath}")
logger.error("Continuing ...")
logger.warning(e)
logger.warning(f"The file in question:")
logger.warning(f"{fpath}")
logger.warning("Continuing ...")
except OSError as e:
logger.error(e)
logger.error(f"The file in question:")
logger.error(f"{fpath}")
logger.error("Continuing ...")
logger.warning(e)
logger.warning(f"The file in question:")
logger.warning(f"{fpath}")
logger.warning("Continuing ...")
if clear_prune_dir:
try:
shutil.rmtree(pruned_data_dir)
except OSError as e:
logger.error(e)
logger.error(f"The directory in question:")
logger.error(f"{pruned_data_dir}")
logger.error("Continuing ...")
logger.warning(e)
logger.warning(f"The directory in question:")
logger.warning(f"{pruned_data_dir}")
logger.warning("Continuing ...")
try:
if check_empty(df, logger, 'create_df'):
return None
else:
df.reset_index(drop=True, inplace=True)
return df
except UnboundLocalError as e:
logger.error(e)
logger.error(
logger.warning(e)
logger.warning(
"Nonexistent dataframe. Check the logfile for more details."
)
logger.error("Quitting ...")
sys.exit(1)

return None
def filter_by_level_type(df, logger, verif_type):
if df is None:
return None
return df
if str(verif_type).lower() in ['pres', 'upper_air']:
df = df[
df['FCST_LEV'].str.startswith('P')
Expand All @@ -189,101 +197,79 @@ def filter_by_level_type(df, logger, verif_type):
~(df['FCST_LEV'].str.startswith('P')
| df['OBS_LEV'].str.startswith('P'))
]
if check_empty(df, logger, 'filter_by_level_type'):
return None
else:
return df
check_empty(df, logger, 'filter_by_level_type')
return df

def filter_by_var_name(df, logger, fcst_var_names, obs_var_names):
if df is None:
return None
return df
df = df[
df['FCST_VAR'].isin(fcst_var_names)
& df['OBS_VAR'].isin(obs_var_names)
]
if check_empty(df, logger, 'filter_by_var_name'):
return None
else:
return df
check_empty(df, logger, 'filter_by_var_name')
return df

def filter_by_interp(df, logger, interp):
if df is None:
return None
df = df[df['INTERP_MTHD'].eq(str(interp).upper())]
if check_empty(df, logger, 'filter_by_interp'):
return None
else:
return df
df = df[df['INTERP_MTHD'].eq(str(interp).upper())]
check_empty(df, logger, 'filter_by_interp')
return df

def filter_by_obtype(df, logger, obtype):
if df is None:
return None
df = df[df['OBTYPE'].eq(str(obtype))]
if check_empty(df, logger, 'filter_by_obtype'):
return None
else:
return df
df = df[df['OBTYPE'].eq(str(obtype))]
check_empty(df, logger, 'filter_by_obtype')
return df

def filter_by_domain(df, logger, domain):
if df is None:
return None
df = df[df['VX_MASK'].eq(str(domain))]
if check_empty(df, logger, 'filter_by_domain'):
return None
else:
return df
df = df[df['VX_MASK'].eq(str(domain))]
check_empty(df, logger, 'filter_by_domain')
return df

def create_lead_hours(df, logger):
if df is None:
return None
df['LEAD_HOURS'] = np.array([int(lead[:-4]) for lead in df['FCST_LEAD']])
if check_empty(df, logger, 'create_lead_hours'):
return None
else:
return df
check_empty(df, logger, 'create_lead_hours')
return df

def create_valid_datetime(df, logger):
if df is None:
return None
df['VALID'] = pd.to_datetime(df['FCST_VALID_END'], format='%Y%m%d_%H%M%S')
if check_empty(df, logger, 'create_valid_datetime'):
return None
else:
return df
check_empty(df, logger, 'create_valid_datetime')
return df

def create_init_datetime(df, logger):
if df is None:
return None
return df
df.reset_index(drop=True, inplace=True)
df['INIT'] = [
df['VALID'][v] - pd.DateOffset(hours=int(hour))
for v, hour in enumerate(df['LEAD_HOURS'])
]
if check_empty(df, logger, 'create_init_datetime'):
return None
else:
return df
check_empty(df, logger, 'create_init_datetime')
return df

def filter_by_date_range(df, logger, date_type, date_range):
if df is None:
return None
return df
df = df.loc[
(df[str(date_type).upper()] >= date_range[0])
& (df[str(date_type).upper()] <= date_range[1])
]
if check_empty(df, logger, 'filter_by_date_range'):
return None
else:
return df
check_empty(df, logger, 'filter_by_date_range')
return df

def filter_by_hour(df, logger, date_type, date_hours):
if df is None:
return None
df = df.loc[[x in date_hours for x in df[str(date_type).upper()].dt.hour]]
return df
if check_empty(df, logger, 'filter_by_hour'):
return None
else:
return df
else:
df = df.loc[[x in date_hours for x in df[str(date_type).upper()].dt.hour]]
check_empty(df, logger, 'filter_by_hour')
return df

def get_preprocessed_data(logger, stats_dir, prune_dir, output_base_template,
verif_case, verif_type, line_type, date_type,
Expand All @@ -300,12 +286,18 @@ def get_preprocessed_data(logger, stats_dir, prune_dir, output_base_template,
)
df = create_df(
logger, stats_dir, pruned_data_dir, line_type, date_range, model_list,
met_version, clear_prune_dir
met_version, clear_prune_dir, verif_type, fcst_var_names, obs_var_names,
interp, domain, date_type, date_hours
)
if df is not None and check_empty(df, logger, 'get_preprocessed_data'):
df = None
return df

def run_filters(df, logger, verif_type, fcst_var_names, obs_var_names,
interp, domain, date_type, date_range, date_hours):
df = filter_by_level_type(df, logger, verif_type)
df = filter_by_var_name(df, logger, fcst_var_names, obs_var_names)
df = filter_by_interp(df, logger, interp)
df = filter_by_obtype(df, logger, obtype)
df = filter_by_domain(df, logger, domain)
df = create_lead_hours(df, logger)
df = create_valid_datetime(df, logger)
Expand Down

0 comments on commit 9253fcc

Please sign in to comment.