Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(tiering): Defragmentation #3021

Merged
merged 7 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/server/common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -258,12 +258,13 @@ bool ParseDouble(string_view src, double* value) {
#define ADD(x) (x) += o.x

TieredStats& TieredStats::operator+=(const TieredStats& o) {
static_assert(sizeof(TieredStats) == 88);
static_assert(sizeof(TieredStats) == 96);

ADD(total_stashes);
ADD(total_fetches);
ADD(total_cancels);
ADD(total_deletes);
ADD(total_defrags);

ADD(allocated_bytes);
ADD(capacity_bytes);
Expand Down
1 change: 1 addition & 0 deletions src/server/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ struct TieredStats {
size_t total_fetches = 0;
size_t total_cancels = 0;
size_t total_deletes = 0;
size_t total_defrags = 0;

size_t allocated_bytes = 0;
size_t capacity_bytes = 0;
Expand Down
1 change: 1 addition & 0 deletions src/server/server_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2107,6 +2107,7 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
append("tiered_total_fetches", m.tiered_stats.total_fetches);
append("tiered_total_cancels", m.tiered_stats.total_cancels);
append("tiered_total_deletes", m.tiered_stats.total_deletes);
append("tiered_total_deletes", m.tiered_stats.total_defrags);

append("tiered_allocated_bytes", m.tiered_stats.allocated_bytes);
append("tiered_capacity_bytes", m.tiered_stats.capacity_bytes);
Expand Down
46 changes: 44 additions & 2 deletions src/server/tiered_storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ bool OccupiesWholePages(size_t size) {
return size >= TieredStorage::kMinOccupancySize;
}

// Stashed bins no longer have bin ids, so this sentinel is used to differentiate from regular reads
constexpr auto kFragmentedBin = tiering::SmallBins::kInvalidBin - 1;

} // anonymous namespace

class TieredStorage::ShardOpManager : public tiering::OpManager {
Expand Down Expand Up @@ -124,6 +127,26 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
return false;
}

// Load all values from bin by their hashes
romange marked this conversation as resolved.
Show resolved Hide resolved
void Defragment(tiering::DiskSegment segment, string_view value) {
for (auto [dbid, hash, sub_segment] : ts_->bins_->DeleteBin(segment, value)) {
// Search for key with the same hash and value pointing to the same segment.
// If it still exists, it must correspond to the value stored in this bin
auto predicate = [sub_segment = sub_segment](const PrimeKey& key, const PrimeValue& probe) {
return probe.IsExternal() && tiering::DiskSegment{probe.GetExternalSlice()} == sub_segment;
};
auto it = db_slice_->GetDBTable(dbid)->prime.FindFirst(hash, predicate);
if (!IsValid(it))
continue;

stats_.total_defrags++;

// Cut out relevant part of value and restore it to memory
string_view sub_value = value.substr(sub_segment.offset - segment.offset, sub_segment.length);
SetInMemory(&it->second, dbid, sub_value, sub_segment);
}
}

void ReportStashed(EntryId id, tiering::DiskSegment segment, error_code ec) override {
if (ec) {
VLOG(1) << "Stash failed " << ec.message();
Expand All @@ -135,7 +158,11 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {

bool ReportFetched(EntryId id, string_view value, tiering::DiskSegment segment,
bool modified) override {
DCHECK(holds_alternative<OpManager::KeyRef>(id)); // we never issue reads for bins
if (id == EntryId{kFragmentedBin}) { // Generally we read whole bins only for defrag
Defragment(segment, value);
return true; // delete
}

if (!modified && !cache_fetched_)
return false;

Expand All @@ -144,7 +171,20 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
}

bool ReportDelete(tiering::DiskSegment segment) override {
return OccupiesWholePages(segment.length) || ts_->bins_->Delete(segment);
if (OccupiesWholePages(segment.length))
return true;

auto bin = ts_->bins_->Delete(segment);
if (bin.empty)
return true;

if (bin.fragmented) {
// Trigger read to signal need for defragmentation. ReportFetched will handle it.
VLOG(1) << "Enqueueing bin defragmentation for: x" << bin.segment.offset;
Enqueue(kFragmentedBin, bin.segment, [](std::string*) { return false; });
}

return false;
}

private:
Expand All @@ -161,6 +201,7 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {

struct {
size_t total_stashes = 0, total_fetches = 0, total_cancels = 0, total_deletes = 0;
size_t total_defrags = 0; // included in total_fetches
} stats_;

TieredStorage* ts_;
Expand Down Expand Up @@ -274,6 +315,7 @@ TieredStats TieredStorage::GetStats() const {
stats.total_fetches = shard_stats.total_fetches;
stats.total_stashes = shard_stats.total_stashes;
stats.total_cancels = shard_stats.total_cancels;
stats.total_defrags = shard_stats.total_defrags;
}

{ // OpManager stats
Expand Down
29 changes: 29 additions & 0 deletions src/server/tiered_storage_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,35 @@ TEST_F(TieredStorageTest, MultiDb) {
}
}

TEST_F(TieredStorageTest, Defrag) {
for (char k = 'a'; k < 'a' + 8; k++) {
Run({"SET", string(1, k), string(512, k)});
}

ExpectConditionWithinTimeout([this] { return GetMetrics().tiered_stats.total_stashes >= 1; });

// 7 out 8 are in one bin, the last one made if flush and is now filling
auto metrics = GetMetrics();
EXPECT_EQ(metrics.tiered_stats.small_bins_cnt, 1u);
EXPECT_EQ(metrics.tiered_stats.small_bins_entries_cnt, 7u);
EXPECT_EQ(metrics.tiered_stats.small_bins_filling_bytes, 512 + 12);

// Reading 3 values still leaves the bin more than half occupied
Run({"GET", string(1, 'a')});
Run({"GET", string(1, 'b')});
Run({"GET", string(1, 'c')});
metrics = GetMetrics();
EXPECT_EQ(metrics.tiered_stats.small_bins_cnt, 1u);
EXPECT_EQ(metrics.tiered_stats.small_bins_entries_cnt, 4u);

// This tirggers defragmentation, as only 3 < 7/2 remain left
Run({"GET", string(1, 'd')});
metrics = GetMetrics();
EXPECT_EQ(metrics.tiered_stats.total_defrags, 3u);
EXPECT_EQ(metrics.tiered_stats.small_bins_cnt, 0u);
EXPECT_EQ(metrics.tiered_stats.allocated_bytes, 0u);
}

TEST_F(TieredStorageTest, BackgroundOffloading) {
absl::FlagSaver saver;
absl::SetFlag(&FLAGS_tiered_offload_threshold, 0.0f); // offload all values
Expand Down
2 changes: 1 addition & 1 deletion src/server/tiering/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ constexpr size_t kPageSize = 4_KB;

// Location on the offloaded blob, measured in bytes
struct DiskSegment {
DiskSegment FillPages() const {
DiskSegment ContainingPages() const {
return {offset / kPageSize * kPageSize, (length + kPageSize - 1) / kPageSize * kPageSize};
}

Expand Down
26 changes: 22 additions & 4 deletions src/server/tiering/op_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ void OpManager::Close() {

void OpManager::Enqueue(EntryId id, DiskSegment segment, ReadCallback cb) {
// Fill pages for prepared read as it has no penalty and potentially covers more small segments
PrepareRead(segment.FillPages()).ForSegment(segment, id).callbacks.emplace_back(std::move(cb));
PrepareRead(segment.ContainingPages())
.ForSegment(segment, id)
.callbacks.emplace_back(std::move(cb));
}

void OpManager::Delete(EntryId id) {
Expand All @@ -54,13 +56,13 @@ void OpManager::Delete(EntryId id) {

void OpManager::Delete(DiskSegment segment) {
EntryOps* pending_op = nullptr;
if (auto it = pending_reads_.find(segment.offset); it != pending_reads_.end())
if (auto it = pending_reads_.find(segment.ContainingPages().offset); it != pending_reads_.end())
pending_op = it->second.Find(segment);

if (pending_op) {
pending_op->deleting = true;
} else if (ReportDelete(segment)) {
storage_.MarkAsFree(segment.FillPages());
storage_.MarkAsFree(segment.ContainingPages());
}
}

Expand Down Expand Up @@ -102,11 +104,26 @@ void OpManager::ProcessStashed(EntryId id, unsigned version, DiskSegment segment
}

void OpManager::ProcessRead(size_t offset, std::string_view value) {
util::FiberAtomicGuard guard; // atomically update items, no in-between states should be possible
ReadOp* info = &pending_reads_.at(offset);

// Reorder base read (offset 0) to be last, so reads for defragmentation are handled last.
// If we already have a page read for defragmentation pending and some other read for the
// sub-segment is enqueued, we first must handle the sub-segment read, only then the full page
// read
for (size_t i = 0; i + 1 < info->key_ops.size(); i++) {
if (info->key_ops[i].segment.offset % kPageSize == 0) {
std::swap(info->key_ops[i], info->key_ops.back());
break;
}
}

bool deleting_full = false;
std::string key_value;
for (auto& ko : info->key_ops) {

// Report functions in the loop may append items to info->key_ops during the traversal
for (size_t i = 0; i < info->key_ops.size(); i++) {
auto& ko = info->key_ops[i];
key_value = value.substr(ko.segment.offset - info->segment.offset, ko.segment.length);

bool modified = false;
Expand All @@ -125,6 +142,7 @@ void OpManager::ProcessRead(size_t offset, std::string_view value) {

if (deleting_full)
storage_.MarkAsFree(info->segment);

pending_reads_.erase(offset);
}

Expand Down
5 changes: 3 additions & 2 deletions src/server/tiering/op_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ class OpManager {

void Close();

// Enqueue callback to be executed once value is read. Triggers read if none is pending yet for
// this segment
// Enqueue callback to be executed once value is read. Trigger read if none is pending yet for
// this segment. Multiple entries can be obtained from a single segment, but every distinct id
// will have it's own independent callback loop that can safely modify the underlying value
void Enqueue(EntryId id, DiskSegment segment, ReadCallback cb);

// Delete entry with pending io
Expand Down
79 changes: 65 additions & 14 deletions src/server/tiering/small_bins.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ std::optional<SmallBins::FilledBin> SmallBins::Stash(DbIndex dbid, std::string_v
DCHECK_LT(value.size(), 2_KB);

// See FlushBin() for format details
size_t value_bytes = 2 /* dbid */ + 8 /* hash */ + value.size();
size_t value_bytes = 2 /* dbid */ + 8 /* hash */ + 2 /* strlen*/ + value.size();

std::optional<FilledBin> filled_bin;
if (2 /* num entries */ + current_bin_bytes_ + value_bytes >= kPageSize) {
Expand Down Expand Up @@ -56,10 +56,12 @@ SmallBins::FilledBin SmallBins::FlushBin() {
data += sizeof(uint64_t);
}

// Store all values, n * x bytes
// Store all values with sizes, n * (2 + x) bytes
for (const auto& [key, value] : current_bin_) {
pending_set[key] = {size_t(data - out.data()), value.size()};
absl::little_endian::Store16(data, value.size());
data += sizeof(uint16_t);

pending_set[key] = {size_t(data - out.data()), value.size()};
memcpy(data, value.data(), value.size());
data += value.size();
}
Expand All @@ -74,13 +76,17 @@ SmallBins::KeySegmentList SmallBins::ReportStashed(BinId id, DiskSegment segment
auto key_list = pending_bins_.extract(id);
DCHECK_GT(key_list.mapped().size(), 0u);

uint16_t bytes = 0;
SmallBins::KeySegmentList list;
for (auto& [key, sub_segment] : key_list.mapped()) {
list.emplace_back(key.first, key.second,
DiskSegment{segment.offset + sub_segment.offset, sub_segment.length});
bytes += sub_segment.length;

DiskSegment real_segment{segment.offset + sub_segment.offset, sub_segment.length};
list.emplace_back(key.first, key.second, real_segment);
}

stats_.total_stashed_entries += list.size();
stats_.stashed_entries_cnt += list.size();
stashed_bins_[segment.offset] = {uint8_t(list.size()), bytes};
return list;
}

Expand Down Expand Up @@ -109,22 +115,67 @@ std::optional<SmallBins::BinId> SmallBins::Delete(DbIndex dbid, std::string_view
return std::nullopt;
}

std::optional<DiskSegment> SmallBins::Delete(DiskSegment segment) {
segment = segment.FillPages();
if (auto it = stashed_bins_.find(segment.offset); it != stashed_bins_.end()) {
stats_.total_stashed_entries--;
if (--it->second == 0) {
SmallBins::BinInfo SmallBins::Delete(DiskSegment segment) {
auto full_segment = segment.ContainingPages();
if (auto it = stashed_bins_.find(full_segment.offset); it != stashed_bins_.end()) {
stats_.stashed_entries_cnt--;
auto& bin = it->second;

DCHECK_LE(segment.length, bin.bytes);
bin.bytes -= segment.length;

if (--bin.entries == 0) {
DCHECK_EQ(bin.bytes, 0u);
stashed_bins_.erase(it);
return segment;
return {full_segment, false /* fragmented */, true /* empty */};
}

if (bin.bytes < kPageSize / 2) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it the condition to trigger fragmentation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes 🤓 Not very sophisticated

return {full_segment, true /* fragmented */, false /* empty */};
}
}
return std::nullopt;

return {segment};
}

SmallBins::Stats SmallBins::GetStats() const {
return Stats{.stashed_bins_cnt = stashed_bins_.size(),
.stashed_entries_cnt = stats_.total_stashed_entries,
.stashed_entries_cnt = stats_.stashed_entries_cnt,
.current_bin_bytes = current_bin_bytes_};
}

SmallBins::KeyHashDbList SmallBins::DeleteBin(DiskSegment segment, std::string_view value) {
DCHECK_EQ(value.size(), kPageSize);
stats_.stashed_entries_cnt -= stashed_bins_.extract(segment.offset).mapped().entries;

const char* data = value.data();

uint16_t entries = absl::little_endian::Load16(data);
data += sizeof(uint16_t);

KeyHashDbList out(entries);

// Recover dbids and hashes
for (size_t i = 0; i < entries; i++) {
DbIndex dbid = absl::little_endian::Load16(data);
data += sizeof(DbIndex);

uint64_t hash = absl::little_endian::Load64(data);
data += sizeof(hash);

out[i] = {dbid, hash, {0, 0}};
}

// Recover segments
for (size_t i = 0; i < entries; i++) {
uint16_t length = absl::little_endian::Load16(data);
data += sizeof(uint16_t);

std::get<DiskSegment>(out[i]) = {segment.offset + (data - value.data()), length};
data += length;
}

return out;
}

} // namespace dfly::tiering