Skip to content
Permalink
Browse files
Merge branch 'master' into tuple_sketch
  • Loading branch information
AlexanderSaydakov committed Aug 14, 2020
2 parents 013b0d0 + fa95167 commit 67d7203595229704e7c0575dd9b1eef1f7e87b6e
Showing 20 changed files with 481 additions and 270 deletions.
@@ -44,7 +44,7 @@ jobs:
with:
submodules: true
- name: Configure
run: mkdir build && cd build && cmake ..
run: cd build && cmake ..
- name: Build C++ unit tests
run: cmake --build build --config Release
- name: Run C++ tests
@@ -24,7 +24,7 @@ jobs:
cd "lcov-$VERSION"
sudo make install
- name: Configure
run: mkdir build && cd build && cmake .. -DCOVERAGE=ON
run: cd build && cmake .. -DCOVERAGE=ON
- name: Build unit tests
run: cmake --build build
- name: Run tests
@@ -16,7 +16,6 @@
*.dll
*.dylib
bin/
build/
lib/
Default/

@@ -0,0 +1,7 @@
# build/ directory for convenience, but should remain empty

# Ignore everything in here
*

# Add an exception for this file
!.gitignore
@@ -50,7 +50,7 @@ struct serde<T, typename std::enable_if<std::is_arithmetic<T>::value>::type> {
void serialize(std::ostream& os, const T* items, unsigned num) const {
bool failure = false;
try {
os.write((char*)items, sizeof(T) * num);
os.write(reinterpret_cast<const char*>(items), sizeof(T) * num);
} catch (std::ostream::failure& e) {
failure = true;
}
@@ -114,17 +114,16 @@ struct serde<std::string> {
unsigned i = 0;
bool failure = false;
try {
for (; i < num && is.good(); i++) {
for (; i < num; i++) {
uint32_t length;
is.read((char*)&length, sizeof(length));
if (!is.good()) { break; }
std::string str;
str.reserve(length);
auto it = std::istreambuf_iterator<char>(is);
for (uint32_t j = 0; j < length; j++) {
str.push_back(*it);
++it;
str.push_back(is.get());
}
if (!is.good()) { break; }
new (&items[i]) std::string(std::move(str));
}
} catch (std::istream::failure& e) {
@@ -26,31 +26,38 @@ namespace datasketches {

template <typename T, typename C, typename A>
class kll_quantile_calculator {
typedef typename std::allocator_traits<A>::template rebind_alloc<uint32_t> AllocU32;
typedef typename std::allocator_traits<A>::template rebind_alloc<uint64_t> AllocU64;
public:
// assumes that all levels are sorted including level 0
kll_quantile_calculator(const T* items, const uint32_t* levels, uint8_t num_levels, uint64_t n);
~kll_quantile_calculator();
T get_quantile(double fraction) const;

private:
using AllocU32 = typename std::allocator_traits<A>::template rebind_alloc<uint32_t>;
using vector_u32 = std::vector<uint32_t, AllocU32>;
using Entry = std::pair<T, uint64_t>;
using AllocEntry = typename std::allocator_traits<A>::template rebind_alloc<Entry>;
using Container = std::vector<Entry, AllocEntry>;
uint64_t n_;
T* items_;
uint64_t* weights_;
uint32_t* levels_;
uint8_t levels_size_;
uint8_t num_levels_;
vector_u32 levels_;
Container entries_;

void populate_from_sketch(const T* items, uint32_t num_items, const uint32_t* levels, uint8_t num_levels);
void populate_from_sketch(const T* items, const uint32_t* levels, uint8_t num_levels);
T approximately_answer_positional_query(uint64_t pos) const;
static void convert_to_preceding_cummulative(uint64_t* weights, uint32_t weights_size);
void convert_to_preceding_cummulative();
uint32_t chunk_containing_pos(uint64_t pos) const;
uint32_t search_for_chunk_containing_pos(uint64_t pos, uint32_t l, uint32_t r) const;
static void merge_sorted_blocks(Container& entries, const uint32_t* levels, uint8_t num_levels, uint32_t num_items);
static void merge_sorted_blocks_direct(Container& orig, Container& temp, const uint32_t* levels, uint8_t starting_level, uint8_t num_levels);
static void merge_sorted_blocks_reversed(Container& orig, Container& temp, const uint32_t* levels, uint8_t starting_level, uint8_t num_levels);
static uint64_t pos_of_phi(double phi, uint64_t n);
static uint32_t chunk_containing_pos(uint64_t* weights, uint32_t weights_size, uint64_t pos);
static uint32_t search_for_chunk_containing_pos(const uint64_t* arr, uint64_t pos, uint32_t l, uint32_t r);
static void blocky_tandem_merge_sort(T* items, uint64_t* weights, uint32_t num_items, const uint32_t* levels, uint8_t num_levels);
static void blocky_tandem_merge_sort_recursion(T* items_src, uint64_t* weights_src, T* items_dst, uint64_t* weights_dst, const uint32_t* levels, uint8_t starting_level, uint8_t num_levels);
static void tandem_merge(const T* items_src, const uint64_t* weights_src, T* items_dst, uint64_t* weights_dst, const uint32_t* levels, uint8_t starting_level_1, uint8_t num_levels_1, uint8_t starting_level_2, uint8_t num_levels_2);

template<typename Comparator>
struct compare_pair_by_first {
template<typename Entry1, typename Entry2>
bool operator()(Entry1&& a, Entry2&& b) const {
return Comparator()(std::forward<Entry1>(a).first, std::forward<Entry2>(b).first);
}
};
};

} /* namespace datasketches */
@@ -22,31 +22,22 @@

#include <memory>
#include <cmath>
#include <algorithm>

#include "kll_helper.hpp"

namespace datasketches {

template <typename T, typename C, typename A>
kll_quantile_calculator<T, C, A>::kll_quantile_calculator(const T* items, const uint32_t* levels, uint8_t num_levels, uint64_t n) {
n_ = n;
kll_quantile_calculator<T, C, A>::kll_quantile_calculator(const T* items, const uint32_t* levels, uint8_t num_levels, uint64_t n):
n_(n), levels_(num_levels + 1)
{
const uint32_t num_items = levels[num_levels] - levels[0];
items_ = A().allocate(num_items);
weights_ = AllocU64().allocate(num_items + 1); // one more is intentional
levels_size_ = num_levels + 1;
levels_ = AllocU32().allocate(levels_size_);
populate_from_sketch(items, num_items, levels, num_levels);
blocky_tandem_merge_sort(items_, weights_, num_items, levels_, num_levels_);
convert_to_preceding_cummulative(weights_, num_items + 1);
}

template <typename T, typename C, typename A>
kll_quantile_calculator<T, C, A>::~kll_quantile_calculator() {
const uint32_t num_items = levels_[num_levels_] - levels_[0];
for (uint32_t i = 0; i < num_items; i++) items_[i].~T();
A().deallocate(items_, num_items);
AllocU64().deallocate(weights_, num_items + 1);
AllocU32().deallocate(levels_, levels_size_);
entries_.reserve(num_items);
populate_from_sketch(items, levels, num_levels);
merge_sorted_blocks(entries_, levels_.data(), levels_.size() - 1, num_items);
if (!is_sorted(entries_.begin(), entries_.end(), compare_pair_by_first<C>())) throw std::logic_error("entries must be sorted");
convert_to_preceding_cummulative();
}

template <typename T, typename C, typename A>
@@ -55,42 +46,43 @@ T kll_quantile_calculator<T, C, A>::get_quantile(double fraction) const {
}

template <typename T, typename C, typename A>
void kll_quantile_calculator<T, C, A>::populate_from_sketch(const T* items, uint32_t num_items, const uint32_t* levels, uint8_t num_levels) {
kll_helper::copy_construct<T>(items, levels[0], levels[num_levels], items_, 0);
uint8_t src_level = 0;
uint8_t dst_level = 0;
void kll_quantile_calculator<T, C, A>::populate_from_sketch(const T* items, const uint32_t* levels, uint8_t num_levels) {
size_t src_level = 0;
size_t dst_level = 0;
uint64_t weight = 1;
uint32_t offset = levels[0];
while (src_level < num_levels) {
const uint32_t from_index(levels[src_level] - offset);
const uint32_t to_index(levels[src_level + 1] - offset); // exclusive
if (from_index < to_index) { // skip empty levels
std::fill(&weights_[from_index], &weights_[to_index], weight);
for (uint32_t i = from_index; i < to_index; ++i) {
entries_.push_back(Entry(items[i + offset], weight));
}
levels_[dst_level] = from_index;
levels_[dst_level + 1] = to_index;
dst_level++;
}
src_level++;
weight *= 2;
}
weights_[num_items] = 0;
num_levels_ = dst_level;
if (levels_.size() > static_cast<size_t>(dst_level + 1)) levels_.resize(dst_level + 1);
}

template <typename T, typename C, typename A>
T kll_quantile_calculator<T, C, A>::approximately_answer_positional_query(uint64_t pos) const {
if (pos >= n_) throw std::logic_error("position out of range");
const uint32_t weights_size(levels_[num_levels_] + 1);
const uint32_t index = chunk_containing_pos(weights_, weights_size, pos);
return items_[index];
const uint32_t num_items = levels_[levels_.size() - 1];
if (pos > entries_[num_items - 1].second) return entries_[num_items - 1].first;
const uint32_t index = chunk_containing_pos(pos);
return entries_[index].first;
}

template <typename T, typename C, typename A>
void kll_quantile_calculator<T, C, A>::convert_to_preceding_cummulative(uint64_t* weights, uint32_t weights_size) {
uint64_t subtotal(0);
for (uint32_t i = 0; i < weights_size; i++) {
const uint32_t new_subtotal = subtotal + weights[i];
weights[i] = subtotal;
void kll_quantile_calculator<T, C, A>::convert_to_preceding_cummulative() {
uint64_t subtotal = 0;
for (auto& entry: entries_) {
const uint32_t new_subtotal = subtotal + entry.second;
entry.second = subtotal;
subtotal = new_subtotal;
}
}
@@ -102,87 +94,74 @@ uint64_t kll_quantile_calculator<T, C, A>::pos_of_phi(double phi, uint64_t n) {
}

template <typename T, typename C, typename A>
uint32_t kll_quantile_calculator<T, C, A>::chunk_containing_pos(uint64_t* weights, uint32_t weights_size, uint64_t pos) {
if (weights_size <= 1) throw std::logic_error("weights array too short"); // weights_ contains an "extra" position
const uint32_t nominal_length(weights_size - 1);
if (pos < weights[0]) throw std::logic_error("position too small");
if (pos >= weights[nominal_length]) throw std::logic_error("position too large");
return search_for_chunk_containing_pos(weights, pos, 0, nominal_length);
uint32_t kll_quantile_calculator<T, C, A>::chunk_containing_pos(uint64_t pos) const {
if (entries_.size() < 1) throw std::logic_error("array too short");
if (pos < entries_[0].second) throw std::logic_error("position too small");
if (pos > entries_[entries_.size() - 1].second) throw std::logic_error("position too large");
return search_for_chunk_containing_pos(pos, 0, entries_.size());
}

template <typename T, typename C, typename A>
uint32_t kll_quantile_calculator<T, C, A>::search_for_chunk_containing_pos(const uint64_t* arr, uint64_t pos, uint32_t l, uint32_t r) {
uint32_t kll_quantile_calculator<T, C, A>::search_for_chunk_containing_pos(uint64_t pos, uint32_t l, uint32_t r) const {
if (l + 1 == r) {
return l;
}
const uint32_t m(l + (r - l) / 2);
if (arr[m] <= pos) {
return search_for_chunk_containing_pos(arr, pos, m, r);
if (entries_[m].second <= pos) {
return search_for_chunk_containing_pos(pos, m, r);
}
return search_for_chunk_containing_pos(arr, pos, l, m);
return search_for_chunk_containing_pos(pos, l, m);
}

template <typename T, typename C, typename A>
void kll_quantile_calculator<T, C, A>::blocky_tandem_merge_sort(T* items, uint64_t* weights, uint32_t num_items, const uint32_t* levels, uint8_t num_levels) {
void kll_quantile_calculator<T, C, A>::merge_sorted_blocks(Container& entries, const uint32_t* levels, uint8_t num_levels, uint32_t num_items) {
if (num_levels == 1) return;

// move the input in preparation for the "ping-pong" reduction strategy
auto tmp_items_deleter = [num_items](T* ptr) {
for (uint32_t i = 0; i < num_items; i++) ptr[i].~T();
A().deallocate(ptr, num_items);
};
std::unique_ptr<T, decltype(tmp_items_deleter)> tmp_items(A().allocate(num_items), tmp_items_deleter);
kll_helper::move_construct<T>(items, 0, num_items, tmp_items.get(), 0, false); // do not destroy since the items will be moved back
auto tmp_weights_deleter = [num_items](uint64_t* ptr) { AllocU64().deallocate(ptr, num_items); };
std::unique_ptr<uint64_t[], decltype(tmp_weights_deleter)> tmp_weights(AllocU64().allocate(num_items), tmp_weights_deleter); // don't need the extra one here
std::copy(weights, &weights[num_items], tmp_weights.get());
blocky_tandem_merge_sort_recursion(tmp_items.get(), tmp_weights.get(), items, weights, levels, 0, num_levels);
Container temporary;
temporary.reserve(num_items);
merge_sorted_blocks_direct(entries, temporary, levels, 0, num_levels);
}

template <typename T, typename C, typename A>
void kll_quantile_calculator<T, C, A>::blocky_tandem_merge_sort_recursion(T* items_src, uint64_t* weights_src, T* items_dst, uint64_t* weights_dst, const uint32_t* levels, uint8_t starting_level, uint8_t num_levels) {
void kll_quantile_calculator<T, C, A>::merge_sorted_blocks_direct(Container& orig, Container& temp, const uint32_t* levels,
uint8_t starting_level, uint8_t num_levels) {
if (num_levels == 1) return;
const uint8_t num_levels_1 = num_levels / 2;
const uint8_t num_levels_2 = num_levels - num_levels_1;
if (num_levels_1 < 1) throw std::logic_error("level above 0 expected");
if (num_levels_2 < num_levels_1) throw std::logic_error("wrong order of levels");
const uint8_t starting_level_1 = starting_level;
const uint8_t starting_level_2 = starting_level + num_levels_1;
// swap roles of src and dst
blocky_tandem_merge_sort_recursion(items_dst, weights_dst, items_src, weights_src, levels, starting_level_1, num_levels_1);
blocky_tandem_merge_sort_recursion(items_dst, weights_dst, items_src, weights_src, levels, starting_level_2, num_levels_2);
tandem_merge(items_src, weights_src, items_dst, weights_dst, levels, starting_level_1, num_levels_1, starting_level_2, num_levels_2);
const auto chunk_begin = temp.begin() + temp.size();
merge_sorted_blocks_reversed(orig, temp, levels, starting_level_1, num_levels_1);
merge_sorted_blocks_reversed(orig, temp, levels, starting_level_2, num_levels_2);
const uint32_t num_items_1 = levels[starting_level_1 + num_levels_1] - levels[starting_level_1];
std::merge(
std::make_move_iterator(chunk_begin), std::make_move_iterator(chunk_begin + num_items_1),
std::make_move_iterator(chunk_begin + num_items_1), std::make_move_iterator(temp.end()),
orig.begin() + levels[starting_level], compare_pair_by_first<C>()
);
temp.erase(chunk_begin, temp.end());
}

template <typename T, typename C, typename A>
void kll_quantile_calculator<T, C, A>::tandem_merge(const T* items_src, const uint64_t* weights_src, T* items_dst, uint64_t* weights_dst, const uint32_t* levels, uint8_t starting_level_1, uint8_t num_levels_1, uint8_t starting_level_2, uint8_t num_levels_2) {
const auto from_index_1 = levels[starting_level_1];
const auto to_index_1 = levels[starting_level_1 + num_levels_1]; // exclusive
const auto from_index_2 = levels[starting_level_2];
const auto to_index_2 = levels[starting_level_2 + num_levels_2]; // exclusive
auto i_src_1 = from_index_1;
auto i_src_2 = from_index_2;
auto i_dst = from_index_1;

while ((i_src_1 < to_index_1) && (i_src_2 < to_index_2)) {
if (C()(items_src[i_src_1], items_src[i_src_2])) {
items_dst[i_dst] = std::move(items_src[i_src_1]);
weights_dst[i_dst] = weights_src[i_src_1];
i_src_1++;
} else {
items_dst[i_dst] = std::move(items_src[i_src_2]);
weights_dst[i_dst] = weights_src[i_src_2];
i_src_2++;
}
i_dst++;
}
if (i_src_1 < to_index_1) {
std::move(&items_src[i_src_1], &items_src[to_index_1], &items_dst[i_dst]);
std::copy(&weights_src[i_src_1], &weights_src[to_index_1], &weights_dst[i_dst]);
} else if (i_src_2 < to_index_2) {
std::move(&items_src[i_src_2], &items_src[to_index_2], &items_dst[i_dst]);
std::copy(&weights_src[i_src_2], &weights_src[to_index_2], &weights_dst[i_dst]);
void kll_quantile_calculator<T, C, A>::merge_sorted_blocks_reversed(Container& orig, Container& temp, const uint32_t* levels,
uint8_t starting_level, uint8_t num_levels) {
if (num_levels == 1) {
std::move(orig.begin() + levels[starting_level], orig.begin() + levels[starting_level + 1], std::back_inserter(temp));
return;
}
const uint8_t num_levels_1 = num_levels / 2;
const uint8_t num_levels_2 = num_levels - num_levels_1;
const uint8_t starting_level_1 = starting_level;
const uint8_t starting_level_2 = starting_level + num_levels_1;
merge_sorted_blocks_direct(orig, temp, levels, starting_level_1, num_levels_1);
merge_sorted_blocks_direct(orig, temp, levels, starting_level_2, num_levels_2);
std::merge(
std::make_move_iterator(orig.begin() + levels[starting_level_1]),
std::make_move_iterator(orig.begin() + levels[starting_level_1 + num_levels_1]),
std::make_move_iterator(orig.begin() + levels[starting_level_2]),
std::make_move_iterator(orig.begin() + levels[starting_level_2 + num_levels_2]),
std::back_inserter(temp),
compare_pair_by_first<C>()
);
}

} /* namespace datasketches */

0 comments on commit 67d7203

Please sign in to comment.