Skip to content
Permalink
Browse files
Merge pull request #278 from apache/quantile_type_conversion
Quantile type conversion
  • Loading branch information
jmalkin committed May 14, 2022
2 parents f0b7937 + 9f00f40 commit 644cb2987eff21d78bbfffd9f878170bfcfbcc2d
Showing 4 changed files with 122 additions and 16 deletions.
@@ -151,6 +151,7 @@ template <typename T,
class quantiles_sketch {
public:
using value_type = T;
using allocator_type = Allocator;
using comparator = Comparator;
using vector_double = std::vector<double, typename std::allocator_traits<Allocator>::template rebind_alloc<double>>;

@@ -161,6 +162,9 @@ class quantiles_sketch {
quantiles_sketch& operator=(const quantiles_sketch& other);
quantiles_sketch& operator=(quantiles_sketch&& other) noexcept;

template<typename From, typename FC, typename FA>
explicit quantiles_sketch(const quantiles_sketch<From, FC, FA>& other, const Allocator& allocator = Allocator());

/**
* Updates this sketch with the given data item.
* @param value an item from a stream of items
@@ -227,6 +231,12 @@ class quantiles_sketch {
*/
Comparator get_comparator() const;

/**
* Returns the allocator for this sketch.
* @return allocator
*/
allocator_type get_allocator() const;

/**
* Returns an approximation to the value of the data item
* that would be preceded by the given fraction of a hypothetical sorted
@@ -138,6 +138,66 @@ is_sorted_(is_sorted)
throw std::logic_error("Item count does not match value computed from k, n");
}

template<typename T, typename C, typename A>
template<typename From, typename FC, typename FA>
quantiles_sketch<T, C, A>::quantiles_sketch(const quantiles_sketch<From, FC, FA>& other, const A& allocator) :
allocator_(allocator),
k_(other.get_k()),
n_(other.get_n()),
bit_pattern_(compute_bit_pattern(other.get_k(), other.get_n())),
base_buffer_(allocator),
levels_(allocator),
min_value_(nullptr),
max_value_(nullptr),
is_sorted_(false)
{
static_assert(std::is_convertible<From, T>::value
|| std::is_constructible<From, T>::value,
"Copy constructor across types requires std::is_convertible or std::is_constructible");

base_buffer_.reserve(2 * std::min(quantiles_constants::MIN_K, k_));

if (!other.is_empty()) {
min_value_ = new (allocator_.allocate(1)) T(other.get_min_value());
max_value_ = new (allocator_.allocate(1)) T(other.get_max_value());

// reserve space in levels
const uint8_t num_levels = compute_levels_needed(k_, n_);
levels_.reserve(num_levels);
for (int i = 0; i < num_levels; ++i) {
Level level(allocator);
level.reserve(k_);
levels_.push_back(std::move(level));
}

// iterate through points, assigning to the correct level as needed
for (auto pair : other) {
const uint64_t wt = pair.second;
if (wt == 1) {
base_buffer_.push_back(pair.first);
// resize where needed as if adding points via update()
if (base_buffer_.size() + 1 > base_buffer_.capacity()) {
const size_t new_size = std::max(std::min(static_cast<size_t>(2 * k_), 2 * base_buffer_.size()), static_cast<size_t>(1));
base_buffer_.reserve(new_size);
}
}
else {
const uint8_t idx = count_trailing_zeros_in_u64(pair.second) - 1;
levels_[idx].push_back(pair.first);
}
}

// validate that ordering within each level is preserved
// base_buffer_ can be considered unsorted for this purpose
for (int i = 0; i < num_levels; ++i) {
if (!std::is_sorted(levels_[i].begin(), levels_[i].end(), C())) {
throw std::logic_error("Copy construction across types produces invalid sorting");
}
}
}
}


template<typename T, typename C, typename A>
quantiles_sketch<T, C, A>::~quantiles_sketch() {
if (min_value_ != nullptr) {
@@ -238,7 +298,7 @@ void quantiles_sketch<T, C, A>::serialize(std::ostream& os, const SerDe& serde)
);
write(os, flags_byte);
write(os, k_);
uint16_t unused = 0;
const uint16_t unused = 0;
write(os, unused);

if (!is_empty()) {
@@ -624,6 +684,11 @@ C quantiles_sketch<T, C, A>::get_comparator() const {
return C();
}

template<typename T, typename C, typename A>
A quantiles_sketch<T, C, A>::get_allocator() const {
return allocator_;
}

// implementation for fixed-size arithmetic types (integral and floating point)
template<typename T, typename C, typename A>
template<typename SerDe, typename TT, typename std::enable_if<std::is_arithmetic<TT>::value, int>::type>
@@ -783,9 +848,9 @@ auto quantiles_sketch<T, C, A>::get_CDF(const T* split_points, uint32_t size) co

template<typename T, typename C, typename A>
uint32_t quantiles_sketch<T, C, A>::compute_retained_items(const uint16_t k, const uint64_t n) {
uint32_t bb_count = compute_base_buffer_items(k, n);
uint64_t bit_pattern = compute_bit_pattern(k, n);
uint32_t valid_levels = compute_valid_levels(bit_pattern);
const uint32_t bb_count = compute_base_buffer_items(k, n);
const uint64_t bit_pattern = compute_bit_pattern(k, n);
const uint32_t valid_levels = compute_valid_levels(bit_pattern);
return bb_count + (k * valid_levels);
}

@@ -843,11 +908,11 @@ void quantiles_sketch<T, C, A>::check_family_id(uint8_t family_id) {

template<typename T, typename C, typename A>
void quantiles_sketch<T, C, A>::check_header_validity(uint8_t preamble_longs, uint8_t flags_byte, uint8_t serial_version) {
bool empty = (flags_byte & (1 << flags::IS_EMPTY)) > 0;
bool compact = (flags_byte & (1 << flags::IS_COMPACT)) > 0;
const bool empty = (flags_byte & (1 << flags::IS_EMPTY)) > 0;
const bool compact = (flags_byte & (1 << flags::IS_COMPACT)) > 0;

uint8_t sw = (compact ? 1 : 0) + (2 * (empty ? 1 : 0))
+ (4 * (serial_version & 0xF)) + (32 * (preamble_longs & 0x3F));
const uint8_t sw = (compact ? 1 : 0) + (2 * (empty ? 1 : 0))
+ (4 * (serial_version & 0xF)) + (32 * (preamble_longs & 0x3F));
bool valid = true;

switch (sw) { // exhaustive list and description of all valid cases
@@ -888,7 +953,7 @@ typename quantiles_sketch<T, C, A>::const_iterator quantiles_sketch<T, C, A>::en

template<typename T, typename C, typename A>
void quantiles_sketch<T, C, A>::grow_base_buffer() {
size_t new_size = std::max(std::min(static_cast<size_t>(2 * k_), 2 * base_buffer_.size()), static_cast<size_t>(1));
const size_t new_size = std::max(std::min(static_cast<size_t>(2 * k_), 2 * base_buffer_.size()), static_cast<size_t>(1));
base_buffer_.reserve(new_size);
}

@@ -912,7 +977,7 @@ void quantiles_sketch<T, C, A>::process_full_base_buffer() {

template<typename T, typename C, typename A>
bool quantiles_sketch<T, C, A>::grow_levels_if_needed() {
uint8_t levels_needed = compute_levels_needed(k_, n_);
const uint8_t levels_needed = compute_levels_needed(k_, n_);
if (levels_needed == 0)
return false; // don't need levels and might have small base buffer. Possible during merges.

@@ -992,15 +1057,15 @@ template<typename FwdV>
void quantiles_sketch<T, C, A>::zip_buffer_with_stride(FwdV&& buf_in, Level& buf_out, uint16_t stride) {
// Random offset in range [0, stride)
std::uniform_int_distribution<uint16_t> dist(0, stride - 1);
uint16_t rand_offset = dist(random_utils::rand);
const uint16_t rand_offset = dist(random_utils::rand);

if ((buf_in.size() != stride * buf_out.capacity())
|| (buf_out.size() > 0)) {
throw std::logic_error("zip_buffer_with_stride requires buf_in.size() == "
"stride*buf_out.capacity() and empty buf_out");
}

size_t k = buf_out.capacity();
const size_t k = buf_out.capacity();
for (uint16_t i = rand_offset, o = 0; o < k; i += stride, ++o) {
buf_out.push_back(conditional_forward<FwdV>(buf_in[i]));
}
@@ -1117,15 +1182,15 @@ void quantiles_sketch<T, C, A>::downsampling_merge(quantiles_sketch& tgt, FwdSk&
const uint16_t downsample_factor = src.get_k() / tgt.get_k();
const uint8_t lg_sample_factor = count_trailing_zeros_in_u32(downsample_factor);

uint64_t new_n = src.get_n() + tgt.get_n();
const uint64_t new_n = src.get_n() + tgt.get_n();

// move items from src's base buffer
for (uint16_t i = 0; i < src.base_buffer_.size(); ++i) {
tgt.update(conditional_forward<FwdSk>(src.base_buffer_[i]));
}

// check (after moving raw items) if we need to extend levels array
uint8_t levels_needed = compute_levels_needed(tgt.get_k(), new_n);
const uint8_t levels_needed = compute_levels_needed(tgt.get_k(), new_n);
if (levels_needed > tgt.levels_.size()) {
tgt.levels_.reserve(levels_needed);
while (tgt.levels_.size() < levels_needed) {
@@ -82,7 +82,7 @@ TEST_CASE("kolmogorov-smirnov slightly different distributions", "[quantiles_ske
const double delta = kolmogorov_smirnov::delta(sketch1, sketch2);
REQUIRE(delta == Approx(0.02).margin(0.01));
const double threshold = kolmogorov_smirnov::threshold(sketch1, sketch2, 0.05);
std::cout << "delta=" << delta << ", threshold=" << threshold << "\n";

REQUIRE_FALSE(delta > threshold);
REQUIRE_FALSE(kolmogorov_smirnov::test(sketch1, sketch2, 0.05));
}
@@ -102,7 +102,7 @@ TEST_CASE("kolmogorov-smirnov slightly different distributions high resolution",
const double delta = kolmogorov_smirnov::delta(sketch1, sketch2);
REQUIRE(delta == Approx(0.02).margin(0.01));
const double threshold = kolmogorov_smirnov::threshold(sketch1, sketch2, 0.05);
std::cout << "delta=" << delta << ", threshold=" << threshold << "\n";

REQUIRE(delta > threshold);
REQUIRE(kolmogorov_smirnov::test(sketch1, sketch2, 0.05));
}
@@ -903,6 +903,37 @@ TEST_CASE("quantiles sketch", "[quantiles_sketch]") {
}
}

SECTION("Type converting copy constructor") {
const uint16_t k = 8;
const int n = 403;
quantiles_sketch<double> sk_double(k);

quantiles_sketch<float> sk_float(k, sk_double.get_allocator());
REQUIRE(sk_float.is_empty());

for (int i = 0; i < n; ++i) sk_double.update(i + .01);

quantiles_sketch<int> sk_int(sk_double);
REQUIRE(sk_double.get_n() == sk_int.get_n());
REQUIRE(sk_double.get_k() == sk_int.get_k());
REQUIRE(sk_double.get_num_retained() == sk_int.get_num_retained());

auto sv_double = sk_double.get_sorted_view(false);
std::vector<std::pair<double, uint64_t>> vec_double(sv_double.begin(), sv_double.end());

auto sv_int = sk_int.get_sorted_view(false);
std::vector<std::pair<int, uint64_t>> vec_int(sv_int.begin(), sv_int.end());

REQUIRE(vec_double.size() == vec_int.size());

for (size_t i = 0; i < vec_int.size(); ++i) {
// known truncation with conversion so approximate result
REQUIRE(vec_double[i].first == Approx(vec_int[i].first).margin(0.1));
// exact equality for weights
REQUIRE(vec_double[i].second == vec_int[i].second);
}
}

// cleanup
if (test_allocator_total_bytes != 0) {
REQUIRE(test_allocator_total_bytes == 0);

0 comments on commit 644cb29

Please sign in to comment.