Skip to content
Permalink
Browse files
incomplete merge implementation
  • Loading branch information
Jon committed Apr 20, 2022
1 parent 3373b06 commit 06331f5120763644e3da1a105044fa7e651bf876
Showing 7 changed files with 172 additions and 24 deletions.
@@ -40,6 +40,13 @@ template<typename A> using string = std::basic_string<char, std::char_traits<cha
static std::independent_bits_engine<std::mt19937, 1, uint32_t>
random_bit(static_cast<uint32_t>(std::chrono::system_clock::now().time_since_epoch().count()));

// common random declarations
namespace random_utils {
static std::random_device rd; // possibly unsafe in MinGW with GCC < 9.2
static std::mt19937_64 rand(rd());
static std::uniform_real_distribution<> next_double(0.0, 1.0);
}


// utility function to hide unused compiler warning
// usually has no additional cost
@@ -20,7 +20,9 @@
#ifndef CLASS_TEST_TYPE_HPP_
#define CLASS_TEST_TYPE_HPP_

#include <cstring>
#include <iostream>
#include "memory_operations.hpp"

namespace datasketches {

@@ -494,6 +494,7 @@ class quantiles_sketch {
Level& buf_size_2k, bool apply_as_update,
quantiles_sketch<T,C,A>& sketch);
static void zip_buffer(Level& buf_in, Level& buf_out);
static void zip_buffer_with_stride(Level& buf_in, Level& buf_out, uint16_t stride);
static void merge_two_size_k_buffers(Level& arr_in_1, Level& arr_in_2, Level& arr_out);

template<typename SerDe>
@@ -513,6 +514,9 @@ class quantiles_sketch {
static uint32_t compute_valid_levels(uint64_t bit_pattern);
static uint8_t compute_levels_needed(uint16_t k, uint64_t n);

template<typename FwdT>
void downsampling_merge(FwdT&& other);

/**
* Returns the zero-based bit position of the lowest zero bit of <i>bits</i> starting at
* <i>startingBit</i>. If input is all ones, this returns 64.
@@ -29,6 +29,7 @@

#include "common_defs.hpp"
#include "count_zeros.hpp"
#include "conditional_forward.hpp"
#include "quantiles_sketch.hpp"

namespace datasketches {
@@ -164,6 +165,40 @@ void quantiles_sketch<T, C, A>::update(FwdT&& item) {
process_full_base_buffer();
}

template<typename T, typename C, typename A>
template<typename FwdT>
void quantiles_sketch<T, C, A>::merge(FwdT&& other) {
if (other.is_empty()) {
return; // nothing to do
} else if (!other.is_estimation_mode()) {
// other is exact, stream in regardless of k
for (auto item : other.base_buffer_) {
update(conditional_forward<FwdT>(item));
}
return; // we're done
}

if (is_empty()) {
std::cerr << "Copy, possibly downsampling" << std::endl;
if (k_ >= other.get_k()) {
std::cerr << "Copy other into self" << std::endl;
// empty, so copy other (since we can't change it) and replace self with copy
quantiles_sketch<T, C, A> sk_copy(k_, allocator_);
//sk_copy.merge(std::forward<FwdT>(other));
sk_copy.merge(other);
*this = std::move(sk_copy);
} else { // k_ < other.get_k()
std::cerr << "Downsampling merge other into self" << std::endl;
// copy, maybe with downsampling
downsampling_merge(std::forward<FwdT>(other));

}
} else {
// merge, maybe with downsampling
std::cerr << "Merge, possibly downsampling" << std::endl;
}
}

template<typename T, typename C, typename A>
template<typename SerDe>
void quantiles_sketch<T, C, A>::serialize(std::ostream& os, const SerDe& serde) const {
@@ -772,7 +807,7 @@ uint8_t quantiles_sketch<T, C, A>::compute_levels_needed(const uint16_t k, const

template<typename T, typename C, typename A>
void quantiles_sketch<T, C, A>::check_k(uint16_t k) {
if (k < quantiles_constants::MIN_K || k > quantiles_constants::MAX_K || (k & k - 1) != 0) {
if (k < quantiles_constants::MIN_K || k > quantiles_constants::MAX_K || (k & (k - 1)) != 0) {
throw std::invalid_argument("k must be a power of 2 that is >= "
+ std::to_string(quantiles_constants::MIN_K) + " and <= "
+ std::to_string(quantiles_constants::MAX_K) + ". Found: " + std::to_string(k));
@@ -879,9 +914,9 @@ bool quantiles_sketch<T, C, A>::grow_levels_if_needed() {

template<typename T, typename C, typename A>
void quantiles_sketch<T, C, A>::in_place_propagate_carry(uint8_t starting_level,
Level& buf_size_k, Level& buf_size_2k,
bool apply_as_update,
quantiles_sketch<T,C,A>& sketch) {
Level& buf_size_k, Level& buf_size_2k,
bool apply_as_update,
quantiles_sketch<T,C,A>& sketch) {
const uint64_t bit_pattern = sketch.bit_pattern_;
const int k = sketch.k_;

@@ -893,7 +928,9 @@ void quantiles_sketch<T, C, A>::in_place_propagate_carry(uint8_t starting_level,
zip_buffer(buf_size_2k, sketch.levels_[ending_level]);
} else {
// merge_into version of computation
std::move(&buf_size_k[0], &buf_size_k[0] + k, &sketch.levels_[ending_level][0]);
for (uint16_t i = 0; i < k; ++i) {
sketch.levels_[ending_level].push_back(std::move(buf_size_k[i]));
}
}

for (uint64_t lvl = starting_level; lvl < ending_level; lvl++) {
@@ -929,6 +966,22 @@ void quantiles_sketch<T, C, A>::zip_buffer(Level& buf_in, Level& buf_out) {
buf_in.clear();
}

template<typename T, typename C, typename A>
void quantiles_sketch<T, C, A>::zip_buffer_with_stride(Level& buf_in, Level& buf_out, uint16_t stride) {
// Random offset in range [0, stride)
std::uniform_int_distribution<uint16_t> dist(0, stride - 1);
uint16_t rand_offset = dist(random_utils::rand);

assert(buf_in.size() == (1 << stride) * buf_out.capacity());
assert(buf_out.size() == 0);
size_t k = buf_out.capacity();
for (uint16_t i = rand_offset, o = 0; o < k; i += 2, ++o) {
buf_out.push_back(buf_in[i]);
}
// do not clear input buffer
}


template<typename T, typename C, typename A>
void quantiles_sketch<T, C, A>::merge_two_size_k_buffers(Level& src_1, Level& src_2, Level& dst) {
assert(src_1.size() == src_2.size());
@@ -955,6 +1008,84 @@ void quantiles_sketch<T, C, A>::merge_two_size_k_buffers(Level& src_1, Level& sr
}
}

/**
* Merges the other sketch into the current sketch with a smaller value of K.
* However, it is required that the ratio of the two K values be a power of 2.
* I.e., other.get_k() = this.get_k() * 2^(nonnegative integer).
* other is modified only if elements can be moved out of it
*/
template<typename T, typename C, typename A>
template<typename FwdT>
void quantiles_sketch<T, C, A>::downsampling_merge(FwdT&& other) {
if (other.get_k() % k_ != 0) {
throw std::invalid_argument("other.get_k() is not a multiple of k_");
}
assert(!other.is_empty());

const uint32_t downsample_factor = other.get_k() / k_;
const uint32_t lg_sample_factor = count_trailing_zeros_in_u32(downsample_factor);

uint64_t new_n = n_ + other.get_n();

// move items from other's base buffer
for (uint16_t i = 0; i < other.base_buffer_.size(); ++i) {
update(conditional_forward<FwdT>(other.base_buffer_[i]));
}

// check (after moving raw items) if we need to extetend levels array
uint8_t levels_needed = compute_levels_needed(k_, new_n);
if (levels_needed > levels_.size()) {
levels_.reserve(levels_needed);
while (levels_.size() < levels_needed) {
Level empty_level(allocator_);
empty_level.reserve(k_);
levels_.push_back(std::move(empty_level));
}
}

Level down_buf(allocator_);
down_buf.reserve(k_);

Level scratch_buf(allocator_);
scratch_buf.reserve(2 * k_);

uint64_t src_pattern = other.bit_pattern_;
for (uint8_t src_lvl = 0; src_pattern != 0; ++src_lvl, src_pattern >>= 1) {
if ((src_pattern & 1) > 0) {
down_buf.clear();
scratch_buf.clear();

// zip with stride, leaving input buffer intact
zip_buffer_with_stride(other.levels_[src_lvl], down_buf, lg_sample_factor);

// propagate-carry
in_place_propagate_carry(src_lvl + lg_sample_factor,
down_buf, scratch_buf,
false, *this);
// update n_ at the end
}
}
n_ = new_n;
assert((n_ / (2 * k_)) == bit_pattern_); // internal consistency check

// update min/max values
// can't just check is_empty() since min/max might not have been set if
// there were no base buffer items added via update()
if (min_value_ == nullptr) {
min_value_ = new (allocator_.allocate(1)) T(*other.min_value_);
} else {
if (C()(*other.min_value_, *min_value_))
*min_value_ = conditional_forward<FwdT>(*other.min_value_);
}

if (max_value_ == nullptr) {
max_value_ = new (allocator_.allocate(1)) T(*other.max_value_);
} else {
if (C()(*max_value_, *other.max_value_))
*max_value_ = conditional_forward<FwdT>(*other.max_value_);
}
}

template<typename T, typename C, typename A>
uint8_t quantiles_sketch<T, C, A>::lowest_zero_bit_starting_at(uint64_t bits, uint8_t starting_bit) {
uint8_t pos = starting_bit & 0X3F;
@@ -18,6 +18,7 @@
add_executable(quantiles_test)

target_link_libraries(quantiles_test quantiles common_test)
#target_link_libraries(quantiles_test quantiles common)

set_target_properties(quantiles_test PROPERTIES
CXX_STANDARD 11
@@ -40,7 +41,5 @@ target_sources(quantiles_test
PRIVATE
quantiles_sketch_test.cpp
quantiles_compatibility_test.cpp
#quantiles_sketch_custom_type_test.cpp
#quantiles_sketch_validation.cpp
#kolmogorov_smirnov_test.cpp
#simple_test.cpp
)
@@ -474,15 +474,30 @@ TEST_CASE("quantiles sketch", "[quantiles_sketch]") {
float split_points[1] = {std::numeric_limits<float>::quiet_NaN()};
REQUIRE_THROWS_AS(sketch.get_CDF(split_points, 1), std::invalid_argument);
}

SECTION("merge, manual testing") {
quantiles_float_sketch sk1(32, 0);
quantiles_float_sketch sk2(256, 0);
const int n = 10000;
for (int i = 0; i < n; i++) {
//sk1.update(static_cast<float>(i));
sk2.update(static_cast<float>((2 * n) - i - 1));
}

//std::cout << "Min: " << sk1.get_min_value() << std::endl;
//std::cout << "Max: " << sk1.get_max_value() << std::endl;
std::cout << "Merging..." << std::endl;
sk1.merge(sk2);
std::cout << "Min: " << sk1.get_min_value() << std::endl;
std::cout << "Max: " << sk1.get_max_value() << std::endl;

std::cout << "n: " << sk1.get_n() << std::endl;
}

/*
SECTION("merge") {
quantiles_float_sketch sketch1(128, 0);
quantiles_float_sketch sketch2(128, 0);
const int n = 10000;
for (int i = 0; i < n; i++) {
sketch1.update(static_cast<float>(i));
sketch2.update(static_cast<float>((2 * n) - i - 1));
}
REQUIRE(sketch1.get_min_value() == 0.0f);
REQUIRE(sketch1.get_max_value() == n - 1);
@@ -575,6 +590,7 @@ TEST_CASE("quantiles sketch", "[quantiles_sketch]") {
REQUIRE(sketch2.get_max_value() == 999999.0f);
}
*/

SECTION("sketch of ints") {
quantiles_sketch<int> sketch;
REQUIRE_THROWS_AS(sketch.get_quantile(0), std::runtime_error);
@@ -669,7 +685,6 @@ TEST_CASE("quantiles sketch", "[quantiles_sketch]") {
REQUIRE(sketch2.get_rank(std::to_string(n)) == sketch1.get_rank(std::to_string(n)));
}


SECTION("sketch of strings, single item, bytes") {
quantiles_string_sketch sketch1(64, 0);
sketch1.update("a");
@@ -1683,16 +1683,6 @@ bool var_opt_sketch<T, S, A>::iterator::get_mark() const {
return sk_->marks_ == nullptr ? false : sk_->marks_[idx_];
}



// ******************** MOVE TO COMMON UTILS AREA EVENTUALLY *********************

namespace random_utils {
static std::random_device rd; // possibly unsafe in MinGW with GCC < 9.2
static std::mt19937_64 rand(rd());
static std::uniform_real_distribution<> next_double(0.0, 1.0);
}

/**
* Checks if target sampling allocation is more than 50% of max sampling size.
* If so, returns max sampling size, otherwise passes through target size.

0 comments on commit 06331f5

Please sign in to comment.