Skip to content

Commit

Permalink
add serialization (self-consistent, not fully tested)
Browse files Browse the repository at this point in the history
  • Loading branch information
jmalkin committed Feb 18, 2022
1 parent 59f40ad commit addcaa6
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 19 deletions.
27 changes: 14 additions & 13 deletions quantiles/include/quantiles_sketch.hpp
Expand Up @@ -430,29 +430,29 @@ class quantiles_sketch {
using Level = std::vector<T, Allocator>;
using AllocLevel = typename std::allocator_traits<Allocator>::template rebind_alloc<Level>;

// TODO: FIX THIS!
/* Serialized sketch layout:
* Adr:
* || 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 |
* 0 || unused | M |--------K--------| Flags | FamID | SerVer | PreambleInts |
* || 15 | 14 | 13 | 12 | 11 | 10 | 9 | 8 |
* 1 ||-----------------------------------N------------------------------------------|
* || 23 | 22 | 21 | 20 | 19 | 18 | 17 | 16 |
* 2 ||---------------data----------------|-unused-|numLevels|-------min K-----------|
* Long || Start Byte Adr:
* Adr:
* || 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 |
* 0 || Preamble_Longs | SerVer | FamID | Flags |----- K ---------|---- unused -----|
*
* || 8 | 9 | 10 | 11 | 12 | 13 | 14 | 15 |
* 1 ||---------------------------Items Seen Count (N)--------------------------------|
*
* Long 3 is the start of data, beginning with serialized min and max values, followed by
* the sketch data buffers.
*/

static const size_t EMPTY_SIZE_BYTES = 8;
static const size_t DATA_START_SINGLE_ITEM = 8;
static const size_t DATA_START = 20;

static const uint8_t SERIAL_VERSION_2 = 1;
static const uint8_t SERIAL_VERSION_3 = 2;
static const uint8_t FAMILY = 15;

enum flags { IS_EMPTY, IS_LEVEL_ZERO_SORTED, IS_SINGLE_ITEM };
enum flags { RESERVED0, RESERVED1, IS_EMPTY, IS_COMPACT, IS_SORTED };

static const uint8_t PREAMBLE_LONGS_SHORT = 1; // for empty and single item
static const uint8_t PREAMBLE_LONGS_SHORT = 1; // for empty
static const uint8_t PREAMBLE_LONGS_FULL = 2;
static const size_t DATA_START = 16;

Allocator allocator_;
uint16_t k_;
Expand All @@ -462,6 +462,7 @@ class quantiles_sketch {
std::vector<Level, AllocLevel> levels_;
T* min_value_;
T* max_value_;
bool is_sorted_;

using QuantileCalculator = quantile_calculator<T, Comparator, Allocator>;
using AllocCalc = typename std::allocator_traits<Allocator>::template rebind_alloc<QuantileCalculator>;
Expand Down
106 changes: 101 additions & 5 deletions quantiles/include/quantiles_sketch_impl.hpp
Expand Up @@ -40,7 +40,8 @@ bit_pattern_(0),
base_buffer_(allocator_),
levels_(allocator_),
min_value_(nullptr),
max_value_(nullptr)
max_value_(nullptr),
is_sorted_(true)
{
if (k < quantiles_constants::MIN_K || k > quantiles_constants::MAX_K) {
throw std::invalid_argument("K must be >= " + std::to_string(quantiles_constants::MIN_K) + " and <= " + std::to_string(quantiles_constants::MAX_K) + ": " + std::to_string(k));
Expand All @@ -57,7 +58,8 @@ bit_pattern_(other.bit_pattern_),
base_buffer_(other.base_buffer_),
levels_(other.levels_),
min_value_(nullptr),
max_value_(nullptr)
max_value_(nullptr),
is_sorted_(other.is_sorted_)
{
if (other.min_value_ != nullptr) min_value_ = new (allocator_.allocate(1)) T(*other.min_value_);
if (other.max_value_ != nullptr) max_value_ = new (allocator_.allocate(1)) T(*other.max_value_);
Expand All @@ -72,7 +74,8 @@ bit_pattern_(other.bit_pattern_),
base_buffer_(std::move(other.base_buffer_)),
levels_(std::move(other.levels_)),
min_value_(other.min_value_),
max_value_(other.max_value_)
max_value_(other.max_value_),
is_sorted_(other.is_sorted_)
{
other.min_value_ = nullptr;
other.max_value_ = nullptr;
Expand All @@ -89,6 +92,7 @@ quantiles_sketch<T, C, S, A>& quantiles_sketch<T, C, S, A>::operator=(const quan
std::swap(levels_, copy.levels_);
std::swap(min_value_, copy.min_value_);
std::swap(max_value_, copy.max_value_);
std::swap(is_sorted_, copy.is_sorted_);
return *this;
}

Expand All @@ -102,6 +106,7 @@ quantiles_sketch<T, C, S, A>& quantiles_sketch<T, C, S, A>::operator=(quantiles_
std::swap(levels_, other.levels_);
std::swap(min_value_, other.min_value_);
std::swap(max_value_, other.max_value_);
std::swap(is_sorted_, other.is_sorted_);
return *this;
}

Expand Down Expand Up @@ -136,11 +141,98 @@ void quantiles_sketch<T, C, S, A>::update(FwdT&& item) {

base_buffer_.push_back(std::forward<FwdT>(item));
++n_;

if (base_buffer_.size() > 1)
is_sorted_ = false;

if (base_buffer_.size() == 2 * k_)
process_full_base_buffer();
}

template<typename T, typename C, typename S, typename A>
void quantiles_sketch<T, C, S, A>::serialize(std::ostream& os) const {
const uint8_t preamble_longs = is_empty() ? PREAMBLE_LONGS_SHORT : PREAMBLE_LONGS_FULL;
write(os, preamble_longs);
const uint8_t ser_ver = SERIAL_VERSION_3;
write(os, ser_ver);
const uint8_t family = FAMILY;
write(os, family);

// empty, ordered, compact are valid flags
const uint8_t flags_byte(
(is_empty() ? 1 << flags::IS_EMPTY : 0)
| (is_sorted_ ? 1 << flags::IS_SORTED : 0)
| (1 << flags::IS_COMPACT) // always compact -- could be optional for numeric types?
);
write(os, flags_byte);
write(os, k_);
uint16_t unused = 0;
write(os, unused);

if (!is_empty()) {
write(os, n_);

// min and max
S().serialize(os, min_value_, 1);
S().serialize(os, max_value_, 1);

// base buffer items
S().serialize(os, base_buffer_.data(), base_buffer_.size());

// levels, only when data is present
for (Level lvl : levels_) {
if (lvl.size() > 0)
S().serialize(os, lvl.data(), lvl.size());
}
}
}

template<typename T, typename C, typename S, typename A>
auto quantiles_sketch<T, C, S, A>::serialize(unsigned header_size_bytes) const -> vector_bytes {
const size_t size = get_serialized_size_bytes() + header_size_bytes;
vector_bytes bytes(size, 0, allocator_);
uint8_t* ptr = bytes.data() + header_size_bytes;
const uint8_t* end_ptr = ptr + size;

const uint8_t preamble_longs = is_empty() ? PREAMBLE_LONGS_SHORT : PREAMBLE_LONGS_FULL;
ptr += copy_to_mem(preamble_longs, ptr);
const uint8_t ser_ver = SERIAL_VERSION_3;
ptr += copy_to_mem(ser_ver, ptr);
const uint8_t family = FAMILY;
ptr += copy_to_mem(family, ptr);

// empty, ordered, compact are valid flags
const uint8_t flags_byte(
(is_empty() ? 1 << flags::IS_EMPTY : 0)
| (is_sorted_ ? 1 << flags::IS_SORTED : 0)
| (1 << flags::IS_COMPACT) // always compact -- could be optional for numeric types?
);
ptr += copy_to_mem(flags_byte, ptr);
ptr += copy_to_mem(k_, ptr);
ptr += sizeof(uint16_t); // 2 unused bytes

if (!is_empty()) {
ptr += copy_to_mem(n_, ptr);

// min and max
ptr += S().serialize(ptr, end_ptr - ptr, min_value_, 1);
ptr += S().serialize(ptr, end_ptr - ptr, max_value_, 1);

// base buffer items
if (base_buffer_.size() > 0)
ptr += S().serialize(ptr, end_ptr - ptr, base_buffer_.data(), base_buffer_.size());

// levels, only when data is present
for (Level lvl : levels_) {
if (lvl.size() > 0)
ptr += S().serialize(ptr, end_ptr - ptr, lvl.data(), lvl.size());
}
}

return bytes;
}


template<typename T, typename C, typename S, typename A>
string<A> quantiles_sketch<T, C, S, A>::to_string(bool print_levels, bool print_items) const {
// Using a temporary stream for implementation here does not comply with AllocatorAwareContainer requirements.
Expand Down Expand Up @@ -278,7 +370,10 @@ template<typename T, typename C, typename S, typename A>
template<bool inclusive>
auto quantiles_sketch<T, C, S, A>::get_quantile_calculator() const -> QuantileCalculatorPtr {
// allow side effect of sorting the base buffer
std::sort(const_cast<Level&>(base_buffer_).begin(), const_cast<Level&>(base_buffer_).end(), C());
// can't set the sorted flag since this is a const method
if (!is_sorted_) {
std::sort(const_cast<Level&>(base_buffer_).begin(), const_cast<Level&>(base_buffer_).end(), C());
}

AllocCalc ac(allocator_);
QuantileCalculatorPtr quantile_calculator_ptr(
Expand Down Expand Up @@ -462,7 +557,8 @@ void quantiles_sketch<T, C, S, A>::process_full_base_buffer() {
levels_[0], // unused here, but 0 is guaranteed to exist
base_buffer_,
true, *this);
base_buffer_.clear();
base_buffer_.clear();
is_sorted_ = true;
assert(n_ / (2 * k_) == bit_pattern_); // internal consistency check
}

Expand Down
2 changes: 1 addition & 1 deletion quantiles/test/quantiles_sketch_test.cpp
Expand Up @@ -276,7 +276,7 @@ TEST_CASE("quantiles sketch", "[quantiles_sketch]") {
}
}
}
/*
/*
SECTION("deserialize from java") {
std::ifstream is;
is.exceptions(std::ios::failbit | std::ios::badbit);
Expand Down

0 comments on commit addcaa6

Please sign in to comment.