Skip to content
Permalink
Browse files
support serde instance
  • Loading branch information
AlexanderSaydakov committed Mar 23, 2022
1 parent c3784a5 commit b10854b46a0041ca12ef16c3b19fb2931edd7ed0
Showing 3 changed files with 81 additions and 7 deletions.
@@ -46,7 +46,7 @@ template<
typename W = uint64_t,
typename H = std::hash<T>,
typename E = std::equal_to<T>,
typename S = serde<T>,
typename S = serde<T>, // deprecated, to be removed in the next major version
typename A = std::allocator<T>
>
class frequent_items_sketch {
@@ -232,39 +232,89 @@ class frequent_items_sketch {
/**
* This method serializes the sketch into a given stream in a binary form
* @param os output stream
*
* Deprecated, to be removed in the next major version
*/
void serialize(std::ostream& os) const;

/**
* This method serializes the sketch into a given stream in a binary form
* @param os output stream
* @param instance of a SerDe
*/
template<typename SerDe = serde<T>>
void serialize(std::ostream& os, const SerDe& sd = SerDe()) const;

// This is a convenience alias for users
// The type returned by the following serialize method
using vector_bytes = std::vector<uint8_t, typename std::allocator_traits<A>::template rebind_alloc<uint8_t>>;


/**
* This method serializes the sketch as a vector of bytes.
* An optional header can be reserved in front of the sketch.
* It is a blank space of a given size.
* This header is used in Datasketches PostgreSQL extension.
* @param header_size_bytes space to reserve in front of the sketch
* @return serialized sketch as a vector of bytes
*
* Deprecated, to be removed in the next major version
*/
vector_bytes serialize(unsigned header_size_bytes = 0) const;

/**
* This method serializes the sketch as a vector of bytes.
* An optional header can be reserved in front of the sketch.
* It is a blank space of a given size.
* This header is used in Datasketches PostgreSQL extension.
* @param header_size_bytes space to reserve in front of the sketch
* @param instance of a SerDe
* @return serialized sketch as a vector of bytes
*/
template<typename SerDe = serde<T>>
vector_bytes serialize(unsigned header_size_bytes = 0, const SerDe& sd = SerDe()) const;

/**
* This method deserializes a sketch from a given stream.
* @param is input stream
* @param instance of an Allocator
* @return an instance of the sketch
*
* Deprecated, to be removed in the next major version
*/
static frequent_items_sketch deserialize(std::istream& is, const A& allocator = A());

/**
* This method deserializes a sketch from a given stream.
* @param is input stream
* @param instance of a SerDe
* @param instance of an Allocator
* @return an instance of the sketch
*/
template<typename SerDe = serde<T>>
static frequent_items_sketch deserialize(std::istream& is, const SerDe& sd = SerDe(), const A& allocator = A());

/**
* This method deserializes a sketch from a given array of bytes.
* @param bytes pointer to the array of bytes
* @param size the size of the array
* @param instance of an Allocator
* @return an instance of the sketch
*
* Deprecated, to be removed in the next major version
*/
static frequent_items_sketch deserialize(const void* bytes, size_t size, const A& allocator = A());

/**
* This method deserializes a sketch from a given array of bytes.
* @param bytes pointer to the array of bytes
* @param size the size of the array
* @param instance of a SerDe
* @param instance of an Allocator
* @return an instance of the sketch
*/
template<typename SerDe = serde<T>>
static frequent_items_sketch deserialize(const void* bytes, size_t size, const SerDe& sd = SerDe(), const A& allocator = A());

/**
* Returns a human readable summary of this sketch
* @param print_items if true include the list of items retained by the sketch
@@ -162,6 +162,12 @@ frequent_items_sketch<T, W, H, E, S, A>::get_frequent_items(frequent_items_error

template<typename T, typename W, typename H, typename E, typename S, typename A>
void frequent_items_sketch<T, W, H, E, S, A>::serialize(std::ostream& os) const {
serialize(os, S());
}

template<typename T, typename W, typename H, typename E, typename S, typename A>
template<typename SerDe>
void frequent_items_sketch<T, W, H, E, S, A>::serialize(std::ostream& os, const SerDe& sd) const {
const uint8_t preamble_longs = is_empty() ? PREAMBLE_LONGS_EMPTY : PREAMBLE_LONGS_NONEMPTY;
write(os, preamble_longs);
const uint8_t serial_version = SERIAL_VERSION;
@@ -199,7 +205,7 @@ void frequent_items_sketch<T, W, H, E, S, A>::serialize(std::ostream& os) const
}
write(os, weights, sizeof(W) * num_items);
aw.deallocate(weights, num_items);
S().serialize(os, items, num_items);
sd.serialize(os, items, num_items);
for (i = 0; i < num_items; i++) items[i].~T();
alloc.deallocate(items, num_items);
}
@@ -215,6 +221,12 @@ size_t frequent_items_sketch<T, W, H, E, S, A>::get_serialized_size_bytes() cons

template<typename T, typename W, typename H, typename E, typename S, typename A>
auto frequent_items_sketch<T, W, H, E, S, A>::serialize(unsigned header_size_bytes) const -> vector_bytes {
return serialize(header_size_bytes, S());
}

template<typename T, typename W, typename H, typename E, typename S, typename A>
template<typename SerDe>
auto frequent_items_sketch<T, W, H, E, S, A>::serialize(unsigned header_size_bytes, const SerDe& sd) const -> vector_bytes {
const size_t size = header_size_bytes + get_serialized_size_bytes();
vector_bytes bytes(size, 0, map.get_allocator());
uint8_t* ptr = bytes.data() + header_size_bytes;
@@ -256,7 +268,7 @@ auto frequent_items_sketch<T, W, H, E, S, A>::serialize(unsigned header_size_byt
ptr += copy_to_mem(weights, ptr, sizeof(W) * num_items);
aw.deallocate(weights, num_items);
const size_t bytes_remaining = end_ptr - ptr;
ptr += S().serialize(ptr, bytes_remaining, items, num_items);
ptr += sd.serialize(ptr, bytes_remaining, items, num_items);
for (i = 0; i < num_items; i++) items[i].~T();
alloc.deallocate(items, num_items);
}
@@ -285,6 +297,12 @@ class frequent_items_sketch<T, W, H, E, S, A>::items_deleter {

template<typename T, typename W, typename H, typename E, typename S, typename A>
frequent_items_sketch<T, W, H, E, S, A> frequent_items_sketch<T, W, H, E, S, A>::deserialize(std::istream& is, const A& allocator) {
return deserialize(is, S(), allocator);
}

template<typename T, typename W, typename H, typename E, typename S, typename A>
template<typename SerDe>
frequent_items_sketch<T, W, H, E, S, A> frequent_items_sketch<T, W, H, E, S, A>::deserialize(std::istream& is, const SerDe& sd, const A& allocator) {
const auto preamble_longs = read<uint8_t>(is);
const auto serial_version = read<uint8_t>(is);
const auto family_id = read<uint8_t>(is);
@@ -313,7 +331,7 @@ frequent_items_sketch<T, W, H, E, S, A> frequent_items_sketch<T, W, H, E, S, A>:
read(is, weights.data(), sizeof(W) * num_items);
A alloc(allocator);
std::unique_ptr<T, items_deleter> items(alloc.allocate(num_items), items_deleter(num_items, false, alloc));
S().deserialize(is, items.get(), num_items);
sd.deserialize(is, items.get(), num_items);
items.get_deleter().set_destroy(true); // serde did not throw, so the items must be constructed
for (uint32_t i = 0; i < num_items; i++) {
sketch.update(std::move(items.get()[i]), weights[i]);
@@ -328,6 +346,12 @@ frequent_items_sketch<T, W, H, E, S, A> frequent_items_sketch<T, W, H, E, S, A>:

template<typename T, typename W, typename H, typename E, typename S, typename A>
frequent_items_sketch<T, W, H, E, S, A> frequent_items_sketch<T, W, H, E, S, A>::deserialize(const void* bytes, size_t size, const A& allocator) {
return deserialize(bytes, size, S(), allocator);
}

template<typename T, typename W, typename H, typename E, typename S, typename A>
template<typename SerDe>
frequent_items_sketch<T, W, H, E, S, A> frequent_items_sketch<T, W, H, E, S, A>::deserialize(const void* bytes, size_t size, const SerDe& sd, const A& allocator) {
ensure_minimum_memory(size, 8);
const char* ptr = static_cast<const char*>(bytes);
const char* base = static_cast<const char*>(bytes);
@@ -371,7 +395,7 @@ frequent_items_sketch<T, W, H, E, S, A> frequent_items_sketch<T, W, H, E, S, A>:
A alloc(allocator);
std::unique_ptr<T, items_deleter> items(alloc.allocate(num_items), items_deleter(num_items, false, alloc));
const size_t bytes_remaining = size - (ptr - base);
ptr += S().deserialize(ptr, bytes_remaining, items.get(), num_items);
ptr += sd.deserialize(ptr, bytes_remaining, items.get(), num_items);
items.get_deleter().set_destroy(true); // serde did not throw, so the items must be constructed
for (uint32_t i = 0; i < num_items; i++) {
sketch.update(std::move(items.get()[i]), weights[i]);
@@ -60,7 +60,7 @@ TEST_CASE("frequent items: custom type", "[frequent_items_sketch]") {
REQUIRE(sketch.get_maximum_error() == sketch2.get_maximum_error());

auto bytes = sketch.serialize();
auto sketch3 = frequent_test_type_sketch::deserialize(bytes.data(), bytes.size(), 0);
auto sketch3 = frequent_test_type_sketch::deserialize(bytes.data(), bytes.size(), alloc(0));
REQUIRE_FALSE(sketch3.is_empty());
REQUIRE(sketch3.get_total_weight() == 17);
REQUIRE(sketch3.get_estimate(1) == 10);

0 comments on commit b10854b

Please sign in to comment.