Skip to content

Commit

Permalink
Add option for manual flush/compaction to ignore unexpired UDT
Browse files Browse the repository at this point in the history
  • Loading branch information
jowlyzhang committed May 1, 2024
1 parent 8b3d9e6 commit 3a8703a
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 32 deletions.
1 change: 1 addition & 0 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,7 @@ ColumnFamilyData::ColumnFamilyData(
refs_(0),
initialized_(false),
dropped_(false),
manual_flush_asks_to_ignore_udt_(false),
internal_comparator_(cf_options.comparator),
initial_cf_options_(SanitizeOptions(db_options, cf_options)),
ioptions_(db_options, initial_cf_options_),
Expand Down
14 changes: 14 additions & 0 deletions db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,16 @@ class ColumnFamilyData {
void SetDropped();
bool IsDropped() const { return dropped_.load(std::memory_order_relaxed); }

void SetManualFlushAsksToIgnoreUDT() {
manual_flush_asks_to_ignore_udt_ = true;
}
bool GetManualFlushAsksToIgnoreUDT() const {
return manual_flush_asks_to_ignore_udt_;
}
void ClearManualFlushAsksToIgnoreUDT() {
manual_flush_asks_to_ignore_udt_ = false;
}

// thread-safe
int NumberLevels() const { return ioptions_.num_levels; }

Expand Down Expand Up @@ -592,6 +602,10 @@ class ColumnFamilyData {
std::atomic<bool> initialized_;
std::atomic<bool> dropped_; // true if client dropped it

// This is accessed while holding db mutex.
// True if a manual flush requesting to ignore unexpired UDT is in progress.
bool manual_flush_asks_to_ignore_udt_;

const InternalKeyComparator internal_comparator_;
InternalTblPropCollFactories internal_tbl_prop_coll_factories_;

Expand Down
86 changes: 55 additions & 31 deletions db/column_family_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,14 @@

namespace ROCKSDB_NAMESPACE {

namespace {
std::string EncodeAsUint64(uint64_t v) {
std::string dst;
PutFixed64(&dst, v);
return dst;
}
} // namespace

static const int kValueSize = 1000;

// counts how many operations were performed
Expand Down Expand Up @@ -3674,21 +3682,17 @@ TEST_F(ColumnFamilyRetainUDTTest, FullHistoryTsLowNotSet) {

SyncPoint::GetInstance()->EnableProcessing();
Open();
std::string write_ts;
PutFixed64(&write_ts, 1);
ASSERT_OK(Put(0, "foo", write_ts, "v1"));
ASSERT_OK(Put(0, "foo", EncodeAsUint64(1), "v1"));
// No `full_history_ts_low` explicitly set by user, flush is continued
// without checking if its UDTs expired.
ASSERT_OK(Flush(0));

// After flush, `full_history_ts_low` should be automatically advanced to
// the effective cutoff timestamp: write_ts + 1
std::string cutoff_ts;
PutFixed64(&cutoff_ts, 2);
std::string effective_full_history_ts_low;
ASSERT_OK(
db_->GetFullHistoryTsLow(handles_[0], &effective_full_history_ts_low));
ASSERT_EQ(cutoff_ts, effective_full_history_ts_low);
ASSERT_EQ(EncodeAsUint64(2), effective_full_history_ts_low);
Close();

SyncPoint::GetInstance()->DisableProcessing();
Expand All @@ -3705,12 +3709,8 @@ TEST_F(ColumnFamilyRetainUDTTest, AllKeysExpired) {

SyncPoint::GetInstance()->EnableProcessing();
Open();
std::string write_ts;
PutFixed64(&write_ts, 1);
ASSERT_OK(Put(0, "foo", write_ts, "v1"));
std::string cutoff_ts;
PutFixed64(&cutoff_ts, 3);
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], cutoff_ts));
ASSERT_OK(Put(0, "foo", EncodeAsUint64(1), "v1"));
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], EncodeAsUint64(3)));
// All keys expired w.r.t the configured `full_history_ts_low`, flush continue
// without the need for a re-schedule.
ASSERT_OK(Flush(0));
Expand All @@ -3719,7 +3719,7 @@ TEST_F(ColumnFamilyRetainUDTTest, AllKeysExpired) {
std::string effective_full_history_ts_low;
ASSERT_OK(
db_->GetFullHistoryTsLow(handles_[0], &effective_full_history_ts_low));
ASSERT_EQ(cutoff_ts, effective_full_history_ts_low);
ASSERT_EQ(EncodeAsUint64(3), effective_full_history_ts_low);
Close();

SyncPoint::GetInstance()->DisableProcessing();
Expand All @@ -3735,12 +3735,8 @@ TEST_F(ColumnFamilyRetainUDTTest, NotAllKeysExpiredFlushToAvoidWriteStall) {

SyncPoint::GetInstance()->EnableProcessing();
Open();
std::string cutoff_ts;
std::string write_ts;
PutFixed64(&write_ts, 1);
ASSERT_OK(Put(0, "foo", write_ts, "v1"));
PutFixed64(&cutoff_ts, 1);
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], cutoff_ts));
ASSERT_OK(Put(0, "foo", EncodeAsUint64(1), "v1"));
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], EncodeAsUint64(1)));
ASSERT_OK(db_->SetOptions(handles_[0], {{"max_write_buffer_number", "1"}}));
// Not all keys expired, but flush is continued without a re-schedule because
// of risk of write stall.
Expand All @@ -3752,24 +3748,55 @@ TEST_F(ColumnFamilyRetainUDTTest, NotAllKeysExpiredFlushToAvoidWriteStall) {
ASSERT_OK(
db_->GetFullHistoryTsLow(handles_[0], &effective_full_history_ts_low));

cutoff_ts.clear();
PutFixed64(&cutoff_ts, 2);
ASSERT_EQ(cutoff_ts, effective_full_history_ts_low);
ASSERT_EQ(EncodeAsUint64(2), effective_full_history_ts_low);
Close();

SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}

TEST_F(ColumnFamilyRetainUDTTest, NotAllKeysExpiredUserAsksToIgnore) {
int num_entries_per_memtable = 2;
column_family_options_.memtable_factory.reset(
test::NewSpecialSkipListFactory(num_entries_per_memtable - 1));
column_family_options_.max_write_buffer_number = 8;
Open();
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], EncodeAsUint64(1)));
ASSERT_OK(Put(0, "bar", EncodeAsUint64(2), "v1"));
ASSERT_OK(Put(0, "baz", EncodeAsUint64(2), "v1"));
ASSERT_OK(Put(0, "foo", EncodeAsUint64(2), "v1"));
// Not all keys expired but user asks to ignore it.
FlushOptions fopt;
fopt.ignore_unexpired_udt = true;
ASSERT_OK(dbfull()->Flush(fopt, handles_[0]));

std::string effective_full_history_ts_low;
ASSERT_OK(
db_->GetFullHistoryTsLow(handles_[0], &effective_full_history_ts_low));
// `full_history_ts_low` is increased after the flush
ASSERT_EQ(EncodeAsUint64(3), effective_full_history_ts_low);

ASSERT_OK(Put(0, "foo", EncodeAsUint64(4), "v2"));
// Not all keys expired but user asks to ignore it.
CompactRangeOptions copt;
copt.ignore_unexpired_udt_for_flush = true;
ASSERT_OK(dbfull()->CompactRange(copt, handles_[0], nullptr, nullptr));

ASSERT_OK(
db_->GetFullHistoryTsLow(handles_[0], &effective_full_history_ts_low));
// `full_history_ts_low` is increased after the flush
ASSERT_EQ(EncodeAsUint64(5), effective_full_history_ts_low);
Close();
}

TEST_F(ColumnFamilyRetainUDTTest, NotAllKeysExpiredFlushRescheduled) {
std::string cutoff_ts;
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::AfterRetainUDTReschedule:cb", [&](void* /*arg*/) {
// Increasing full_history_ts_low so all keys expired after the initial
// FlushRequest is rescheduled
cutoff_ts.clear();
PutFixed64(&cutoff_ts, 3);
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], cutoff_ts));
ASSERT_OK(
db_->IncreaseFullHistoryTsLow(handles_[0], EncodeAsUint64(3)));
});
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundFlush:CheckFlushRequest:cb", [&](void* arg) {
Expand All @@ -3780,11 +3807,8 @@ TEST_F(ColumnFamilyRetainUDTTest, NotAllKeysExpiredFlushRescheduled) {
SyncPoint::GetInstance()->EnableProcessing();

Open();
std::string write_ts;
PutFixed64(&write_ts, 1);
ASSERT_OK(Put(0, "foo", write_ts, "v1"));
PutFixed64(&cutoff_ts, 1);
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], cutoff_ts));
ASSERT_OK(Put(0, "foo", EncodeAsUint64(1), "v1"));
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], EncodeAsUint64(1)));
// Not all keys expired, and there is no risk of write stall. Flush is
// rescheduled. The actual flush happens after `full_history_ts_low` is
// increased to mark all keys expired.
Expand All @@ -3794,7 +3818,7 @@ TEST_F(ColumnFamilyRetainUDTTest, NotAllKeysExpiredFlushRescheduled) {
ASSERT_OK(
db_->GetFullHistoryTsLow(handles_[0], &effective_full_history_ts_low));
// `full_history_ts_low` stays unchanged.
ASSERT_EQ(cutoff_ts, effective_full_history_ts_low);
ASSERT_EQ(EncodeAsUint64(3), effective_full_history_ts_low);
Close();

SyncPoint::GetInstance()->DisableProcessing();
Expand Down
18 changes: 18 additions & 0 deletions db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ bool DBImpl::ShouldRescheduleFlushRequestToRetainUDT(
mutex_.AssertHeld();
assert(flush_req.cfd_to_max_mem_id_to_persist.size() == 1);
ColumnFamilyData* cfd = flush_req.cfd_to_max_mem_id_to_persist.begin()->first;
if (cfd->GetManualFlushAsksToIgnoreUDT()) {
return false;
}
uint64_t max_memtable_id =
flush_req.cfd_to_max_mem_id_to_persist.begin()->second;
if (cfd->IsDropped() ||
Expand Down Expand Up @@ -1154,6 +1157,7 @@ Status DBImpl::CompactRangeInternal(const CompactRangeOptions& options,
if (s.ok() && flush_needed) {
FlushOptions fo;
fo.allow_write_stall = options.allow_write_stall;
fo.ignore_unexpired_udt = options.ignore_unexpired_udt_for_flush;
if (immutable_db_options_.atomic_flush) {
s = AtomicFlushMemTables(fo, FlushReason::kManualCompaction);
} else {
Expand Down Expand Up @@ -2340,6 +2344,9 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
flush_reqs.emplace_back(std::move(req));
memtable_ids_to_wait.emplace_back(
cfd->imm()->GetLatestMemTableID(false /* for_atomic_flush */));
if (flush_options.ignore_unexpired_udt) {
cfd->SetManualFlushAsksToIgnoreUDT();
}
}
if (immutable_db_options_.persist_stats_to_disk) {
ColumnFamilyData* cfd_stats =
Expand Down Expand Up @@ -2422,6 +2429,10 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
flush_reason == FlushReason::kErrorRecovery /* resuming_from_bg_err */);
InstrumentedMutexLock lock_guard(&mutex_);
for (auto* tmp_cfd : cfds) {
if (flush_options.ignore_unexpired_udt) {
// This is no-op for statistics cfd.
tmp_cfd->ClearManualFlushAsksToIgnoreUDT();
}
tmp_cfd->UnrefAndTryDelete();
}
}
Expand All @@ -2440,6 +2451,13 @@ Status DBImpl::AtomicFlushMemTables(
"Please try again later after writes are resumed";
return Status::TryAgain(oss.str());
}
if (flush_options.ignore_unexpired_udt) {
std::ostringstream oss;
oss << "User-defined timestamps in Memtable only feature is not compatible"
"with atomic flush. FlushOptions.ignore_unexpired_udt should not be"
"set to true.";
return Status::TryAgain(oss.str());
}
Status s;
autovector<ColumnFamilyData*> candidate_cfds;
if (provided_candidate_cfds.empty()) {
Expand Down
14 changes: 13 additions & 1 deletion include/rocksdb/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -1926,7 +1926,13 @@ struct FlushOptions {
// is performed by someone else (foreground call or background thread).
// Default: false
bool allow_write_stall;
FlushOptions() : wait(true), allow_write_stall(false) {}
// If true, will not try to retain user-defined timestamps by rescheduling
// flush. This option only applies for when user-defined timestamps is enabled
// and `persist_user_defined_timestamps` is set to false.
// Default: false
bool ignore_unexpired_udt;
FlushOptions()
: wait(true), allow_write_stall(false), ignore_unexpired_udt(false) {}
};

// Create a Logger from provided DBOptions
Expand Down Expand Up @@ -2021,6 +2027,12 @@ struct CompactRangeOptions {
// Set user-defined timestamp low bound, the data with older timestamp than
// low bound maybe GCed by compaction. Default: nullptr
const Slice* full_history_ts_low = nullptr;
// If true, will not try to retain user-defined timestamps by rescheduling
// flush. This option only applies for when user-defined timestamps is enabled
// and `persist_user_defined_timestamps` is set to false. Also see
// FlushOptions.ignore_unexpired_udt
// Default: false
bool ignore_unexpired_udt_for_flush = false;

// Allows cancellation of an in-progress manual compaction.
//
Expand Down

0 comments on commit 3a8703a

Please sign in to comment.