Skip to content
Permalink
Browse files
Merge pull request #258 from apache/kll_inclusive_rank
KLL inclusive rank
  • Loading branch information
AlexanderSaydakov committed Jan 13, 2022
2 parents 753bd38 + 310da28 commit 7b42551ee0d594382e4e612a987047a2d5aa3d00
Showing 6 changed files with 110 additions and 39 deletions.
@@ -35,7 +35,7 @@ namespace datasketches {
* and nearly optimal accuracy per retained item.
* See <a href="https://arxiv.org/abs/1603.05346v2">Optimal Quantile Approximation in Streams</a>.
*
* <p>This is a stochastic streaming sketch that enables near-real time analysis of the
* <p>This is a stochastic streaming sketch that enables near real-time analysis of the
* approximate distribution of values from a very large stream in a single pass, requiring only
* that the values are comparable.
* The analysis is obtained using <i>get_quantile()</i> or <i>get_quantiles()</i> functions or the
@@ -157,7 +157,12 @@ namespace kll_constants {
const uint16_t DEFAULT_K = 200;
}

template <typename T, typename C = std::less<T>, typename S = serde<T>, typename A = std::allocator<T>>
template <
typename T,
typename C = std::less<T>, // strict weak ordering function (see C++ named requirements: Compare)
typename S = serde<T>,
typename A = std::allocator<T>
>
class kll_sketch {
public:
using value_type = T;
@@ -309,6 +314,9 @@ class kll_sketch {
/**
* Returns an approximation to the normalized (fractional) rank of the given value from 0 to 1,
* inclusive.
* With the template parameter inclusive=true the weight of the given value is included into the rank.
* Otherwise the rank equals the sum of the weights of all values that are less than the given value
* according to the comparator C.
*
* <p>The resulting approximation has a probabilistic guarantee that can be obtained from the
* get_normalized_rank_error(false) function.
@@ -318,6 +326,7 @@ class kll_sketch {
* @param value to be ranked
* @return an approximate rank of the given value
*/
template<bool inclusive = false>
double get_rank(const T& value) const;

/**
@@ -338,9 +347,12 @@ class kll_sketch {
*
* @return an array of m+1 doubles each of which is an approximation
* to the fraction of the input stream values (the mass) that fall into one of those intervals.
* The definition of an "interval" is inclusive of the left split point and exclusive of the right
* split point, with the exception that the last interval will include maximum value.
* If the template parameter inclusive=false, the definition of an "interval" is inclusive of the left split point and exclusive of the right
* split point, with the exception that the last interval will include the maximum value.
* If the template parameter inclusive=true, the definition of an "interval" is exclusive of the left split point and inclusive of the right
* split point.
*/
template<bool inclusive = false>
vector_d<A> get_PMF(const T* split_points, uint32_t size) const;

/**
@@ -364,6 +376,7 @@ class kll_sketch {
* CDF array is the sum of the returned values in positions 0 through j of the returned PMF
* array.
*/
template<bool inclusive = false>
vector_d<A> get_CDF(const T* split_points, uint32_t size) const;

/**
@@ -536,11 +549,16 @@ class kll_sketch {
void add_empty_top_level_to_completely_full_sketch();
void sort_level_zero();
std::unique_ptr<kll_quantile_calculator<T, C, A>, std::function<void(kll_quantile_calculator<T, C, A>*)>> get_quantile_calculator();

template<bool inclusive>
vector_d<A> get_PMF_or_CDF(const T* split_points, uint32_t size, bool is_CDF) const;
template<bool inclusive>
void increment_buckets_unsorted_level(uint32_t from_index, uint32_t to_index, uint64_t weight,
const T* split_points, uint32_t size, double* buckets) const;
template<bool inclusive>
void increment_buckets_sorted_level(uint32_t from_index, uint32_t to_index, uint64_t weight,
const T* split_points, uint32_t size, double* buckets) const;

template<typename O> void merge_higher_levels(O&& other, uint64_t final_n);
void populate_work_arrays(const kll_sketch& other, T* workbuf, uint32_t* worklevels, uint8_t provisional_num_levels);
void populate_work_arrays(kll_sketch&& other, T* workbuf, uint32_t* worklevels, uint8_t provisional_num_levels);
@@ -320,6 +320,7 @@ std::vector<T, A> kll_sketch<T, C, S, A>::get_quantiles(uint32_t num) const {
}

template<typename T, typename C, typename S, typename A>
template<bool inclusive>
double kll_sketch<T, C, S, A>::get_rank(const T& value) const {
if (is_empty()) return std::numeric_limits<double>::quiet_NaN();
uint8_t level = 0;
@@ -329,7 +330,7 @@ double kll_sketch<T, C, S, A>::get_rank(const T& value) const {
const auto from_index(levels_[level]);
const auto to_index(levels_[level + 1]); // exclusive
for (uint32_t i = from_index; i < to_index; i++) {
if (C()(items_[i], value)) {
if (inclusive ? !C()(value, items_[i]) : C()(items_[i], value)) {
total += weight;
} else if ((level > 0) || is_level_zero_sorted_) {
break; // levels above 0 are sorted, no point comparing further
@@ -342,13 +343,15 @@ double kll_sketch<T, C, S, A>::get_rank(const T& value) const {
}

template<typename T, typename C, typename S, typename A>
template<bool inclusive>
vector_d<A> kll_sketch<T, C, S, A>::get_PMF(const T* split_points, uint32_t size) const {
return get_PMF_or_CDF(split_points, size, false);
return get_PMF_or_CDF<inclusive>(split_points, size, false);
}

template<typename T, typename C, typename S, typename A>
template<bool inclusive>
vector_d<A> kll_sketch<T, C, S, A>::get_CDF(const T* split_points, uint32_t size) const {
return get_PMF_or_CDF(split_points, size, true);
return get_PMF_or_CDF<inclusive>(split_points, size, true);
}

template<typename T, typename C, typename S, typename A>
@@ -798,6 +801,7 @@ std::unique_ptr<kll_quantile_calculator<T, C, A>, std::function<void(kll_quantil
}

template<typename T, typename C, typename S, typename A>
template<bool inclusive>
vector_d<A> kll_sketch<T, C, S, A>::get_PMF_or_CDF(const T* split_points, uint32_t size, bool is_CDF) const {
if (is_empty()) return vector_d<A>(allocator_);
kll_helper::validate_values<T, C>(split_points, size);
@@ -808,9 +812,9 @@ vector_d<A> kll_sketch<T, C, S, A>::get_PMF_or_CDF(const T* split_points, uint32
const auto from_index = levels_[level];
const auto to_index = levels_[level + 1]; // exclusive
if ((level == 0) && !is_level_zero_sorted_) {
increment_buckets_unsorted_level(from_index, to_index, weight, split_points, size, buckets.data());
increment_buckets_unsorted_level<inclusive>(from_index, to_index, weight, split_points, size, buckets.data());
} else {
increment_buckets_sorted_level(from_index, to_index, weight, split_points, size, buckets.data());
increment_buckets_sorted_level<inclusive>(from_index, to_index, weight, split_points, size, buckets.data());
}
level++;
weight *= 2;
@@ -831,13 +835,14 @@ vector_d<A> kll_sketch<T, C, S, A>::get_PMF_or_CDF(const T* split_points, uint32
}

template<typename T, typename C, typename S, typename A>
template<bool inclusive>
void kll_sketch<T, C, S, A>::increment_buckets_unsorted_level(uint32_t from_index, uint32_t to_index, uint64_t weight,
const T* split_points, uint32_t size, double* buckets) const
{
for (uint32_t i = from_index; i < to_index; i++) {
uint32_t j;
for (j = 0; j < size; j++) {
if (C()(items_[i], split_points[j])) {
if (inclusive ? !C()(split_points[j], items_[i]) : C()(items_[i], split_points[j])) {
break;
}
}
@@ -846,13 +851,14 @@ void kll_sketch<T, C, S, A>::increment_buckets_unsorted_level(uint32_t from_inde
}

template<typename T, typename C, typename S, typename A>
template<bool inclusive>
void kll_sketch<T, C, S, A>::increment_buckets_sorted_level(uint32_t from_index, uint32_t to_index, uint64_t weight,
const T* split_points, uint32_t size, double* buckets) const
{
uint32_t i = from_index;
uint32_t j = 0;
while ((i < to_index) && (j < size)) {
if (C()(items_[i], split_points[j])) {
if (inclusive ? !C()(split_points[j], items_[i]) : C()(items_[i], split_points[j])) {
buckets[j] += weight; // this sample goes into this bucket
i++; // move on to next sample and see whether it also goes into this bucket
} else {
@@ -90,7 +90,9 @@ TEST_CASE("kll sketch", "[kll_sketch]") {
REQUIRE(sketch.get_n() == 1);
REQUIRE(sketch.get_num_retained() == 1);
REQUIRE(sketch.get_rank(1.0f) == 0.0);
REQUIRE(sketch.get_rank<true>(1.0f) == 1.0);
REQUIRE(sketch.get_rank(2.0f) == 1.0);
REQUIRE(sketch.get_rank(std::numeric_limits<float>::infinity()) == 1.0);
REQUIRE(sketch.get_min_value() == 1.0);
REQUIRE(sketch.get_max_value() == 1.0);
REQUIRE(sketch.get_quantile(0.5) == 1.0);
@@ -142,8 +144,10 @@ TEST_CASE("kll sketch", "[kll_sketch]") {
REQUIRE(quantiles[2] == n - 1 );

for (uint32_t i = 0; i < n; i++) {
const double trueRank = (double) i / n;
REQUIRE(sketch.get_rank(static_cast<float>(i)) == trueRank);
const double true_rank = (double) i / n;
REQUIRE(sketch.get_rank(static_cast<float>(i)) == true_rank);
const double true_rank_inclusive = (double) (i + 1) / n;
REQUIRE(sketch.get_rank<true>(static_cast<float>(i)) == true_rank_inclusive);
}

// the alternative method must produce the same result
@@ -241,20 +245,38 @@ TEST_CASE("kll sketch", "[kll_sketch]") {
sketch.update(static_cast<float>(i));
values[i] = static_cast<float>(i);
}

const auto ranks(sketch.get_CDF(values, n));
const auto pmf(sketch.get_PMF(values, n));

double subtotal_pmf(0);
for (int i = 0; i < n; i++) {
if (sketch.get_rank(values[i]) != ranks[i]) {
std::cerr << "checking rank vs CDF for value " << i << std::endl;
REQUIRE(sketch.get_rank(values[i]) == ranks[i]);
{ // inclusive=false (default)
const auto ranks(sketch.get_CDF(values, n));
const auto pmf(sketch.get_PMF(values, n));

double subtotal_pmf = 0;
for (int i = 0; i < n; i++) {
if (sketch.get_rank(values[i]) != ranks[i]) {
std::cerr << "checking rank vs CDF for value " << i << std::endl;
REQUIRE(sketch.get_rank(values[i]) == ranks[i]);
}
subtotal_pmf += pmf[i];
if (abs(ranks[i] - subtotal_pmf) > NUMERIC_NOISE_TOLERANCE) {
std::cerr << "CDF vs PMF for value " << i << std::endl;
REQUIRE(ranks[i] == Approx(subtotal_pmf).margin(NUMERIC_NOISE_TOLERANCE));
}
}
subtotal_pmf += pmf[i];
if (abs(ranks[i] - subtotal_pmf) > NUMERIC_NOISE_TOLERANCE) {
std::cerr << "CDF vs PMF for value " << i << std::endl;
REQUIRE(ranks[i] == Approx(subtotal_pmf).margin(NUMERIC_NOISE_TOLERANCE));
}
{ // inclusive=true
const auto ranks(sketch.get_CDF<true>(values, n));
const auto pmf(sketch.get_PMF<true>(values, n));

double subtotal_pmf = 0;
for (int i = 0; i < n; i++) {
if (sketch.get_rank<true>(values[i]) != ranks[i]) {
std::cerr << "checking rank vs CDF for value " << i << std::endl;
REQUIRE(sketch.get_rank(values[i]) == ranks[i]);
}
subtotal_pmf += pmf[i];
if (abs(ranks[i] - subtotal_pmf) > NUMERIC_NOISE_TOLERANCE) {
std::cerr << "CDF vs PMF for value " << i << std::endl;
REQUIRE(ranks[i] == Approx(subtotal_pmf).margin(NUMERIC_NOISE_TOLERANCE));
}
}
}
}
@@ -50,6 +50,14 @@ double kll_sketch_generic_normalized_rank_error(uint16_t k, bool pmf) {
return kll_sketch<T>::get_normalized_rank_error(k, pmf);
}

template<typename T>
double kll_sketch_get_rank(const kll_sketch<T>& sk, const T& item, bool inclusive) {
if (inclusive)
return sk.template get_rank<true>(item);
else
return sk.template get_rank<false>(item);
}

template<typename T>
py::list kll_sketch_get_quantiles(const kll_sketch<T>& sk,
std::vector<double>& fractions) {
@@ -67,9 +75,12 @@ py::list kll_sketch_get_quantiles(const kll_sketch<T>& sk,

template<typename T>
py::list kll_sketch_get_pmf(const kll_sketch<T>& sk,
std::vector<T>& split_points) {
std::vector<T>& split_points,
bool inclusive) {
size_t nPoints = split_points.size();
auto result = sk.get_PMF(&split_points[0], nPoints);
auto result = inclusive ?
sk.template get_PMF<true>(&split_points[0], nPoints)
: sk.template get_PMF<false>(&split_points[0], nPoints);

py::list list(nPoints + 1);
for (size_t i = 0; i <= nPoints; ++i) {
@@ -81,9 +92,12 @@ py::list kll_sketch_get_pmf(const kll_sketch<T>& sk,

template<typename T>
py::list kll_sketch_get_cdf(const kll_sketch<T>& sk,
std::vector<T>& split_points) {
std::vector<T>& split_points,
bool inclusive) {
size_t nPoints = split_points.size();
auto result = sk.get_CDF(&split_points[0], nPoints);
auto result = inclusive ?
sk.template get_CDF<true>(&split_points[0], nPoints)
: sk.template get_CDF<false>(&split_points[0], nPoints);

py::list list(nPoints + 1);
for (size_t i = 0; i <= nPoints; ++i) {
@@ -159,34 +173,40 @@ void bind_kll_sketch(py::module &m, const char* name) {
"a single query. It is strongly recommend that this method be used instead of multiple calls "
"to get_quantile().\n"
"If the sketch is empty this returns an empty vector.")
.def("get_rank", &kll_sketch<T>::get_rank, py::arg("value"),
.def("get_rank", &dspy::kll_sketch_get_rank<T>, py::arg("value"), py::arg("inclusive")=false,
"Returns an approximation to the normalized (fractional) rank of the given value from 0 to 1, inclusive.\n"
"The resulting approximation has a probabilistic guarantee that can be obtained from the "
"get_normalized_rank_error(False) function.\n"
"With the parameter inclusive=true the weight of the given value is included into the rank."
"Otherwise the rank equals the sum of the weights of values less than the given value.\n"
"If the sketch is empty this returns nan.")
.def("get_pmf", &dspy::kll_sketch_get_pmf<T>, py::arg("split_points"),
.def("get_pmf", &dspy::kll_sketch_get_pmf<T>, py::arg("split_points"), py::arg("inclusive")=false,
"Returns an approximation to the Probability Mass Function (PMF) of the input stream "
"given a set of split points (values).\n"
"The resulting approximations have a probabilistic guarantee that can be obtained from the "
"get_normalized_rank_error(True) function.\n"
"If the sketch is empty this returns an empty vector.\n"
"split_points is an array of m unique, monotonically increasing float values "
"that divide the real number line into m+1 consecutive disjoint intervals.\n"
"The definition of an 'interval' is inclusive of the left split point (or minimum value) and "
"If the parameter inclusive=false, the definition of an 'interval' is inclusive of the left split point (or minimum value) and "
"exclusive of the right split point, with the exception that the last interval will include "
"the maximum value.\n"
"If the parameter inclusive=true, the definition of an 'interval' is exclusive of the left split point (or minimum value) and "
"inclusive of the right split point.\n"
"It is not necessary to include either the min or max values in these split points.")
.def("get_cdf", &dspy::kll_sketch_get_cdf<T>, py::arg("split_points"),
.def("get_cdf", &dspy::kll_sketch_get_cdf<T>, py::arg("split_points"), py::arg("inclusive")=false,
"Returns an approximation to the Cumulative Distribution Function (CDF), which is the "
"cumulative analog of the PMF, of the input stream given a set of split points (values).\n"
"The resulting approximations have a probabilistic guarantee that can be obtained from the "
"get_normalized_rank_error(True) function.\n"
"If the sketch is empty this returns an empty vector.\n"
"split_points is an array of m unique, monotonically increasing float values "
"that divide the real number line into m+1 consecutive disjoint intervals.\n"
"The definition of an 'interval' is inclusive of the left split point (or minimum value) and "
"If the parameter inclusive=false, the definition of an 'interval' is inclusive of the left split point (or minimum value) and "
"exclusive of the right split point, with the exception that the last interval will include "
"the maximum value.\n"
"If the parameter inclusive=true, the definition of an 'interval' is exclusive of the left split point (or minimum value) and "
"inclusive of the right split point.\n"
"It is not necessary to include either the min or max values in these split points.")
.def("normalized_rank_error", (double (kll_sketch<T>::*)(bool) const) &kll_sketch<T>::get_normalized_rank_error,
py::arg("as_pmf"),

0 comments on commit 7b42551

Please sign in to comment.