# QAQC flag counts 

## Environment set-up

In [None]:
import boto3
import numpy as np
import pandas as pd
import xarray as xr
import matplotlib.pyplot as plt
from io import BytesIO, StringIO
from functools import reduce

import inspect

import logging
# Create a simple logger that just prints to the console
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()

plt.rcParams["figure.dpi"] = 300

In [None]:
# Set AWS credentials
s3 = boto3.resource("s3")
s3_cl = boto3.client("s3")  # for lower-level processes

# Set relative paths to other folders and objects in repository.
bucket_name = "wecc-historical-wx"

In [None]:
def merge_ds_to_df(ds):
    """Converts xarray ds for a station to pandas df in the format needed for processing.

    Parameters
    ----------
    ds: xr.Dataset
        Data object with information about each network and station
    verbose: boolean
        Flag as to whether to print runtime statements to terminal. Default is False. Set in ALLNETWORKS_merge.py run.

    Returns
    -------
    df: pd.DataFrame
        Table object with information about each network and station
    MultiIndex: pd.DataFrame (I think)
        Original multi-index of station and time, to be used on conversion back to ds
    attrs:
        Save ds attributes to inherent to the final merged file
    var_attrs:
        Save variable attributes to inherent to the final merged file
    """

    # Save attributes to inherent them to the final merged file
    attrs = ds.attrs
    var_attrs = {var: ds[var].attrs for var in list(ds.data_vars.keys())}

    df = ds.to_dataframe()

    # Save instrumentation heights
    if "anemometer_height_m" not in df.columns:
        try:
            df["anemometer_height_m"] = (
                np.ones(ds["time"].shape) * ds.anemometer_height_m
            )
        except:
            logger.info("Filling anemometer_height_m with NaN.")
            df["anemometer_height_m"] = np.ones(len(df)) * np.nan
        finally:
            pass
    if "thermometer_height_m" not in df.columns:
        try:
            df["thermometer_height_m"] = (
                np.ones(ds["time"].shape) * ds.thermometer_height_m
            )
        except:
            logger.info("Filling thermometer_height_m with NaN.")
            df["thermometer_height_m"] = np.ones(len(df)) * np.nan
        finally:
            pass

    # De-duplicate time axis
    df = df[~df.index.duplicated()].sort_index()

    # Save station/time multiindex
    MultiIndex = df.index
    station = df.index.get_level_values(0)
    df["station"] = station

    # Station pd.Series to str
    station = station.unique().values[0]

    # Convert time/station index to columns and reset index
    df = df.droplevel(0).reset_index()

    return df, MultiIndex, attrs, var_attrs

## Set the stage

### Load Data

In [None]:
# url = "s3://wecc-historical-wx/3_qaqc_wx/VALLEYWATER/VALLEYWATER_6001.zarr"
url = "s3://wecc-historical-wx/3_qaqc_wx/ASOSAWOS/ASOSAWOS_72493023230.zarr"
ds = xr.open_zarr(url)

In [None]:
df, MultiIndex, attrs, var_attrs = merge_ds_to_df(ds)

### Perform hourly standardization

In [None]:
# -----------------------------------------------------------------------------
def qaqc_flag_fcn(flags: str) -> str:
    """
    Used for resampling QAQC flag columns. Ensures that the final standardized dataframe
    does not contain any empty strings by returning 'nan' when given an empty input (i.e. in time gaps).

    Parameters
    -----------
    flags : array_like
        sub-hourly timestep data

    Returns
    -------
    str : final flag value

    """
    if len(flags) == 0:
        return "nan"
    else:
        return ",".join(flags.unique())


# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
def _modify_infill(df: pd.DataFrame, constant_vars: list) -> pd.DataFrame:
    """
    This function does two things:
    1. Flags rows that were infilled by resampling in the hourly standardization process, where
        there were time gaps in the input dataframe. These infilled rows will NOT count towards
        the total observations count when calculating flag rates for the success report
    2. Infills constant variables (ie those in "constant_vars") observations that were left empty because
        they were in a time gap. They are infilled with the first non-nan value of each column, and set to
        np.nan if there are no non-nan values.

    Parameters
    -----------
    df : pd.Dataframe
        hourly standardized dataframe
    constant_vars: list
        variables that are constant throughout time

    Returns
    -------
    df : pd.Dataframe
        dataframe with updates added to rows infilled by hourly standardization

    """
    # Mask for rows where station is None (or np.nan)
    mask = df["station"].isnull()

    # Initialize dict to hold first non-NaN values
    first_valids = {}

    # Populate first_valids only for existing columns
    for col in constant_vars:
        if col not in df.columns or col == "time":
            # skip if constant var not in df cols or if var == "time"
            continue

        first_valids[col] = (
            df[col].dropna().iloc[0] if df[col].notna().any() else np.nan
        )

    # Update values in masked rows for existing columns
    for col, val in first_valids.items():
        df.loc[mask, col] = val

    # Add or update 'standardized_infill' column
    df["standardized_infill"] = np.where(mask, "y", "n")

    return df


# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
def merge_hourly_standardization(
    df: pd.DataFrame, var_attrs: dict, logger: logging.Logger
) -> tuple[pd.DataFrame, dict]:
    """Resamples meteorological variables to hourly timestep according to standard conventions.

    Parameters
    -----------
    df : pd.DataFrame
        station dataset converted to dataframe through QAQC pipeline
    var_attrs: library
        attributes for sub-hourly variables
    logger : logging.Logger
        Logger instance for recording messages during processing.

    Returns
    -------
    df : pd.DataFrame | None
        returns a dataframe with all columns resampled to one hour (column name retained)
    var_attrs : dict | None
        returns variable attributes dictionary updated to note that sub-hourly variables are now hourly

    Notes
    -----
    Rules:
    1. Top of the hour: take the first value in each hour. Standard convention for temperature, dewpoint, wind speed, direction, relative humidity, air pressure.
    2. Summation across the hour: sum observations within each hour. Standard convention for precipitation and solar radiation.
    3. Constant across the hour: take the first value in each hour. This applied to variables that do not change.
    """

    #logger.info(f"{inspect.currentframe().f_code.co_name}: Starting...")

    # Variables that remain constant within each hour
    constant_vars = [
        "time",
        "station",
        "lat",
        "lon",
        "elevation",
        "anemometer_height_m",
        "thermometer_height_m",
    ]

    # Aggregation across hour variables, standard meteorological convention: precipitation and solar radiation
    sum_vars = [
        "time",
        "pr",
        "pr_localmid",
        "pr_24h",
        "pr_1h",
        "pr_15min",
        "pr_5min",
        "rsds",
    ]

    # Top of the hour variables, standard meteorological convention: temperature, dewpoint temperature, pressure, humidity, winds
    instant_vars = [
        "hurs_derived",
        "time",
        "tas",
        "tas_derived",
        "tdps",
        "tdps_derived",
        "ps",
        "psl",
        "ps_altimeter",
        "ps_derived",
        "hurs",
        "sfcWind",
        "sfcWind_dir",
    ]

    # QAQC variable suffixes
    # the QAQC variables contain these words (ex: ps_eraqc)
    qaqc_var_suffixes = [
        "qc",
        "eraqc",
        "duration",
        "method",
        "flag",
        "depth",
        "process",
    ]

    # QAQC variables, which we will concatenate within each hour
    qaqc_vars = [
        var for var in df.columns if any(item in var for item in qaqc_var_suffixes)
    ]

    try:
        # Subset the dataframe according to rules
        constant_df = df[[col for col in constant_vars if col in df.columns]]

        qaqc_df = df[[col for col in qaqc_vars if col in df.columns if col != "time"]]
        qaqc_df = qaqc_df.astype(str)
        qaqc_df.insert(0, "time", df["time"])

        sum_df = df[[col for col in sum_vars if col in df.columns]]

        instant_df = df[[col for col in instant_vars if col in df.columns]]

        # Performing hourly aggregation, only if subset contains more than one (ie more than the 'time' time) column
        # This is to account for input dataframes that do not contain ALL subsets of variables defined above - just a subset of them.
        result_list = []
        if len(constant_df.columns) > 1:
            constant_result = constant_df.resample("1h", on="time").first()
            result_list.append(constant_result)

        if len(instant_df.columns) > 1:
            instant_result = instant_df.resample("1h", on="time").first()
            result_list.append(instant_result)

        if len(sum_df.columns) > 1:
            sum_result = sum_df.resample("1h", on="time").apply(
                lambda x: np.nan if x.isna().all() else x.sum(skipna=True)
            )
            result_list.append(sum_result)

        if len(qaqc_df.columns) > 1:
            qaqc_result = qaqc_df.resample("1h", on="time").apply(
                lambda x: qaqc_flag_fcn(x)
            )  # concatenating unique flags
            result_list.append(qaqc_result)

        # Aggregate and output reduced dataframe - this merges all dataframes defined
        # This function sets "time" to the index; reset index to return to original index
        result = reduce(
            lambda left, right: pd.merge(left, right, on=["time"], how="outer"),
            result_list,
        )
        result.reset_index(inplace=True)  # Convert time index --> column

        # Infill constant values and flag rows added through resampling
        result = _modify_infill(result, constant_vars)

        # Update attributes for sub-hourly variables
        sub_hourly_vars = [i for i in df.columns if "min" in i and "qc" not in i]
        for var in sub_hourly_vars:
            var_attrs[var]["standardization"] = (
                "{} has been standardized to an hourly timestep, but will retain its original name".format(
                    var
                )
            )
        #logger.info(f"{inspect.currentframe().f_code.co_name}: Completed successfully")
        return result, var_attrs

    except Exception as e:
        #logger.error(f"{inspect.currentframe().f_code.co_name}: Failed")
        raise e

In [None]:
# subsetting data to speed things up
df_sub = df[['time','ps_eraqc','pr_eraqc']]

In [None]:
df_st, var_attrs_st = merge_hourly_standardization(df, var_attrs, logger)

## Development

### Final Functions

In [None]:
# -----------------------------------------------------------------------------
def eraqc_counts_original_timestep(
    df: pd.DataFrame, network: str, station: str, logger: logging.Logger
) -> None:
    """
    Generates a dataframe of raw qaqc flag value counts for every variable,
    in their native timestep, before hourly standardization.
    Exports the dataframe as a csv to AWS.

    Parameters
    ----------
    df : pd.DataFrame
        station dataset converted to dataframe through QAQC pipeline
    network: str
        network name
    station: str
        station name
    logger : logging.Logger
        Logger instance for recording messages during processing.

    Returns
    -------
    None
    """
    logger.info(f"{inspect.currentframe().f_code.co_name}: Starting...")

    try:
        # identify _eraqc variables
        eraqc_vars = [var for var in df.columns if "_eraqc" in var]

        # filter df for only qaqc columns
        # also replace Nan values with 'no_flag' for two reasons:
        #   1. to enable us to count total observations for the success report
        #   2. to clarify what the Nan value indicates
        df = df[eraqc_vars].fillna("no_flag")

        # generate df of counts of each unique flag for each variable
        # fill all Nan values with 0, since Nan = no observations counted
        flag_counts = df.apply(pd.Series.value_counts).fillna(0)

        # rename columns
        flag_counts.columns = flag_counts.columns.str.replace("_eraqc", "", regex=True)

        # rename index (i.e. eraqc values) and then reset index
        flag_counts = flag_counts.rename_axis("eraqc_flag_values")

        # set all counts to integers, for readability
        flag_counts = flag_counts.astype(int)

        # send file to AWS
        csv_s3_filepath = f"s3://wecc-historical-wx/4_merge_wx/{network}/eraqc_counts/{station}_flag_counts_native_timestep.csv"
        flag_counts.to_csv(csv_s3_filepath, index=True)

        # Update logger
        logger.info(f"Uploaded file to: {csv_s3_filepath}")
        logger.info(f"{inspect.currentframe().f_code.co_name}: Completed successfully")

    except Exception as e:
        logger.error(f"{inspect.currentframe().f_code.co_name}: Failed")
        raise e


# -----------------------------------------------------------------------------
def eraqc_counts_hourly_timestep(
    df: pd.DataFrame, network: str, station: str, logger: logging.Logger
) -> None:
    """
    Generates a dataframe of raw qaqc flag value counts for every variable, for the hourly
    timestep, after hourly standardization. Includes the total observation count.
    Exports the dataframe as a CSV to AWS.

    Parameters
    ----------
    df : pd.DataFrame
        station dataset converted to dataframe through QAQC pipeline
    network: str
        network name
    station: str
        station name
    logger : logging.Logger
        Logger instance for recording messages during processing.

    Returns
    -------
    None
    """
    logger.info(f"{inspect.currentframe().f_code.co_name}: Starting...")

    try:
        # filter out rows that were infilled during hourly standardization
        df = df[df["standardized_infill"] == "n"]

        # identify _eraqc variables
        eraqc_vars = [var for var in df.columns if "_eraqc" in var]

        # filter df for only qaqc columns
        # also replace Nan values with 'no_flag' for two reasons:
        #   1. to enable us to count total observations for the success report
        #   2. to clarify what the Nan value indicates
        df_qaqc = df[eraqc_vars]

        # generate df of counts of each unique flag for each variable
        # fill all Nan values with 0, since Nan = no observations counted
        flag_counts = df_qaqc.apply(
            lambda x: x.str.split(",", expand=True).stack().value_counts()
        ).fillna(0)

        # rename columns
        flag_counts.columns = flag_counts.columns.str.replace("_eraqc", "", regex=True)

        # rename index (i.e. eraqc values) and then reset index
        flag_counts = flag_counts.rename_axis("eraqc_flag_values")

        # replace 'nan' (a string) with 'no_flag', for clarity
        flag_counts = flag_counts.rename(index={"nan": "no_flag"})

        # add row with total observation count
        total_obs_count = len(df)
        flag_counts.loc["total_obs_count"] = [total_obs_count] * flag_counts.shape[1]

        # set all counts to integers, for readability
        flag_counts = flag_counts.astype(int)

        # send file to AWS
        csv_s3_filepath = f"s3://wecc-historical-wx/4_merge_wx/{network}/eraqc_counts/{station}_flag_counts_hourly_standardized.csv"
        flag_counts.to_csv(csv_s3_filepath, index=True)

        # Update logger
        logger.info(f"Uploaded file to: {csv_s3_filepath}")
        logger.info(f"{inspect.currentframe().f_code.co_name}: Completed successfully")

    except Exception as e:
        logger.error(f"{inspect.currentframe().f_code.co_name}: Failed")
        raise e

## Testing

In [None]:
network = "ASOSAWOS"
station = "ASOSAWOS_72493023230"

In [None]:
eraqc_counts_hourly_timestep(df_st, network, station, logger)
eraqc_counts_original_timestep(df, network, station, logger)

In [None]:
key = f"4_merge_wx/{network}/eraqc_counts/{station}_flag_counts_native_timestep.csv"

list_import = s3_cl.get_object(
    Bucket=bucket_name,
    Key=key,
)

flag_counts_table = pd.read_csv(BytesIO(list_import["Body"].read()))

flag_counts_table

In [None]:
key = f"4_merge_wx/{network}/eraqc_counts/{station}_flag_counts_hourly_standardized.csv"

list_import = s3_cl.get_object(
    Bucket=bucket_name,
    Key=key,
)

flag_counts_table = pd.read_csv(BytesIO(list_import["Body"].read()))

flag_counts_table