Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a OnManualFlushScheduled callback in event listener #12631

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
228 changes: 196 additions & 32 deletions db/column_family_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/iterator.h"
#include "rocksdb/listener.h"
#include "rocksdb/utilities/object_registry.h"
#include "test_util/sync_point.h"
#include "test_util/testharness.h"
Expand All @@ -35,6 +36,14 @@

namespace ROCKSDB_NAMESPACE {

namespace {
std::string EncodeAsUint64(uint64_t v) {
std::string dst;
PutFixed64(&dst, v);
return dst;
}
} // namespace

static const int kValueSize = 1000;

// counts how many operations were performed
Expand Down Expand Up @@ -3674,21 +3683,17 @@ TEST_F(ColumnFamilyRetainUDTTest, FullHistoryTsLowNotSet) {

SyncPoint::GetInstance()->EnableProcessing();
Open();
std::string write_ts;
PutFixed64(&write_ts, 1);
ASSERT_OK(Put(0, "foo", write_ts, "v1"));
ASSERT_OK(Put(0, "foo", EncodeAsUint64(1), "v1"));
// No `full_history_ts_low` explicitly set by user, flush is continued
// without checking if its UDTs expired.
ASSERT_OK(Flush(0));

// After flush, `full_history_ts_low` should be automatically advanced to
// the effective cutoff timestamp: write_ts + 1
std::string cutoff_ts;
PutFixed64(&cutoff_ts, 2);
std::string effective_full_history_ts_low;
ASSERT_OK(
db_->GetFullHistoryTsLow(handles_[0], &effective_full_history_ts_low));
ASSERT_EQ(cutoff_ts, effective_full_history_ts_low);
ASSERT_EQ(EncodeAsUint64(2), effective_full_history_ts_low);
Close();

SyncPoint::GetInstance()->DisableProcessing();
Expand All @@ -3705,12 +3710,8 @@ TEST_F(ColumnFamilyRetainUDTTest, AllKeysExpired) {

SyncPoint::GetInstance()->EnableProcessing();
Open();
std::string write_ts;
PutFixed64(&write_ts, 1);
ASSERT_OK(Put(0, "foo", write_ts, "v1"));
std::string cutoff_ts;
PutFixed64(&cutoff_ts, 3);
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], cutoff_ts));
ASSERT_OK(Put(0, "foo", EncodeAsUint64(1), "v1"));
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], EncodeAsUint64(3)));
// All keys expired w.r.t the configured `full_history_ts_low`, flush continue
// without the need for a re-schedule.
ASSERT_OK(Flush(0));
Expand All @@ -3719,13 +3720,13 @@ TEST_F(ColumnFamilyRetainUDTTest, AllKeysExpired) {
std::string effective_full_history_ts_low;
ASSERT_OK(
db_->GetFullHistoryTsLow(handles_[0], &effective_full_history_ts_low));
ASSERT_EQ(cutoff_ts, effective_full_history_ts_low);
ASSERT_EQ(EncodeAsUint64(3), effective_full_history_ts_low);
Close();

SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
TEST_F(ColumnFamilyRetainUDTTest, NotAllKeysExpiredFlushToAvoidWriteStall) {
TEST_F(ColumnFamilyRetainUDTTest, IncreaseCutoffInMemtableSealCb) {
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundFlush:CheckFlushRequest:cb", [&](void* arg) {
ASSERT_NE(nullptr, arg);
Expand All @@ -3735,12 +3736,8 @@ TEST_F(ColumnFamilyRetainUDTTest, NotAllKeysExpiredFlushToAvoidWriteStall) {

SyncPoint::GetInstance()->EnableProcessing();
Open();
std::string cutoff_ts;
std::string write_ts;
PutFixed64(&write_ts, 1);
ASSERT_OK(Put(0, "foo", write_ts, "v1"));
PutFixed64(&cutoff_ts, 1);
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], cutoff_ts));
ASSERT_OK(Put(0, "foo", EncodeAsUint64(1), "v1"));
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], EncodeAsUint64(1)));
ASSERT_OK(db_->SetOptions(handles_[0], {{"max_write_buffer_number", "1"}}));
// Not all keys expired, but flush is continued without a re-schedule because
// of risk of write stall.
Expand All @@ -3752,24 +3749,194 @@ TEST_F(ColumnFamilyRetainUDTTest, NotAllKeysExpiredFlushToAvoidWriteStall) {
ASSERT_OK(
db_->GetFullHistoryTsLow(handles_[0], &effective_full_history_ts_low));

cutoff_ts.clear();
PutFixed64(&cutoff_ts, 2);
ASSERT_EQ(cutoff_ts, effective_full_history_ts_low);
ASSERT_EQ(EncodeAsUint64(2), effective_full_history_ts_low);
Close();

SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}

// The user selectively increase cutoff timestamp in the `OnMemtableSealed`
// callback when it is invoked during a manual flush. It's suitable for when the
// user does not know an effective new cutoff timestamp and the callback will
// provide this info.
// The caveat of this approach is that the user need to track when manual flush
// is ongoing. In this example listener, the `manual_flush_count_` variable is
// for this purpose, it's designed to be a counter to allow concurrent manual
// flush to control the increase cutoff timestamp behavior independently.
// Also, a lot of operations can indirectly cause a manual flush, such as
// manual compaction/file ingestion. And the user needs to
// explicitly track each of such operation. So this callback is not ideal. Check
// out below `ManualFlushScheduledEventListener` for a different approach.
class MemtableSealEventListener : public EventListener {
private:
DB* db_;
std::vector<ColumnFamilyHandle*> handles_;
std::atomic<int> manual_flush_count_{0};

public:
std::atomic<int> memtable_seal_count_{0};
std::atomic<int> increase_cutoff_count_{0};

void OnMemTableSealed(const MemTableInfo& info) override {
memtable_seal_count_.fetch_add(1);
if (manual_flush_count_.load() == 0) {
return;
}
if (!info.newest_udt.empty()) {
uint64_t int_newest_udt = 0;
Slice udt_slice = info.newest_udt;
Status s = DecodeU64Ts(udt_slice, &int_newest_udt);
if (!s.ok()) {
return;
}
// An error indicates others have already set the cutoff to a higher
// point, so it's OK to proceed.
db_->IncreaseFullHistoryTsLow(handles_[0],
EncodeAsUint64(int_newest_udt + 1))
.PermitUncheckedError();
increase_cutoff_count_.fetch_add(1);
}
}

void PopulateDBAndHandles(DB* db, std::vector<ColumnFamilyHandle*> handles) {
db_ = db;
handles_ = handles;
}

void MarkManualFlushStart() { manual_flush_count_.fetch_add(1); }

void MarkManualFlushEnd() { manual_flush_count_.fetch_sub(1); }
};

TEST_F(ColumnFamilyRetainUDTTest, IncreaseCutoffOnMemtableSealedCb) {
std::shared_ptr<MemtableSealEventListener> listener =
std::make_shared<MemtableSealEventListener>();
db_options_.listeners.push_back(listener);
const int kNumEntriesPerMemTable = 2;
column_family_options_.memtable_factory.reset(
test::NewSpecialSkipListFactory(kNumEntriesPerMemTable - 1));
// Make sure there is no memory pressure to not retain udts.
column_family_options_.max_write_buffer_number = 8;
Open();

listener->PopulateDBAndHandles(db_, handles_);
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], EncodeAsUint64(1)));
ASSERT_OK(Put(0, "bar", EncodeAsUint64(2), "v1"));
ASSERT_OK(Put(0, "baz", EncodeAsUint64(2), "v1"));
// Event listener not attempt to increase cutoff timestamp if there is no
// manual flush going on.
ASSERT_EQ(listener->memtable_seal_count_.load(), 1);
ASSERT_EQ(listener->increase_cutoff_count_.load(), 0);

// Created the first memtable and scheduled it for flush.
ASSERT_OK(Put(0, "foo", EncodeAsUint64(2), "v1"));
listener->MarkManualFlushStart();
// Cutoff increased to 3 in `OnMemTableSealed` callback.
ASSERT_OK(dbfull()->Flush(FlushOptions(), handles_[0]));
listener->MarkManualFlushEnd();

std::string effective_full_history_ts_low;
ASSERT_OK(
db_->GetFullHistoryTsLow(handles_[0], &effective_full_history_ts_low));
ASSERT_EQ(EncodeAsUint64(3), effective_full_history_ts_low);

ASSERT_OK(Put(0, "foo", EncodeAsUint64(4), "v2"));
// Cutoff increased to 5 in `OnMemtableSealed` callback.
listener->MarkManualFlushStart();
ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_[0], nullptr,
nullptr));
listener->MarkManualFlushEnd();

ASSERT_OK(
db_->GetFullHistoryTsLow(handles_[0], &effective_full_history_ts_low));
ASSERT_EQ(EncodeAsUint64(5), effective_full_history_ts_low);

// There are two attempts to increase cutoff timestamp, one for each manual
// compaction.
ASSERT_EQ(listener->increase_cutoff_count_.load(), 2);
Close();
}

// The user explicitly increase cutoff timestamp in the `OnManualFlushScheduled`
// callback. It's suitable for when the user already knows an effective cutoff
// timestamp to let the flush proceed.
class ManualFlushScheduledEventListener : public EventListener {
private:
std::vector<ColumnFamilyHandle*> handles_;
// this is a workaround to get a meaningful cutoff timestamp to use.
std::atomic<uint64_t> counter{0};

public:
void OnManualFlushScheduled(
DB* db, const std::vector<ManualFlushInfo>& manual_flush_info) override {
// This vector should always be 1 for non atomic flush case.
EXPECT_EQ(manual_flush_info.size(), 1);
EXPECT_EQ(manual_flush_info[0].cf_name, kDefaultColumnFamilyName);
if (counter.load() == 0) {
EXPECT_EQ(manual_flush_info[0].flush_reason, FlushReason::kManualFlush);
// An error indicates others have already set the cutoff to a higher
// point, so it's OK to proceed.
db->IncreaseFullHistoryTsLow(handles_[0], EncodeAsUint64(3))
.PermitUncheckedError();
} else if (counter.load() == 1) {
EXPECT_EQ(manual_flush_info[0].flush_reason,
FlushReason::kManualCompaction);
db->IncreaseFullHistoryTsLow(handles_[0], EncodeAsUint64(5))
.PermitUncheckedError();
}
counter.fetch_add(1);
}

void PopulateHandles(std::vector<ColumnFamilyHandle*> handles) {
handles_ = handles;
}
};

TEST_F(ColumnFamilyRetainUDTTest, IncreaseCutoffOnManualFlushScheduledCb) {
std::shared_ptr<ManualFlushScheduledEventListener> listener =
std::make_shared<ManualFlushScheduledEventListener>();
db_options_.listeners.push_back(listener);
const int kNumEntriesPerMemTable = 2;
column_family_options_.memtable_factory.reset(
test::NewSpecialSkipListFactory(kNumEntriesPerMemTable - 1));
// Make sure there is no memory pressure to not retain udts.
column_family_options_.max_write_buffer_number = 8;
Open();

listener->PopulateHandles(handles_);
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], EncodeAsUint64(1)));
ASSERT_OK(Put(0, "bar", EncodeAsUint64(2), "v1"));
ASSERT_OK(Put(0, "baz", EncodeAsUint64(2), "v1"));
// Created the first memtable and scheduled it for flush.
ASSERT_OK(Put(0, "foo", EncodeAsUint64(2), "v1"));
// Cutoff increased to 3 in the `OnManualFlushScheduled` callback.
ASSERT_OK(dbfull()->Flush(FlushOptions(), handles_[0]));

std::string effective_full_history_ts_low;
ASSERT_OK(
db_->GetFullHistoryTsLow(handles_[0], &effective_full_history_ts_low));
ASSERT_EQ(EncodeAsUint64(3), effective_full_history_ts_low);

ASSERT_OK(Put(0, "foo", EncodeAsUint64(4), "v2"));
// Cutoff increased to 5 in the `OnManualFlushScheduled` callback.
ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_[0], nullptr,
nullptr));

ASSERT_OK(
db_->GetFullHistoryTsLow(handles_[0], &effective_full_history_ts_low));
ASSERT_EQ(EncodeAsUint64(5), effective_full_history_ts_low);
Close();
}

TEST_F(ColumnFamilyRetainUDTTest, NotAllKeysExpiredFlushRescheduled) {
std::string cutoff_ts;
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::AfterRetainUDTReschedule:cb", [&](void* /*arg*/) {
// Increasing full_history_ts_low so all keys expired after the initial
// FlushRequest is rescheduled
cutoff_ts.clear();
PutFixed64(&cutoff_ts, 3);
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], cutoff_ts));
ASSERT_OK(
db_->IncreaseFullHistoryTsLow(handles_[0], EncodeAsUint64(3)));
});
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundFlush:CheckFlushRequest:cb", [&](void* arg) {
Expand All @@ -3780,11 +3947,8 @@ TEST_F(ColumnFamilyRetainUDTTest, NotAllKeysExpiredFlushRescheduled) {
SyncPoint::GetInstance()->EnableProcessing();

Open();
std::string write_ts;
PutFixed64(&write_ts, 1);
ASSERT_OK(Put(0, "foo", write_ts, "v1"));
PutFixed64(&cutoff_ts, 1);
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], cutoff_ts));
ASSERT_OK(Put(0, "foo", EncodeAsUint64(1), "v1"));
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], EncodeAsUint64(1)));
// Not all keys expired, and there is no risk of write stall. Flush is
// rescheduled. The actual flush happens after `full_history_ts_low` is
// increased to mark all keys expired.
Expand All @@ -3794,7 +3958,7 @@ TEST_F(ColumnFamilyRetainUDTTest, NotAllKeysExpiredFlushRescheduled) {
ASSERT_OK(
db_->GetFullHistoryTsLow(handles_[0], &effective_full_history_ts_low));
// `full_history_ts_low` stays unchanged.
ASSERT_EQ(cutoff_ts, effective_full_history_ts_low);
ASSERT_EQ(EncodeAsUint64(3), effective_full_history_ts_low);
Close();

SyncPoint::GetInstance()->DisableProcessing();
Expand Down
3 changes: 3 additions & 0 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1442,6 +1442,9 @@ class DBImpl : public DB {
Status RenameTempFileToOptionsFile(const std::string& file_name);
Status DeleteObsoleteOptionsFiles();

void NotifyOnManualFlushScheduled(autovector<ColumnFamilyData*> cfds,
FlushReason flush_reason);

void NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta,
const MutableCFOptions& mutable_cf_options,
int job_id, FlushReason flush_reason);
Expand Down
20 changes: 20 additions & 0 deletions db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2310,6 +2310,23 @@ void DBImpl::GenerateFlushRequest(const autovector<ColumnFamilyData*>& cfds,
}
}

void DBImpl::NotifyOnManualFlushScheduled(autovector<ColumnFamilyData*> cfds,
FlushReason flush_reason) {
if (immutable_db_options_.listeners.size() == 0U) {
return;
}
if (shutting_down_.load(std::memory_order_acquire)) {
return;
}
std::vector<ManualFlushInfo> info;
for (ColumnFamilyData* cfd : cfds) {
info.push_back({cfd->GetID(), cfd->GetName(), flush_reason});
}
for (const auto& listener : immutable_db_options_.listeners) {
listener->OnManualFlushScheduled(this, info);
}
}

Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
const FlushOptions& flush_options,
FlushReason flush_reason,
Expand Down Expand Up @@ -2426,6 +2443,8 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
}
}
}

NotifyOnManualFlushScheduled({cfd}, flush_reason);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The FlushRequest generated by FlushMemTable() specifies std::numeric_limits<uint64_t>::max() as the max memtable ID to persist. Is it possible the background flush includes memtables newer than the one in memtable_ids_to_wait? That would mean increasing the cutoff TS here would not guarantee the flush can happen.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good question. Yes, if another memtable is sealed in between the IncreaseFullHistoryTsLow here and the time background flush checks for whether timestamps expired. It could mean that the cutoff TS here is not sufficient to guarantee the flush can happen.

If that sealed memtable is caused by another manual flush type of event, this call back should have also be invoked to increase the cutoff TS to a higher point. If that sealed memtable is caused by regular writes filling up a memtable, this would be an issue, like when the write rate is very high.

I think updating the memtable id in FlushRequest to be GetLatestMemTableID instead of std::numeric_limits<uint64_t>::max() can help make sure the flush can still proceed in this case. Do you have any concerns for making this change?

Copy link
Contributor

@ajkr ajkr Jun 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think updating the memtable id in FlushRequest to be GetLatestMemTableID instead of std::numeric_limits<uint64_t>::max() can help make sure the flush can still proceed in this case. Do you have any concerns for making this change?

Sorry for the delay. It's hard to say. I think that bounding the flushed memtable ID was introduced for atomic_flush. We didn't use it everywhere because it's more efficient (for write-amp, at least) to greedily pick as many memtables as possible at flush-time. That can make a difference when the flush queue is long, which is rare so it's a minor optimization. Also, one could argue that foreground flush latency is more important than write-amp in case of manual flush. So, introducing the limit is fine with me.

Still, would it be enough? There could be a case where manual flush does not generate any flush request because there is already one queued for automatic flush. If that one fails or is postponed, do we add a new flush request with unbounded memtable ID?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the detailed context on this optimization. At flush job creation time, it will check for the latest memtable id again and use that to pick memtables to flush:

uint64_t max_memtable_id =

So I think even if we update this manual FlushRequest to use the latest memtable id, we will still have the optimization you mentioned.

If this manual flush's enqueuing effort didn't succeed because another auto flush request is already enqueued, since those request are generated with GenerateFlushRequest

void DBImpl::GenerateFlushRequest(const autovector<ColumnFamilyData*>& cfds,
, that already enqueued request should have a memtable id that is equal to or smaller than the current latest memtable id. In theory, if the new cutoff timestamp is high enough to let the current latest memtable id proceed, that request can proceed too.

If the automatic flush fails, presumably that would trigger the error recovery flush, which goes through this manual flush path and enqueue another request. The RetryFlushesForErrorRecovery path does not go through this path, I think I should add invoking this callback in that path too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be hard to enforce the memtable picking is non-greedy. Alternatively you could add a post-wait callback. Then the user can:

  • In the post-schedule, pre-wait callback:
    • IncreaseFullHistoryTsLow()
    • Bump an "in manual flush" counter
  • In the post-wait callback
    • Decrement an "in manual flush" counter
  • In the seal callback:
    • If the "in manual flush" counter is nonzero, call IncreaseFullHistoryTsLow()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this idea that uses a combination of these callbacks. Let me do this in a follow up to implement such a flow as an example to handle this edge case.

TEST_SYNC_POINT("DBImpl::FlushMemTable:AfterScheduleFlush");
TEST_SYNC_POINT("DBImpl::FlushMemTable:BeforeWaitForBgFlush");
if (s.ok() && flush_options.wait) {
Expand Down Expand Up @@ -2570,6 +2589,7 @@ Status DBImpl::AtomicFlushMemTables(
}
}
}
NotifyOnManualFlushScheduled(cfds, flush_reason);
TEST_SYNC_POINT("DBImpl::AtomicFlushMemTables:AfterScheduleFlush");
TEST_SYNC_POINT("DBImpl::AtomicFlushMemTables:BeforeWaitForBgFlush");
if (s.ok() && flush_options.wait) {
Expand Down
5 changes: 5 additions & 0 deletions db/db_impl/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2195,6 +2195,11 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
memtable_info.earliest_seqno = cfd->mem()->GetEarliestSequenceNumber();
memtable_info.num_entries = cfd->mem()->num_entries();
memtable_info.num_deletes = cfd->mem()->num_deletes();
if (!cfd->ioptions()->persist_user_defined_timestamps &&
cfd->user_comparator()->timestamp_size() > 0) {
const Slice& newest_udt = cfd->mem()->GetNewestUDT();
memtable_info.newest_udt.assign(newest_udt.data(), newest_udt.size());
}
// Log this later after lock release. It may be outdated, e.g., if background
// flush happens before logging, but that should be ok.
int num_imm_unflushed = cfd->imm()->NumNotFlushed();
Expand Down
Loading
Loading