diff --git a/quantiles/include/quantiles_sketch.hpp b/quantiles/include/quantiles_sketch.hpp index 8d3c8a25..2db43570 100644 --- a/quantiles/include/quantiles_sketch.hpp +++ b/quantiles/include/quantiles_sketch.hpp @@ -430,29 +430,29 @@ class quantiles_sketch { using Level = std::vector; using AllocLevel = typename std::allocator_traits::template rebind_alloc; - // TODO: FIX THIS! /* Serialized sketch layout: - * Adr: - * || 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | - * 0 || unused | M |--------K--------| Flags | FamID | SerVer | PreambleInts | - * || 15 | 14 | 13 | 12 | 11 | 10 | 9 | 8 | - * 1 ||-----------------------------------N------------------------------------------| - * || 23 | 22 | 21 | 20 | 19 | 18 | 17 | 16 | - * 2 ||---------------data----------------|-unused-|numLevels|-------min K-----------| + * Long || Start Byte Adr: + * Adr: + * || 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | + * 0 || Preamble_Longs | SerVer | FamID | Flags |----- K ---------|---- unused -----| + * + * || 8 | 9 | 10 | 11 | 12 | 13 | 14 | 15 | + * 1 ||---------------------------Items Seen Count (N)--------------------------------| + * + * Long 3 is the start of data, beginning with serialized min and max values, followed by + * the sketch data buffers. */ static const size_t EMPTY_SIZE_BYTES = 8; - static const size_t DATA_START_SINGLE_ITEM = 8; - static const size_t DATA_START = 20; - static const uint8_t SERIAL_VERSION_2 = 1; static const uint8_t SERIAL_VERSION_3 = 2; static const uint8_t FAMILY = 15; - enum flags { IS_EMPTY, IS_LEVEL_ZERO_SORTED, IS_SINGLE_ITEM }; + enum flags { RESERVED0, RESERVED1, IS_EMPTY, IS_COMPACT, IS_SORTED }; - static const uint8_t PREAMBLE_LONGS_SHORT = 1; // for empty and single item + static const uint8_t PREAMBLE_LONGS_SHORT = 1; // for empty static const uint8_t PREAMBLE_LONGS_FULL = 2; + static const size_t DATA_START = 16; Allocator allocator_; uint16_t k_; @@ -462,6 +462,7 @@ class quantiles_sketch { std::vector levels_; T* min_value_; T* max_value_; + bool is_sorted_; using QuantileCalculator = quantile_calculator; using AllocCalc = typename std::allocator_traits::template rebind_alloc; diff --git a/quantiles/include/quantiles_sketch_impl.hpp b/quantiles/include/quantiles_sketch_impl.hpp index bf090144..2dba12e8 100644 --- a/quantiles/include/quantiles_sketch_impl.hpp +++ b/quantiles/include/quantiles_sketch_impl.hpp @@ -40,7 +40,8 @@ bit_pattern_(0), base_buffer_(allocator_), levels_(allocator_), min_value_(nullptr), -max_value_(nullptr) +max_value_(nullptr), +is_sorted_(true) { if (k < quantiles_constants::MIN_K || k > quantiles_constants::MAX_K) { throw std::invalid_argument("K must be >= " + std::to_string(quantiles_constants::MIN_K) + " and <= " + std::to_string(quantiles_constants::MAX_K) + ": " + std::to_string(k)); @@ -57,7 +58,8 @@ bit_pattern_(other.bit_pattern_), base_buffer_(other.base_buffer_), levels_(other.levels_), min_value_(nullptr), -max_value_(nullptr) +max_value_(nullptr), +is_sorted_(other.is_sorted_) { if (other.min_value_ != nullptr) min_value_ = new (allocator_.allocate(1)) T(*other.min_value_); if (other.max_value_ != nullptr) max_value_ = new (allocator_.allocate(1)) T(*other.max_value_); @@ -72,7 +74,8 @@ bit_pattern_(other.bit_pattern_), base_buffer_(std::move(other.base_buffer_)), levels_(std::move(other.levels_)), min_value_(other.min_value_), -max_value_(other.max_value_) +max_value_(other.max_value_), +is_sorted_(other.is_sorted_) { other.min_value_ = nullptr; other.max_value_ = nullptr; @@ -89,6 +92,7 @@ quantiles_sketch& quantiles_sketch::operator=(const quan std::swap(levels_, copy.levels_); std::swap(min_value_, copy.min_value_); std::swap(max_value_, copy.max_value_); + std::swap(is_sorted_, copy.is_sorted_); return *this; } @@ -102,6 +106,7 @@ quantiles_sketch& quantiles_sketch::operator=(quantiles_ std::swap(levels_, other.levels_); std::swap(min_value_, other.min_value_); std::swap(max_value_, other.max_value_); + std::swap(is_sorted_, other.is_sorted_); return *this; } @@ -136,11 +141,98 @@ void quantiles_sketch::update(FwdT&& item) { base_buffer_.push_back(std::forward(item)); ++n_; + + if (base_buffer_.size() > 1) + is_sorted_ = false; if (base_buffer_.size() == 2 * k_) process_full_base_buffer(); } +template +void quantiles_sketch::serialize(std::ostream& os) const { + const uint8_t preamble_longs = is_empty() ? PREAMBLE_LONGS_SHORT : PREAMBLE_LONGS_FULL; + write(os, preamble_longs); + const uint8_t ser_ver = SERIAL_VERSION_3; + write(os, ser_ver); + const uint8_t family = FAMILY; + write(os, family); + + // empty, ordered, compact are valid flags + const uint8_t flags_byte( + (is_empty() ? 1 << flags::IS_EMPTY : 0) + | (is_sorted_ ? 1 << flags::IS_SORTED : 0) + | (1 << flags::IS_COMPACT) // always compact -- could be optional for numeric types? + ); + write(os, flags_byte); + write(os, k_); + uint16_t unused = 0; + write(os, unused); + + if (!is_empty()) { + write(os, n_); + + // min and max + S().serialize(os, min_value_, 1); + S().serialize(os, max_value_, 1); + + // base buffer items + S().serialize(os, base_buffer_.data(), base_buffer_.size()); + + // levels, only when data is present + for (Level lvl : levels_) { + if (lvl.size() > 0) + S().serialize(os, lvl.data(), lvl.size()); + } + } +} + +template +auto quantiles_sketch::serialize(unsigned header_size_bytes) const -> vector_bytes { + const size_t size = get_serialized_size_bytes() + header_size_bytes; + vector_bytes bytes(size, 0, allocator_); + uint8_t* ptr = bytes.data() + header_size_bytes; + const uint8_t* end_ptr = ptr + size; + + const uint8_t preamble_longs = is_empty() ? PREAMBLE_LONGS_SHORT : PREAMBLE_LONGS_FULL; + ptr += copy_to_mem(preamble_longs, ptr); + const uint8_t ser_ver = SERIAL_VERSION_3; + ptr += copy_to_mem(ser_ver, ptr); + const uint8_t family = FAMILY; + ptr += copy_to_mem(family, ptr); + + // empty, ordered, compact are valid flags + const uint8_t flags_byte( + (is_empty() ? 1 << flags::IS_EMPTY : 0) + | (is_sorted_ ? 1 << flags::IS_SORTED : 0) + | (1 << flags::IS_COMPACT) // always compact -- could be optional for numeric types? + ); + ptr += copy_to_mem(flags_byte, ptr); + ptr += copy_to_mem(k_, ptr); + ptr += sizeof(uint16_t); // 2 unused bytes + + if (!is_empty()) { + ptr += copy_to_mem(n_, ptr); + + // min and max + ptr += S().serialize(ptr, end_ptr - ptr, min_value_, 1); + ptr += S().serialize(ptr, end_ptr - ptr, max_value_, 1); + + // base buffer items + if (base_buffer_.size() > 0) + ptr += S().serialize(ptr, end_ptr - ptr, base_buffer_.data(), base_buffer_.size()); + + // levels, only when data is present + for (Level lvl : levels_) { + if (lvl.size() > 0) + ptr += S().serialize(ptr, end_ptr - ptr, lvl.data(), lvl.size()); + } + } + + return bytes; +} + + template string quantiles_sketch::to_string(bool print_levels, bool print_items) const { // Using a temporary stream for implementation here does not comply with AllocatorAwareContainer requirements. @@ -278,7 +370,10 @@ template template auto quantiles_sketch::get_quantile_calculator() const -> QuantileCalculatorPtr { // allow side effect of sorting the base buffer - std::sort(const_cast(base_buffer_).begin(), const_cast(base_buffer_).end(), C()); + // can't set the sorted flag since this is a const method + if (!is_sorted_) { + std::sort(const_cast(base_buffer_).begin(), const_cast(base_buffer_).end(), C()); + } AllocCalc ac(allocator_); QuantileCalculatorPtr quantile_calculator_ptr( @@ -462,7 +557,8 @@ void quantiles_sketch::process_full_base_buffer() { levels_[0], // unused here, but 0 is guaranteed to exist base_buffer_, true, *this); - base_buffer_.clear(); + base_buffer_.clear(); + is_sorted_ = true; assert(n_ / (2 * k_) == bit_pattern_); // internal consistency check } diff --git a/quantiles/test/quantiles_sketch_test.cpp b/quantiles/test/quantiles_sketch_test.cpp index 7f89da26..17a7eee0 100644 --- a/quantiles/test/quantiles_sketch_test.cpp +++ b/quantiles/test/quantiles_sketch_test.cpp @@ -276,7 +276,7 @@ TEST_CASE("quantiles sketch", "[quantiles_sketch]") { } } } -/* + /* SECTION("deserialize from java") { std::ifstream is; is.exceptions(std::ios::failbit | std::ios::badbit);