Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option for manual flush/compaction to ignore unexpired UDT #12585

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,7 @@ ColumnFamilyData::ColumnFamilyData(
refs_(0),
initialized_(false),
dropped_(false),
manual_flush_asks_to_ignore_udt_(false),
internal_comparator_(cf_options.comparator),
initial_cf_options_(SanitizeOptions(db_options, cf_options)),
ioptions_(db_options, initial_cf_options_),
Expand Down
14 changes: 14 additions & 0 deletions db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,16 @@ class ColumnFamilyData {
void SetDropped();
bool IsDropped() const { return dropped_.load(std::memory_order_relaxed); }

void SetManualFlushAsksToIgnoreUDT() {
manual_flush_asks_to_ignore_udt_ = true;
}
bool GetManualFlushAsksToIgnoreUDT() const {
return manual_flush_asks_to_ignore_udt_;
}
void ClearManualFlushAsksToIgnoreUDT() {
manual_flush_asks_to_ignore_udt_ = false;
}

// thread-safe
int NumberLevels() const { return ioptions_.num_levels; }

Expand Down Expand Up @@ -592,6 +602,10 @@ class ColumnFamilyData {
std::atomic<bool> initialized_;
std::atomic<bool> dropped_; // true if client dropped it

// This is accessed while holding db mutex.
// True if a manual flush requesting to ignore unexpired UDT is in progress.
bool manual_flush_asks_to_ignore_udt_;

const InternalKeyComparator internal_comparator_;
InternalTblPropCollFactories internal_tbl_prop_coll_factories_;

Expand Down
88 changes: 57 additions & 31 deletions db/column_family_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,14 @@

namespace ROCKSDB_NAMESPACE {

namespace {
std::string EncodeAsUint64(uint64_t v) {
std::string dst;
PutFixed64(&dst, v);
return dst;
}
} // namespace

static const int kValueSize = 1000;

// counts how many operations were performed
Expand Down Expand Up @@ -3674,21 +3682,17 @@ TEST_F(ColumnFamilyRetainUDTTest, FullHistoryTsLowNotSet) {

SyncPoint::GetInstance()->EnableProcessing();
Open();
std::string write_ts;
PutFixed64(&write_ts, 1);
ASSERT_OK(Put(0, "foo", write_ts, "v1"));
ASSERT_OK(Put(0, "foo", EncodeAsUint64(1), "v1"));
// No `full_history_ts_low` explicitly set by user, flush is continued
// without checking if its UDTs expired.
ASSERT_OK(Flush(0));

// After flush, `full_history_ts_low` should be automatically advanced to
// the effective cutoff timestamp: write_ts + 1
std::string cutoff_ts;
PutFixed64(&cutoff_ts, 2);
std::string effective_full_history_ts_low;
ASSERT_OK(
db_->GetFullHistoryTsLow(handles_[0], &effective_full_history_ts_low));
ASSERT_EQ(cutoff_ts, effective_full_history_ts_low);
ASSERT_EQ(EncodeAsUint64(2), effective_full_history_ts_low);
Close();

SyncPoint::GetInstance()->DisableProcessing();
Expand All @@ -3705,12 +3709,8 @@ TEST_F(ColumnFamilyRetainUDTTest, AllKeysExpired) {

SyncPoint::GetInstance()->EnableProcessing();
Open();
std::string write_ts;
PutFixed64(&write_ts, 1);
ASSERT_OK(Put(0, "foo", write_ts, "v1"));
std::string cutoff_ts;
PutFixed64(&cutoff_ts, 3);
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], cutoff_ts));
ASSERT_OK(Put(0, "foo", EncodeAsUint64(1), "v1"));
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], EncodeAsUint64(3)));
// All keys expired w.r.t the configured `full_history_ts_low`, flush continue
// without the need for a re-schedule.
ASSERT_OK(Flush(0));
Expand All @@ -3719,7 +3719,7 @@ TEST_F(ColumnFamilyRetainUDTTest, AllKeysExpired) {
std::string effective_full_history_ts_low;
ASSERT_OK(
db_->GetFullHistoryTsLow(handles_[0], &effective_full_history_ts_low));
ASSERT_EQ(cutoff_ts, effective_full_history_ts_low);
ASSERT_EQ(EncodeAsUint64(3), effective_full_history_ts_low);
Close();

SyncPoint::GetInstance()->DisableProcessing();
Expand All @@ -3735,12 +3735,8 @@ TEST_F(ColumnFamilyRetainUDTTest, NotAllKeysExpiredFlushToAvoidWriteStall) {

SyncPoint::GetInstance()->EnableProcessing();
Open();
std::string cutoff_ts;
std::string write_ts;
PutFixed64(&write_ts, 1);
ASSERT_OK(Put(0, "foo", write_ts, "v1"));
PutFixed64(&cutoff_ts, 1);
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], cutoff_ts));
ASSERT_OK(Put(0, "foo", EncodeAsUint64(1), "v1"));
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], EncodeAsUint64(1)));
ASSERT_OK(db_->SetOptions(handles_[0], {{"max_write_buffer_number", "1"}}));
// Not all keys expired, but flush is continued without a re-schedule because
// of risk of write stall.
Expand All @@ -3752,24 +3748,57 @@ TEST_F(ColumnFamilyRetainUDTTest, NotAllKeysExpiredFlushToAvoidWriteStall) {
ASSERT_OK(
db_->GetFullHistoryTsLow(handles_[0], &effective_full_history_ts_low));

cutoff_ts.clear();
PutFixed64(&cutoff_ts, 2);
ASSERT_EQ(cutoff_ts, effective_full_history_ts_low);
ASSERT_EQ(EncodeAsUint64(2), effective_full_history_ts_low);
Close();

SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}

TEST_F(ColumnFamilyRetainUDTTest, NotAllKeysExpiredUserAsksToIgnore) {
int num_entries_per_memtable = 2;
column_family_options_.memtable_factory.reset(
test::NewSpecialSkipListFactory(num_entries_per_memtable - 1));
// Make sure there is no memory pressure to not retain udts.
column_family_options_.max_write_buffer_number = 8;
Open();
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], EncodeAsUint64(1)));
ASSERT_OK(Put(0, "bar", EncodeAsUint64(2), "v1"));
ASSERT_OK(Put(0, "baz", EncodeAsUint64(2), "v1"));
// Created the first memtable and scheduled it for flush.
ASSERT_OK(Put(0, "foo", EncodeAsUint64(2), "v1"));
// Not all keys expired but user asks to ignore it.
FlushOptions fopt;
fopt.strict_udt_retention = false;
ASSERT_OK(dbfull()->Flush(fopt, handles_[0]));

std::string effective_full_history_ts_low;
ASSERT_OK(
db_->GetFullHistoryTsLow(handles_[0], &effective_full_history_ts_low));
// `full_history_ts_low` is increased after the flush
ASSERT_EQ(EncodeAsUint64(3), effective_full_history_ts_low);

ASSERT_OK(Put(0, "foo", EncodeAsUint64(4), "v2"));
// Not all keys expired but user asks to ignore it.
CompactRangeOptions copt;
copt.strict_udt_retention = false;
ASSERT_OK(dbfull()->CompactRange(copt, handles_[0], nullptr, nullptr));

ASSERT_OK(
db_->GetFullHistoryTsLow(handles_[0], &effective_full_history_ts_low));
// `full_history_ts_low` is increased after the flush
ASSERT_EQ(EncodeAsUint64(5), effective_full_history_ts_low);
Close();
}

TEST_F(ColumnFamilyRetainUDTTest, NotAllKeysExpiredFlushRescheduled) {
std::string cutoff_ts;
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::AfterRetainUDTReschedule:cb", [&](void* /*arg*/) {
// Increasing full_history_ts_low so all keys expired after the initial
// FlushRequest is rescheduled
cutoff_ts.clear();
PutFixed64(&cutoff_ts, 3);
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], cutoff_ts));
ASSERT_OK(
db_->IncreaseFullHistoryTsLow(handles_[0], EncodeAsUint64(3)));
});
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundFlush:CheckFlushRequest:cb", [&](void* arg) {
Expand All @@ -3780,11 +3809,8 @@ TEST_F(ColumnFamilyRetainUDTTest, NotAllKeysExpiredFlushRescheduled) {
SyncPoint::GetInstance()->EnableProcessing();

Open();
std::string write_ts;
PutFixed64(&write_ts, 1);
ASSERT_OK(Put(0, "foo", write_ts, "v1"));
PutFixed64(&cutoff_ts, 1);
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], cutoff_ts));
ASSERT_OK(Put(0, "foo", EncodeAsUint64(1), "v1"));
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], EncodeAsUint64(1)));
// Not all keys expired, and there is no risk of write stall. Flush is
// rescheduled. The actual flush happens after `full_history_ts_low` is
// increased to mark all keys expired.
Expand All @@ -3794,7 +3820,7 @@ TEST_F(ColumnFamilyRetainUDTTest, NotAllKeysExpiredFlushRescheduled) {
ASSERT_OK(
db_->GetFullHistoryTsLow(handles_[0], &effective_full_history_ts_low));
// `full_history_ts_low` stays unchanged.
ASSERT_EQ(cutoff_ts, effective_full_history_ts_low);
ASSERT_EQ(EncodeAsUint64(3), effective_full_history_ts_low);
Close();

SyncPoint::GetInstance()->DisableProcessing();
Expand Down
18 changes: 18 additions & 0 deletions db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ bool DBImpl::ShouldRescheduleFlushRequestToRetainUDT(
mutex_.AssertHeld();
assert(flush_req.cfd_to_max_mem_id_to_persist.size() == 1);
ColumnFamilyData* cfd = flush_req.cfd_to_max_mem_id_to_persist.begin()->first;
if (cfd->GetManualFlushAsksToIgnoreUDT()) {
return false;
}
uint64_t max_memtable_id =
flush_req.cfd_to_max_mem_id_to_persist.begin()->second;
if (cfd->IsDropped() ||
Expand Down Expand Up @@ -1154,6 +1157,7 @@ Status DBImpl::CompactRangeInternal(const CompactRangeOptions& options,
if (s.ok() && flush_needed) {
FlushOptions fo;
fo.allow_write_stall = options.allow_write_stall;
fo.strict_udt_retention = options.strict_udt_retention;
if (immutable_db_options_.atomic_flush) {
s = AtomicFlushMemTables(fo, FlushReason::kManualCompaction);
} else {
Expand Down Expand Up @@ -2340,6 +2344,9 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
flush_reqs.emplace_back(std::move(req));
memtable_ids_to_wait.emplace_back(
cfd->imm()->GetLatestMemTableID(false /* for_atomic_flush */));
if (!flush_options.strict_udt_retention) {
cfd->SetManualFlushAsksToIgnoreUDT();
}
}
if (immutable_db_options_.persist_stats_to_disk) {
ColumnFamilyData* cfd_stats =
Expand Down Expand Up @@ -2422,6 +2429,10 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
flush_reason == FlushReason::kErrorRecovery /* resuming_from_bg_err */);
InstrumentedMutexLock lock_guard(&mutex_);
for (auto* tmp_cfd : cfds) {
if (!flush_options.strict_udt_retention) {
// This is no-op for statistics cfd.
tmp_cfd->ClearManualFlushAsksToIgnoreUDT();
}
tmp_cfd->UnrefAndTryDelete();
}
}
Expand All @@ -2440,6 +2451,13 @@ Status DBImpl::AtomicFlushMemTables(
"Please try again later after writes are resumed";
return Status::TryAgain(oss.str());
}
if (!flush_options.strict_udt_retention) {
std::ostringstream oss;
oss << "User-defined timestamps in Memtable only feature is not compatible"
"with atomic flush. FlushOptions.strict_udt_retention should not be"
"set to false.";
return Status::TryAgain(oss.str());
}
Status s;
autovector<ColumnFamilyData*> candidate_cfds;
if (provided_candidate_cfds.empty()) {
Expand Down
14 changes: 13 additions & 1 deletion include/rocksdb/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -1926,7 +1926,13 @@ struct FlushOptions {
// is performed by someone else (foreground call or background thread).
// Default: false
bool allow_write_stall;
FlushOptions() : wait(true), allow_write_stall(false) {}
// If false, will not try to retain user-defined timestamps by rescheduling
// flush. This option only applies for when user-defined timestamps is enabled
// and `persist_user_defined_timestamps` is set to false.
// Default: true
bool strict_udt_retention;
FlushOptions()
: wait(true), allow_write_stall(false), strict_udt_retention(true) {}
};

// Create a Logger from provided DBOptions
Expand Down Expand Up @@ -2021,6 +2027,12 @@ struct CompactRangeOptions {
// Set user-defined timestamp low bound, the data with older timestamp than
// low bound maybe GCed by compaction. Default: nullptr
const Slice* full_history_ts_low = nullptr;
Comment on lines 2027 to 2029
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this support setting a higher value than GetFullHistoryTsLow() without ever increasing it on the CF? If so I was thinking perhaps they could set it here to the max timestamp so just the CompactRange() can drop history.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this actually only supports setting a higher value than GetFullHistoryTsLow(), it will updates the column family's cutoff timestamp which does not allow going back down. So once it's set to max timestamp, it kind of is setting to no retention going forward, for auto flush too.

// If false, will not try to retain user-defined timestamps by rescheduling
// flush. This option only applies for when user-defined timestamps is enabled
// and `persist_user_defined_timestamps` is set to false. Also see
// FlushOptions.strict_udt_retention
// Default: true
bool strict_udt_retention = true;

// Allows cancellation of an in-progress manual compaction.
//
Expand Down