diff --git a/src/server/common.cc b/src/server/common.cc index ac6e8af94a1..a277683a20b 100644 --- a/src/server/common.cc +++ b/src/server/common.cc @@ -258,12 +258,13 @@ bool ParseDouble(string_view src, double* value) { #define ADD(x) (x) += o.x TieredStats& TieredStats::operator+=(const TieredStats& o) { - static_assert(sizeof(TieredStats) == 88); + static_assert(sizeof(TieredStats) == 96); ADD(total_stashes); ADD(total_fetches); ADD(total_cancels); ADD(total_deletes); + ADD(total_defrags); ADD(allocated_bytes); ADD(capacity_bytes); diff --git a/src/server/common.h b/src/server/common.h index ab4988507ae..a20dcd2b123 100644 --- a/src/server/common.h +++ b/src/server/common.h @@ -65,6 +65,7 @@ struct TieredStats { size_t total_fetches = 0; size_t total_cancels = 0; size_t total_deletes = 0; + size_t total_defrags = 0; size_t allocated_bytes = 0; size_t capacity_bytes = 0; diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 762f8ed12e8..e5cadf57edf 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -2107,6 +2107,7 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) { append("tiered_total_fetches", m.tiered_stats.total_fetches); append("tiered_total_cancels", m.tiered_stats.total_cancels); append("tiered_total_deletes", m.tiered_stats.total_deletes); + append("tiered_total_deletes", m.tiered_stats.total_defrags); append("tiered_allocated_bytes", m.tiered_stats.allocated_bytes); append("tiered_capacity_bytes", m.tiered_stats.capacity_bytes); diff --git a/src/server/tiered_storage.cc b/src/server/tiered_storage.cc index 50561b1d96b..2936a2e2933 100644 --- a/src/server/tiered_storage.cc +++ b/src/server/tiered_storage.cc @@ -45,6 +45,9 @@ bool OccupiesWholePages(size_t size) { return size >= TieredStorage::kMinOccupancySize; } +// Stashed bins no longer have bin ids, so this sentinel is used to differentiate from regular reads +constexpr auto kFragmentedBin = tiering::SmallBins::kInvalidBin - 1; + } // anonymous namespace class TieredStorage::ShardOpManager : public tiering::OpManager { @@ -124,6 +127,26 @@ class TieredStorage::ShardOpManager : public tiering::OpManager { return false; } + // Load all values from bin by their hashes + void Defragment(tiering::DiskSegment segment, string_view value) { + for (auto [dbid, hash, sub_segment] : ts_->bins_->DeleteBin(segment, value)) { + // Search for key with the same hash and value pointing to the same segment. + // If it still exists, it must correspond to the value stored in this bin + auto predicate = [sub_segment = sub_segment](const PrimeKey& key, const PrimeValue& probe) { + return probe.IsExternal() && tiering::DiskSegment{probe.GetExternalSlice()} == sub_segment; + }; + auto it = db_slice_->GetDBTable(dbid)->prime.FindFirst(hash, predicate); + if (!IsValid(it)) + continue; + + stats_.total_defrags++; + + // Cut out relevant part of value and restore it to memory + string_view sub_value = value.substr(sub_segment.offset - segment.offset, sub_segment.length); + SetInMemory(&it->second, dbid, sub_value, sub_segment); + } + } + void ReportStashed(EntryId id, tiering::DiskSegment segment, error_code ec) override { if (ec) { VLOG(1) << "Stash failed " << ec.message(); @@ -135,7 +158,11 @@ class TieredStorage::ShardOpManager : public tiering::OpManager { bool ReportFetched(EntryId id, string_view value, tiering::DiskSegment segment, bool modified) override { - DCHECK(holds_alternative(id)); // we never issue reads for bins + if (id == EntryId{kFragmentedBin}) { // Generally we read whole bins only for defrag + Defragment(segment, value); + return true; // delete + } + if (!modified && !cache_fetched_) return false; @@ -144,7 +171,20 @@ class TieredStorage::ShardOpManager : public tiering::OpManager { } bool ReportDelete(tiering::DiskSegment segment) override { - return OccupiesWholePages(segment.length) || ts_->bins_->Delete(segment); + if (OccupiesWholePages(segment.length)) + return true; + + auto bin = ts_->bins_->Delete(segment); + if (bin.empty) + return true; + + if (bin.fragmented) { + // Trigger read to signal need for defragmentation. ReportFetched will handle it. + VLOG(1) << "Enqueueing bin defragmentation for: x" << bin.segment.offset; + Enqueue(kFragmentedBin, bin.segment, [](std::string*) { return false; }); + } + + return false; } private: @@ -161,6 +201,7 @@ class TieredStorage::ShardOpManager : public tiering::OpManager { struct { size_t total_stashes = 0, total_fetches = 0, total_cancels = 0, total_deletes = 0; + size_t total_defrags = 0; // included in total_fetches } stats_; TieredStorage* ts_; @@ -274,6 +315,7 @@ TieredStats TieredStorage::GetStats() const { stats.total_fetches = shard_stats.total_fetches; stats.total_stashes = shard_stats.total_stashes; stats.total_cancels = shard_stats.total_cancels; + stats.total_defrags = shard_stats.total_defrags; } { // OpManager stats diff --git a/src/server/tiered_storage_test.cc b/src/server/tiered_storage_test.cc index e9e23dbfba0..c93216599af 100644 --- a/src/server/tiered_storage_test.cc +++ b/src/server/tiered_storage_test.cc @@ -135,6 +135,35 @@ TEST_F(TieredStorageTest, MultiDb) { } } +TEST_F(TieredStorageTest, Defrag) { + for (char k = 'a'; k < 'a' + 8; k++) { + Run({"SET", string(1, k), string(512, k)}); + } + + ExpectConditionWithinTimeout([this] { return GetMetrics().tiered_stats.total_stashes >= 1; }); + + // 7 out 8 are in one bin, the last one made if flush and is now filling + auto metrics = GetMetrics(); + EXPECT_EQ(metrics.tiered_stats.small_bins_cnt, 1u); + EXPECT_EQ(metrics.tiered_stats.small_bins_entries_cnt, 7u); + EXPECT_EQ(metrics.tiered_stats.small_bins_filling_bytes, 512 + 12); + + // Reading 3 values still leaves the bin more than half occupied + Run({"GET", string(1, 'a')}); + Run({"GET", string(1, 'b')}); + Run({"GET", string(1, 'c')}); + metrics = GetMetrics(); + EXPECT_EQ(metrics.tiered_stats.small_bins_cnt, 1u); + EXPECT_EQ(metrics.tiered_stats.small_bins_entries_cnt, 4u); + + // This tirggers defragmentation, as only 3 < 7/2 remain left + Run({"GET", string(1, 'd')}); + metrics = GetMetrics(); + EXPECT_EQ(metrics.tiered_stats.total_defrags, 3u); + EXPECT_EQ(metrics.tiered_stats.small_bins_cnt, 0u); + EXPECT_EQ(metrics.tiered_stats.allocated_bytes, 0u); +} + TEST_F(TieredStorageTest, BackgroundOffloading) { absl::FlagSaver saver; absl::SetFlag(&FLAGS_tiered_offload_threshold, 0.0f); // offload all values diff --git a/src/server/tiering/common.h b/src/server/tiering/common.h index f9403fc6700..13de4fe9992 100644 --- a/src/server/tiering/common.h +++ b/src/server/tiering/common.h @@ -27,7 +27,7 @@ constexpr size_t kPageSize = 4_KB; // Location on the offloaded blob, measured in bytes struct DiskSegment { - DiskSegment FillPages() const { + DiskSegment ContainingPages() const { return {offset / kPageSize * kPageSize, (length + kPageSize - 1) / kPageSize * kPageSize}; } diff --git a/src/server/tiering/op_manager.cc b/src/server/tiering/op_manager.cc index ebb5f38456a..7d23fa16bd2 100644 --- a/src/server/tiering/op_manager.cc +++ b/src/server/tiering/op_manager.cc @@ -43,7 +43,9 @@ void OpManager::Close() { void OpManager::Enqueue(EntryId id, DiskSegment segment, ReadCallback cb) { // Fill pages for prepared read as it has no penalty and potentially covers more small segments - PrepareRead(segment.FillPages()).ForSegment(segment, id).callbacks.emplace_back(std::move(cb)); + PrepareRead(segment.ContainingPages()) + .ForSegment(segment, id) + .callbacks.emplace_back(std::move(cb)); } void OpManager::Delete(EntryId id) { @@ -54,13 +56,13 @@ void OpManager::Delete(EntryId id) { void OpManager::Delete(DiskSegment segment) { EntryOps* pending_op = nullptr; - if (auto it = pending_reads_.find(segment.offset); it != pending_reads_.end()) + if (auto it = pending_reads_.find(segment.ContainingPages().offset); it != pending_reads_.end()) pending_op = it->second.Find(segment); if (pending_op) { pending_op->deleting = true; } else if (ReportDelete(segment)) { - storage_.MarkAsFree(segment.FillPages()); + storage_.MarkAsFree(segment.ContainingPages()); } } @@ -102,11 +104,26 @@ void OpManager::ProcessStashed(EntryId id, unsigned version, DiskSegment segment } void OpManager::ProcessRead(size_t offset, std::string_view value) { + util::FiberAtomicGuard guard; // atomically update items, no in-between states should be possible ReadOp* info = &pending_reads_.at(offset); + // Reorder base read (offset 0) to be last, so reads for defragmentation are handled last. + // If we already have a page read for defragmentation pending and some other read for the + // sub-segment is enqueued, we first must handle the sub-segment read, only then the full page + // read + for (size_t i = 0; i + 1 < info->key_ops.size(); i++) { + if (info->key_ops[i].segment.offset % kPageSize == 0) { + std::swap(info->key_ops[i], info->key_ops.back()); + break; + } + } + bool deleting_full = false; std::string key_value; - for (auto& ko : info->key_ops) { + + // Report functions in the loop may append items to info->key_ops during the traversal + for (size_t i = 0; i < info->key_ops.size(); i++) { + auto& ko = info->key_ops[i]; key_value = value.substr(ko.segment.offset - info->segment.offset, ko.segment.length); bool modified = false; @@ -125,6 +142,7 @@ void OpManager::ProcessRead(size_t offset, std::string_view value) { if (deleting_full) storage_.MarkAsFree(info->segment); + pending_reads_.erase(offset); } diff --git a/src/server/tiering/op_manager.h b/src/server/tiering/op_manager.h index eba4ccb96d2..7a55c7f82de 100644 --- a/src/server/tiering/op_manager.h +++ b/src/server/tiering/op_manager.h @@ -45,8 +45,9 @@ class OpManager { void Close(); - // Enqueue callback to be executed once value is read. Triggers read if none is pending yet for - // this segment + // Enqueue callback to be executed once value is read. Trigger read if none is pending yet for + // this segment. Multiple entries can be obtained from a single segment, but every distinct id + // will have it's own independent callback loop that can safely modify the underlying value void Enqueue(EntryId id, DiskSegment segment, ReadCallback cb); // Delete entry with pending io diff --git a/src/server/tiering/small_bins.cc b/src/server/tiering/small_bins.cc index 956fa9e54d0..d8700886ebf 100644 --- a/src/server/tiering/small_bins.cc +++ b/src/server/tiering/small_bins.cc @@ -22,7 +22,7 @@ std::optional SmallBins::Stash(DbIndex dbid, std::string_v DCHECK_LT(value.size(), 2_KB); // See FlushBin() for format details - size_t value_bytes = 2 /* dbid */ + 8 /* hash */ + value.size(); + size_t value_bytes = 2 /* dbid */ + 8 /* hash */ + 2 /* strlen*/ + value.size(); std::optional filled_bin; if (2 /* num entries */ + current_bin_bytes_ + value_bytes >= kPageSize) { @@ -56,10 +56,12 @@ SmallBins::FilledBin SmallBins::FlushBin() { data += sizeof(uint64_t); } - // Store all values, n * x bytes + // Store all values with sizes, n * (2 + x) bytes for (const auto& [key, value] : current_bin_) { - pending_set[key] = {size_t(data - out.data()), value.size()}; + absl::little_endian::Store16(data, value.size()); + data += sizeof(uint16_t); + pending_set[key] = {size_t(data - out.data()), value.size()}; memcpy(data, value.data(), value.size()); data += value.size(); } @@ -74,13 +76,17 @@ SmallBins::KeySegmentList SmallBins::ReportStashed(BinId id, DiskSegment segment auto key_list = pending_bins_.extract(id); DCHECK_GT(key_list.mapped().size(), 0u); + uint16_t bytes = 0; SmallBins::KeySegmentList list; for (auto& [key, sub_segment] : key_list.mapped()) { - list.emplace_back(key.first, key.second, - DiskSegment{segment.offset + sub_segment.offset, sub_segment.length}); + bytes += sub_segment.length; + + DiskSegment real_segment{segment.offset + sub_segment.offset, sub_segment.length}; + list.emplace_back(key.first, key.second, real_segment); } - stats_.total_stashed_entries += list.size(); + stats_.stashed_entries_cnt += list.size(); + stashed_bins_[segment.offset] = {uint8_t(list.size()), bytes}; return list; } @@ -109,22 +115,67 @@ std::optional SmallBins::Delete(DbIndex dbid, std::string_view return std::nullopt; } -std::optional SmallBins::Delete(DiskSegment segment) { - segment = segment.FillPages(); - if (auto it = stashed_bins_.find(segment.offset); it != stashed_bins_.end()) { - stats_.total_stashed_entries--; - if (--it->second == 0) { +SmallBins::BinInfo SmallBins::Delete(DiskSegment segment) { + auto full_segment = segment.ContainingPages(); + if (auto it = stashed_bins_.find(full_segment.offset); it != stashed_bins_.end()) { + stats_.stashed_entries_cnt--; + auto& bin = it->second; + + DCHECK_LE(segment.length, bin.bytes); + bin.bytes -= segment.length; + + if (--bin.entries == 0) { + DCHECK_EQ(bin.bytes, 0u); stashed_bins_.erase(it); - return segment; + return {full_segment, false /* fragmented */, true /* empty */}; + } + + if (bin.bytes < kPageSize / 2) { + return {full_segment, true /* fragmented */, false /* empty */}; } } - return std::nullopt; + + return {segment}; } SmallBins::Stats SmallBins::GetStats() const { return Stats{.stashed_bins_cnt = stashed_bins_.size(), - .stashed_entries_cnt = stats_.total_stashed_entries, + .stashed_entries_cnt = stats_.stashed_entries_cnt, .current_bin_bytes = current_bin_bytes_}; } +SmallBins::KeyHashDbList SmallBins::DeleteBin(DiskSegment segment, std::string_view value) { + DCHECK_EQ(value.size(), kPageSize); + stats_.stashed_entries_cnt -= stashed_bins_.extract(segment.offset).mapped().entries; + + const char* data = value.data(); + + uint16_t entries = absl::little_endian::Load16(data); + data += sizeof(uint16_t); + + KeyHashDbList out(entries); + + // Recover dbids and hashes + for (size_t i = 0; i < entries; i++) { + DbIndex dbid = absl::little_endian::Load16(data); + data += sizeof(DbIndex); + + uint64_t hash = absl::little_endian::Load64(data); + data += sizeof(hash); + + out[i] = {dbid, hash, {0, 0}}; + } + + // Recover segments + for (size_t i = 0; i < entries; i++) { + uint16_t length = absl::little_endian::Load16(data); + data += sizeof(uint16_t); + + std::get(out[i]) = {segment.offset + (data - value.data()), length}; + data += length; + } + + return out; +} + } // namespace dfly::tiering diff --git a/src/server/tiering/small_bins.h b/src/server/tiering/small_bins.h index b4fa88c9f45..50481a1d475 100644 --- a/src/server/tiering/small_bins.h +++ b/src/server/tiering/small_bins.h @@ -27,6 +27,12 @@ class SmallBins { }; using BinId = unsigned; + static const BinId kInvalidBin = std::numeric_limits::max(); + + struct BinInfo { + DiskSegment segment; + bool fragmented = false, empty = false; + }; // Bin filled with blob of serialized entries using FilledBin = std::pair; @@ -34,6 +40,9 @@ class SmallBins { // List of locations of values for corresponding keys of previously filled bin using KeySegmentList = std::vector>; + // List of item key db indices and hashes + using KeyHashDbList = std::vector>; + // Enqueue key/value pair for stash. Returns page to be stashed if it filled up. std::optional Stash(DbIndex dbid, std::string_view key, std::string_view value); @@ -46,8 +55,13 @@ class SmallBins { // Delete a key with pending io. Returns entry id if needs to be deleted. std::optional Delete(DbIndex dbid, std::string_view key); - // Delete a stored segment. Returns page segment if it became emtpy and needs to be deleted. - std::optional Delete(DiskSegment segment); + // Delete a stored segment. Returns information about the current bin, which might indicate + // the need for external actions like deleting empty segments or triggering defragmentation + BinInfo Delete(DiskSegment segment); + + // Delete stashed bin. Returns list of recovered item key hashes and db indices. + // Mainly used for defragmentation + KeyHashDbList DeleteBin(DiskSegment segment, std::string_view value); Stats GetStats() const; @@ -56,6 +70,12 @@ class SmallBins { FilledBin FlushBin(); private: + struct StashInfo { + uint8_t entries = 0; + uint16_t bytes = 0; + }; + static_assert(sizeof(StashInfo) == sizeof(unsigned)); + BinId last_bin_id_ = 0; unsigned current_bin_bytes_ = 0; @@ -66,11 +86,11 @@ class SmallBins { absl::flat_hash_map /* key*/, DiskSegment>> pending_bins_; - // Map of bins that were stashed and should be deleted when refcount reaches 0 - absl::flat_hash_map stashed_bins_; + // Map of bins that were stashed and should be deleted when number of entries reaches 0 + absl::flat_hash_map stashed_bins_; struct { - size_t total_stashed_entries = 0; + size_t stashed_entries_cnt = 0; } stats_; }; diff --git a/src/server/tiering/small_bins_test.cc b/src/server/tiering/small_bins_test.cc index cb4cd1c2b1e..82bc4c261fc 100644 --- a/src/server/tiering/small_bins_test.cc +++ b/src/server/tiering/small_bins_test.cc @@ -57,7 +57,7 @@ TEST(SmallBins, SimpleDeleteAbort) { } } -TEST(SmallBins, PartialStash) { +TEST(SmallBins, PartialStashDelete) { SmallBins bins; // Fill single bin @@ -77,6 +77,22 @@ TEST(SmallBins, PartialStash) { for (auto& [dbid, key, segment] : segments) { EXPECT_EQ(key, "k"s + bin->second.substr(segment.offset, segment.length).substr(1)); } + + // Delete all stashed values + while (!segments.empty()) { + auto segment = std::get<2>(segments.back()); + segments.pop_back(); + auto bin = bins.Delete(segment); + + EXPECT_EQ(bin.segment.offset, 0u); + EXPECT_EQ(bin.segment.length, 4_KB); + + if (segments.empty()) { + EXPECT_TRUE(bin.empty); + } else { + EXPECT_TRUE(bin.fragmented); // half of the values were deleted + } + } } } // namespace dfly::tiering