# Process Target Variables

After updating the catchment attributes using revised catchment bounds from USGS and WSC where available, or delineating them from processed rasters where not available, the target variables are the last input data to be processed before we can train the gradient boosted decision tree (GBDT) models to take in attributes and predict the various target variables.

## Shannon entropy processing

Compute the Shannon entropy of individual streamflow time series.  The Shannon entropy is given by: 

$$H(X) = \sum_{i=1}^n P(x_i) \log_2 P(x_i)$$

The entropy is computed for various quantization bit depths (`bitrate` parameter, $n=2^{bitrate}$ in the above summation).  No prior is applied here.

From the scipy.stats [docs](https://docs.scipy.org/doc/scipy/reference/generated/scipy.stats.entropy.html):
>*"If messages consisting of sequences of symbols from a set are to be encoded and transmitted over a noiseless channel, then the Shannon entropy H(pk) gives a tight lower bound for the average number of units of information [bits] needed per symbol if the symbols occur with frequencies governed by the discrete distribution pk."*

Let's run through an example computation to see the difference between 4, 6, and 8 bit quantization, how each represents the total measurement range, and how each quantization aligns with your own expectation of heteroscedastic rating curve uncertainty.

In [None]:
import os
from time import time
from scipy.stats import entropy
import multiprocessing as mp
import data_processing_functions as dpf
import numpy as np
import pandas as pd
import geopandas as gpd

# visualize the catchment centroid locations
from bokeh.plotting import figure, show
from bokeh.layouts import gridplot
from bokeh.io import output_notebook
from bokeh.palettes import Colorblind, Sunset10
output_notebook()

BASE_DIR = os.getcwd()

In [None]:
def quantize_signal(df, b, label, stn):
    df.dropna(subset=[stn], inplace=True)
    # add a very small margin on the range to ensure the values are contained 
    # within the specified dictionary size because the right edge is closed
    # by default and will return 2**b + 1 for values equal to the max
    min_q, max_q = df[stn].min() - 1e-9, df[stn].max() + 1e-9
    print(f'observed range: {min_q:.3f}-{max_q:.3f}')
    assert min_q > 0
    # use equal width bins in log10 space
    log_edges = np.linspace(np.log10(min_q), np.log10(max_q), 2**b)
    linear_edges = np.linspace(min_q, np.log10(max_q), 2**b)
    edges = np.power(10, log_edges)

    data = df[stn].values
    sorted_data = np.sort(data)
    ecdf_values = np.arange(1, len(sorted_data) + 1) / len(sorted_data)
    pmf = dpf.interpolate_ecdf(edges, sorted_data, ecdf_values)
    return pmf, linear_edges, edges

In [None]:
test_stn = '05010500'
test_df = dpf.get_timeseries_data(test_stn)

n_years = len(test_df)/365

test_fig = figure(title=f'Sample Distribution by Dictionary Size (Station ID {test_stn}, N=~{n_years:.1f} years record)',
                 width=800, height=300)#, x_axis_type='log')
n = 0
bitrates_even = [4, 6, 8, 10, 12]
for b in bitrates_even:
    t0 = time()

    normed_freqs, lin_space_edges, log_space_edges = quantize_signal(test_df, b, f'{b}_bits_quantized', test_stn)
    
    t1 = time()
    H = entropy(normed_frequencies, base=2)
    lin_bin_midpoints = (lin_space_edges[1:] + lin_space_edges[-1]) / 2
    log_bin_midpoints = (log_space_edges[1:] + log_space_edges[-1]) / 2
    bottoms = [0 for _ in normed_freqs]
    test_fig.quad(left=log_space_edges[:-1], right=log_space_edges[1:], top=normed_freqs, bottom=bottoms, 
                  legend_label=f'{b} bits (H={H:.2f})', color=Sunset10[n], fill_alpha=0.5)
    
    test_fig.xaxis.axis_label = r'$$\text{Flow} \left[ m^3/s \right]$$'
    test_fig.yaxis.axis_label = r'P(X)'
    test_fig.legend.location = 'top_right'
    test_fig.legend.click_policy = 'hide'
    n += 2
    t2 = time()
    print(f'{t1-t0:.3f}s to compute PMF, {t2-t1:.3f}s to compute entropy and plot')
    
test_fig.legend.background_fill_alpha = 0.5

In [None]:
show(test_fig)

In [None]:
# create a new output filename 
attributes_filename = 'BCUB_watershed_attributes_updated.csv'
attributes_fpath = os.path.join(os.getcwd(), 'data', attributes_filename)
attr_df = pd.read_csv(attributes_fpath)
attr_df.columns = [e.lower() for e in attr_df.columns]
# df.columns

In [None]:
filtered_stns = sorted(list(set(attr_df['official_id'].values)))

## Compute the "distortion" of an assumed error distribution as a prior

Rating curve uncertainty is a hard problem in hydrology.  Instead of treating daily flow observations as discrete measurements with the fixed (often overzealous) precision that it is published by governing agencies, we can assume some kind of basic error model and test how much the error model distorts the information in the distribution.  In other words, how much noise/uncertainty is added for any model error.  

Below we'll test a range of uniform error distributions as models for the observations.  We'll take an example streamflow record, and we'll quantize it to a range of dictionary sizes in two ways:  
1. **"idealized observations":** bin the observations as they are,
2. **"constant proportion error"**: assume a uniform error on each observation equal to a constant proportion, and bin the observations by counting the fraction of the error interval covering each bin.  In other words, we'll count partial observations in proportion to how they overlap the binning intervals as opposed to counting a whole observation based on the interval alone.

The quantization will take in a bitrate $b$, and it will divide and log-transform the measured interval $(\log(x_\text{min}),\log(x_\text{max}))$ into $2^b$ (log-spaced) bins.  

In [None]:
def compute_log_uniform_bins(df, stn, bitrate):
    n_bins = 2**bitrate - 2
    # set the bin edges to be evenly spaced between the
    # observed range of the proxy/donor series
    # np.digitize will assign 0 for out-of-range values at left
    # and n_bins + 1 for out-of-range values at right
    log_bin_edges = np.linspace(
        np.log10(df[stn].min()),
        np.log10(df[stn].max()),
        n_bins + 1,
    ).flatten()
    # convert back to linear space
    bin_edges = [10**e for e in log_bin_edges]
    return bin_edges

In [None]:
def apply_error_to_observations(df, stn, bitrate=None, error=0.1):
    min_q, max_q = df[stn].min() - 1e-9, df[stn].max() + 1e-9
    assert min_q > 0
    # use equal width bins in log10 space
    bin_edges = compute_log_uniform_bins(df, stn, bitrate)
    # df[f'{bitrate}_bits_quantized'] = np.digitize(df[stn], bin_edges)
    fractional_obs_counts = dpf.error_adjusted_fractional_bin_counts(
        df[stn], np.array(bin_edges), bitrate, error_factor=error
    )
    label = f'{stn}_{int(100*error)}_error'
    count_df = pd.DataFrame(index=range(2**bitrate))
    count_df[label] = 0
    count_df[label] += fractional_obs_counts
    count_df.fillna(0, inplace=True)
    n_obs = np.nansum(count_df[label])
    # normalize p_obs and p_sim
    return count_df[label].values / n_obs
    

In [None]:
def compute_unadjusted_counts(df, stn, bitrate):
    bin_edges = compute_log_uniform_bins(df, stn, bitrate)
    label = f'{stn}_simple_{bitrate}bits'
    df[label] = np.digitize(df[stn], bin_edges)
    # print(df[[stn, f'{stn}_quantized_{bitrate}bits']].head(4))
    # count the occurrences of each quantized value
    # the "simulated" series is the proxy/donor series
    # and the "observed" series is the target location
    obs_count_df = df.groupby(label).count()
    count_df = pd.DataFrame(index=range(2**bitrate))
    count_df[label] = 0
    count_df[label] += obs_count_df[stn]
    count_df.fillna(0, inplace=True)
    adjusted_p = count_df / obs_count_df[stn].sum()
    return adjusted_p.values.flatten()

In [None]:
def compute_error_bias(inputs):
    df, stn, b, err = inputs
    t0 = time()
    simple_frequencies = compute_unadjusted_counts(df, stn, b)
    error_adjusted_frequencies = apply_error_to_observations(df, stn, bitrate=b, error=err)
    # compute KL divergence between the simple and adjusted frequencies
    # this represents the distortion due to the error model
    mask = (simple_frequencies > 0) & (error_adjusted_frequencies > 0)
    distortion = np.zeros_like(simple_frequencies)
    distortion[mask] = simple_frequencies[mask] * np.log2(simple_frequencies[mask] / error_adjusted_frequencies[mask])
    kld = sum(distortion)
    return stn, kld, b, err

In [None]:
n = 0
distortions = {}
distortion_fig = figure(width=600, height=400, y_axis_type='log')
error_models = [1e-12, 1e-4, 1e-3, 2.5e-3, 5e-3, 1e-2, 2.5e-2, 5e-2, 1e-1, 2e-1]
stn_df = dpf.get_timeseries_data(test_stn)
for b in range(3, 13):
    distortions[b] = []
    print(f'computing {b} bits')
    for err in error_models:
        t1 = time()
        stn, kld, b, err = compute_error_bias((stn_df, test_stn, b, err))
        distortions[b].append(kld)
    
    err_labels = [100*e for e in error_models]
    distortion_fig.line(err_labels, distortions[b], line_width=2, line_color=Sunset10[n],
                   legend_label=f'{b} bits')
    n += 1

distortion_fig.legend.location = 'top_left'
distortion_fig.legend.click_policy = 'hide'
distortion_fig.xaxis.axis_label = r'$$\text{Error Model } [\%]$$'
distortion_fig.yaxis.axis_label = r'$$\text{Error Bias } [\text{bits}/\text{sample}]$$'
distortion_fig.add_layout(distortion_fig.legend[0], 'right')

In [None]:
distortion_fig = dpf.format_fig_fonts(distortion_fig)
show(distortion_fig)

### Compute the distortion on all samples

Above we looked at one example.  There are 1325 in the dataset. Evaluate the distribution of distortion across the dataset based on a range of error models.

In [None]:
pcts = [2.5, 25, 50, 75, 98.5]
dfigs = []
distortions = {b: {} for b in bitrates_even}
for b in bitrates_even:
    distortions[b] = {e: [] for e in error_models}
i = 0
t0 = time()
all_results = []
n_stns = len(attr_df)
error_model_bias_fname = 'data/error_model_distortion/error_model_distortion_test.csv'
if not os.path.exists(error_model_distortion_fname):
    for stn in attr_df['official_id'].values:
        stn_df = dpf.get_timeseries_data(stn)
        inputs = [
            (stn_df, stn, b, err)
            for b in bitrates_even
            for err in error_models
        ]
        with mp.Pool() as pool:
            results = pool.map(compute_error_bias, inputs)
            rdf = pd.DataFrame(results, columns=['official_id', 'value', 'bitrate', 'err'])
            all_results.append(rdf)
        i += 1
        if i % 50 == 0:
            et = time() - t0
            print(f'{i}/{n_stns} stns processed in {et/60:.1f}min')
        # if i % 100 == 0:
        #     break
    results_df = pd.concat(all_results, axis=0)
    results_df.to_csv(error_model_bias_fname, index=False)
else:
    results_df = pd.read_csv(error_model_bias_fname)


In [None]:
bound_dict = {}
for b in bitrates_even:
    if b in [3, 5, 7, 9, 11]:
        continue
    err_bounds = []
    for err in error_models:
        # get all values for the bitrate and error model
        data = results_df[(results_df['bitrate'] == b) & (results_df['err'] == err)].copy()
        # get percentiles of the sample
        err_bounds += [np.percentile(data['value'].values, pcts)]
    bound_dict[b] = pd.DataFrame(err_bounds, columns=pcts)
    bound_dict[b]['err'] = error_models

In [None]:
dfigs = []
for b in range(3, 13):
    if b in [3, 5, 7, 9, 11]:
        continue
    dfig = figure(width=500, height=400, y_axis_type='log')
    if len(dfigs) > 0:
        dfig = figure(width=500, height=400, y_range=dfigs[-1].y_range,
                     y_axis_type='log')

    err_labels = [100*e for e in error_models]
    data = bound_dict[b].copy()
    error = np.round(100*data['err'], 0)
    dfig.varea(data['err'], y1=data[2.5], y2=data[98.5], 
               color='grey', fill_alpha=0.4, legend_label='95% CI')
    dfig.varea(data['err'], y1=data[25], y2=data[75], 
               color='black', fill_alpha=0.4, legend_label='IQR')
    dfig.line(data['err'], data[50], line_width=2, line_color='red',
                       legend_label=f'Median')
    n += 1
    dfig.legend.location = 'top_left'
    dfig.xaxis.axis_label = r'$$\text{RC Error Model} [\%]$$'
    
    if (len(dfigs) == 0) | (len(dfigs) == 3):
        dfig.legend.background_fill_alpha = 0.5
        dfig.yaxis.axis_label = r'$$\text{Distortion } [\text{bps}]$$'
    if len(dfigs) > 0:
        dfig.legend.visible = False
    dfig = dpf.format_fig_fonts(dfig)
    dfigs.append(dfig)    

In [None]:
layout = gridplot(dfigs, ncols=3, width=400, height=350)
show(layout)

In [None]:
# the bitrate dictates the number of quantization levels = 2**b, i.e. 4 bits = 16 levels
quant_labels = []
filename = 'BCUB_watershed_attributes_updated.csv'
bitrates = list(range(3, 13))
entropy_fpath = os.path.join(BASE_DIR, 'data', 'processed_divergence_inputs', attributes_filename)
if not os.path.exists(entropy_fpath):
    for bitrate in bitrates:
        label = f'H_{bitrate}_bits'
        print(f'Processing {bitrate} bit entropy')
        df[label] = df.apply(lambda row: dpf.compute_observed_series_entropy(row, bitrate), axis=1) 
        quant_labels.append(label)
    # save the results
    df.to_csv(entropy_fpath, index=False)
else:
    df = pd.read_csv(entropy_fpath)
    quant_labels = [e for e in df.columns if e.startswith('H_')]

In [None]:
# plot the CDFs of entropy by quantization
fig = figure(width=600, height=350, x_axis_label=r'$$H(X)$$', y_axis_label=r'$$P(H)$$')
n = 0
for l in quant_labels:
    x, y = dpf.compute_cdf(df[l])
    fig.line(x, y, legend_label=' '.join(l.split('_')[1:]), line_width=2, color=Sunset10[n])
    n += 1
fig.legend.location = 'top_left'
fig.legend.background_fill_alpha = 0.5
show(fig)

## Parametric Distribution Fitting

For each timeseries, generate the sufficient statistics based on an asumed parametric distribution.  

>*Sufficient statistics are a set of scalar (or vector) valued summaries that, combined with a statistical model, contain all the information needed to estimate the parameters of a probability distribution.*



## Pairwise f-divergence processing

There are roughly 900K pairings.  To speed up the processing and avoid losing progress, we process these in parallel in batches and save the results intermittently.

In [None]:
# load the attributes file with catchment geometries
geom_file = 'BCUB_watershed_attributes_updated.geojson'
bcub_gdf = gpd.read_file(os.path.join(os.getcwd(), 'data', geom_file))
bcub_gdf.columns = [c.lower() for c in bcub_gdf.columns]

In [None]:
# ensure that all stations appear in both datasets
gdf_ids = list(set(bcub_gdf['official_id'].values))
df_ids = list(set(df['official_id'].values))
diff_ids = np.setdiff1d(gdf_ids, df_ids)
assert len(diff_ids) == 0

In [None]:
import itertools

unique_stations = list(set(df['official_id'].values))
# generate all combinations of pairs of station ids
id_pairs = list(itertools.combinations(unique_stations, 2))
print(f' There are {len(id_pairs)} unique pairings in the dataset')
# shuffle the pairs to make testing smaller batches more robust
np.random.seed(42)
np.random.shuffle(id_pairs)

Review, organize, and separate the attribute and metadata columns.

In [None]:
attr_cols = [
    'drainage_area_km2', 'elevation_m', 'slope_deg', 'aspect_deg', 
    'land_use_forest_frac_2010','land_use_grass_frac_2010', 'land_use_wetland_frac_2010',
    'land_use_water_frac_2010', 'land_use_urban_frac_2010', 'land_use_shrubs_frac_2010', 
    'land_use_crops_frac_2010', 'land_use_snow_ice_frac_2010', 'logk_ice_x100', 'porosity_x100'
]

climate_cols = [
    'tmax', 'tmin', 'prcp', 'srad', 'swe', 'vp', 
    'high_prcp_freq', 'low_prcp_freq', 'high_prcp_duration', 'low_prcp_duration',
]

flag_cols = ['flag_shape_extraction', 'flag_terrain_extraction', 'flag_subsoil_extraction', 'flag_gsim_boundaries', 'flag_artificial_boundaries', 'flag_land_use_extraction']
metadata_cols = [e for e in df.columns if e not in climate_cols + attr_cols]

### Define input variables

In [None]:
# set a revision date for the results output file
revision_date = '20241023'

# how many pairs to compute in each batch
batch_size = 5000
batch_size = 10

# # what percentage of 365 observations in a year counts as a "complete" year
# completeness_threshold = 0.9
# min_observations = 365 * 0.9

# station pairs with less than min_years concurrent years of data are excluded (for concurrent analysis),
# stations with less than min_years are excluded (for non-concurrent analysis),
min_years = 1 #[2, 3, 4, 5, 10]

# a prior is applied to q in the form of a uniform array of 10**c pseudo-counts "c"
# this prior is used to test the effect of the choice of prior on the model
pseudo_counts = [-5, -4, -3, -2, -1, -0.5, -0.2, -0.1, 0, 0.1, 0.2, 0.5, 1, 2, 3, 4, 5]

# set the number of quantization levels to test, equal to 2^bitrate
bitrates = [3, 4, 5, 6, 7, 8, 9, 10, 11, 12]

## Process the data 


```{note}
This step is very time consuming, you can skip by downloading the processed files as described at the [top of the page](https://dankovacek.github.io/divergence_measures/notebooks/1_data.html)
```


In [None]:
def input_batch_generator(df, id_pairs_filtered, bitrate, error_df,
                          min_years, use_partial_counts, attr_cols, 
                          climate_cols, pseudo_counts):
    
    # Preload all records into a dictionary for fast lookup
    records_dict = bcub_gdf.copy().set_index('official_id').to_dict(orient='index')

    unique_ids = list(set(id_pairs_filtered.flatten()))
    record_ids = bcub_gdf['official_id'].values

    foo = np.setdiff1d(unique_ids, record_ids)
    
    batch_inputs = []
    for proxy, target in id_pairs_filtered:
        
        proxy_dict = records_dict.get(proxy, {})
        target_dict = records_dict.get(target, {})

        proxy_dict['official_id'] = proxy
        target_dict['official_id'] = target

        assert 'geometry' in proxy_dict.keys(), proxy_dict.keys()
        assert 'geometry' in target_dict.keys(), target_dict.keys()
        
        batch = [
            proxy_dict, target_dict, bitrate, 
            error_df, min_years, attr_cols, climate_cols, pseudo_counts
        ]
        batch_inputs.append(batch)
    return batch_inputs

In [None]:
temp_dir = os.path.join(os.getcwd(), 'data/', 'temp')
if not os.path.exists(temp_dir):
    os.makedirs(temp_dir)

In [None]:
# load the distortion error dataframe
error_model_distortion_fname = 'data/error_model_distortion/error_model_distortion_test.csv'
error_model_df = pd.read_csv(error_model_distortion_fname)
error_model_df.head()

In [None]:
def compute_posterior_Q_probabilities(
    simple_count_df, partial_count_df, target, bitrate, pseudo_counts, concurrent_data, bin_edges, p_errors
):
    n_obs = np.nansum(simple_count_df[target.obs_label])
    n_sim = np.nansum(simple_count_df[target.sim_label])

    if concurrent_data:
        assert round(n_obs, 0) == round(n_sim, 0), f"Number of observations and simulations do not match. n_obs={n_obs}, n_sim={n_sim}"

    # normalize p_obs and p_sim
    p_obs = count_df[target.obs_label].values / n_obs
    p_sim = count_df[target.sim_label].values / n_sim
    assert round(np.sum(p_sim), 2) == 1.0, "p_sim does not sum to 1.0"

    p_df = pd.DataFrame()
    p_df['p_simple_counts'] = p_obs
    p_df['p_partial_counts'] = partial_count_df[target.obs_label].values / n_obs

    # create a dataframe to store the posterior Q after assuming different priors
    q_df = pd.DataFrame()
    # first save Q, the model (simulated) distribution
    q_df["q_sim_no_prior"] = p_sim
    

    uniform_p = [1.0 / 2.0**bitrate for _ in range(2**bitrate)]
    q_df["q_uniform"] = test_probability_distribution_sums_to_one(uniform_p, bitrate)

    # compute the posterior probabilities based on
    # a wide range of priors to test sensitivity
    for pseudo_counts in pseudo_counts:
        adjusted_counts = [x + 10**pseudo_counts for x in count_df[target.sim_label]]
        tot_adjusted_counts = np.nansum(adjusted_counts)
        q_df[f"q_post_{pseudo_counts}R"] = adjusted_counts / tot_adjusted_counts
        assert (
            np.round(q_df[f"q_post_{pseudo_counts}R"].sum(), 5) == 1
        ), "Posterior probabilities do not sum to 1."

    return p_obs, q_df

In [None]:
def process_probabilities(
    df, proxy, target, bitrate, concurrent_data, pseudo_counts, p_errors
):
    # compute the bin edges based on equal width in log space
    bin_edges = dpf.uniform_log_bins(df, proxy, bitrate)

    # if partial_counts == False:
        # computes the observed P and simulation Q distribution probabilities
        # as dicts by bin number, probability key-value pairs
        # test a wide range of uniform priors via pseudo counts
    simple_count_df = dpf.compute_unadjusted_counts(
        df, target, bin_edges, bitrate, concurrent_data
    )
    # add a uniformly distributed error to the observed data
    # and compute probabilities from partial observation counts
    # where counts are divided based on the proportion of the bin
    # that the measurement error falls within
    t0 = time()
    fractional_obs_counts = dpf.error_adjusted_fractional_bin_counts(
        df[target.obs_label], np.array(bin_edges), bitrate, error_factor=0.1
    )
    fractional_sim_counts = dpf.error_adjusted_fractional_bin_counts(
        df[target.sim_label], np.array(bin_edges), bitrate, error_factor=0.1
    )
    t1 = time()
    # print(f' {t1-t0:.2f}s to process fractional bin counts')

    partial_count_df = pd.DataFrame(index=range(2**bitrate))
    partial_count_df[target.obs_label] = 0
    partial_count_df[target.sim_label] = 0
    partial_count_df[target.obs_label] += fractional_obs_counts
    partial_count_df[target.sim_label] += fractional_sim_counts
    partial_count_df.fillna(0, inplace=True)

    # p_obs is the P, the observed target binned using the 
    # observed range of the model to "optimize" the model for P
    # this is because DKL is the extra bits per sample caused by
    # encoding a set of observations on a sub-optimal model P
    p_obs, p_sim = compute_posterior_Q_probabilities(
        simple_count_df, partial_count_df, target, bitrate, pseudo_counts, concurrent_data, bin_edges, p_errors,
    )
    
    # the prior adds some amount of noise to the distribution
    # we should compute this noise (via the KL divergence 
    # between the model q and the posterior r
    
    # set a flag where the simulation counts zero in any state
    # where the a posteriori observation counts are > 0.
    # it's these cases where the prior should have the greatest 
    # influence on the KL divergence.
    underspecified_flag = (
        (count_df[f'{target.official_id}_sim'] == 0) & 
        (count_df[target.official_id] > 0)
    ).any()
    
    return p_obs, p_sim, bin_edges, underspecified_flag

In [None]:
def process_divergences(result, p_obs, p_sim, bin_edges, bitrate, concurrent_data):
    
    dkl_by_prior = dpf.process_KL_divergence(p_obs, p_sim, bitrate, concurrent_data)

    p = p_obs
    q = p_sim["q_sim_no_prior"].values
    q_uniform = p_sim["q_uniform"].values

    tvd_result = dpf.compute_tvd(p, q, q_uniform, concurrent_data)
    wd_result = dpf.compute_wasserstein_distance(
        bin_edges, p, q, q_uniform, concurrent_data
    )

    result.update(tvd_result)
    result.update(wd_result)
    result.update(dkl_by_prior.to_dict())

    return result

In [None]:
def process_KL_divergence(p_obs, p_sim, bitrate, concurrent_data):
    # dkl_df = uf.compute_kl_divergence(p_obs, p_sim, bitrate, concurrent_data)
    df = pd.DataFrame()
    df["bin"] = range(1, 2**bitrate + 1)
    df.set_index("bin", inplace=True)

    for c in p_sim.columns:
        if c == "q_sim_no_prior":
            continue

        # explicitly set data types before vectorization
        p = np.array(p_obs, dtype=np.float64)
        q = np.array(p_sim["q_sim_no_prior"].values, dtype=np.float64)
        r = np.array(p_sim[c].values, dtype=np.float64) # the posterior
        
        # Raise error if any r[i] == 0, as log(p/r) is undefined for those cases
        if np.any(r == 0):
            raise ValueError("Posterior R contains zero entries, which is not allowed.")

        # ensure that the probabilities sum to 1
        check_distribution(p, r, c)

        label = "dkl_nonconcurrent_" + "_".join(c.split("_")[1:])        
        if concurrent_data is True:
            label = "dkl_concurrent_" + "_".join(c.split("_")[1:])

        # compute the distortion due to the prior assumed on Q
        mask = (p > 0) & (r > 0)
        kld_array = np.zeros_like(p)
        kld_array[mask] = p[mask] * np.log2(p[mask] / r[mask])
        df[label] = kld_array
            
        # compute DKL(Q||R), or the distortion of Q 
        # due to the assumed prior
        mask = (q > 0) & (r > 0)
        distortion = np.zeros_like(q)
        distortion[mask] = q[mask] * np.log2(q[mask] / r[mask])
        label = "dkl_prior_distortion_" + "_".join(c.split("_")[1:])
        df[label] = distortion

    sum_dkl = df.sum()

    if any(sum_dkl.values) <= 0:
        print(f"negative or zero dkl")
        print(sum_dkl.values)

    return sum_dkl

In [None]:
def process_divergences(result, p_obs, p_sim, bin_edges, bitrate, concurrent_data):
    dkl_by_prior = process_KL_divergence(p_obs, p_sim, bitrate, concurrent_data)

    p = p_obs
    q = p_sim["q_sim_no_prior"].values
    q_uniform = p_sim["q_uniform"].values

    tvd_result = compute_tvd(p, q, q_uniform, concurrent_data)
    wd_result = compute_wasserstein_distance(
        bin_edges, p, q, q_uniform, concurrent_data
    )

    result.update(tvd_result)
    result.update(wd_result)
    result.update(dkl_by_prior.to_dict())

    return result

In [None]:
def process_batch(inputs):    
    (
        proxy,
        target,
        bitrate,
        error_df,
        min_concurrent_years,
        attr_cols,
        climate_cols,
        pseudo_counts,
    ) = inputs

    proxy_id, target_id = proxy['official_id'], target['official_id']
    bitrate = int(bitrate)

    # create a result dict object for tracking results of the batch comparison
    result = {
        "proxy": proxy_id,
        "target": target_id,
        "bitrate": bitrate,
        "min_concurrent_years": min_concurrent_years,
    }

    station_info = {"proxy": proxy, "target": target}

    # check if the polygons are nested
    result["nested_catchments"] = dpf.check_if_nested(
        proxy, target
    )

    # for stn in pair:
    proxy = dpf.Station(station_info["proxy"], bitrate)
    target = dpf.Station(station_info["target"], bitrate)

    # compute spatial distance
    p1, p2 = (
        station_info["proxy"]["geometry"].centroid,
        station_info["target"]["geometry"].centroid,
    )
    # compute the distance between catchment centroids (km)
    centroid_distance = p1.distance(p2) / 1000
    result["centroid_distance"] = round(centroid_distance, 2)
    if centroid_distance > 1000:
        return None

    if np.isnan(target.drainage_area_km2):
        raise ValueError(f"No drainage area for {target_id}")
    if np.isnan(proxy.drainage_area_km2):
        raise ValueError(f"No drainage area for {proxy_id}")

    # Retrieve the data for both stations
    # this is all data, including non-concurrent
    adf = dpf.retrieve_nonconcurrent_data(proxy_id, target_id)

    assert ~adf.empty, "No data returned."

    for stn in [proxy, target]:
        adf = dpf.transform_and_jitter(adf, stn)

    # simulate flow at the target based on equal unit area runoff scaling
    adf[target.sim_label] = adf[proxy.id] * (
        target.drainage_area_km2 / proxy.drainage_area_km2
    )

    # filter for the concurrent data
    df = adf.copy().dropna(subset=[proxy_id, target_id], how="any")
    result["num_concurrent_obs"] = len(df)
    
    if df.empty:
        num_complete_concurrent_years = 0
    else:
        df.reset_index(inplace=True)
        num_complete_concurrent_years = dpf.count_complete_years(df, 'time', proxy_id)
        
    counts = df[[proxy_id, target_id]].count(axis=0)
    counts = adf.count(axis=0)
    proxy.n_obs, target.n_obs = counts[proxy_id], counts[target_id]
    result[f"proxy_n_obs"] = proxy.n_obs
    result[f"target_n_obs"] = target.n_obs
    result[f"proxy_frac_concurrent"] = len(df) / proxy.n_obs
    result[f"target_frac_concurrent"] = len(df) / target.n_obs

    if (counts[proxy_id] == 0) or (counts[target_id] == 0):
        print(f"   Zero observations.  Skipping.")
        return None

    # for each of the error models, look up the model error distortion
    # for the proxy (distortion of P) and add it to the results
    p_errors = error_df[error_df['official_id'] == proxy.official_id]

    # process the PMFs and divergences for concurrent data
    # using a range of uniform priors via pseudo counts
    if num_complete_concurrent_years > min_concurrent_years:
        # compute coefficient of determination
        result["cod"] = dpf.compute_cod(df, proxy, target)

        # compute Nash-Sutcliffe efficiency
        result["nse"] = dpf.compute_nse(df, proxy, target)

        # compute the Kling-Gupta efficiency
        result["kge"] = dpf.compute_kge(df, proxy, target)

        # df is concurrent data, so the results
        # are updating concurrent data here
        # df, proxy, target, bitrate, concurrent_data, partial_counts, pseudo_counts
        concurrent_data = True
        p_obs, p_sim, bin_edges, underspecified_flag = process_probabilities(
            df, proxy, target, bitrate, concurrent_data, pseudo_counts, p_errors
        )
    if (target.n_obs > 365 * 0.9) & (proxy.n_obs > 365 * 0.9):
        # adf is all data (includes non-concurrent), so the results
        # are updated if both series meet the minimum length
        concurrent_data = False
        p_obs, p_sim, bin_edges, underspecified_flag = process_probabilities(
            adf, proxy, target, bitrate, concurrent_data, pseudo_counts, p_errors
        )
        
    result = dpf.process_divergences(
        result, p_obs, p_sim, bin_edges, bitrate, concurrent_data
    )
    result['underspecified_model_flag'] = underspecified_flag
    return result


In [None]:
# the 'process' variable is here so jupyter doesn't go computing 
# a million rows per iteration when the book is built for pushing to github pages.
reordered_bitrates = [4, 6, 8, 10, 12, 3, 5, 7, 9, 11]
process = True
if process: 
    for bitrate in reordered_bitrates:
        print(f'Processing pairs at {bitrate} bits quantization (partial counts={partial_counts})')
        results_fname = f'KL_results_{bitrate}bits_{revision_date}.csv'
        # if partial_counts == True:
        #     results_fname = results_fname.replace('.csv', '_partial_counts.csv')

        out_fpath = os.path.join('data/', 'processed_divergence_inputs', results_fname)
        if os.path.exists(out_fpath):
            continue

        n_batches = max(len(id_pairs) // batch_size, 1)
        batches = np.array_split(np.array(id_pairs, dtype=object), n_batches)
        n_pairs = len(id_pairs)
        print(
            f"    Processing {n_pairs} pairs in {n_batches} batches at {bitrate} bits"
        )
        batch_no = 1
        batch_files = []
        t0 = time()
        error_df = error_model_df[error_model_df['bitrate'] == bitrate].copy()
        for batch_ids in batches:
            print(f'Starting batch {batch_no}/{len(batches)} processing.')
            batch_fname = results_fname.replace('.csv', f'_batch_{batch_no:03d}.csv')
            batch_output_fpath = os.path.join(temp_dir, batch_fname)
            if os.path.exists(batch_output_fpath):
                batch_files.append(batch_output_fpath)
                batch_no += 1
                continue
            
            # define the input array for multiprocessing
            inputs = input_batch_generator(bcub_gdf, batch_ids, bitrate, error_df,
                     min_years, partial_counts, attr_cols, climate_cols, pseudo_counts)

            with mp.Pool(1) as pool:
                results = pool.map(process_batch, inputs)
                results = [r for r in results if r is not None]

            batch_result = pd.DataFrame(results)
            print(batch_result)
            print(asdf)
            if batch_result.empty:
                print('Empty batch.  Skipping')
            else:
                batch_result.to_csv(batch_output_fpath, index=False)
                print(f"    Saved {len(batch_result)} new results to file.")
            
            batch_files.append(batch_output_fpath)
            t2 = time()
            print(f'    Processed {len(batch_ids)} pairs at ({bitrate} bits) in {t2 - t0:.1f} seconds')
            batch_no += 1
            
        print(f'    Concatenating {len(batch_files)} batch files.')
        if len(batch_files) > 0:
            all_results = pd.concat([pd.read_csv(f, engine='pyarrow') for f in batch_files], axis=0)
            all_results.to_csv(out_fpath, index=False)
            if os.path.exists(out_fpath):
                for f in batch_files:
                    os.remove(f)
            print(f'    Wrote {len(all_results)} results to {out_fpath}')
        else:
            print('    No new results to write to file.')

## Compare Parametric vs. Nonparametric 

In [None]:
b = 10
rdate = '20241025'

pfile = f'KL_parametric_fits_{b}bits_{rdate}.csv'
out_fpath = os.path.join('data/', 'parametric_divergence_test', pfile)
param_df = pd.read_csv(out_fpath)

rdate = '20241016'
npfile = f'KL_results_{b}bits_{rdate}.csv'
out_fpath = os.path.join('data/', 'processed_divergence_inputs', npfile)
np_df = pd.read_csv(out_fpath)


In [None]:
param_df_sorted = param_df.sort_values(['proxy', 'target']).reset_index(drop=True)
np_df_sorted = np_df.sort_values(['proxy', 'target']).reset_index(drop=True)
# Compare row-by-row to ensure the pairings are identical
if param_df_sorted[['proxy', 'target']].equals(np_df_sorted[['proxy', 'target']]):
    print("All pairings match after sorting!")
else:
    print("Mismatch(es) found in pairings.")

In [None]:
# Optional: Find mismatched rows if needed
# Optional: Find mismatched rows
mismatches = param_df_sorted[~param_df_sorted[['proxy', 'target']].apply(tuple, axis=1)
                         .isin(np_df_sorted[['proxy', 'target']].apply(tuple, axis=1))]

if not mismatches.empty:
    print("Mismatched rows:")
    print(len(mismatches))

# Perform an inner join to find common rows based on (c1, c2)
common_rows = pd.merge(param_df_sorted, np_df_sorted, on=['proxy', 'target'], how='inner', suffixes=('_param', '_np'))

In [None]:
pcts = np.linspace(0, 100, 200)
comp_df = pd.DataFrame()
comp_df['pcts'] = pcts / 100
comp_cols = [c for c in np_df.columns if c.startswith('dkl_concurrent_post')]
for c in comp_cols:
    prior = c.split('_')[-1].split('R')[0]
    data = common_rows[[c, 'dkl_nonconcurrent_sim_lognorm_cdf']].copy()
    data.dropna(how='any', inplace=True)
    ratios = data['dkl_nonconcurrent_sim_lognorm_cdf'] / data[c] 
    ratios = ratios.clip(lower=0, upper=100)
    comp_df[f'lognorm_vs_{b}bit_{prior}_np'] = np.percentile(ratios.values, pcts)

In [None]:
from bokeh.palettes import Category20

pal = Category20[17]

ratio_cdf = figure(title=f'{b} bits', width=700, height=400)#, y_axis_type='log')
stn_df = dpf.get_timeseries_data(test_stn)
n = 0
for c in comp_cols:
    prior = c.split('_')[-1].split('R')[0]
    ratio_cdf.line(comp_df[f'lognorm_vs_{b}bit_{prior}_np'], comp_df['pcts'], line_width=2, line_color=pal[n],
                   legend_label=f'10^{prior}')
    n += 1
ratio_cdf.legend.ncols = 2
ratio_cdf.legend.click_policy='hide'
ratio_cdf.xaxis.axis_label = r'$$ D_\text{KL} \text{ ratio}\text{ lognorm} / \text{nonparametric fits}$$'
ratio_cdf.yaxis.axis_label = r'$$Pr(x \leq X)$$'
ratio_cdf.add_layout(ratio_cdf.legend[0], 'right')
ratio_cdf = dpf.format_fig_fonts(ratio_cdf, font_size=16)

In [None]:
show(ratio_cdf)

In [None]:
show(ratio_cdf)

In [None]:
show(ratio_cdf)

In [None]:
show(ratio_cdf)

## Citations

```{bibliography}
:filter: docname in docnames
```