Skip to content
Permalink
Browse files
finish merge, update types to be able handle const input sketchs inst…
…ead of just exact matches
  • Loading branch information
Jon committed Apr 23, 2022
1 parent 06331f5 commit bf2a0bc0fa31d7bff0e7605b701dc108b97f812b
Showing 5 changed files with 209 additions and 95 deletions.
@@ -150,8 +150,8 @@ void bind_quantiles_sketch(py::module &m, const char* name) {
"Updates the sketch with the given value")
.def("update", &dspy::quantiles_sketch_update<T>, py::arg("array"),
"Updates the sketch with the values in the given array")
//.def("merge", (void (quantiles_sketch<T>::*)(const quantiles_sketch<T>&)) &quantiles_sketch<T>::merge, py::arg("sketch"),
// "Merges the provided sketch into the this one")
.def("merge", (void (quantiles_sketch<T>::*)(const quantiles_sketch<T>&)) &quantiles_sketch<T>::merge, py::arg("sketch"),
"Merges the provided sketch into the this one")
.def("__str__", &quantiles_sketch<T>::to_string, py::arg("print_levels")=false, py::arg("print_items")=false,
"Produces a string summary of the sketch")
.def("to_string", &quantiles_sketch<T>::to_string, py::arg("print_levels")=false, py::arg("print_items")=false,
@@ -490,11 +490,11 @@ class quantiles_sketch {
bool grow_levels_if_needed();

// buffers should be pre-sized to target capacity as appropriate
static void in_place_propagate_carry(uint8_t starting_level, Level& buf_size_k,
template<typename FwdV>
static void in_place_propagate_carry(uint8_t starting_level, FwdV&& buf_size_k,
Level& buf_size_2k, bool apply_as_update,
quantiles_sketch<T,C,A>& sketch);
static void zip_buffer(Level& buf_in, Level& buf_out);
static void zip_buffer_with_stride(Level& buf_in, Level& buf_out, uint16_t stride);
static void merge_two_size_k_buffers(Level& arr_in_1, Level& arr_in_2, Level& arr_out);

template<typename SerDe>
@@ -514,8 +514,24 @@ class quantiles_sketch {
static uint32_t compute_valid_levels(uint64_t bit_pattern);
static uint8_t compute_levels_needed(uint16_t k, uint64_t n);

template<typename FwdT>
void downsampling_merge(FwdT&& other);
/**
* Merges the src sketch into the tgt sketch with equal values of K.
* src is modified only if elements can be moved out of it.
*/
template<typename FwdSk>
static void standard_merge(quantiles_sketch<T,C,A>& tgt, FwdSk&& src);

/**
* Merges the src sketch into the tgt sketch with a smaller value of K.
* However, it is required that the ratio of the two K values be a power of 2.
* I.e., other.get_k() = this.get_k() * 2^(nonnegative integer).
* src is modified only if elements can be moved out of it.
*/
template<typename FwdSk>
static void downsampling_merge(quantiles_sketch<T,C,A>& tgt, FwdSk&& src);

template<typename FwdV>
static void zip_buffer_with_stride(FwdV&& buf_in, Level& buf_out, uint16_t stride);

/**
* Returns the zero-based bit position of the lowest zero bit of <i>bits</i> starting at
@@ -64,6 +64,11 @@ is_sorted_(other.is_sorted_)
{
if (other.min_value_ != nullptr) min_value_ = new (allocator_.allocate(1)) T(*other.min_value_);
if (other.max_value_ != nullptr) max_value_ = new (allocator_.allocate(1)) T(*other.max_value_);
for (size_t i = 0; i < levels_.size(); ++i) {
if (levels_[i].capacity() != other.levels_[i].capacity()) {
levels_[i].reserve(other.levels_[i].capacity());
}
}
}

template<typename T, typename C, typename A>
@@ -166,37 +171,62 @@ void quantiles_sketch<T, C, A>::update(FwdT&& item) {
}

template<typename T, typename C, typename A>
template<typename FwdT>
void quantiles_sketch<T, C, A>::merge(FwdT&& other) {
template<typename FwdSk>
void quantiles_sketch<T, C, A>::merge(FwdSk&& other) {
if (other.is_empty()) {
return; // nothing to do
} else if (!other.is_estimation_mode()) {
// other is exact, stream in regardless of k
for (auto item : other.base_buffer_) {
update(conditional_forward<FwdT>(item));
update(conditional_forward<FwdSk>(item));
}
return; // we're done
}

if (is_empty()) {
std::cerr << "Copy, possibly downsampling" << std::endl;
if (k_ >= other.get_k()) {
std::cerr << "Copy other into self" << std::endl;
// empty, so copy other (since we can't change it) and replace self with copy
quantiles_sketch<T, C, A> sk_copy(k_, allocator_);
//sk_copy.merge(std::forward<FwdT>(other));
sk_copy.merge(other);
*this = std::move(sk_copy);
// we know other has data and is in estimation mode
if (is_estimation_mode()) {
if (k_ == other.get_k()) {
standard_merge(*this, other);
} else if (k_ > other.get_k()) {
quantiles_sketch<T, C, A> sk_copy(other);
downsampling_merge(sk_copy, *this);
*this = sk_copy;
} else { // k_ < other.get_k()
std::cerr << "Downsampling merge other into self" << std::endl;
// copy, maybe with downsampling
downsampling_merge(std::forward<FwdT>(other));

downsampling_merge(*this, other);
}
} else {
// exact or empty
quantiles_sketch<T, C, A> sk_copy(other);
if (k_ <= other.get_k()) {
if (!is_empty()) {
for (uint16_t i = 0; i < base_buffer_.size(); ++i) {
sk_copy.update(std::move(base_buffer_[i]));
}
}
} else { // k_ > other.get_k()
downsampling_merge(sk_copy, *this);
}
*this = sk_copy;
}

/*
// update min/max values
// can't just check is_empty() since min/max might not have been set if
// there were no base buffer items added via update()
if (min_value_ == nullptr) {
min_value_ = new (allocator_.allocate(1)) T(*other.min_value_);
} else {
// merge, maybe with downsampling
std::cerr << "Merge, possibly downsampling" << std::endl;
if (C()(*other.min_value_, *min_value_))
*min_value_ = conditional_forward<FwdSk>(*other.min_value_);
}
if (max_value_ == nullptr) {
max_value_ = new (allocator_.allocate(1)) T(*other.max_value_);
} else {
if (C()(*max_value_, *other.max_value_))
*max_value_ = conditional_forward<FwdSk>(*other.max_value_);
}
*/
}

template<typename T, typename C, typename A>
@@ -913,8 +943,9 @@ bool quantiles_sketch<T, C, A>::grow_levels_if_needed() {
}

template<typename T, typename C, typename A>
template<typename FwdV>
void quantiles_sketch<T, C, A>::in_place_propagate_carry(uint8_t starting_level,
Level& buf_size_k, Level& buf_size_2k,
FwdV&& buf_size_k, Level& buf_size_2k,
bool apply_as_update,
quantiles_sketch<T,C,A>& sketch) {
const uint64_t bit_pattern = sketch.bit_pattern_;
@@ -929,7 +960,7 @@ void quantiles_sketch<T, C, A>::in_place_propagate_carry(uint8_t starting_level,
} else {
// merge_into version of computation
for (uint16_t i = 0; i < k; ++i) {
sketch.levels_[ending_level].push_back(std::move(buf_size_k[i]));
sketch.levels_[ending_level].push_back(conditional_forward<FwdV>(buf_size_k[i]));
}
}

@@ -967,16 +998,17 @@ void quantiles_sketch<T, C, A>::zip_buffer(Level& buf_in, Level& buf_out) {
}

template<typename T, typename C, typename A>
void quantiles_sketch<T, C, A>::zip_buffer_with_stride(Level& buf_in, Level& buf_out, uint16_t stride) {
template<typename FwdV>
void quantiles_sketch<T, C, A>::zip_buffer_with_stride(FwdV&& buf_in, Level& buf_out, uint16_t stride) {
// Random offset in range [0, stride)
std::uniform_int_distribution<uint16_t> dist(0, stride - 1);
uint16_t rand_offset = dist(random_utils::rand);

assert(buf_in.size() == (1 << stride) * buf_out.capacity());
assert(buf_in.size() == stride * buf_out.capacity());
assert(buf_out.size() == 0);
size_t k = buf_out.capacity();
for (uint16_t i = rand_offset, o = 0; o < k; i += 2, ++o) {
buf_out.push_back(buf_in[i]);
for (uint16_t i = rand_offset, o = 0; o < k; i += stride, ++o) {
buf_out.push_back(conditional_forward<FwdV>(buf_in[i]));
}
// do not clear input buffer
}
@@ -1008,84 +1040,143 @@ void quantiles_sketch<T, C, A>::merge_two_size_k_buffers(Level& src_1, Level& sr
}
}

/**
* Merges the other sketch into the current sketch with a smaller value of K.
* However, it is required that the ratio of the two K values be a power of 2.
* I.e., other.get_k() = this.get_k() * 2^(nonnegative integer).
* other is modified only if elements can be moved out of it
*/

template<typename T, typename C, typename A>
template<typename FwdT>
void quantiles_sketch<T, C, A>::downsampling_merge(FwdT&& other) {
if (other.get_k() % k_ != 0) {
throw std::invalid_argument("other.get_k() is not a multiple of k_");
template<typename FwdSk>
void quantiles_sketch<T, C, A>::standard_merge(quantiles_sketch<T,C,A>& tgt, FwdSk&& src) {
if (src.get_k() != tgt.get_k()) {
throw std::invalid_argument("src.get_k() != tgt.get_k()");
}
assert(!src.is_empty());

uint64_t new_n = src.get_n() + tgt.get_n();

// move items from src's base buffer
for (uint16_t i = 0; i < src.base_buffer_.size(); ++i) {
tgt.update(conditional_forward<FwdSk>(src.base_buffer_[i]));
}

// check (after moving raw items) if we need to extetend levels array
uint8_t levels_needed = compute_levels_needed(tgt.get_k(), new_n);
if (levels_needed > tgt.levels_.size()) {
tgt.levels_.reserve(levels_needed);
while (tgt.levels_.size() < levels_needed) {
Level empty_level(tgt.allocator_);
empty_level.reserve(tgt.get_k());
tgt.levels_.push_back(std::move(empty_level));
}
}

Level scratch_buf(tgt.allocator_);
scratch_buf.reserve(2 * tgt.get_k());

uint64_t src_pattern = src.bit_pattern_;
for (uint8_t src_lvl = 0; src_pattern != 0; ++src_lvl, src_pattern >>= 1) {
if ((src_pattern & 1) > 0) {
scratch_buf.clear();

// propagate-carry
in_place_propagate_carry(src_lvl,
src.levels_[src_lvl], scratch_buf,
false, tgt);
// update n_ at the end
}
}
tgt.n_ = new_n;
assert((tgt.get_n() / (2 * tgt.get_k())) == tgt.bit_pattern_); // internal consistency check

// update min/max values
// can't just check is_empty() since min/max might not have been set if
// there were no base buffer items added via update()
if (tgt.min_value_ == nullptr) {
tgt.min_value_ = new (tgt.allocator_.allocate(1)) T(*src.min_value_);
} else {
if (C()(*src.min_value_, *tgt.min_value_))
*tgt.min_value_ = conditional_forward<FwdSk>(*src.min_value_);
}

if (tgt.max_value_ == nullptr) {
tgt.max_value_ = new (tgt.allocator_.allocate(1)) T(*src.max_value_);
} else {
if (C()(*tgt.max_value_, *src.max_value_))
*tgt.max_value_ = conditional_forward<FwdSk>(*src.max_value_);
}
}


template<typename T, typename C, typename A>
template<typename FwdSk>
void quantiles_sketch<T, C, A>::downsampling_merge(quantiles_sketch<T,C,A>& tgt, FwdSk&& src) {
if (src.get_k() % tgt.get_k() != 0) {
throw std::invalid_argument("src.get_k() is not a multiple of tgt.get_k()");
}
assert(!other.is_empty());
assert(!src.is_empty());

const uint32_t downsample_factor = other.get_k() / k_;
const uint32_t downsample_factor = src.get_k() / tgt.get_k();
const uint32_t lg_sample_factor = count_trailing_zeros_in_u32(downsample_factor);

uint64_t new_n = n_ + other.get_n();
uint64_t new_n = src.get_n() + tgt.get_n();

// move items from other's base buffer
for (uint16_t i = 0; i < other.base_buffer_.size(); ++i) {
update(conditional_forward<FwdT>(other.base_buffer_[i]));
// move items from src's base buffer
for (uint16_t i = 0; i < src.base_buffer_.size(); ++i) {
tgt.update(conditional_forward<FwdSk>(src.base_buffer_[i]));
}

// check (after moving raw items) if we need to extetend levels array
uint8_t levels_needed = compute_levels_needed(k_, new_n);
if (levels_needed > levels_.size()) {
levels_.reserve(levels_needed);
while (levels_.size() < levels_needed) {
Level empty_level(allocator_);
empty_level.reserve(k_);
levels_.push_back(std::move(empty_level));
uint8_t levels_needed = compute_levels_needed(tgt.get_k(), new_n);
if (levels_needed > tgt.levels_.size()) {
tgt.levels_.reserve(levels_needed);
while (tgt.levels_.size() < levels_needed) {
Level empty_level(tgt.allocator_);
empty_level.reserve(tgt.get_k());
tgt.levels_.push_back(std::move(empty_level));
}
}

Level down_buf(allocator_);
down_buf.reserve(k_);
Level down_buf(tgt.allocator_);
down_buf.reserve(tgt.get_k());

Level scratch_buf(allocator_);
scratch_buf.reserve(2 * k_);
Level scratch_buf(tgt.allocator_);
scratch_buf.reserve(2 * tgt.get_k());

uint64_t src_pattern = other.bit_pattern_;
uint64_t src_pattern = src.bit_pattern_;
for (uint8_t src_lvl = 0; src_pattern != 0; ++src_lvl, src_pattern >>= 1) {
if ((src_pattern & 1) > 0) {
down_buf.clear();
scratch_buf.clear();

// zip with stride, leaving input buffer intact
zip_buffer_with_stride(other.levels_[src_lvl], down_buf, lg_sample_factor);
zip_buffer_with_stride(src.levels_[src_lvl], down_buf, downsample_factor);

// propagate-carry
in_place_propagate_carry(src_lvl + lg_sample_factor,
down_buf, scratch_buf,
false, *this);
false, tgt);
// update n_ at the end
}
}
n_ = new_n;
assert((n_ / (2 * k_)) == bit_pattern_); // internal consistency check
tgt.n_ = new_n;
assert((tgt.get_n() / (2 * tgt.get_k())) == tgt.bit_pattern_); // internal consistency check

// update min/max values
// can't just check is_empty() since min/max might not have been set if
// there were no base buffer items added via update()
if (min_value_ == nullptr) {
min_value_ = new (allocator_.allocate(1)) T(*other.min_value_);
if (tgt.min_value_ == nullptr) {
tgt.min_value_ = new (tgt.allocator_.allocate(1)) T(*src.min_value_);
} else {
if (C()(*other.min_value_, *min_value_))
*min_value_ = conditional_forward<FwdT>(*other.min_value_);
if (C()(*src.min_value_, *tgt.min_value_))
*tgt.min_value_ = conditional_forward<FwdSk>(*src.min_value_);
}

if (max_value_ == nullptr) {
max_value_ = new (allocator_.allocate(1)) T(*other.max_value_);
if (tgt.max_value_ == nullptr) {
tgt.max_value_ = new (tgt.allocator_.allocate(1)) T(*src.max_value_);
} else {
if (C()(*max_value_, *other.max_value_))
*max_value_ = conditional_forward<FwdT>(*other.max_value_);
if (C()(*tgt.max_value_, *src.max_value_))
*tgt.max_value_ = conditional_forward<FwdSk>(*src.max_value_);
}
}


template<typename T, typename C, typename A>
uint8_t quantiles_sketch<T, C, A>::lowest_zero_bit_starting_at(uint64_t bits, uint8_t starting_bit) {
uint8_t pos = starting_bit & 0X3F;
@@ -41,5 +41,4 @@ target_sources(quantiles_test
PRIVATE
quantiles_sketch_test.cpp
quantiles_compatibility_test.cpp
#simple_test.cpp
)

0 comments on commit bf2a0bc

Please sign in to comment.