Skip to content
Permalink
Browse files
Merge pull request #264 from apache/kll_forwarding
Reduced code duplication in KLL
  • Loading branch information
AlexanderSaydakov committed Mar 8, 2022
2 parents 7ee0c36 + 0caa001 commit e6fa0526520667d70e710211c5a37e7cc3e80655
Showing 2 changed files with 24 additions and 88 deletions.
@@ -184,31 +184,17 @@ class kll_sketch {

/**
* Updates this sketch with the given data item.
* This method takes lvalue.
* @param value an item from a stream of items
*/
void update(const T& value);

/**
* Updates this sketch with the given data item.
* This method takes rvalue.
* @param value an item from a stream of items
*/
void update(T&& value);

/**
* Merges another sketch into this one.
* This method takes lvalue.
* @param other sketch to merge into this one
*/
void merge(const kll_sketch& other);
template<typename FwdT>
void update(FwdT&& value);

/**
* Merges another sketch into this one.
* This method takes rvalue.
* @param other sketch to merge into this one
*/
void merge(kll_sketch&& other);
template<typename FwdSk>
void merge(FwdSk&& other);

/**
* Returns true if this sketch is empty.
@@ -567,8 +553,10 @@ class kll_sketch {
const T* split_points, uint32_t size, double* buckets) const;

template<typename O> void merge_higher_levels(O&& other, uint64_t final_n);
void populate_work_arrays(const kll_sketch& other, T* workbuf, uint32_t* worklevels, uint8_t provisional_num_levels);
void populate_work_arrays(kll_sketch&& other, T* workbuf, uint32_t* worklevels, uint8_t provisional_num_levels);

template<typename FwdSk>
void populate_work_arrays(FwdSk&& other, T* workbuf, uint32_t* worklevels, uint8_t provisional_num_levels);

void assert_correct_total_weight() const;
uint32_t safe_level_size(uint8_t level) const;
uint32_t get_num_retained_above_level_zero() const;
@@ -25,6 +25,7 @@
#include <sstream>
#include <stdexcept>

#include "conditional_forward.hpp"
#include "memory_operations.hpp"
#include "kll_helper.hpp"

@@ -147,19 +148,12 @@ kll_sketch<T, C, S, A>::~kll_sketch() {
}

template<typename T, typename C, typename S, typename A>
void kll_sketch<T, C, S, A>::update(const T& value) {
template<typename FwdT>
void kll_sketch<T, C, S, A>::update(FwdT&& value) {
if (!check_update_value(value)) { return; }
update_min_max(value);
const uint32_t index = internal_update();
new (&items_[index]) T(value);
}

template<typename T, typename C, typename S, typename A>
void kll_sketch<T, C, S, A>::update(T&& value) {
if (!check_update_value(value)) { return; }
update_min_max(value);
const uint32_t index = internal_update();
new (&items_[index]) T(std::move(value));
new (&items_[index]) T(std::forward<FwdT>(value));
}

template<typename T, typename C, typename S, typename A>
@@ -182,53 +176,30 @@ uint32_t kll_sketch<T, C, S, A>::internal_update() {
}

template<typename T, typename C, typename S, typename A>
void kll_sketch<T, C, S, A>::merge(const kll_sketch& other) {
template<typename FwdSk>
void kll_sketch<T, C, S, A>::merge(FwdSk&& other) {
if (other.is_empty()) return;
if (m_ != other.m_) {
throw std::invalid_argument("incompatible M: " + std::to_string(m_) + " and " + std::to_string(other.m_));
}
if (is_empty()) {
min_value_ = new (allocator_.allocate(1)) T(*other.min_value_);
max_value_ = new (allocator_.allocate(1)) T(*other.max_value_);
min_value_ = new (allocator_.allocate(1)) T(conditional_forward<FwdSk>(*other.min_value_));
max_value_ = new (allocator_.allocate(1)) T(conditional_forward<FwdSk>(*other.max_value_));
} else {
if (C()(*other.min_value_, *min_value_)) *min_value_ = *other.min_value_;
if (C()(*max_value_, *other.max_value_)) *max_value_ = *other.max_value_;
if (C()(*other.min_value_, *min_value_)) *min_value_ = conditional_forward<FwdSk>(*other.min_value_);
if (C()(*max_value_, *other.max_value_)) *max_value_ = conditional_forward<FwdSk>(*other.max_value_);
}
const uint64_t final_n = n_ + other.n_;
for (uint32_t i = other.levels_[0]; i < other.levels_[1]; i++) {
const uint32_t index = internal_update();
new (&items_[index]) T(other.items_[i]);
new (&items_[index]) T(conditional_forward<FwdSk>(other.items_[i]));
}
if (other.num_levels_ >= 2) merge_higher_levels(other, final_n);
n_ = final_n;
if (other.is_estimation_mode()) min_k_ = std::min(min_k_, other.min_k_);
assert_correct_total_weight();
}

template<typename T, typename C, typename S, typename A>
void kll_sketch<T, C, S, A>::merge(kll_sketch&& other) {
if (other.is_empty()) return;
if (m_ != other.m_) {
throw std::invalid_argument("incompatible M: " + std::to_string(m_) + " and " + std::to_string(other.m_));
}
if (is_empty()) {
min_value_ = new (allocator_.allocate(1)) T(std::move(*other.min_value_));
max_value_ = new (allocator_.allocate(1)) T(std::move(*other.max_value_));
} else {
if (C()(*other.min_value_, *min_value_)) *min_value_ = std::move(*other.min_value_);
if (C()(*max_value_, *other.max_value_)) *max_value_ = std::move(*other.max_value_);
}
const uint64_t final_n = n_ + other.n_;
for (uint32_t i = other.levels_[0]; i < other.levels_[1]; i++) {
const uint32_t index = internal_update();
new (&items_[index]) T(std::move(other.items_[i]));
}
if (other.num_levels_ >= 2) merge_higher_levels(std::forward<kll_sketch>(other), final_n);
n_ = final_n;
if (other.is_estimation_mode()) min_k_ = std::min(min_k_, other.min_k_);
assert_correct_total_weight();
}

template<typename T, typename C, typename S, typename A>
bool kll_sketch<T, C, S, A>::is_empty() const {
return n_ == 0;
@@ -922,9 +893,9 @@ void kll_sketch<T, C, S, A>::merge_higher_levels(O&& other, uint64_t final_n) {
}

// this leaves items_ uninitialized (all objects moved out and destroyed)
// this version copies objects from the incoming sketch
template<typename T, typename C, typename S, typename A>
void kll_sketch<T, C, S, A>::populate_work_arrays(const kll_sketch& other, T* workbuf, uint32_t* worklevels, uint8_t provisional_num_levels) {
template<typename FwdSk>
void kll_sketch<T, C, S, A>::populate_work_arrays(FwdSk&& other, T* workbuf, uint32_t* worklevels, uint8_t provisional_num_levels) {
worklevels[0] = 0;

// the level zero data from "other" was already inserted into "this"
@@ -939,32 +910,9 @@ void kll_sketch<T, C, S, A>::populate_work_arrays(const kll_sketch& other, T* wo
if ((self_pop > 0) && (other_pop == 0)) {
kll_helper::move_construct<T>(items_, levels_[lvl], levels_[lvl] + self_pop, workbuf, worklevels[lvl], true);
} else if ((self_pop == 0) && (other_pop > 0)) {
kll_helper::copy_construct<T>(other.items_, other.levels_[lvl], other.levels_[lvl] + other_pop, workbuf, worklevels[lvl]);
} else if ((self_pop > 0) && (other_pop > 0)) {
kll_helper::merge_sorted_arrays<T, C>(items_, levels_[lvl], self_pop, other.items_, other.levels_[lvl], other_pop, workbuf, worklevels[lvl]);
}
}
}

// this leaves items_ uninitialized (all objects moved out and destroyed)
// this version moves objects from the incoming sketch
template<typename T, typename C, typename S, typename A>
void kll_sketch<T, C, S, A>::populate_work_arrays(kll_sketch&& other, T* workbuf, uint32_t* worklevels, uint8_t provisional_num_levels) {
worklevels[0] = 0;

// the level zero data from "other" was already inserted into "this"
kll_helper::move_construct<T>(items_, levels_[0], levels_[1], workbuf, 0, true);
worklevels[1] = safe_level_size(0);

for (uint8_t lvl = 1; lvl < provisional_num_levels; lvl++) {
const uint32_t self_pop = safe_level_size(lvl);
const uint32_t other_pop = other.safe_level_size(lvl);
worklevels[lvl + 1] = worklevels[lvl] + self_pop + other_pop;

if ((self_pop > 0) && (other_pop == 0)) {
kll_helper::move_construct<T>(items_, levels_[lvl], levels_[lvl] + self_pop, workbuf, worklevels[lvl], true);
} else if ((self_pop == 0) && (other_pop > 0)) {
kll_helper::move_construct<T>(other.items_, other.levels_[lvl], other.levels_[lvl] + other_pop, workbuf, worklevels[lvl], false);
for (auto i = other.levels_[lvl], j = worklevels[lvl]; i < other.levels_[lvl] + other_pop; ++i, ++j) {
new (&workbuf[j]) T(conditional_forward<FwdSk>(other.items_[i]));
}
} else if ((self_pop > 0) && (other_pop > 0)) {
kll_helper::merge_sorted_arrays<T, C>(items_, levels_[lvl], self_pop, other.items_, other.levels_[lvl], other_pop, workbuf, worklevels[lvl]);
}

0 comments on commit e6fa052

Please sign in to comment.