# Importance Weighted Functions can be Deduped and Even Approximated

## Installation Instructions

`conda env create -n dev python=3.7 numpy numba scikit-learn hypothesis`

`conda activate dev`

In [1]:
from typing import Dict, Callable, Iterable, Optional, Sequence, Tuple, TypeVar
import numpy as np

X = TypeVar("X")
Y = TypeVar("Y")
NpArray = TypeVar("NpArray", bound=np.ndarray)

## Motivating Example

The computation needed to compute a weighted sum can be reduced by first grouping by values and then summing the weights.  Notice the number of necessary operations is halved in this example.

$$ 
\begin{align*}
6.3 &= (0.15 \times 5 + 0.05 \times 5) + (0.1 \times 6 + 0.2 \times 6) + (0.25 \times 7 + 0.25 \times 7) \\
    &= (0.15 + 0.05) \times 5 + (0.1 + 0.2) \times 6 + (0.25 + 0.25) \times 7 \\
    &= 0.2 \times 5 + 0.3 \times 6 + 0.5 \times 7
\end{align*}
$$

In [2]:
# 6 multiplies, 5 additions

wt = np.array([0.15, 0.05,   0.10, 0.20,   0.25, 0.25])
x  = np.array([5,    5,      6,    6,      7,    7])

np.testing.assert_allclose(
    np.dot(wt, x),
    6.3
)

In [3]:
# 3 multiplies, 2 additions

wt = np.array([0.2, 0.3, 0.5])
x = np.array([5, 6, 7])

np.testing.assert_allclose(
    np.dot(wt, x), 
    6.3
)

In [4]:
# 3 multiplies, 2 additions

np.testing.assert_allclose(
    0.2 * 5 + 0.3 * 6 + 0.5 * 7,
    6.3
)

## Implementation

### Transformations Under Test

We want to test that an algorith is invariant to the 3 public methods, listed in the following cell.

In [5]:
def one_weighted(xs: np.ndarray, dtype: np.dtype=np.float64) -> np.ndarray:
    return np.ones(xs.shape[0], dtype=dtype)


def remove_zero_weights(xs: NpArray, wts: np.ndarray) -> Tuple[NpArray, np.ndarray]:
    nzi = np.nonzero(wts)[0]
    return(xs[nzi], wts[nzi])
    
# TODO: To enable numba.  May or may not help.
# NOTE: Eager compilation.
# import numba   # requires numba conda dependency.
# @numba.jit(
#     [
#         "int32[:](int64[:], int32[:])",
#         "int64[:](int64[:], int64[:])",
#         "float32[:](int64[:], float32[:])",
#         "float64[:](int64[:], float64[:])"
#     ],
#     nopython=True, nogil=True, cache=True
# )
def _compressed_weights(group_ind: np.ndarray, wt: np.ndarray) -> np.ndarray:
    # sum the weights in between indices group_ind[j] and group_ind[j+1]
    # and associate with index j in the output.
    
    n = wt.shape[0]
    m = group_ind.shape[0]
    comp_wt = np.zeros(shape=m, dtype=wt.dtype)
    for j in range(m - 1):
        comp_wt[j] = np.sum(wt[group_ind[j]:group_ind[j+1]])

    comp_wt[m - 1] = np.sum(wt[group_ind[-1]:n])

    return comp_wt


def _beginning_ind_contiguous_blocks(
    xs: np.ndarray, 
    key: Optional[Callable[[NpArray], np.ndarray]]
) -> np.ndarray:
    # Find the indices for the beginning of contiguous runs within xs.
    # Based on https://stackoverflow.com/a/19125898
    if key is not None:
        xs = key(xs)
    return np.concatenate([[0], 1 + np.nonzero(xs[:-1] != xs[1:])[0]])


def compressed(
    xs: NpArray, 
    wts: Optional[np.ndarray] = None, 
    xs_commute: bool = False,
    remove_zeros: bool = True,
    key: Optional[Callable[[NpArray], np.ndarray]] = None
) -> Tuple[NpArray, np.ndarray]:
    """Compress xs.  
    
    This can be weighted by wts.  If xs_commute, xs will be sorted prior
    to compression to increase compression.
    
    params:
        xs: NpArray (shape: (n,)) - Array to compress.
        wts: Optional[np.ndarray] (default None) - if supplied, shape is (n,).
            weights to aggregate.
        xs_commute: bool (default False) - whether the values in xs commute
            in the algorithm that uses them.
        remove_zeros: bool (default True) - Whether to remove zero-weighted
            rows prior to returning.
        key: Optional[Callable[[NpArray], np.ndarray]] (default None) -
            If provided, this will be used as the basis of comparison.
    returns: Tuple[NpArray, np.ndarray]
        compressed version of xs and wts
        
    usage:
    In [1]: compressed( 
       ...:     np.rec.fromarrays( 
       ...:         [ 
       ...:             np.array([1,   0,   1,   0]),  
       ...:             np.array([0.4, 0.3, 0.4, 0.3]) 
       ...:         ], 
       ...:         dtype=[('y', np.uint8), ('p', np.float32)] 
       ...:     ), 
       ...:     np.array([1,2,3,0]), 
       ...:     xs_commute=True 
       ...: )                                                                                                                                
    Out[1]: 
    (rec.array([(0, 0.3), (1, 0.4)],
               dtype=[('y', 'u1'), ('p', '<f4')]),
     array([2, 4]))
    In [2]: compressed( 
       ...:     np.array([0.4, 0.3, 0.4, 0.3]), 
       ...:     np.array([1,2,3,0]), 
       ...:     xs_commute=True 
       ...: )                                                                                                                                
    Out[2]: (array([0.3, 0.4]), array([2, 4]))
    In [3]: compressed(  
       ...:     np.rec.fromarrays(  
       ...:         [  
       ...:             np.array([1,   0,   1,    0]),   
       ...:             np.array([0.4, 0.3, 0.41, 0.3])  
       ...:         ],  
       ...:         dtype=[('y', np.uint8), ('p', np.float32)]  
       ...:     ),  
       ...:     np.array([1,2,3,0]),  
       ...:     xs_commute=True, 
       ...:     key=lambda x: x.y 
       ...: )                                                                                                                               
    Out[3]: 
    (rec.array([(0, 0.3), (1, 0.4)],
               dtype=[('y', 'u1'), ('p', '<f4')]),
     array([2, 4]))
    """
    # NOTE: If xs_commute, histograms may help improve speed and
    #       memory usage at the expense of precision.

    if wts is None:
        wts = np.ones(xs.shape[0], dtype=np.int32)

    # If the xs commute, then any permutation is OK.  Sorting the
    # array will put similar elements together, and maximize the
    # compression when _beginning_ind_contiguous_blocks is called.
    if xs_commute:
        ind = xs.argsort()
        xs = xs[ind]
        wts = wts[ind]

    group_ind = _beginning_ind_contiguous_blocks(xs, key)
    compressed_xs = xs[group_ind]
    if isinstance(xs, np.recarray):
        compressed_xs = compressed_xs.view(np.recarray)

    compressed_wts = _compressed_weights(group_ind, wts)

    if remove_zeros:
        compressed_xs, compressed_wts = \
            remove_zero_weights(compressed_xs, compressed_wts)
    return compressed_xs, compressed_wts

### Random Variate Creation

*Very basic!*  Hypothesis does it better.

In [6]:
def generate_random_example(
    rng: np.random.Generator, 
    xs_generator: Callable[[np.random.Generator, int], Iterable[X]],
    min_list_size:int = 5,
    max_list_size: int = 25,
    min_wt_value: int = 0,
    max_wt_value: int = 100,
    int_weights: bool = False
):
    n = rng.integers(min_list_size, max_list_size + 1)

    wts = (
        rng.integers(min_wt_value, max_wt_value + 1, size=n)
        if int_weights else
        min_wt_value + (max_wt_value - min_wt_value) * rng.random(size=n)
    )
    
    xs = xs_generator(rng, n)
    
    return xs, wts

### Properties to Test

These are generic tests that we can reuse across algorithms and types of equality assertions.

In [7]:
def assert_no_weight_like_one_weights(
    xs: NpArray,
    *,
    algo: Callable[[NpArray, Optional[np.ndarray]], Y],
    equality_assertion: Callable[[Y, Y], None]
) -> None:
    orig_no_weight_value = algo(xs, None)
    one_wts = algo(xs, one_weighted(xs))
    equality_assertion(one_wts, orig_no_weight_value)


def assert_zero_weights_have_no_effect(
    xs: NpArray,
    wts: np.ndarray,
    *,
    algo: Callable[[NpArray, Optional[np.ndarray]], Y],
    equality_assertion: Callable[[Y, Y], None]
) -> None:
    assert xs.shape[0] == wts.shape[0]
    orig_weighted_value = algo(xs, wts)
    pos_weights_only = algo(*remove_zero_weights(xs, wts))
    equality_assertion(pos_weights_only, orig_weighted_value)
    

def assert_weight_consolidation_yields_same_result(
    xs: NpArray,
    wts: np.ndarray,
    *,
    algo: Callable[[NpArray, Optional[np.ndarray]], Y],
    equality_assertion: Callable[[Y, Y], None],
    preprocess_xs: Optional[Callable[[NpArray], NpArray]] = None,
    xs_commute: bool = False
) -> None:
    assert xs.shape[0] == wts.shape[0]
    orig_weighted_value = algo(xs, wts)
    xs = preprocess_xs(xs) if preprocess_xs is not None else xs
    consolidated = algo(*compressed(xs, wts, xs_commute))
    equality_assertion(consolidated, orig_weighted_value)


def assert_importance_weighting_properties(
    xs: np.recarray,
    wts: np.ndarray,
    *,
    algo: Callable[[np.recarray, Optional[np.ndarray]], Y],
    equality_assertion: Callable[[Y, Y], None],
    preprocess_xs: Optional[Callable[[NpArray], NpArray]] = None,
    xs_commute: bool = False
) -> None:
    # Call the above three property tests.
    
    assert_no_weight_like_one_weights(
        xs, 
        algo=algo, equality_assertion=equality_assertion
    )
    assert_zero_weights_have_no_effect(
        xs, wts, 
        algo=algo, equality_assertion=equality_assertion
    )
    assert_weight_consolidation_yields_same_result(
        xs, wts, 
        algo=algo, 
        equality_assertion=equality_assertion, 
        preprocess_xs=preprocess_xs,
        xs_commute=xs_commute
    )

### Test Harness

*Very basic!*  Hypothesis has things like narrowing.

In [8]:
def run_importance_weighting_test(
    algo: Callable[[Iterable[X], Optional[Iterable[float]]], Y],
    xs_generator: Callable[[np.random.Generator, int], Iterable[X]],
    equality_assertion: Callable[[Y, Y], None],
    num_trials: int = 1000, 
    seed: Optional[int] = None, 
    report_every: Optional[int] = None,
    **kwargs
) -> None:

    # If seed is None, the rng will be unseeded.
    rng = np.random.Generator(np.random.PCG64(seed))
    for i in range(num_trials):
        if report_every is not None and i % report_every == report_every - 1:
            print(".", end="", flush=True)

        xs, wts = generate_random_example(rng, xs_generator, **kwargs)
        assert_importance_weighting_properties(xs, wts, algo=algo, equality_assertion=equality_assertion)

## Tests

### Test: Importance weighted sums

In [9]:
# Function we are testing.
def iw_sum(xs: np.ndarray, wts: Optional[np.ndarray] = None) -> float:
    return np.sum(xs) if wts is None else np.dot(xs, wts)


# Generator for xs that iw_sum takes.
def iw_sum_xs_generator(min_value: int = 0, max_value: int = 100):
    return lambda rng, n: rng.integers(min_value, max_value + 1, size=n)

In [10]:
%%time

# Note: no seed -> different every time.
run_importance_weighting_test(
    algo=iw_sum, 
    xs_generator=iw_sum_xs_generator(),
    equality_assertion=np.testing.assert_allclose,
    max_wt_value=1, 
    int_weights=True
)

CPU times: user 369 ms, sys: 14.2 ms, total: 383 ms
Wall time: 372 ms


### Test: AUC

In [11]:
def auc(xs: np.recarray, wts: Optional[np.ndarray] = None) -> float:
    from sklearn.metrics import roc_auc_score
    return roc_auc_score(xs.y, xs.p, sample_weight=wts)


def auc_example_generator(rng: np.random.Generator, n: int) -> np.recarray:
    # Basic version.
    import scipy.stats
    p = rng.random(size=n).astype(np.float32)
    y = scipy.stats.binom.rvs(n=1, p=p, random_state=rng).astype(np.uint8)
    return np.rec.fromarrays([y, p], dtype=[('y', np.uint8), ('p', np.float32)])

In [12]:
%%time

# Note: no seed -> different every time.
run_importance_weighting_test(
    algo=auc, 
    xs_generator=auc_example_generator,
    equality_assertion=np.testing.assert_allclose,
    max_wt_value=1,
    min_list_size=25,
    max_list_size=50,
    num_trials=100
)

CPU times: user 761 ms, sys: 73.6 ms, total: 835 ms
Wall time: 906 ms


## Hypothesis Property-Based Test Harness

You can see from the AUC code for `auc_example_generator` that the random number generation is rather well behaved.  The random variate represents a well calibrated, probabilistic classifier whose scores are uniformly distributed over \[0, 1\].

**[Hypothesis](https://hypothesis.readthedocs.io/en/latest/)** shines at [property-based testing](https://en.wikipedia.org/wiki/Property_testing), because it is actively strives to find edge cases during variate generation.  The nice thing is that Hypothesis provides a lot of control over how expected values are sampled with power to control the edge cases, like edges of intervals, NaN, &infin;, etc.  Since the assertion functions above are parametrized by algorithm, we can write simple facades around the assertions, and create Hypothesis `given`s to inject the algorithms and data into the test suite.

Note that the code below is a bit verbose, but contains some of the set up that would be beneficial to test environments like dev and CI test environments.

In [13]:
import os  # For loading profiles.
import hypothesis
from hypothesis import assume, given, settings
from hypothesis.strategies import (composite, floats, integers, just, one_of, sampled_from)
import hypothesis.extra.numpy

In [14]:
hypothesis.__version__

'4.54.2'

### Set up Hypothesis Settings Profiles

This doesn't really need to be here.  This shows how you might configure when run with pytest.

In [15]:
# See https://hypothesis.readthedocs.io/en/latest/settings.html
# for usage on registration and loading of profiles.

HYPOTHESIS_DEFAULT_PROFILE_NAME = "ci"  # "default"
HYPOTHESIS_DEBUG_PROFILE = {
    "max_examples": 10,
    "verbosity": hypothesis.Verbosity.verbose
}
HYPOTHESIS_DEV_PROFILE = { "max_examples": 10 }
HYPOTHESIS_CI_PROFILE = { "max_examples": 1000 }

# hypothesis.Verbosity.quiet, hypothesis.Verbosity.verbose, hypothesis.Verbosity.debug
settings.register_profile("debug", **HYPOTHESIS_DEBUG_PROFILE)
settings.register_profile("dev", **HYPOTHESIS_DEV_PROFILE)
settings.register_profile("ci", **HYPOTHESIS_CI_PROFILE)

# Change in CLI via:  pytest tests --hypothesis-profile <profile-name>
# settings.load_profile(os.getenv(u"HYPOTHESIS_PROFILE", HYPOTHESIS_DEFAULT_PROFILE_NAME))
settings.load_profile("ci")

### Hypothesis Example Generation for AUC Example

This is where hypothesis shines.

In [16]:
from typing import NamedTuple


Data = NamedTuple('Data', [
    ('xs', np.recarray),
    ('wts', np.ndarray),
])


@composite
def _auc_data(
    draw, 
    add_zero_one=True,
    min_list_size:int = 5,
    max_list_size: int = 25,
    min_value: float = 1e-7,
    max_value: float = 1-1e-7,
    min_wt_value: float = 0.00001,
    max_wt_value: float = 10,
    allow_nan: bool = False,
    allow_infinity: bool = False
):
    y_dtype = np.uint8
    p_dtype = np.float64
    wts_dtype = np.float64

    n = draw(integers(min_value=min_list_size, max_value=max_list_size))
    n = n if n >= 2 or not add_zero_one else 2

    p_values_strategy = floats(min_value=min_value, max_value=max_value, allow_nan=allow_nan)

    p_strategy = hypothesis.extra.numpy.arrays(
        p_dtype,
        shape=n,
        elements=one_of([just(0), just(1), p_values_strategy])
    )
    
    y_strategy = hypothesis.extra.numpy.arrays(
        y_dtype, 
        shape=n,
        elements=integers(min_value=0, max_value=1)
    )
    
    non_zero_wt_strategy = floats(
        min_value=min_wt_value,
        max_value=max_wt_value,
        allow_nan=False,
        allow_infinity=False,
    )
    
    no_extrema_strategy = floats(
        min_value=min_wt_value, 
        max_value=max_wt_value, 
        allow_nan=False, 
        allow_infinity=False, 
        exclude_min=True, 
        exclude_max=True,
    )

    p = draw(p_strategy) 
    y = draw(y_strategy)

    wts = draw(
        hypothesis.extra.numpy.arrays(
            wts_dtype, 
            shape=n,
            elements=one_of([just(0), non_zero_wt_strategy])
        )
    )

    # scikit-learn AUC requires one positive and one negative.
    if add_zero_one:
        y[0] = 0
        y[1] = 1
        wts[0] = draw(no_extrema_strategy)
        wts[1] = draw(no_extrema_strategy)

    xs = np.rec.fromarrays([y, p], dtype=[('y', y_dtype), ('p', p_dtype)])

    return Data(xs, wts)

### Hypothesis Versions of Property Based Tests

#### Hypothesis Test Assumptions

This shows that we can change the assumptions of the tests and that's all that really needs to change to parametrize the test with different algorithms (and assumptions in general).

In [17]:
# -----------------------------------------------------------------------------
#  Uncomment this to try these assumptions, where we round the input to auc
#  and group by the rounded version, then get the auc and compare against the
#  original where no rounding takes place.
#
#  The reason we might want to do this is to round the results to get further
#  compression.
# -----------------------------------------------------------------------------
#
def round_preprocess(n: int, debug: bool = False):
    def preprocess(xs: np.recarray):
        min_value = 10 ** -n
        max_value = 1 - (10 ** -n)
        y = xs.y
        p = xs.p.copy()  # necessary?
        ind = np.where((p != 0) & (p != 1))[0]
        p[ind] = np.clip(np.round(p[ind], decimals=n), min_value, max_value)
        a = np.rec.fromarrays([y, p], dtype=xs.dtype)
        if debug:
            print(f"in preprocess({id(xs)}): {id(a)}  xs.p == a.p:  {all(xs.p == a.p)}  {a.p}")
        return a
    return preprocess


def auc_equivalent(digits: int):
    # accurate to digits of precision.
    return lambda x, y: np.testing.assert_allclose(x, y, rtol=10**(-digits))

# This is the hard case.  Notice that round_preprocess makes the input rounded
# prior to grouping.  It's that value that's compared against the original 
# auc of the original input.

test_assumptions = {
    "data":_auc_data(),  # Parametrized this
    "algo": just(auc),
    "equality_assertion": just(auc_equivalent(3)),
    "preprocess_xs": sampled_from([round_preprocess(5, debug=False), None]),
    "xs_commute": just(True)
}

# -----------------------------------------------------------------------------
#  Fill in this section to change the algorithm and parameters being tested.
# -----------------------------------------------------------------------------

# This is the easy case when the input is not modified.

# test_assumptions = {
#     "data":_auc_data(),  # Parametrized this
#     "algo": just(auc),
#     "equality_assertion": just(np.testing.assert_allclose),
#     "preprocess_xs": just(None),
#     "xs_commute": just(True)
# }

These are the Hypothesis tests.  They do not need to change at all.  Just the `test_assumptions`.

In [18]:
@given(**test_assumptions)
def test_hypothesis__no_weight_like_one_weights(data, algo, equality_assertion, *args, **kwargs):
    assert_no_weight_like_one_weights(
        data.xs, 
        algo=algo, equality_assertion=equality_assertion
    )
    

@given(**test_assumptions)
def test_hypothesis__zero_weights_have_no_effect(data, algo, equality_assertion, *args, **kwargs):
    assert_zero_weights_have_no_effect(
        data.xs, data.wts, 
        algo=algo, equality_assertion=equality_assertion
    )


@given(**test_assumptions)
def test_hypothesis__weight_consolidation_yields_same_result(
    data, algo, equality_assertion, preprocess_xs, xs_commute, *args, **kwargs
):
    assert_weight_consolidation_yields_same_result(
        data.xs, data.wts, 
        algo=algo, 
        equality_assertion=equality_assertion, 
        preprocess_xs=preprocess_xs,
        xs_commute=xs_commute
    )


# All of the above tests at once:

    
@given(**test_assumptions)
def test_hypothesis__importance_weighting_properties(
    data, algo, equality_assertion, preprocess_xs, xs_commute, *args, **kwargs
):
    assert_importance_weighting_properties(
        data.xs, data.wts, 
        algo=algo,
        equality_assertion=equality_assertion,
        preprocess_xs=preprocess_xs,
        xs_commute=xs_commute
    )

### Execution of Hypothesis Tests

#### Hypothesis Settings Based on Global Defaults

These would normally be called in a pytest test suite, but called here to show that the functions can be called outside, using the current Hypothesis settings profile.

In [19]:
%%time 

test_hypothesis__weight_consolidation_yields_same_result()

CPU times: user 5 s, sys: 35.5 ms, total: 5.04 s
Wall time: 5.06 s


In [20]:
%%time 

test_hypothesis__zero_weights_have_no_effect()

CPU times: user 4.6 s, sys: 23.2 ms, total: 4.63 s
Wall time: 4.64 s


In [21]:
%%time 

test_hypothesis__no_weight_like_one_weights()

CPU times: user 4.31 s, sys: 17.8 ms, total: 4.33 s
Wall time: 4.33 s


In [22]:
%%time

# Notice that the time is reduced when running tests simultaneously.
test_hypothesis__importance_weighting_properties()

CPU times: user 7.42 s, sys: 19.9 ms, total: 7.44 s
Wall time: 7.43 s


#### Custom Hypothesis Environment Setting

In [23]:
%%time

# UNCOMMENT TO SEE DEBUG PROFILE
# settings.get_profile("debug")(test_hypothesis__no_weight_like_one_weights)()

# Note this wraps test_hypothesis__no_weight_like_one_weights
# so it can only be done once.  This probably shouldn't be done
# in code, but through environment settings and config files.

CPU times: user 2 µs, sys: 0 ns, total: 2 µs
Wall time: 4.77 µs


## What These Tests Imply

If a function, $ f $ supports importance weighting, data passed to the function can be deduplicated by summing the weights associated with the duplicated rows and rows with zero weight can removed from the input.  This means that the input can be transformed to be less than or equal to the original input size.  Information is passed from the examples, to the associated weights when grouping occurs.

This further implies that if the groups are made larger, and hence the number of groups are made smaller, then the function, $ f $, through which the data passes can be sped up.  One way to achieve this is through rounding, which effectively reduces the numeric precision.  This can be thought of as passing the input to $ f $ through a step prior to the application of .

Note that under this definition of support for importance weighting, we are assuming that the algorithm is commutative in its input vectors, but this may not be the case in general (imagine average run length encoding).  Note that when the inputs of the function commute, the `xs_commute` value in `compressed` can be set to true.  This will likely both speed up the compression and increase the compression ratio.
    


## Why Shrink Data?

If $ f $ is $ \Omega(N) $ its input vector length, compressing the input will make the algorithm's ***runtime*** based on the compressed data rather than the original full sized data.  So while the asympototic runtime of $ f $ doesn't change, the algorithm is effectively speed up through the reduction of the dimensionality of the arguments passed to $ f $.

Additionally, it is often the case that the compression of data can occur in batches and be aggregated in a map/reduce style algorithm that can reduce the ***memory*** needed for the input of the function in question.