diff --git a/ush/rtofs/df_preprocessing.py b/ush/rtofs/df_preprocessing.py index 8df4157bbb..fe9e36b0be 100755 --- a/ush/rtofs/df_preprocessing.py +++ b/ush/rtofs/df_preprocessing.py @@ -27,7 +27,7 @@ def get_valid_range(logger, date_type, date_range, date_hours, fleads): if None in date_hours: - e = (f"One or more FCST_{date_type}_HOURS is Nonetype. This may be" + e = (f"FATAL ERROR: One or more FCST_{date_type}_HOURS is Nonetype. This may be" + f" because the input string is empty.") logger.error(e) raise ValueError(e) @@ -45,7 +45,7 @@ def get_valid_range(logger, date_type, date_range, date_hours, fleads): elif date_type == 'VALID': valid_range = date_range else: - e = (f"Invalid DATE_TYPE: {str(date_type).upper()}. Valid values are" + e = (f"FATAL ERROR: Invalid DATE_TYPE: {str(date_type).upper()}. Valid values are" + f" VALID or INIT") logger.error(e) raise ValueError(e) @@ -78,13 +78,13 @@ def run_prune_data(logger, stats_dir, prune_dir, output_base_template, verif_cas str(var_name).upper(), model_list, obtype ) else: - e1 = f"{stats_dir} exists but is empty." + e1 = f"FATAL ERROR: {stats_dir} exists but is empty." e2 = f"Populate {stats_dir} and retry." logger.error(e1) logger.error(e2) raise OSError(e1+"\n"+e2) else: - e1 = f"{stats_dir} does not exist." + e1 = f"FATAL ERROR: {stats_dir} does not exist." e2 = f"Create and populate {stats_dir} and retry." logger.error(e1) logger.error(e2) @@ -93,15 +93,18 @@ def run_prune_data(logger, stats_dir, prune_dir, output_base_template, verif_cas def check_empty(df, logger, called_from): if df.empty: - logger.error(f"Called from {called_from}:") - logger.error(f"Empty Dataframe. Continuing onto next plot...") + logger.warning(f"Called from {called_from}:") + logger.warning(f"Empty Dataframe encountered while filtering a subset" + + f" of input statistics...") logger.info("========================================") return True else: return False def create_df(logger, stats_dir, pruned_data_dir, line_type, date_range, - model_list, met_version, clear_prune_dir): + model_list, met_version, clear_prune_dir, verif_type, + fcst_var_names, obs_var_names, interp, domain, date_type, + date_hours): model_list = [str(model) for model in model_list] # Create df combining pruned stats for all models in model_list start_string = date_range[0].strftime('%HZ %d %B %Y') @@ -109,8 +112,11 @@ def create_df(logger, stats_dir, pruned_data_dir, line_type, date_range, for model in model_list: fpath = os.path.join(pruned_data_dir,f'{str(model)}.stat') if not os.path.isfile(fpath): - logger.warning( - f"The stat file for {str(model)} does not exist in" + if not any( + group_name in str(model) for group_name in ["group", "set"] + ): + logger.warning( + f"{str(model)} is not a model in" + f" {pruned_data_dir}." ) logger.warning( @@ -137,6 +143,10 @@ def create_df(logger, stats_dir, pruned_data_dir, line_type, date_range, i = -1*len(df_line_type_colnames) for col_name in df_colnames[i:]: df_tmp[col_name] = df_tmp[col_name].astype(float) + df_tmp = run_filters( + df_tmp, logger, verif_type, fcst_var_names, obs_var_names, + interp, domain, date_type, date_range, date_hours + ) try: df = pd.concat([df, df_tmp]) except NameError: @@ -144,23 +154,23 @@ def create_df(logger, stats_dir, pruned_data_dir, line_type, date_range, except UnboundLocalError as e: df = df_tmp except pd.errors.EmptyDataError as e: - logger.error(e) - logger.error(f"The file in question:") - logger.error(f"{fpath}") - logger.error("Continuing ...") + logger.warning(e) + logger.warning(f"The file in question:") + logger.warning(f"{fpath}") + logger.warning("Continuing ...") except OSError as e: - logger.error(e) - logger.error(f"The file in question:") - logger.error(f"{fpath}") - logger.error("Continuing ...") + logger.warning(e) + logger.warning(f"The file in question:") + logger.warning(f"{fpath}") + logger.warning("Continuing ...") if clear_prune_dir: try: shutil.rmtree(pruned_data_dir) except OSError as e: - logger.error(e) - logger.error(f"The directory in question:") - logger.error(f"{pruned_data_dir}") - logger.error("Continuing ...") + logger.warning(e) + logger.warning(f"The directory in question:") + logger.warning(f"{pruned_data_dir}") + logger.warning("Continuing ...") try: if check_empty(df, logger, 'create_df'): return None @@ -168,16 +178,14 @@ def create_df(logger, stats_dir, pruned_data_dir, line_type, date_range, df.reset_index(drop=True, inplace=True) return df except UnboundLocalError as e: - logger.error(e) - logger.error( + logger.warning(e) + logger.warning( "Nonexistent dataframe. Check the logfile for more details." ) - logger.error("Quitting ...") - sys.exit(1) - + return None def filter_by_level_type(df, logger, verif_type): if df is None: - return None + return df if str(verif_type).lower() in ['pres', 'upper_air']: df = df[ df['FCST_LEV'].str.startswith('P') @@ -189,101 +197,79 @@ def filter_by_level_type(df, logger, verif_type): ~(df['FCST_LEV'].str.startswith('P') | df['OBS_LEV'].str.startswith('P')) ] - if check_empty(df, logger, 'filter_by_level_type'): - return None - else: - return df + check_empty(df, logger, 'filter_by_level_type') + return df def filter_by_var_name(df, logger, fcst_var_names, obs_var_names): if df is None: - return None + return df df = df[ df['FCST_VAR'].isin(fcst_var_names) & df['OBS_VAR'].isin(obs_var_names) ] - if check_empty(df, logger, 'filter_by_var_name'): - return None - else: - return df + check_empty(df, logger, 'filter_by_var_name') + return df def filter_by_interp(df, logger, interp): if df is None: - return None - df = df[df['INTERP_MTHD'].eq(str(interp).upper())] - if check_empty(df, logger, 'filter_by_interp'): - return None - else: return df + df = df[df['INTERP_MTHD'].eq(str(interp).upper())] + check_empty(df, logger, 'filter_by_interp') + return df def filter_by_obtype(df, logger, obtype): if df is None: - return None - df = df[df['OBTYPE'].eq(str(obtype))] - if check_empty(df, logger, 'filter_by_obtype'): - return None - else: return df + df = df[df['OBTYPE'].eq(str(obtype))] + check_empty(df, logger, 'filter_by_obtype') + return df def filter_by_domain(df, logger, domain): if df is None: - return None - df = df[df['VX_MASK'].eq(str(domain))] - if check_empty(df, logger, 'filter_by_domain'): - return None - else: return df + df = df[df['VX_MASK'].eq(str(domain))] + check_empty(df, logger, 'filter_by_domain') + return df def create_lead_hours(df, logger): - if df is None: - return None df['LEAD_HOURS'] = np.array([int(lead[:-4]) for lead in df['FCST_LEAD']]) - if check_empty(df, logger, 'create_lead_hours'): - return None - else: - return df + check_empty(df, logger, 'create_lead_hours') + return df def create_valid_datetime(df, logger): - if df is None: - return None df['VALID'] = pd.to_datetime(df['FCST_VALID_END'], format='%Y%m%d_%H%M%S') - if check_empty(df, logger, 'create_valid_datetime'): - return None - else: - return df + check_empty(df, logger, 'create_valid_datetime') + return df def create_init_datetime(df, logger): - if df is None: - return None + return df df.reset_index(drop=True, inplace=True) df['INIT'] = [ df['VALID'][v] - pd.DateOffset(hours=int(hour)) for v, hour in enumerate(df['LEAD_HOURS']) ] - if check_empty(df, logger, 'create_init_datetime'): - return None - else: - return df + check_empty(df, logger, 'create_init_datetime') + return df def filter_by_date_range(df, logger, date_type, date_range): if df is None: - return None + return df df = df.loc[ (df[str(date_type).upper()] >= date_range[0]) & (df[str(date_type).upper()] <= date_range[1]) ] - if check_empty(df, logger, 'filter_by_date_range'): - return None - else: - return df + check_empty(df, logger, 'filter_by_date_range') + return df def filter_by_hour(df, logger, date_type, date_hours): if df is None: - return None - df = df.loc[[x in date_hours for x in df[str(date_type).upper()].dt.hour]] + return df if check_empty(df, logger, 'filter_by_hour'): - return None - else: return df + else: + df = df.loc[[x in date_hours for x in df[str(date_type).upper()].dt.hour]] + check_empty(df, logger, 'filter_by_hour') + return df def get_preprocessed_data(logger, stats_dir, prune_dir, output_base_template, verif_case, verif_type, line_type, date_type, @@ -300,12 +286,18 @@ def get_preprocessed_data(logger, stats_dir, prune_dir, output_base_template, ) df = create_df( logger, stats_dir, pruned_data_dir, line_type, date_range, model_list, - met_version, clear_prune_dir + met_version, clear_prune_dir, verif_type, fcst_var_names, obs_var_names, + interp, domain, date_type, date_hours ) + if df is not None and check_empty(df, logger, 'get_preprocessed_data'): + df = None + return df + +def run_filters(df, logger, verif_type, fcst_var_names, obs_var_names, + interp, domain, date_type, date_range, date_hours): df = filter_by_level_type(df, logger, verif_type) df = filter_by_var_name(df, logger, fcst_var_names, obs_var_names) df = filter_by_interp(df, logger, interp) - df = filter_by_obtype(df, logger, obtype) df = filter_by_domain(df, logger, domain) df = create_lead_hours(df, logger) df = create_valid_datetime(df, logger)