Skip to content
Permalink
Browse files
deserialize void*
  • Loading branch information
jmalkin committed Mar 24, 2022
1 parent 41caf63 commit 9801d4346d11357ed4ce7047d631e950c8631b27
Showing 3 changed files with 123 additions and 53 deletions.
@@ -495,7 +495,7 @@ class quantiles_sketch {
static void merge_two_size_k_buffers(Level& arr_in_1, Level& arr_in_2, Level& arr_out);

static Level deserialize_array(std::istream& is, uint32_t num_items, uint32_t capcacity, const S& serde, const Allocator& allocator);
static Level deserialize_array(const void* bytes, size_t size, uint32_t num_items, uint32_t capcacity, const S& serde, const Allocator& allocator);
static std::pair<Level, size_t> deserialize_array(const void* bytes, size_t size, uint32_t num_items, uint32_t capcacity, const S& serde, const Allocator& allocator);

static void check_serial_version(uint8_t serial_version);
static void check_header_validity(uint8_t preamble_longs, uint8_t flags_byte, uint8_t serial_version);
@@ -298,7 +298,7 @@ auto quantiles_sketch<T, C, S, A>::deserialize(std::istream &is, const S& serde,
// load base buffer
const uint32_t bb_items = compute_base_buffer_items(k, items_seen);
uint32_t items_to_read = is_compact ? bb_items : 2 * k;
Level base_buffer = deserialize_array(is, allocator, items_to_read, 2 * k);
Level base_buffer = deserialize_array(is, items_to_read, 2 * k, serde, allocator);

// populate vector of Levels directly
VectorLevels levels(allocator);
@@ -308,7 +308,7 @@ auto quantiles_sketch<T, C, S, A>::deserialize(std::istream &is, const S& serde,
for (size_t i = 0; i < levels_needed; ++i, working_pattern >>= 1) {

if ((working_pattern & 0x01) == 1) {
Level level = deserialize_array(is, allocator, k, k);
Level level = deserialize_array(is, k, k, serde, allocator);
levels.push_back(std::move(level));
} else {
Level level(allocator);
@@ -337,32 +337,115 @@ auto quantiles_sketch<T, C, S, A>::deserialize_array(std::istream& is, uint32_t
level.insert(level.begin(),
std::make_move_iterator(items.get()),
std::make_move_iterator(items.get() + num_items));
return level;
return std::move(level);
}

/*
template<typename T, typename C, typename S, typename A>
auto quantiles_sketch<T, C, S, A>::deserialize(const void* bytes, size_t size, const S& serde, const A &allocator) -> quantiles_sketch {
ensure_minimum_memory(size, 8);
const char* ptr = static_cast<const char*>(bytes);
const char* end_ptr = static_cast<const char*>(bytes) + size;

uint8_t preamble_longs;
ptr += copy_from_mem(ptr, preamble_longs);
uint8_t serial_version;
ptr += copy_from_mem(ptr, serial_version);
uint8_t family_id;
ptr += copy_from_mem(ptr, family_id);
uint8_t flags_byte;
ptr += copy_from_mem(ptr, flags_byte);
uint16_t k;
ptr += copy_from_mem(ptr, k);
uint16_t unused;
ptr += copy_from_mem(ptr, unused);

check_serial_version(serial_version); // a little redundant with the next line, but explicit checks
check_family_id(family_id);
check_header_validity(preamble_longs, flags_byte, serial_version);

const bool is_empty = (flags_byte & (1 << flags::IS_EMPTY)) > 0;
if (is_empty) {
return quantiles_sketch(k, allocator);
}

ensure_minimum_memory(size, 16);
uint64_t items_seen;
ptr += copy_from_mem(ptr, items_seen);

const bool is_compact = (flags_byte & (1 << flags::IS_COMPACT)) > 0;
const bool is_sorted = (flags_byte & (1 << flags::IS_SORTED)) > 0;

A alloc(allocator);
auto item_buffer_deleter = [&alloc](T* ptr) { alloc.deallocate(ptr, 1); };
std::unique_ptr<T, decltype(item_buffer_deleter)> min_value_buffer(alloc.allocate(1), item_buffer_deleter);
std::unique_ptr<T, decltype(item_buffer_deleter)> max_value_buffer(alloc.allocate(1), item_buffer_deleter);
std::unique_ptr<T, item_deleter> min_value(nullptr, item_deleter(allocator));
std::unique_ptr<T, item_deleter> max_value(nullptr, item_deleter(allocator));

ptr += serde.deserialize(ptr, end_ptr - ptr, min_value_buffer.get(), 1);
// serde call did not throw, repackage with destrtuctor
min_value = std::unique_ptr<T, item_deleter>(min_value_buffer.release(), item_deleter(allocator));
ptr += serde.deserialize(ptr, end_ptr - ptr, max_value_buffer.get(), 1);
// serde call did not throw, repackage with destrtuctor
max_value = std::unique_ptr<T, item_deleter>(max_value_buffer.release(), item_deleter(allocator));

// allocate buffers as needed
const uint8_t levels_needed = compute_levels_needed(k, items_seen);
const uint64_t bit_pattern = compute_bit_pattern(k, items_seen);

// Java provides a compact storage layout for a sketch of primitive doubles. The C++ version
// does not currently operate sketches in compact mode, but will only serialize as compact
// to avoid complications around serialization of empty values for generic type T. We also need
// to be able to ingest either serialized format from Java.

// load base buffer
const uint32_t bb_items = compute_base_buffer_items(k, items_seen);
uint32_t items_to_read = is_compact ? bb_items : 2 * k;
auto base_buffer_pair = deserialize_array(ptr, end_ptr - ptr, items_to_read, 2 * k, serde, allocator);
ptr += base_buffer_pair.second;

// populate vector of Levels directly
VectorLevels levels(allocator);
levels.reserve(levels_needed);
if (levels_needed > 0) {
uint64_t working_pattern = bit_pattern;
for (size_t i = 0; i < levels_needed; ++i, working_pattern >>= 1) {

if ((working_pattern & 0x01) == 1) {
auto pair = deserialize_array(ptr, end_ptr - ptr, k, k, serde, allocator);
ptr += pair.second;
levels.push_back(std::move(pair.first));
} else {
Level level(allocator);
level.reserve(k);
levels.push_back(std::move(level));
}
}
}

return quantiles_sketch(k, items_seen, bit_pattern,
std::move(base_buffer_pair.first), std::move(levels), std::move(min_value), std::move(max_value), is_sorted, allocator);
}
*/

template<typename T, typename C, typename S, typename A>
auto quantiles_sketch<T, C, S, A>::deserialize_array(const void* bytes, size_t size, uint32_t num_items, uint32_t capacity, const S& serder, const A& allocator) -> Level {
auto quantiles_sketch<T, C, S, A>::deserialize_array(const void* bytes, size_t size, uint32_t num_items, uint32_t capacity, const S& serde, const A& allocator)
-> std::pair<Level, size_t> {
const char* ptr = static_cast<const char*>(bytes);
const char* end_ptr = static_cast<const char*>(bytes) + size;
A alloc(allocator);
std::unique_ptr<T, items_deleter> items(alloc.allocate(num), items_deleter(allocator, false, num));
serde.deserialize(is, items.get(), num);
std::unique_ptr<T, items_deleter> items(alloc.allocate(num_items), items_deleter(allocator, false, num_items));
ptr += serde.deserialize(ptr, end_ptr - ptr, items.get(), num_items);
// serde did not throw, enable destructors
items.get_deleter().set_destroy(true);
if (!is.good()) throw std::runtime_error("error reading from std::istream");


// succesfully read, now put into a Level
Level level(allocator);
level.reserve(capacity);
level.insert(level.begin(),
std::make_move_iterator(items.get()),
std::make_move_iterator(items.get() + num_items));
return level;

return std::pair<Level, size_t>(std::move(level), ptr - static_cast<const char*>(bytes));
}

template<typename T, typename C, typename S, typename A>
@@ -295,7 +295,7 @@ TEST_CASE("quantiles sketch", "[quantiles_sketch]") {
std::stringstream s(std::ios::in | std::ios::out | std::ios::binary);
sketch.serialize(s);
REQUIRE(static_cast<size_t>(s.tellp()) == sketch.get_serialized_size_bytes());
auto sketch2 = quantiles_float_sketch::deserialize(s, test_allocator<float>(0));
auto sketch2 = quantiles_float_sketch::deserialize(s, serde<float>(), test_allocator<float>(0));
REQUIRE(static_cast<size_t>(s.tellp()) == sketch2.get_serialized_size_bytes());
REQUIRE(s.tellg() == s.tellp());
REQUIRE(sketch2.is_empty() == sketch.is_empty());
@@ -307,11 +307,11 @@ TEST_CASE("quantiles sketch", "[quantiles_sketch]") {
REQUIRE(sketch2.get_normalized_rank_error(false) == sketch.get_normalized_rank_error(false));
REQUIRE(sketch2.get_normalized_rank_error(true) == sketch.get_normalized_rank_error(true));
}
/*

SECTION("bytes serialize deserialize empty") {
kll_float_sketch sketch(200, 0);
quantiles_float_sketch sketch(200, 0);
auto bytes = sketch.serialize();
auto sketch2 = kll_float_sketch::deserialize(bytes.data(), bytes.size(), 0);
auto sketch2 = quantiles_float_sketch::deserialize(bytes.data(), bytes.size(), serde<float>(), 0);
REQUIRE(bytes.size() == sketch.get_serialized_size_bytes());
REQUIRE(sketch2.is_empty() == sketch.is_empty());
REQUIRE(sketch2.is_estimation_mode() == sketch.is_estimation_mode());
@@ -322,14 +322,14 @@ TEST_CASE("quantiles sketch", "[quantiles_sketch]") {
REQUIRE(sketch2.get_normalized_rank_error(false) == sketch.get_normalized_rank_error(false));
REQUIRE(sketch2.get_normalized_rank_error(true) == sketch.get_normalized_rank_error(true));
}
*/

SECTION("stream serialize deserialize one item") {
quantiles_float_sketch sketch(200, 0);
sketch.update(1.0f);
std::stringstream s(std::ios::in | std::ios::out | std::ios::binary);
sketch.serialize(s);
REQUIRE(static_cast<size_t>(s.tellp()) == sketch.get_serialized_size_bytes());
auto sketch2 = quantiles_float_sketch::deserialize(s, test_allocator<float>(0));
auto sketch2 = quantiles_float_sketch::deserialize(s, serde<float>(), test_allocator<float>(0));
REQUIRE(static_cast<size_t>(s.tellp()) == sketch2.get_serialized_size_bytes());
REQUIRE(s.tellg() == s.tellp());
REQUIRE_FALSE(sketch2.is_empty());
@@ -342,13 +342,13 @@ TEST_CASE("quantiles sketch", "[quantiles_sketch]") {
REQUIRE(sketch2.get_rank(1) == 0.0);
REQUIRE(sketch2.get_rank(2) == 1.0);
}
/*

SECTION("bytes serialize deserialize one item") {
kll_float_sketch sketch(200, 0);
quantiles_float_sketch sketch(200, 0);
sketch.update(1.0f);
auto bytes = sketch.serialize();
REQUIRE(bytes.size() == sketch.get_serialized_size_bytes());
auto sketch2 = kll_float_sketch::deserialize(bytes.data(), bytes.size(), 0);
auto sketch2 = quantiles_float_sketch::deserialize(bytes.data(), bytes.size(), serde<float>(), 0);
REQUIRE(bytes.size() == sketch2.get_serialized_size_bytes());
REQUIRE_FALSE(sketch2.is_empty());
REQUIRE_FALSE(sketch2.is_estimation_mode());
@@ -360,12 +360,12 @@ TEST_CASE("quantiles sketch", "[quantiles_sketch]") {
REQUIRE(sketch2.get_rank(1) == 0.0);
REQUIRE(sketch2.get_rank(2) == 1.0);
}
/*
SECTION("deserialize one item v1") {
std::ifstream is;
is.exceptions(std::ios::failbit | std::ios::badbit);
is.open(testBinaryInputPath + "kll_sketch_float_one_item_v1.sk", std::ios::binary);
auto sketch = kll_float_sketch::deserialize(is, test_allocator<float>(0));
auto sketch = quantiles_float_sketch::deserialize(is, serde<float>(), test_allocator<float>(0));
REQUIRE_FALSE(sketch.is_empty());
REQUIRE_FALSE(sketch.is_estimation_mode());
REQUIRE(sketch.get_n() == 1);
@@ -382,7 +382,7 @@ TEST_CASE("quantiles sketch", "[quantiles_sketch]") {
std::stringstream s(std::ios::in | std::ios::out | std::ios::binary);
sketch.serialize(s);
REQUIRE(static_cast<size_t>(s.tellp()) == sketch.get_serialized_size_bytes());
auto sketch2 = quantiles_float_sketch::deserialize(s, test_allocator<float>(0));
auto sketch2 = quantiles_float_sketch::deserialize(s, serde<float>(), test_allocator<float>(0));
REQUIRE(static_cast<size_t>(s.tellp()) == sketch2.get_serialized_size_bytes());
REQUIRE(s.tellg() == s.tellp());
REQUIRE_FALSE(sketch2.is_empty());
@@ -392,15 +392,15 @@ TEST_CASE("quantiles sketch", "[quantiles_sketch]") {
REQUIRE(sketch2.get_min_value() == 1.0);
REQUIRE(sketch2.get_max_value() == 3.0);
}
/*

SECTION("bytes serialize deserialize three items") {
kll_float_sketch sketch(200, 0);
quantiles_float_sketch sketch(200, 0);
sketch.update(1.0f);
sketch.update(2.0f);
sketch.update(3.0f);
auto bytes = sketch.serialize();
REQUIRE(bytes.size() == sketch.get_serialized_size_bytes());
auto sketch2 = kll_float_sketch::deserialize(bytes.data(), bytes.size(), 0);
auto sketch2 = quantiles_float_sketch::deserialize(bytes.data(), bytes.size(), serde<float>(), 0);
REQUIRE(bytes.size() == sketch2.get_serialized_size_bytes());
REQUIRE_FALSE(sketch2.is_empty());
REQUIRE_FALSE(sketch2.is_estimation_mode());
@@ -409,15 +409,15 @@ TEST_CASE("quantiles sketch", "[quantiles_sketch]") {
REQUIRE(sketch2.get_min_value() == 1.0);
REQUIRE(sketch2.get_max_value() == 3.0);
}
*/

SECTION("stream serialize deserialize many floats") {
quantiles_float_sketch sketch(200, 0);
const int n = 1000;
for (int i = 0; i < n; i++) sketch.update(static_cast<float>(i));
std::stringstream s(std::ios::in | std::ios::out | std::ios::binary);
sketch.serialize(s);
REQUIRE(static_cast<size_t>(s.tellp()) == sketch.get_serialized_size_bytes());
auto sketch2 = quantiles_float_sketch::deserialize(s, test_allocator<float>(0));
auto sketch2 = quantiles_float_sketch::deserialize(s, serde<float>(), test_allocator<float>(0));
REQUIRE(static_cast<size_t>(s.tellp()) == sketch2.get_serialized_size_bytes());
REQUIRE(s.tellg() == s.tellp());
REQUIRE(sketch2.is_empty() == sketch.is_empty());
@@ -432,14 +432,13 @@ TEST_CASE("quantiles sketch", "[quantiles_sketch]") {
REQUIRE(sketch2.get_rank(0) == sketch.get_rank(0));
REQUIRE(sketch2.get_rank(static_cast<float>(n)) == sketch.get_rank(static_cast<float>(n)));
}
/*
SECTION("bytes serialize deserialize many floats") {
kll_float_sketch sketch(200, 0);
quantiles_float_sketch sketch(200, 0);
const int n = 1000;
for (int i = 0; i < n; i++) sketch.update(static_cast<float>(i));
auto bytes = sketch.serialize();
REQUIRE(bytes.size() == sketch.get_serialized_size_bytes());
auto sketch2 = kll_float_sketch::deserialize(bytes.data(), bytes.size(), 0);
auto sketch2 = quantiles_float_sketch::deserialize(bytes.data(), bytes.size(), serde<float>(), 0);
REQUIRE(bytes.size() == sketch2.get_serialized_size_bytes());
REQUIRE(sketch2.is_empty() == sketch.is_empty());
REQUIRE(sketch2.is_estimation_mode() == sketch.is_estimation_mode());
@@ -452,18 +451,18 @@ TEST_CASE("quantiles sketch", "[quantiles_sketch]") {
REQUIRE(sketch2.get_quantile(0.5) == sketch.get_quantile(0.5));
REQUIRE(sketch2.get_rank(0) == sketch.get_rank(0));
REQUIRE(sketch2.get_rank(static_cast<float>(n)) == sketch.get_rank(static_cast<float>(n)));
REQUIRE_THROWS_AS(kll_sketch<int>::deserialize(bytes.data(), 7), std::out_of_range);
REQUIRE_THROWS_AS(kll_sketch<int>::deserialize(bytes.data(), 15), std::out_of_range);
REQUIRE_THROWS_AS(kll_sketch<int>::deserialize(bytes.data(), bytes.size() - 1), std::out_of_range);
REQUIRE_THROWS_AS(quantiles_sketch<int>::deserialize(bytes.data(), 7), std::out_of_range);
REQUIRE_THROWS_AS(quantiles_sketch<int>::deserialize(bytes.data(), 15), std::out_of_range);
REQUIRE_THROWS_AS(quantiles_sketch<int>::deserialize(bytes.data(), bytes.size() - 1), std::out_of_range);
}

SECTION("bytes serialize deserialize many ints") {
kll_sketch<int> sketch;
quantiles_sketch<int> sketch;
const int n = 1000;
for (int i = 0; i < n; i++) sketch.update(i);
auto bytes = sketch.serialize();
REQUIRE(bytes.size() == sketch.get_serialized_size_bytes());
auto sketch2 = kll_sketch<int>::deserialize(bytes.data(), bytes.size());
auto sketch2 = quantiles_sketch<int>::deserialize(bytes.data(), bytes.size());
REQUIRE(bytes.size() == sketch2.get_serialized_size_bytes());
REQUIRE(sketch2.is_empty() == sketch.is_empty());
REQUIRE(sketch2.is_estimation_mode() == sketch.is_estimation_mode());
@@ -476,23 +475,11 @@ TEST_CASE("quantiles sketch", "[quantiles_sketch]") {
REQUIRE(sketch2.get_quantile(0.5) == sketch.get_quantile(0.5));
REQUIRE(sketch2.get_rank(0) == sketch.get_rank(0));
REQUIRE(sketch2.get_rank(n) == sketch.get_rank(n));
REQUIRE_THROWS_AS(kll_sketch<int>::deserialize(bytes.data(), 7), std::out_of_range);
REQUIRE_THROWS_AS(kll_sketch<int>::deserialize(bytes.data(), 15), std::out_of_range);
REQUIRE_THROWS_AS(kll_sketch<int>::deserialize(bytes.data(), bytes.size() - 1), std::out_of_range);
}
SECTION("floor of log2 of fraction") {
REQUIRE(kll_helper::floor_of_log2_of_fraction(0, 1) == 0);
REQUIRE(kll_helper::floor_of_log2_of_fraction(1, 2) == 0);
REQUIRE(kll_helper::floor_of_log2_of_fraction(2, 2) == 0);
REQUIRE(kll_helper::floor_of_log2_of_fraction(3, 2) == 0);
REQUIRE(kll_helper::floor_of_log2_of_fraction(4, 2) == 1);
REQUIRE(kll_helper::floor_of_log2_of_fraction(5, 2) == 1);
REQUIRE(kll_helper::floor_of_log2_of_fraction(6, 2) == 1);
REQUIRE(kll_helper::floor_of_log2_of_fraction(7, 2) == 1);
REQUIRE(kll_helper::floor_of_log2_of_fraction(8, 2) == 2);
REQUIRE_THROWS_AS(quantiles_sketch<int>::deserialize(bytes.data(), 7), std::out_of_range);
REQUIRE_THROWS_AS(quantiles_sketch<int>::deserialize(bytes.data(), 15), std::out_of_range);
REQUIRE_THROWS_AS(quantiles_sketch<int>::deserialize(bytes.data(), bytes.size() - 1), std::out_of_range);
}
*/

SECTION("out of order split points, float") {
quantiles_float_sketch sketch(200, 0);
sketch.update(0); // has too be non-empty to reach the check

0 comments on commit 9801d43

Please sign in to comment.