Skip to content
Permalink
Browse files
support serde instance
  • Loading branch information
AlexanderSaydakov committed Mar 25, 2022
1 parent 9d64c7a commit 68f5013a6827f5366bba8cf46f0fc6b9408b425e
Showing 3 changed files with 87 additions and 40 deletions.
@@ -161,7 +161,7 @@ namespace kll_constants {
template <
typename T,
typename C = std::less<T>, // strict weak ordering function (see C++ named requirements: Compare)
typename S = serde<T>,
typename S = serde<T>, // deprecated, to be removed in the next major version
typename A = std::allocator<T>
>
class kll_sketch {
@@ -386,16 +386,17 @@ class kll_sketch {
* This version is for fixed-size arithmetic types (integral and floating point).
* @return size in bytes needed to serialize this sketch
*/
template<typename TT = T, typename std::enable_if<std::is_arithmetic<TT>::value, int>::type = 0>
size_t get_serialized_size_bytes() const;
template<typename TT = T, typename SerDe = S, typename std::enable_if<std::is_arithmetic<TT>::value, int>::type = 0>
size_t get_serialized_size_bytes(const SerDe& sd = SerDe()) const;

/**
* Computes size needed to serialize the current state of the sketch.
* This version is for all other types and can be expensive since every item needs to be looked at.
* @param instance of a SerDe
* @return size in bytes needed to serialize this sketch
*/
template<typename TT = T, typename std::enable_if<!std::is_arithmetic<TT>::value, int>::type = 0>
size_t get_serialized_size_bytes() const;
template<typename TT = T, typename SerDe = S, typename std::enable_if<!std::is_arithmetic<TT>::value, int>::type = 0>
size_t get_serialized_size_bytes(const SerDe& sd = SerDe()) const;

/**
* Returns upper bound on the serialized size of a sketch given a parameter <em>k</em> and stream
@@ -427,8 +428,11 @@ class kll_sketch {
/**
* This method serializes the sketch into a given stream in a binary form
* @param os output stream
* @param instance of a SerDe
*
*/
void serialize(std::ostream& os) const;
template<typename SerDe = S>
void serialize(std::ostream& os, const SerDe& sd = SerDe()) const;

// This is a convenience alias for users
// The type returned by the following serialize method
@@ -440,24 +444,53 @@ class kll_sketch {
* It is a blank space of a given size.
* This header is used in Datasketches PostgreSQL extension.
* @param header_size_bytes space to reserve in front of the sketch
* @param instance of a SerDe
*/
vector_bytes serialize(unsigned header_size_bytes = 0) const;
template<typename SerDe = S>
vector_bytes serialize(unsigned header_size_bytes = 0, const SerDe& sd = SerDe()) const;

/**
* This method deserializes a sketch from a given stream.
* @param is input stream
* @param instance of an Allocator
* @return an instance of a sketch
*
* Deprecated, to be removed in the next major version
*/
static kll_sketch<T, C, S, A> deserialize(std::istream& is, const A& allocator = A());

/**
* This method deserializes a sketch from a given stream.
* @param is input stream
* @param instance of a SerDe
* @param instance of an Allocator
* @return an instance of a sketch
*/
template<typename SerDe = S>
static kll_sketch<T, C, S, A> deserialize(std::istream& is, const SerDe& sd = SerDe(), const A& allocator = A());

/**
* This method deserializes a sketch from a given array of bytes.
* @param bytes pointer to the array of bytes
* @param size the size of the array
* @param instance of an Allocator
* @return an instance of a sketch
*
* Deprecated, to be removed in the next major version
*/
static kll_sketch<T, C, S, A> deserialize(const void* bytes, size_t size, const A& allocator = A());

/**
* This method deserializes a sketch from a given array of bytes.
* @param bytes pointer to the array of bytes
* @param size the size of the array
* @param instance of a SerDe
* @param instance of an Allocator
* @return an instance of a sketch
*/
template<typename SerDe = S>
static kll_sketch<T, C, S, A> deserialize(const void* bytes, size_t size, const SerDe& sd = SerDe(), const A& allocator = A());

/*
* Gets the normalized rank error given k and pmf.
* k - the configuration parameter
@@ -334,8 +334,8 @@ double kll_sketch<T, C, S, A>::get_normalized_rank_error(bool pmf) const {

// implementation for fixed-size arithmetic types (integral and floating point)
template<typename T, typename C, typename S, typename A>
template<typename TT, typename std::enable_if<std::is_arithmetic<TT>::value, int>::type>
size_t kll_sketch<T, C, S, A>::get_serialized_size_bytes() const {
template<typename TT, typename SerDe, typename std::enable_if<std::is_arithmetic<TT>::value, int>::type>
size_t kll_sketch<T, C, S, A>::get_serialized_size_bytes(const SerDe&) const {
if (is_empty()) { return EMPTY_SIZE_BYTES; }
if (num_levels_ == 1 && get_num_retained() == 1) {
return DATA_START_SINGLE_ITEM + sizeof(TT);
@@ -346,17 +346,17 @@ size_t kll_sketch<T, C, S, A>::get_serialized_size_bytes() const {

// implementation for all other types
template<typename T, typename C, typename S, typename A>
template<typename TT, typename std::enable_if<!std::is_arithmetic<TT>::value, int>::type>
size_t kll_sketch<T, C, S, A>::get_serialized_size_bytes() const {
template<typename TT, typename SerDe, typename std::enable_if<!std::is_arithmetic<TT>::value, int>::type>
size_t kll_sketch<T, C, S, A>::get_serialized_size_bytes(const SerDe& sd) const {
if (is_empty()) { return EMPTY_SIZE_BYTES; }
if (num_levels_ == 1 && get_num_retained() == 1) {
return DATA_START_SINGLE_ITEM + S().size_of_item(items_[levels_[0]]);
return DATA_START_SINGLE_ITEM + sd.size_of_item(items_[levels_[0]]);
}
// the last integer in the levels_ array is not serialized because it can be derived
size_t size = DATA_START + num_levels_ * sizeof(uint32_t);
size += S().size_of_item(*min_value_);
size += S().size_of_item(*max_value_);
for (auto it: *this) size += S().size_of_item(it.first);
size += sd.size_of_item(*min_value_);
size += sd.size_of_item(*max_value_);
for (auto it: *this) size += sd.size_of_item(it.first);
return size;
}

@@ -381,7 +381,8 @@ size_t kll_sketch<T, C, S, A>::get_max_serialized_size_bytes(uint16_t k, uint64_
}

template<typename T, typename C, typename S, typename A>
void kll_sketch<T, C, S, A>::serialize(std::ostream& os) const {
template<typename SerDe>
void kll_sketch<T, C, S, A>::serialize(std::ostream& os, const SerDe& sd) const {
const bool is_single_item = n_ == 1;
const uint8_t preamble_ints(is_empty() || is_single_item ? PREAMBLE_INTS_SHORT : PREAMBLE_INTS_FULL);
write(os, preamble_ints);
@@ -406,16 +407,17 @@ void kll_sketch<T, C, S, A>::serialize(std::ostream& os) const {
write(os, num_levels_);
write(os, unused);
write(os, levels_.data(), sizeof(levels_[0]) * num_levels_);
S().serialize(os, min_value_, 1);
S().serialize(os, max_value_, 1);
sd.serialize(os, min_value_, 1);
sd.serialize(os, max_value_, 1);
}
S().serialize(os, &items_[levels_[0]], get_num_retained());
sd.serialize(os, &items_[levels_[0]], get_num_retained());
}

template<typename T, typename C, typename S, typename A>
vector_u8<A> kll_sketch<T, C, S, A>::serialize(unsigned header_size_bytes) const {
template<typename SerDe>
vector_u8<A> kll_sketch<T, C, S, A>::serialize(unsigned header_size_bytes, const SerDe& sd) const {
const bool is_single_item = n_ == 1;
const size_t size = header_size_bytes + get_serialized_size_bytes();
const size_t size = header_size_bytes + get_serialized_size_bytes(sd);
vector_u8<A> bytes(size, 0, allocator_);
uint8_t* ptr = bytes.data() + header_size_bytes;
const uint8_t* end_ptr = ptr + size;
@@ -441,11 +443,11 @@ vector_u8<A> kll_sketch<T, C, S, A>::serialize(unsigned header_size_bytes) const
ptr += copy_to_mem(num_levels_, ptr);
ptr += sizeof(uint8_t); // unused
ptr += copy_to_mem(levels_.data(), ptr, sizeof(levels_[0]) * num_levels_);
ptr += S().serialize(ptr, end_ptr - ptr, min_value_, 1);
ptr += S().serialize(ptr, end_ptr - ptr, max_value_, 1);
ptr += sd.serialize(ptr, end_ptr - ptr, min_value_, 1);
ptr += sd.serialize(ptr, end_ptr - ptr, max_value_, 1);
}
const size_t bytes_remaining = end_ptr - ptr;
ptr += S().serialize(ptr, bytes_remaining, &items_[levels_[0]], get_num_retained());
ptr += sd.serialize(ptr, bytes_remaining, &items_[levels_[0]], get_num_retained());
}
const size_t delta = ptr - bytes.data();
if (delta != size) throw std::logic_error("serialized size mismatch: " + std::to_string(delta) + " != " + std::to_string(size));
@@ -454,6 +456,12 @@ vector_u8<A> kll_sketch<T, C, S, A>::serialize(unsigned header_size_bytes) const

template<typename T, typename C, typename S, typename A>
kll_sketch<T, C, S, A> kll_sketch<T, C, S, A>::deserialize(std::istream& is, const A& allocator) {
return deserialize(is, S(), allocator);
}

template<typename T, typename C, typename S, typename A>
template<typename SerDe>
kll_sketch<T, C, S, A> kll_sketch<T, C, S, A>::deserialize(std::istream& is, const SerDe& sd, const A& allocator) {
const auto preamble_ints = read<uint8_t>(is);
const auto serial_version = read<uint8_t>(is);
const auto family_id = read<uint8_t>(is);
@@ -501,17 +509,17 @@ kll_sketch<T, C, S, A> kll_sketch<T, C, S, A>::deserialize(std::istream& is, con
std::unique_ptr<T, item_deleter> min_value(nullptr, item_deleter(allocator));
std::unique_ptr<T, item_deleter> max_value(nullptr, item_deleter(allocator));
if (!is_single_item) {
S().deserialize(is, min_value_buffer.get(), 1);
sd.deserialize(is, min_value_buffer.get(), 1);
// serde call did not throw, repackage with destrtuctor
min_value = std::unique_ptr<T, item_deleter>(min_value_buffer.release(), item_deleter(allocator));
S().deserialize(is, max_value_buffer.get(), 1);
sd.deserialize(is, max_value_buffer.get(), 1);
// serde call did not throw, repackage with destrtuctor
max_value = std::unique_ptr<T, item_deleter>(max_value_buffer.release(), item_deleter(allocator));
}
auto items_buffer_deleter = [capacity, &alloc](T* ptr) { alloc.deallocate(ptr, capacity); };
std::unique_ptr<T, decltype(items_buffer_deleter)> items_buffer(alloc.allocate(capacity), items_buffer_deleter);
const auto num_items = levels[num_levels] - levels[0];
S().deserialize(is, &items_buffer.get()[levels[0]], num_items);
sd.deserialize(is, &items_buffer.get()[levels[0]], num_items);
// serde call did not throw, repackage with destrtuctors
std::unique_ptr<T, items_deleter> items(items_buffer.release(), items_deleter(levels[0], capacity, allocator));
const bool is_level_zero_sorted = (flags_byte & (1 << flags::IS_LEVEL_ZERO_SORTED)) > 0;
@@ -531,6 +539,12 @@ kll_sketch<T, C, S, A> kll_sketch<T, C, S, A>::deserialize(std::istream& is, con

template<typename T, typename C, typename S, typename A>
kll_sketch<T, C, S, A> kll_sketch<T, C, S, A>::deserialize(const void* bytes, size_t size, const A& allocator) {
return deserialize(bytes, size, S(), allocator);
}

template<typename T, typename C, typename S, typename A>
template<typename SerDe>
kll_sketch<T, C, S, A> kll_sketch<T, C, S, A>::deserialize(const void* bytes, size_t size, const SerDe& sd, const A& allocator) {
ensure_minimum_memory(size, 8);
const char* ptr = static_cast<const char*>(bytes);
uint8_t preamble_ints;
@@ -587,17 +601,17 @@ kll_sketch<T, C, S, A> kll_sketch<T, C, S, A>::deserialize(const void* bytes, si
std::unique_ptr<T, item_deleter> min_value(nullptr, item_deleter(allocator));
std::unique_ptr<T, item_deleter> max_value(nullptr, item_deleter(allocator));
if (!is_single_item) {
ptr += S().deserialize(ptr, end_ptr - ptr, min_value_buffer.get(), 1);
ptr += sd.deserialize(ptr, end_ptr - ptr, min_value_buffer.get(), 1);
// serde call did not throw, repackage with destrtuctor
min_value = std::unique_ptr<T, item_deleter>(min_value_buffer.release(), item_deleter(allocator));
ptr += S().deserialize(ptr, end_ptr - ptr, max_value_buffer.get(), 1);
ptr += sd.deserialize(ptr, end_ptr - ptr, max_value_buffer.get(), 1);
// serde call did not throw, repackage with destrtuctor
max_value = std::unique_ptr<T, item_deleter>(max_value_buffer.release(), item_deleter(allocator));
}
auto items_buffer_deleter = [capacity, &alloc](T* ptr) { alloc.deallocate(ptr, capacity); };
std::unique_ptr<T, decltype(items_buffer_deleter)> items_buffer(alloc.allocate(capacity), items_buffer_deleter);
const auto num_items = levels[num_levels] - levels[0];
ptr += S().deserialize(ptr, end_ptr - ptr, &items_buffer.get()[levels[0]], num_items);
ptr += sd.deserialize(ptr, end_ptr - ptr, &items_buffer.get()[levels[0]], num_items);
// serde call did not throw, repackage with destrtuctors
std::unique_ptr<T, items_deleter> items(items_buffer.release(), items_deleter(levels[0], capacity, allocator));
const size_t delta = ptr - static_cast<const char*>(bytes);
@@ -316,7 +316,7 @@ TEST_CASE("kll sketch", "[kll_sketch]") {
SECTION("bytes serialize deserialize empty") {
kll_float_sketch sketch(200, 0);
auto bytes = sketch.serialize();
auto sketch2 = kll_float_sketch::deserialize(bytes.data(), bytes.size(), 0);
auto sketch2 = kll_float_sketch::deserialize(bytes.data(), bytes.size(), serde<float>(), 0);
REQUIRE(bytes.size() == sketch.get_serialized_size_bytes());
REQUIRE(sketch2.is_empty() == sketch.is_empty());
REQUIRE(sketch2.is_estimation_mode() == sketch.is_estimation_mode());
@@ -334,7 +334,7 @@ TEST_CASE("kll sketch", "[kll_sketch]") {
std::stringstream s(std::ios::in | std::ios::out | std::ios::binary);
sketch.serialize(s);
REQUIRE(static_cast<size_t>(s.tellp()) == sketch.get_serialized_size_bytes());
auto sketch2 = kll_float_sketch::deserialize(s, test_allocator<float>(0));
auto sketch2 = kll_float_sketch::deserialize(s, serde<float>(), 0);
REQUIRE(static_cast<size_t>(s.tellp()) == sketch2.get_serialized_size_bytes());
REQUIRE(s.tellg() == s.tellp());
REQUIRE_FALSE(sketch2.is_empty());
@@ -353,7 +353,7 @@ TEST_CASE("kll sketch", "[kll_sketch]") {
sketch.update(1.0f);
auto bytes = sketch.serialize();
REQUIRE(bytes.size() == sketch.get_serialized_size_bytes());
auto sketch2 = kll_float_sketch::deserialize(bytes.data(), bytes.size(), 0);
auto sketch2 = kll_float_sketch::deserialize(bytes.data(), bytes.size(), serde<float>(), 0);
REQUIRE(bytes.size() == sketch2.get_serialized_size_bytes());
REQUIRE_FALSE(sketch2.is_empty());
REQUIRE_FALSE(sketch2.is_estimation_mode());
@@ -370,7 +370,7 @@ TEST_CASE("kll sketch", "[kll_sketch]") {
std::ifstream is;
is.exceptions(std::ios::failbit | std::ios::badbit);
is.open(testBinaryInputPath + "kll_sketch_float_one_item_v1.sk", std::ios::binary);
auto sketch = kll_float_sketch::deserialize(is, test_allocator<float>(0));
auto sketch = kll_float_sketch::deserialize(is, serde<float>(), 0);
REQUIRE_FALSE(sketch.is_empty());
REQUIRE_FALSE(sketch.is_estimation_mode());
REQUIRE(sketch.get_n() == 1);
@@ -387,7 +387,7 @@ TEST_CASE("kll sketch", "[kll_sketch]") {
std::stringstream s(std::ios::in | std::ios::out | std::ios::binary);
sketch.serialize(s);
REQUIRE(static_cast<size_t>(s.tellp()) == sketch.get_serialized_size_bytes());
auto sketch2 = kll_float_sketch::deserialize(s, test_allocator<float>(0));
auto sketch2 = kll_float_sketch::deserialize(s, serde<float>(), 0);
REQUIRE(static_cast<size_t>(s.tellp()) == sketch2.get_serialized_size_bytes());
REQUIRE(s.tellg() == s.tellp());
REQUIRE_FALSE(sketch2.is_empty());
@@ -405,7 +405,7 @@ TEST_CASE("kll sketch", "[kll_sketch]") {
sketch.update(3.0f);
auto bytes = sketch.serialize();
REQUIRE(bytes.size() == sketch.get_serialized_size_bytes());
auto sketch2 = kll_float_sketch::deserialize(bytes.data(), bytes.size(), 0);
auto sketch2 = kll_float_sketch::deserialize(bytes.data(), bytes.size(), serde<float>(), 0);
REQUIRE(bytes.size() == sketch2.get_serialized_size_bytes());
REQUIRE_FALSE(sketch2.is_empty());
REQUIRE_FALSE(sketch2.is_estimation_mode());
@@ -422,7 +422,7 @@ TEST_CASE("kll sketch", "[kll_sketch]") {
std::stringstream s(std::ios::in | std::ios::out | std::ios::binary);
sketch.serialize(s);
REQUIRE(static_cast<size_t>(s.tellp()) == sketch.get_serialized_size_bytes());
auto sketch2 = kll_float_sketch::deserialize(s, test_allocator<float>(0));
auto sketch2 = kll_float_sketch::deserialize(s, serde<float>(), 0);
REQUIRE(static_cast<size_t>(s.tellp()) == sketch2.get_serialized_size_bytes());
REQUIRE(s.tellg() == s.tellp());
REQUIRE(sketch2.is_empty() == sketch.is_empty());
@@ -444,7 +444,7 @@ TEST_CASE("kll sketch", "[kll_sketch]") {
for (int i = 0; i < n; i++) sketch.update(static_cast<float>(i));
auto bytes = sketch.serialize();
REQUIRE(bytes.size() == sketch.get_serialized_size_bytes());
auto sketch2 = kll_float_sketch::deserialize(bytes.data(), bytes.size(), 0);
auto sketch2 = kll_float_sketch::deserialize(bytes.data(), bytes.size(), serde<float>(), 0);
REQUIRE(bytes.size() == sketch2.get_serialized_size_bytes());
REQUIRE(sketch2.is_empty() == sketch.is_empty());
REQUIRE(sketch2.is_estimation_mode() == sketch.is_estimation_mode());
@@ -701,7 +701,7 @@ TEST_CASE("kll sketch", "[kll_sketch]") {

auto bytes = sketch1.serialize();
REQUIRE(bytes.size() == sketch1.get_serialized_size_bytes());
auto sketch2 = kll_string_sketch::deserialize(bytes.data(), bytes.size(), 0);
auto sketch2 = kll_string_sketch::deserialize(bytes.data(), bytes.size(), serde<std::string>(), 0);
REQUIRE(bytes.size() == sketch2.get_serialized_size_bytes());
REQUIRE(sketch2.is_empty() == sketch1.is_empty());
REQUIRE(sketch2.is_estimation_mode() == sketch1.is_estimation_mode());
@@ -722,7 +722,7 @@ TEST_CASE("kll sketch", "[kll_sketch]") {
sketch1.update("a");
auto bytes = sketch1.serialize();
REQUIRE(bytes.size() == sketch1.get_serialized_size_bytes());
auto sketch2 = kll_string_sketch::deserialize(bytes.data(), bytes.size(), 0);
auto sketch2 = kll_string_sketch::deserialize(bytes.data(), bytes.size(), serde<std::string>(), 0);
REQUIRE(bytes.size() == sketch2.get_serialized_size_bytes());
}

0 comments on commit 68f5013

Please sign in to comment.