From 59fb69365e98915fbab5abf6d5c1ddcd006a004a Mon Sep 17 00:00:00 2001 From: Vladislav Oleshko Date: Tue, 7 May 2024 15:01:57 +0300 Subject: [PATCH 1/7] feat(tiering): Defragmentation Signed-off-by: Vladislav Oleshko --- src/server/tiered_storage.cc | 38 +++++++++++++++++- src/server/tiering/op_manager.cc | 5 ++- src/server/tiering/op_manager.h | 5 ++- src/server/tiering/small_bins.cc | 55 ++++++++++++++++++++++----- src/server/tiering/small_bins.h | 28 ++++++++++++-- src/server/tiering/small_bins_test.cc | 18 ++++++++- 6 files changed, 131 insertions(+), 18 deletions(-) diff --git a/src/server/tiered_storage.cc b/src/server/tiered_storage.cc index 50561b1d96b..78f8469aa00 100644 --- a/src/server/tiered_storage.cc +++ b/src/server/tiered_storage.cc @@ -45,6 +45,10 @@ bool OccupiesWholePages(size_t size) { return size >= TieredStorage::kMinOccupancySize; } +// Stashed bins no longer have bin ids. We use this id to differentiate with regular reads for the +// same segment when reading a full segment to trigger defragmentation +constexpr auto kFragmentedBin = tiering::SmallBins::kInvalidBin - 1; + } // anonymous namespace class TieredStorage::ShardOpManager : public tiering::OpManager { @@ -124,6 +128,22 @@ class TieredStorage::ShardOpManager : public tiering::OpManager { return false; } + // Load all values from bin by their hashes + void Defragment(tiering::DiskSegment segment, string_view value) { + auto hashes = ts_->bins_->DeleteBin(segment, value); + for (auto [dbid, hash] : hashes) { + auto it = db_slice_->GetDBTable(dbid)->prime.Find(hash); + if (IsValid(it) && it->second.IsExternal()) { + tiering::DiskSegment entry_segment{it->second.GetExternalSlice()}; + if (entry_segment.FillPages().offset == segment.offset) { + SetInMemory(&it->second, + value.substr(entry_segment.offset - segment.offset, entry_segment.length), + entry_segment); + } + } + } + } + void ReportStashed(EntryId id, tiering::DiskSegment segment, error_code ec) override { if (ec) { VLOG(1) << "Stash failed " << ec.message(); @@ -144,7 +164,23 @@ class TieredStorage::ShardOpManager : public tiering::OpManager { } bool ReportDelete(tiering::DiskSegment segment) override { - return OccupiesWholePages(segment.length) || ts_->bins_->Delete(segment); + if (OccupiesWholePages(segment.length)) + return true; + + auto bin = ts_->bins_->Delete(segment); + if (bin.empty) + return true; + + if (bin.fragmented) { + // Trigger read to signal need for defragmentation. + // The underlying value is already in memory, so the read will be fulfilled immediately, + // but only after all other values from this segment have been fetched and possibly + // deleted. + VLOG(1) << "Enqueueing bin defragmentation for: x" << bin.segment.offset; + Enqueue(kFragmentedBin, bin.segment, [](std::string*) { return false; }); + } + + return false; } private: diff --git a/src/server/tiering/op_manager.cc b/src/server/tiering/op_manager.cc index ebb5f38456a..c27b30759c1 100644 --- a/src/server/tiering/op_manager.cc +++ b/src/server/tiering/op_manager.cc @@ -102,11 +102,13 @@ void OpManager::ProcessStashed(EntryId id, unsigned version, DiskSegment segment } void OpManager::ProcessRead(size_t offset, std::string_view value) { + util::FiberAtomicGuard guard; // atomically update items, no in-between states should be possible ReadOp* info = &pending_reads_.at(offset); bool deleting_full = false; std::string key_value; - for (auto& ko : info->key_ops) { + for (size_t i = 0; i < info->key_ops.size(); i++) { // more items can be read while iterating + auto& ko = info->key_ops[i]; key_value = value.substr(ko.segment.offset - info->segment.offset, ko.segment.length); bool modified = false; @@ -125,6 +127,7 @@ void OpManager::ProcessRead(size_t offset, std::string_view value) { if (deleting_full) storage_.MarkAsFree(info->segment); + pending_reads_.erase(offset); } diff --git a/src/server/tiering/op_manager.h b/src/server/tiering/op_manager.h index eba4ccb96d2..9d0f1e1edf0 100644 --- a/src/server/tiering/op_manager.h +++ b/src/server/tiering/op_manager.h @@ -45,8 +45,9 @@ class OpManager { void Close(); - // Enqueue callback to be executed once value is read. Triggers read if none is pending yet for - // this segment + // Enqueue callback to be executed once value is read. Trigger read if none is pending yet for + // this segment. Multiple entries can be obtained from a single segment, but every distinct id + // will have it's own indepentend callback loop that can safely modifly the underlying value void Enqueue(EntryId id, DiskSegment segment, ReadCallback cb); // Delete entry with pending io diff --git a/src/server/tiering/small_bins.cc b/src/server/tiering/small_bins.cc index 956fa9e54d0..3e2fa8f627e 100644 --- a/src/server/tiering/small_bins.cc +++ b/src/server/tiering/small_bins.cc @@ -56,7 +56,7 @@ SmallBins::FilledBin SmallBins::FlushBin() { data += sizeof(uint64_t); } - // Store all values, n * x bytes + // Store all values with sizes, n * x bytes for (const auto& [key, value] : current_bin_) { pending_set[key] = {size_t(data - out.data()), value.size()}; @@ -74,13 +74,17 @@ SmallBins::KeySegmentList SmallBins::ReportStashed(BinId id, DiskSegment segment auto key_list = pending_bins_.extract(id); DCHECK_GT(key_list.mapped().size(), 0u); + uint16_t bytes = 0; SmallBins::KeySegmentList list; for (auto& [key, sub_segment] : key_list.mapped()) { - list.emplace_back(key.first, key.second, - DiskSegment{segment.offset + sub_segment.offset, sub_segment.length}); + bytes += sub_segment.length; + + DiskSegment real_segment{segment.offset + sub_segment.offset, sub_segment.length}; + list.emplace_back(key.first, key.second, real_segment); } stats_.total_stashed_entries += list.size(); + stashed_bins_[segment.offset] = {uint8_t(list.size()), bytes}; return list; } @@ -109,16 +113,26 @@ std::optional SmallBins::Delete(DbIndex dbid, std::string_view return std::nullopt; } -std::optional SmallBins::Delete(DiskSegment segment) { - segment = segment.FillPages(); - if (auto it = stashed_bins_.find(segment.offset); it != stashed_bins_.end()) { +SmallBins::BinInfo SmallBins::Delete(DiskSegment segment) { + auto full_segment = segment.FillPages(); + if (auto it = stashed_bins_.find(full_segment.offset); it != stashed_bins_.end()) { stats_.total_stashed_entries--; - if (--it->second == 0) { + auto& bin = it->second; + + DCHECK_LE(segment.length, bin.bytes); + bin.bytes -= segment.length; + + if (--bin.entries == 0) { stashed_bins_.erase(it); - return segment; + return {full_segment, false /* fragmented */, true /* empty */}; + } + + if (bin.bytes < kPageSize / 2) { + return {full_segment, true /* fragmented */, false /* empty */}; } } - return std::nullopt; + + return {segment}; } SmallBins::Stats SmallBins::GetStats() const { @@ -127,4 +141,27 @@ SmallBins::Stats SmallBins::GetStats() const { .current_bin_bytes = current_bin_bytes_}; } +SmallBins::KeyHashDbList SmallBins::DeleteBin(DiskSegment segment, std::string_view value) { + DCHECK_EQ(value.size(), kPageSize); + CHECK(stashed_bins_.erase(segment.offset)); + + const char* data = value.data(); + + uint16_t entries = absl::little_endian::Load16(data); + data += sizeof(uint16_t); + + KeyHashDbList out(entries); + for (size_t i = 0; i < entries; i++) { + DbIndex dbid = absl::little_endian::Load16(data); + data += sizeof(DbIndex); + + uint64_t hash = absl::little_endian::Load64(data); + data += sizeof(hash); + + out[i] = {dbid, hash}; + } + + return out; +} + } // namespace dfly::tiering diff --git a/src/server/tiering/small_bins.h b/src/server/tiering/small_bins.h index b4fa88c9f45..449303a8142 100644 --- a/src/server/tiering/small_bins.h +++ b/src/server/tiering/small_bins.h @@ -27,6 +27,12 @@ class SmallBins { }; using BinId = unsigned; + static const BinId kInvalidBin = std::numeric_limits::max(); + + struct BinInfo { + DiskSegment segment; + bool fragmented = false, empty = false; + }; // Bin filled with blob of serialized entries using FilledBin = std::pair; @@ -34,6 +40,9 @@ class SmallBins { // List of locations of values for corresponding keys of previously filled bin using KeySegmentList = std::vector>; + // List of item key db indices and hashes + using KeyHashDbList = std::vector>; + // Enqueue key/value pair for stash. Returns page to be stashed if it filled up. std::optional Stash(DbIndex dbid, std::string_view key, std::string_view value); @@ -46,8 +55,13 @@ class SmallBins { // Delete a key with pending io. Returns entry id if needs to be deleted. std::optional Delete(DbIndex dbid, std::string_view key); - // Delete a stored segment. Returns page segment if it became emtpy and needs to be deleted. - std::optional Delete(DiskSegment segment); + // Delete a stored segment. Returns information about the current bin, which might indicate + // the need for external actions like deleting empty segments or triggering defragmentation + BinInfo Delete(DiskSegment segment); + + // Delete stashed bin. Returns list of recovered item key hashes and db indices. + // Mainly used for defragmentation + KeyHashDbList DeleteBin(DiskSegment segment, std::string_view value); Stats GetStats() const; @@ -56,6 +70,12 @@ class SmallBins { FilledBin FlushBin(); private: + struct StashInfo { + uint8_t entries = 0; + uint16_t bytes = 0; + }; + static_assert(sizeof(StashInfo) == sizeof(unsigned)); + BinId last_bin_id_ = 0; unsigned current_bin_bytes_ = 0; @@ -66,8 +86,8 @@ class SmallBins { absl::flat_hash_map /* key*/, DiskSegment>> pending_bins_; - // Map of bins that were stashed and should be deleted when refcount reaches 0 - absl::flat_hash_map stashed_bins_; + // Map of bins that were stashed and should be deleted when number of entries reaches 0 + absl::flat_hash_map stashed_bins_; struct { size_t total_stashed_entries = 0; diff --git a/src/server/tiering/small_bins_test.cc b/src/server/tiering/small_bins_test.cc index cb4cd1c2b1e..82bc4c261fc 100644 --- a/src/server/tiering/small_bins_test.cc +++ b/src/server/tiering/small_bins_test.cc @@ -57,7 +57,7 @@ TEST(SmallBins, SimpleDeleteAbort) { } } -TEST(SmallBins, PartialStash) { +TEST(SmallBins, PartialStashDelete) { SmallBins bins; // Fill single bin @@ -77,6 +77,22 @@ TEST(SmallBins, PartialStash) { for (auto& [dbid, key, segment] : segments) { EXPECT_EQ(key, "k"s + bin->second.substr(segment.offset, segment.length).substr(1)); } + + // Delete all stashed values + while (!segments.empty()) { + auto segment = std::get<2>(segments.back()); + segments.pop_back(); + auto bin = bins.Delete(segment); + + EXPECT_EQ(bin.segment.offset, 0u); + EXPECT_EQ(bin.segment.length, 4_KB); + + if (segments.empty()) { + EXPECT_TRUE(bin.empty); + } else { + EXPECT_TRUE(bin.fragmented); // half of the values were deleted + } + } } } // namespace dfly::tiering From a1768b783ee63c605813dfdd4b2d626398e090de Mon Sep 17 00:00:00 2001 From: Vladislav Oleshko Date: Thu, 9 May 2024 09:34:41 +0300 Subject: [PATCH 2/7] feat: Use FindFirst for defrag Signed-off-by: Vladislav Oleshko --- src/server/tiered_storage.cc | 27 +++++++++++++-------------- src/server/tiered_storage_test.cc | 28 ++++++++++++++++++++++++++++ src/server/tiering/op_manager.cc | 7 +++++++ src/server/tiering/small_bins.cc | 24 +++++++++++++++++++----- src/server/tiering/small_bins.h | 2 +- 5 files changed, 68 insertions(+), 20 deletions(-) diff --git a/src/server/tiered_storage.cc b/src/server/tiered_storage.cc index 78f8469aa00..019bf1d9a9b 100644 --- a/src/server/tiered_storage.cc +++ b/src/server/tiered_storage.cc @@ -130,17 +130,19 @@ class TieredStorage::ShardOpManager : public tiering::OpManager { // Load all values from bin by their hashes void Defragment(tiering::DiskSegment segment, string_view value) { - auto hashes = ts_->bins_->DeleteBin(segment, value); - for (auto [dbid, hash] : hashes) { - auto it = db_slice_->GetDBTable(dbid)->prime.Find(hash); - if (IsValid(it) && it->second.IsExternal()) { - tiering::DiskSegment entry_segment{it->second.GetExternalSlice()}; - if (entry_segment.FillPages().offset == segment.offset) { - SetInMemory(&it->second, - value.substr(entry_segment.offset - segment.offset, entry_segment.length), - entry_segment); - } - } + for (auto [dbid, hash, sub_segment] : ts_->bins_->DeleteBin(segment, value)) { + // Search for key with the same hash and value pointing to the same segment. + // If it still exists, it must correspond to the value stored in this bin + auto predicate = [sub_segment = sub_segment](const PrimeKey& key, const PrimeValue& probe) { + return probe.IsExternal() && tiering::DiskSegment{probe.GetExternalSlice()} == sub_segment; + }; + auto it = db_slice_->GetDBTable(dbid)->prime.FindFirst(hash, predicate); + if (!IsValid(it)) + continue; + + // Cut out relevant part of value and restore it to memory + string_view sub_value = value.substr(sub_segment.offset - segment.offset, sub_segment.length); + SetInMemory(&it->second, sub_value, sub_segment); } } @@ -173,9 +175,6 @@ class TieredStorage::ShardOpManager : public tiering::OpManager { if (bin.fragmented) { // Trigger read to signal need for defragmentation. - // The underlying value is already in memory, so the read will be fulfilled immediately, - // but only after all other values from this segment have been fetched and possibly - // deleted. VLOG(1) << "Enqueueing bin defragmentation for: x" << bin.segment.offset; Enqueue(kFragmentedBin, bin.segment, [](std::string*) { return false; }); } diff --git a/src/server/tiered_storage_test.cc b/src/server/tiered_storage_test.cc index e9e23dbfba0..ca4bfe12352 100644 --- a/src/server/tiered_storage_test.cc +++ b/src/server/tiered_storage_test.cc @@ -135,6 +135,34 @@ TEST_F(TieredStorageTest, MultiDb) { } } +TEST_F(TieredStorageTest, Defrag) { + for (char k = 'a'; k < 'a' + 8; k++) { + Run({"SET", string(1, k), string(512, k)}); + } + + ExpectConditionWithinTimeout([this] { return GetMetrics().tiered_stats.total_stashes >= 1; }); + + // 7 out 8 are in one bin, the last one made if flush and is now filling + auto metrics = GetMetrics(); + EXPECT_EQ(metrics.tiered_stats.small_bins_cnt, 1u); + EXPECT_EQ(metrics.tiered_stats.small_bins_entries_cnt, 7u); + EXPECT_EQ(metrics.tiered_stats.small_bins_filling_bytes, 512 + 12); + + // Reading 3 values still leaves the bin more than half occupied + Run({"GET", string(1, 'a')}); + Run({"GET", string(1, 'b')}); + Run({"GET", string(1, 'c')}); + metrics = GetMetrics(); + EXPECT_EQ(metrics.tiered_stats.small_bins_cnt, 1u); + EXPECT_EQ(metrics.tiered_stats.small_bins_entries_cnt, 4u); + + // This tirggers defragmentation + Run({"GET", string(1, 'd')}); + metrics = GetMetrics(); + EXPECT_EQ(metrics.tiered_stats.small_bins_cnt, 0u); + EXPECT_EQ(metrics.tiered_stats.allocated_bytes, 0u); +} + TEST_F(TieredStorageTest, BackgroundOffloading) { absl::FlagSaver saver; absl::SetFlag(&FLAGS_tiered_offload_threshold, 0.0f); // offload all values diff --git a/src/server/tiering/op_manager.cc b/src/server/tiering/op_manager.cc index c27b30759c1..bb0533b020f 100644 --- a/src/server/tiering/op_manager.cc +++ b/src/server/tiering/op_manager.cc @@ -105,6 +105,13 @@ void OpManager::ProcessRead(size_t offset, std::string_view value) { util::FiberAtomicGuard guard; // atomically update items, no in-between states should be possible ReadOp* info = &pending_reads_.at(offset); + // most generic page must be last + for (size_t i = 0; i + 1 < info->key_ops.size(); i++) { + if (info->key_ops[i].segment.offset == 0) { + std::swap(info->key_ops[i], info->key_ops.back()); + } + } + bool deleting_full = false; std::string key_value; for (size_t i = 0; i < info->key_ops.size(); i++) { // more items can be read while iterating diff --git a/src/server/tiering/small_bins.cc b/src/server/tiering/small_bins.cc index 3e2fa8f627e..40687fcaacd 100644 --- a/src/server/tiering/small_bins.cc +++ b/src/server/tiering/small_bins.cc @@ -22,7 +22,7 @@ std::optional SmallBins::Stash(DbIndex dbid, std::string_v DCHECK_LT(value.size(), 2_KB); // See FlushBin() for format details - size_t value_bytes = 2 /* dbid */ + 8 /* hash */ + value.size(); + size_t value_bytes = 2 /* dbid */ + 8 /* hash */ + 2 /* strlen*/ + value.size(); std::optional filled_bin; if (2 /* num entries */ + current_bin_bytes_ + value_bytes >= kPageSize) { @@ -56,10 +56,12 @@ SmallBins::FilledBin SmallBins::FlushBin() { data += sizeof(uint64_t); } - // Store all values with sizes, n * x bytes + // Store all values with sizes, n * (2 + x) bytes for (const auto& [key, value] : current_bin_) { - pending_set[key] = {size_t(data - out.data()), value.size()}; + absl::little_endian::Store16(data, value.size()); + data += sizeof(uint16_t); + pending_set[key] = {size_t(data - out.data()), value.size()}; memcpy(data, value.data(), value.size()); data += value.size(); } @@ -123,6 +125,7 @@ SmallBins::BinInfo SmallBins::Delete(DiskSegment segment) { bin.bytes -= segment.length; if (--bin.entries == 0) { + DCHECK_EQ(bin.bytes, 0u); stashed_bins_.erase(it); return {full_segment, false /* fragmented */, true /* empty */}; } @@ -143,7 +146,7 @@ SmallBins::Stats SmallBins::GetStats() const { SmallBins::KeyHashDbList SmallBins::DeleteBin(DiskSegment segment, std::string_view value) { DCHECK_EQ(value.size(), kPageSize); - CHECK(stashed_bins_.erase(segment.offset)); + stats_.total_stashed_entries -= stashed_bins_.extract(segment.offset).mapped().entries; const char* data = value.data(); @@ -151,6 +154,8 @@ SmallBins::KeyHashDbList SmallBins::DeleteBin(DiskSegment segment, std::string_v data += sizeof(uint16_t); KeyHashDbList out(entries); + + // Recover dbids and hashes for (size_t i = 0; i < entries; i++) { DbIndex dbid = absl::little_endian::Load16(data); data += sizeof(DbIndex); @@ -158,7 +163,16 @@ SmallBins::KeyHashDbList SmallBins::DeleteBin(DiskSegment segment, std::string_v uint64_t hash = absl::little_endian::Load64(data); data += sizeof(hash); - out[i] = {dbid, hash}; + out[i] = {dbid, hash, {0, 0}}; + } + + // Recover segments + for (size_t i = 0; i < entries; i++) { + uint16_t length = absl::little_endian::Load16(data); + data += sizeof(uint16_t); + + std::get(out[i]) = {segment.offset + (data - value.data()), length}; + data += length; } return out; diff --git a/src/server/tiering/small_bins.h b/src/server/tiering/small_bins.h index 449303a8142..6f6eb76a407 100644 --- a/src/server/tiering/small_bins.h +++ b/src/server/tiering/small_bins.h @@ -41,7 +41,7 @@ class SmallBins { using KeySegmentList = std::vector>; // List of item key db indices and hashes - using KeyHashDbList = std::vector>; + using KeyHashDbList = std::vector>; // Enqueue key/value pair for stash. Returns page to be stashed if it filled up. std::optional Stash(DbIndex dbid, std::string_view key, std::string_view value); From 461f8cb95b152d7d0a3f7227b976e0bb1988578d Mon Sep 17 00:00:00 2001 From: Vladislav Oleshko Date: Tue, 28 May 2024 21:13:10 +0300 Subject: [PATCH 3/7] fix: fixes --- src/server/tiered_storage.cc | 13 ++++++++----- src/server/tiering/op_manager.cc | 7 ++++--- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/src/server/tiered_storage.cc b/src/server/tiered_storage.cc index 019bf1d9a9b..7fa2534fead 100644 --- a/src/server/tiered_storage.cc +++ b/src/server/tiered_storage.cc @@ -45,8 +45,7 @@ bool OccupiesWholePages(size_t size) { return size >= TieredStorage::kMinOccupancySize; } -// Stashed bins no longer have bin ids. We use this id to differentiate with regular reads for the -// same segment when reading a full segment to trigger defragmentation +// Stashed bins no longer have bin ids, so this sentinel is used to differentiate from regular reads constexpr auto kFragmentedBin = tiering::SmallBins::kInvalidBin - 1; } // anonymous namespace @@ -142,7 +141,7 @@ class TieredStorage::ShardOpManager : public tiering::OpManager { // Cut out relevant part of value and restore it to memory string_view sub_value = value.substr(sub_segment.offset - segment.offset, sub_segment.length); - SetInMemory(&it->second, sub_value, sub_segment); + SetInMemory(&it->second, dbid, sub_value, sub_segment); } } @@ -157,7 +156,11 @@ class TieredStorage::ShardOpManager : public tiering::OpManager { bool ReportFetched(EntryId id, string_view value, tiering::DiskSegment segment, bool modified) override { - DCHECK(holds_alternative(id)); // we never issue reads for bins + if (id == EntryId{kFragmentedBin}) { + Defragment(segment, value); + return true; + } + if (!modified && !cache_fetched_) return false; @@ -174,7 +177,7 @@ class TieredStorage::ShardOpManager : public tiering::OpManager { return true; if (bin.fragmented) { - // Trigger read to signal need for defragmentation. + // Trigger read to signal need for defragmentation. ReportFetched will handle it. VLOG(1) << "Enqueueing bin defragmentation for: x" << bin.segment.offset; Enqueue(kFragmentedBin, bin.segment, [](std::string*) { return false; }); } diff --git a/src/server/tiering/op_manager.cc b/src/server/tiering/op_manager.cc index bb0533b020f..dd36d097162 100644 --- a/src/server/tiering/op_manager.cc +++ b/src/server/tiering/op_manager.cc @@ -54,7 +54,7 @@ void OpManager::Delete(EntryId id) { void OpManager::Delete(DiskSegment segment) { EntryOps* pending_op = nullptr; - if (auto it = pending_reads_.find(segment.offset); it != pending_reads_.end()) + if (auto it = pending_reads_.find(segment.FillPages().offset); it != pending_reads_.end()) pending_op = it->second.Find(segment); if (pending_op) { @@ -105,10 +105,11 @@ void OpManager::ProcessRead(size_t offset, std::string_view value) { util::FiberAtomicGuard guard; // atomically update items, no in-between states should be possible ReadOp* info = &pending_reads_.at(offset); - // most generic page must be last + // Reorder base read (offset 0) to be last, so reads for defragmentation are handled last for (size_t i = 0; i + 1 < info->key_ops.size(); i++) { - if (info->key_ops[i].segment.offset == 0) { + if (info->key_ops[i].segment.offset % kPageSize == 0) { std::swap(info->key_ops[i], info->key_ops.back()); + break; } } From 6de1bd7293c5ba9eb44fd38c0f9e264341e4a974 Mon Sep 17 00:00:00 2001 From: Vladislav Oleshko Date: Tue, 28 May 2024 21:27:53 +0300 Subject: [PATCH 4/7] chore: stats --- src/server/common.cc | 3 ++- src/server/common.h | 1 + src/server/server_family.cc | 1 + src/server/tiered_storage.cc | 4 ++++ src/server/tiered_storage_test.cc | 3 ++- src/server/tiering/small_bins.cc | 8 ++++---- src/server/tiering/small_bins.h | 2 +- 7 files changed, 15 insertions(+), 7 deletions(-) diff --git a/src/server/common.cc b/src/server/common.cc index ac6e8af94a1..a277683a20b 100644 --- a/src/server/common.cc +++ b/src/server/common.cc @@ -258,12 +258,13 @@ bool ParseDouble(string_view src, double* value) { #define ADD(x) (x) += o.x TieredStats& TieredStats::operator+=(const TieredStats& o) { - static_assert(sizeof(TieredStats) == 88); + static_assert(sizeof(TieredStats) == 96); ADD(total_stashes); ADD(total_fetches); ADD(total_cancels); ADD(total_deletes); + ADD(total_defrags); ADD(allocated_bytes); ADD(capacity_bytes); diff --git a/src/server/common.h b/src/server/common.h index ab4988507ae..a20dcd2b123 100644 --- a/src/server/common.h +++ b/src/server/common.h @@ -65,6 +65,7 @@ struct TieredStats { size_t total_fetches = 0; size_t total_cancels = 0; size_t total_deletes = 0; + size_t total_defrags = 0; size_t allocated_bytes = 0; size_t capacity_bytes = 0; diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 762f8ed12e8..e5cadf57edf 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -2107,6 +2107,7 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) { append("tiered_total_fetches", m.tiered_stats.total_fetches); append("tiered_total_cancels", m.tiered_stats.total_cancels); append("tiered_total_deletes", m.tiered_stats.total_deletes); + append("tiered_total_deletes", m.tiered_stats.total_defrags); append("tiered_allocated_bytes", m.tiered_stats.allocated_bytes); append("tiered_capacity_bytes", m.tiered_stats.capacity_bytes); diff --git a/src/server/tiered_storage.cc b/src/server/tiered_storage.cc index 7fa2534fead..33ba4d3329f 100644 --- a/src/server/tiered_storage.cc +++ b/src/server/tiered_storage.cc @@ -139,6 +139,8 @@ class TieredStorage::ShardOpManager : public tiering::OpManager { if (!IsValid(it)) continue; + stats_.total_defrags++; + // Cut out relevant part of value and restore it to memory string_view sub_value = value.substr(sub_segment.offset - segment.offset, sub_segment.length); SetInMemory(&it->second, dbid, sub_value, sub_segment); @@ -199,6 +201,7 @@ class TieredStorage::ShardOpManager : public tiering::OpManager { struct { size_t total_stashes = 0, total_fetches = 0, total_cancels = 0, total_deletes = 0; + size_t total_defrags = 0; // part of total_fetches } stats_; TieredStorage* ts_; @@ -312,6 +315,7 @@ TieredStats TieredStorage::GetStats() const { stats.total_fetches = shard_stats.total_fetches; stats.total_stashes = shard_stats.total_stashes; stats.total_cancels = shard_stats.total_cancels; + stats.total_defrags = shard_stats.total_defrags; } { // OpManager stats diff --git a/src/server/tiered_storage_test.cc b/src/server/tiered_storage_test.cc index ca4bfe12352..c93216599af 100644 --- a/src/server/tiered_storage_test.cc +++ b/src/server/tiered_storage_test.cc @@ -156,9 +156,10 @@ TEST_F(TieredStorageTest, Defrag) { EXPECT_EQ(metrics.tiered_stats.small_bins_cnt, 1u); EXPECT_EQ(metrics.tiered_stats.small_bins_entries_cnt, 4u); - // This tirggers defragmentation + // This tirggers defragmentation, as only 3 < 7/2 remain left Run({"GET", string(1, 'd')}); metrics = GetMetrics(); + EXPECT_EQ(metrics.tiered_stats.total_defrags, 3u); EXPECT_EQ(metrics.tiered_stats.small_bins_cnt, 0u); EXPECT_EQ(metrics.tiered_stats.allocated_bytes, 0u); } diff --git a/src/server/tiering/small_bins.cc b/src/server/tiering/small_bins.cc index 40687fcaacd..f02776e80dd 100644 --- a/src/server/tiering/small_bins.cc +++ b/src/server/tiering/small_bins.cc @@ -85,7 +85,7 @@ SmallBins::KeySegmentList SmallBins::ReportStashed(BinId id, DiskSegment segment list.emplace_back(key.first, key.second, real_segment); } - stats_.total_stashed_entries += list.size(); + stats_.stashed_entries_cnt += list.size(); stashed_bins_[segment.offset] = {uint8_t(list.size()), bytes}; return list; } @@ -118,7 +118,7 @@ std::optional SmallBins::Delete(DbIndex dbid, std::string_view SmallBins::BinInfo SmallBins::Delete(DiskSegment segment) { auto full_segment = segment.FillPages(); if (auto it = stashed_bins_.find(full_segment.offset); it != stashed_bins_.end()) { - stats_.total_stashed_entries--; + stats_.stashed_entries_cnt--; auto& bin = it->second; DCHECK_LE(segment.length, bin.bytes); @@ -140,13 +140,13 @@ SmallBins::BinInfo SmallBins::Delete(DiskSegment segment) { SmallBins::Stats SmallBins::GetStats() const { return Stats{.stashed_bins_cnt = stashed_bins_.size(), - .stashed_entries_cnt = stats_.total_stashed_entries, + .stashed_entries_cnt = stats_.stashed_entries_cnt, .current_bin_bytes = current_bin_bytes_}; } SmallBins::KeyHashDbList SmallBins::DeleteBin(DiskSegment segment, std::string_view value) { DCHECK_EQ(value.size(), kPageSize); - stats_.total_stashed_entries -= stashed_bins_.extract(segment.offset).mapped().entries; + stats_.stashed_entries_cnt -= stashed_bins_.extract(segment.offset).mapped().entries; const char* data = value.data(); diff --git a/src/server/tiering/small_bins.h b/src/server/tiering/small_bins.h index 6f6eb76a407..50481a1d475 100644 --- a/src/server/tiering/small_bins.h +++ b/src/server/tiering/small_bins.h @@ -90,7 +90,7 @@ class SmallBins { absl::flat_hash_map stashed_bins_; struct { - size_t total_stashed_entries = 0; + size_t stashed_entries_cnt = 0; } stats_; }; From 8a49b0e4db909b50ca1914304fc639c22b66fee3 Mon Sep 17 00:00:00 2001 From: Vladislav Oleshko Date: Tue, 28 May 2024 22:04:33 +0300 Subject: [PATCH 5/7] fix: small comments --- src/server/tiered_storage.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/server/tiered_storage.cc b/src/server/tiered_storage.cc index 33ba4d3329f..2936a2e2933 100644 --- a/src/server/tiered_storage.cc +++ b/src/server/tiered_storage.cc @@ -158,9 +158,9 @@ class TieredStorage::ShardOpManager : public tiering::OpManager { bool ReportFetched(EntryId id, string_view value, tiering::DiskSegment segment, bool modified) override { - if (id == EntryId{kFragmentedBin}) { + if (id == EntryId{kFragmentedBin}) { // Generally we read whole bins only for defrag Defragment(segment, value); - return true; + return true; // delete } if (!modified && !cache_fetched_) @@ -201,7 +201,7 @@ class TieredStorage::ShardOpManager : public tiering::OpManager { struct { size_t total_stashes = 0, total_fetches = 0, total_cancels = 0, total_deletes = 0; - size_t total_defrags = 0; // part of total_fetches + size_t total_defrags = 0; // included in total_fetches } stats_; TieredStorage* ts_; From 21a0c4969cad13f825aa32b78e9ce0b8305172a4 Mon Sep 17 00:00:00 2001 From: Vladislav Oleshko Date: Wed, 29 May 2024 16:50:34 +0300 Subject: [PATCH 6/7] fix: small fixes --- src/server/tiering/common.h | 2 +- src/server/tiering/op_manager.cc | 8 +++++--- src/server/tiering/op_manager.h | 2 +- src/server/tiering/small_bins.cc | 2 +- 4 files changed, 8 insertions(+), 6 deletions(-) diff --git a/src/server/tiering/common.h b/src/server/tiering/common.h index f9403fc6700..13de4fe9992 100644 --- a/src/server/tiering/common.h +++ b/src/server/tiering/common.h @@ -27,7 +27,7 @@ constexpr size_t kPageSize = 4_KB; // Location on the offloaded blob, measured in bytes struct DiskSegment { - DiskSegment FillPages() const { + DiskSegment ContainingPages() const { return {offset / kPageSize * kPageSize, (length + kPageSize - 1) / kPageSize * kPageSize}; } diff --git a/src/server/tiering/op_manager.cc b/src/server/tiering/op_manager.cc index dd36d097162..541e4183d5c 100644 --- a/src/server/tiering/op_manager.cc +++ b/src/server/tiering/op_manager.cc @@ -43,7 +43,9 @@ void OpManager::Close() { void OpManager::Enqueue(EntryId id, DiskSegment segment, ReadCallback cb) { // Fill pages for prepared read as it has no penalty and potentially covers more small segments - PrepareRead(segment.FillPages()).ForSegment(segment, id).callbacks.emplace_back(std::move(cb)); + PrepareRead(segment.ContainingPages()) + .ForSegment(segment, id) + .callbacks.emplace_back(std::move(cb)); } void OpManager::Delete(EntryId id) { @@ -54,13 +56,13 @@ void OpManager::Delete(EntryId id) { void OpManager::Delete(DiskSegment segment) { EntryOps* pending_op = nullptr; - if (auto it = pending_reads_.find(segment.FillPages().offset); it != pending_reads_.end()) + if (auto it = pending_reads_.find(segment.ContainingPages().offset); it != pending_reads_.end()) pending_op = it->second.Find(segment); if (pending_op) { pending_op->deleting = true; } else if (ReportDelete(segment)) { - storage_.MarkAsFree(segment.FillPages()); + storage_.MarkAsFree(segment.ContainingPages()); } } diff --git a/src/server/tiering/op_manager.h b/src/server/tiering/op_manager.h index 9d0f1e1edf0..7a55c7f82de 100644 --- a/src/server/tiering/op_manager.h +++ b/src/server/tiering/op_manager.h @@ -47,7 +47,7 @@ class OpManager { // Enqueue callback to be executed once value is read. Trigger read if none is pending yet for // this segment. Multiple entries can be obtained from a single segment, but every distinct id - // will have it's own indepentend callback loop that can safely modifly the underlying value + // will have it's own independent callback loop that can safely modify the underlying value void Enqueue(EntryId id, DiskSegment segment, ReadCallback cb); // Delete entry with pending io diff --git a/src/server/tiering/small_bins.cc b/src/server/tiering/small_bins.cc index f02776e80dd..d8700886ebf 100644 --- a/src/server/tiering/small_bins.cc +++ b/src/server/tiering/small_bins.cc @@ -116,7 +116,7 @@ std::optional SmallBins::Delete(DbIndex dbid, std::string_view } SmallBins::BinInfo SmallBins::Delete(DiskSegment segment) { - auto full_segment = segment.FillPages(); + auto full_segment = segment.ContainingPages(); if (auto it = stashed_bins_.find(full_segment.offset); it != stashed_bins_.end()) { stats_.stashed_entries_cnt--; auto& bin = it->second; From 01b8c39a01280b0fc134b323e0198d4945d68cda Mon Sep 17 00:00:00 2001 From: Vladislav Oleshko Date: Wed, 29 May 2024 19:12:45 +0300 Subject: [PATCH 7/7] chore: commnets --- src/server/tiering/op_manager.cc | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/server/tiering/op_manager.cc b/src/server/tiering/op_manager.cc index 541e4183d5c..7d23fa16bd2 100644 --- a/src/server/tiering/op_manager.cc +++ b/src/server/tiering/op_manager.cc @@ -107,7 +107,10 @@ void OpManager::ProcessRead(size_t offset, std::string_view value) { util::FiberAtomicGuard guard; // atomically update items, no in-between states should be possible ReadOp* info = &pending_reads_.at(offset); - // Reorder base read (offset 0) to be last, so reads for defragmentation are handled last + // Reorder base read (offset 0) to be last, so reads for defragmentation are handled last. + // If we already have a page read for defragmentation pending and some other read for the + // sub-segment is enqueued, we first must handle the sub-segment read, only then the full page + // read for (size_t i = 0; i + 1 < info->key_ops.size(); i++) { if (info->key_ops[i].segment.offset % kPageSize == 0) { std::swap(info->key_ops[i], info->key_ops.back()); @@ -117,7 +120,9 @@ void OpManager::ProcessRead(size_t offset, std::string_view value) { bool deleting_full = false; std::string key_value; - for (size_t i = 0; i < info->key_ops.size(); i++) { // more items can be read while iterating + + // Report functions in the loop may append items to info->key_ops during the traversal + for (size_t i = 0; i < info->key_ops.size(); i++) { auto& ko = info->key_ops[i]; key_value = value.substr(ko.segment.offset - info->segment.offset, ko.segment.length);