diff --git a/include/cascade/detail/delta_store_core.hpp b/include/cascade/detail/delta_store_core.hpp index f1184679..3c3a9a6a 100644 --- a/include/cascade/detail/delta_store_core.hpp +++ b/include/cascade/detail/delta_store_core.hpp @@ -29,6 +29,33 @@ class DeltaCascadeStoreCore : public mutils::ByteRepresentable, #endif public: + /** + * @class DeltaType + * @brief the delta type that are stored in a delta. + * 1) The first sizeof(std::vector::size_type) bytes is the number of VT objects in the + * delta, followed by specified number of + * 2) Serialized VT objects. + */ + class DeltaType : public mutils::ByteRepresentable { + public: + /* The objects */ + std::unordered_map objects; + /** @fn DeltaType + * @brief Constructor + */ + DeltaType(); + virtual std::size_t to_bytes(uint8_t*) const override; + virtual void post_object(const std::function&) const override; + virtual std::size_t bytes_size() const override; + virtual void ensure_registered(mutils::DeserializationManager&); + static std::unique_ptr from_bytes(mutils::DeserializationManager*,const uint8_t* const); + static mutils::context_ptr from_bytes_noalloc( + mutils::DeserializationManager*, + const uint8_t* const); + static mutils::context_ptr from_bytes_noalloc_const( + mutils::DeserializationManager*, + const uint8_t* const); + }; /** The delta is a list of keys for the objects that are changed by put or remove. */ std::vector delta; /** The KV map */ diff --git a/include/cascade/detail/delta_store_core_impl.hpp b/include/cascade/detail/delta_store_core_impl.hpp index a7566d9b..91cc93fe 100644 --- a/include/cascade/detail/delta_store_core_impl.hpp +++ b/include/cascade/detail/delta_store_core_impl.hpp @@ -22,11 +22,86 @@ namespace derecho { namespace cascade { +template +DeltaCascadeStoreCore::DeltaType::DeltaType() {} + +template +std::size_t DeltaCascadeStoreCore::DeltaType::to_bytes(uint8_t*) const { + dbg_default_warn("{} should not be called. It is not designed for serialization.",__PRETTY_FUNCTION__); + return 0; +} + +template +void DeltaCascadeStoreCore::DeltaType::post_object( + const std::function&) const { + dbg_default_warn("{} should not be called. It is not designed for serialization.",__PRETTY_FUNCTION__); +} + +template +std::size_t DeltaCascadeStoreCore::DeltaType::bytes_size() const { + dbg_default_warn("{} should not be called. It is not designed for serialization.",__PRETTY_FUNCTION__); + return 0; +} + +template +void DeltaCascadeStoreCore::DeltaType::ensure_registered(mutils::DeserializationManager&) {} + +template +std::unique_ptr::DeltaType> +DeltaCascadeStoreCore::DeltaType::from_bytes( + mutils::DeserializationManager* dsm,const uint8_t* const v) { + size_t pos = 0; + std::size_t num_objects = *mutils::from_bytes_noalloc(dsm,v + pos); + pos += mutils::bytes_size(num_objects); + auto pdelta = std::make_unique::DeltaType>(); + while (num_objects--) { + auto po = mutils::from_bytes(dsm,v + pos); + pos += mutils::bytes_size(*po); + KT key = po->get_key_ref(); + pdelta->objects.emplace(std::move(key),std::move(*po)); + } + return pdelta; +} + +template +mutils::context_ptr::DeltaType> +DeltaCascadeStoreCore::DeltaType::from_bytes_noalloc( + mutils::DeserializationManager* dsm,const uint8_t* const v) { + size_t pos = 0; + std::size_t num_objects = *mutils::from_bytes_noalloc(dsm,v + pos); + pos += mutils::bytes_size(num_objects); + auto* pdelta = new DeltaCascadeStoreCore::DeltaType(); + while (num_objects--) { + auto po = mutils::from_bytes_noalloc(dsm,const_cast(v) + pos); + pos += mutils::bytes_size(*po); + KT key = po->get_key_ref(); + pdelta->objects.emplace(std::move(key),std::move(*po)); + } + return mutils::context_ptr::DeltaType>(pdelta); +} + +template +mutils::context_ptr::DeltaType> +DeltaCascadeStoreCore::DeltaType::from_bytes_noalloc_const( + mutils::DeserializationManager* dsm,const uint8_t* const v) { + size_t pos = 0; + std::size_t num_objects = *mutils::from_bytes_noalloc(dsm,v + pos); + pos += mutils::bytes_size(num_objects); + auto* pdelta = new DeltaCascadeStoreCore::DeltaType(); + while (num_objects--) { + auto po = mutils::from_bytes_noalloc(dsm,const_cast(v) + pos); + pos += mutils::bytes_size(*po); + KT key = po->get_key_ref(); + pdelta->objects.emplace(std::move(key),std::move(*po)); + } + return mutils::context_ptr::DeltaType>(pdelta); +} + template size_t DeltaCascadeStoreCore::currentDeltaSize() { size_t delta_size = 0; if (delta.size() > 0) { - delta_size += mutils::bytes_size(delta.size()); + delta_size += mutils::bytes_size(static_cast(delta.size())); for (const auto& k:delta) { delta_size+=mutils::bytes_size(this->kv_map[k]); } @@ -42,19 +117,17 @@ size_t DeltaCascadeStoreCore::currentDeltaToBytes(uint8_t * cons dbg_default_error("{}: failed because we need {} bytes for delta, but only a buffer with {} bytes given.\n", __PRETTY_FUNCTION__, delta_size, buf_size); } - size_t offset = mutils::to_bytes(delta.size(),buf); + size_t offset = mutils::to_bytes(static_cast(delta.size()),buf); for(const auto& k:delta) { offset += mutils::to_bytes(this->kv_map[k],buf+offset); } + delta.clear(); return offset; } template void DeltaCascadeStoreCore::applyDelta(uint8_t const* const serialized_delta) { - auto num_objects = - *mutils::from_bytes< - std::result_of_t::size)(std::vector)> - >(nullptr,serialized_delta); + std::size_t num_objects = *mutils::from_bytes(nullptr,serialized_delta); size_t offset = mutils::bytes_size(num_objects); while (num_objects--) { offset += diff --git a/include/cascade/detail/persistent_store_impl.hpp b/include/cascade/detail/persistent_store_impl.hpp index 9713906b..88d0465b 100644 --- a/include/cascade/detail/persistent_store_impl.hpp +++ b/include/cascade/detail/persistent_store_impl.hpp @@ -130,15 +130,17 @@ const VT PersistentCascadeStore::get(const KT& key, const pe #endif return persistent_core->lockless_get(key); } else { - return persistent_core.template getDelta(requested_version, exact, [this, key, requested_version, exact, ver](const VT& v) { - if(key == v.get_key_ref()) { + return persistent_core.template getDelta::DeltaType>(requested_version, exact, + [this, key, requested_version, exact, ver](const typename DeltaCascadeStoreCore::DeltaType& delta) { + if(delta.objects.find(key) != delta.objects.cend()) { debug_leave_func_with_value("key:{} is found at version:0x{:x}", key, requested_version); #if __cplusplus > 201703L LOG_TIMESTAMP_BY_TAG(TLT_PERSISTENT_GET_END, group,*IV,ver); #else LOG_TIMESTAMP_BY_TAG_EXTRA(TLT_PERSISTENT_GET_END, group,*IV,ver); #endif - return v; + // This return is a copy to make sure returned value does not rely on the data in the delta log. + return delta.objects.at(key); } else { if(exact) { // return invalid object for EXACT search. @@ -158,9 +160,9 @@ const VT PersistentCascadeStore::get(const KT& key, const pe persistent::version_t target_version = o.version; while (target_version > requested_version) { target_version = - persistent_core.template getDelta(target_version,true, - [](const VT& v){ - return v.previous_version_by_key; + persistent_core.template getDelta::DeltaType>(target_version,true, + [&key](const typename DeltaCascadeStoreCore::DeltaType& delta){ + return delta.objects.at(key).previous_version_by_key; }); } if (target_version == persistent::INVALID_VERSION) { @@ -177,9 +179,11 @@ const VT PersistentCascadeStore::get(const KT& key, const pe #else LOG_TIMESTAMP_BY_TAG_EXTRA(TLT_PERSISTENT_GET_END, group,*IV,ver); #endif - return persistent_core.template getDelta(target_version,true, - [](const VT& v){ - return v; + return persistent_core.template getDelta::DeltaType>(target_version,true, + [&key](const typename DeltaCascadeStoreCore::DeltaType& delta){ + // This return is a copy, which make sure the returned value does not rely on the data in + // the delta log. + return delta.objects.at(key); }); } } @@ -300,10 +304,10 @@ uint64_t PersistentCascadeStore::get_size(const KT& key, con #endif return rvo_val; } else { - return persistent_core.template getDelta(requested_version, exact, [this, key, requested_version, exact, ver](const VT& v) -> uint64_t { - if(key == v.get_key_ref()) { + return persistent_core.template getDelta::DeltaType>(requested_version, exact, [this, &key, requested_version, exact, ver](const typename DeltaCascadeStoreCore::DeltaType& delta) -> uint64_t { + if(delta.objects.find(key)!=delta.objects.cend()) { debug_leave_func_with_value("key:{} is found at version:0x{:x}", key, requested_version); - uint64_t size = mutils::bytes_size(v); + uint64_t size = mutils::bytes_size(delta.objects.at(key)); #if __cplusplus > 201703L LOG_TIMESTAMP_BY_TAG(TLT_PERSISTENT_GET_SIZE_END, group,*IV,ver); #else @@ -322,24 +326,44 @@ uint64_t PersistentCascadeStore::get_size(const KT& key, con return 0ull; } else { // fall back to the slow path. - auto versioned_state_ptr = persistent_core.get(requested_version); - if(versioned_state_ptr->kv_map.find(key) != versioned_state_ptr->kv_map.end()) { - debug_leave_func_with_value("Reconstructed version:0x{:x} for key:{}", requested_version, key); - uint64_t size = mutils::bytes_size(versioned_state_ptr->kv_map.at(key)); + // following the backward chain until its version is behind requested_version. + // TODO: We can introduce a per-key version index to achieve a better performance + // with a 64bit per log entry memory overhead. + VT o = persistent_core->lockless_get(key); + persistent::version_t target_version = o.version; + while (target_version > requested_version) { + target_version = + persistent_core.template getDelta::DeltaType>(target_version,true, + [&key](const typename DeltaCascadeStoreCore::DeltaType& delta){ + if (delta.objects.find(key) != delta.objects.cend()) { + return delta.objects.at(key).previous_version_by_key; + } + return persistent::INVALID_VERSION; + }); + } + if (target_version == persistent::INVALID_VERSION) { #if __cplusplus > 201703L LOG_TIMESTAMP_BY_TAG(TLT_PERSISTENT_GET_SIZE_END, group,*IV,ver); #else LOG_TIMESTAMP_BY_TAG_EXTRA(TLT_PERSISTENT_GET_SIZE_END, group,*IV,ver); #endif - return size; - } - debug_leave_func_with_value("No data found for key:{} before version:0x{:x}", key, requested_version); + debug_leave_func_with_value("No data found for key:{} before version:0x{:x}", key, requested_version); + return 0ull; + } else { + auto size = persistent_core.template getDelta::DeltaType>(target_version,true, + [&key](const typename DeltaCascadeStoreCore::DeltaType& delta){ + if (delta.objects.find(key) != delta.objects.cend()) { + return static_cast(mutils::bytes_size(delta.objects.at(key))); + } + return static_cast(0ull); + }); #if __cplusplus > 201703L - LOG_TIMESTAMP_BY_TAG(TLT_PERSISTENT_GET_SIZE_END, group,*IV,ver); + LOG_TIMESTAMP_BY_TAG(TLT_PERSISTENT_GET_SIZE_END, group,*IV,ver); #else - LOG_TIMESTAMP_BY_TAG_EXTRA(TLT_PERSISTENT_GET_SIZE_END, group,*IV,ver); + LOG_TIMESTAMP_BY_TAG_EXTRA(TLT_PERSISTENT_GET_SIZE_END, group,*IV,ver); #endif - return 0ull; + return size; + } } } });