Skip to content

Commit

Permalink
Add option for manual flush/compaction to ignore unexpired UDT
Browse files Browse the repository at this point in the history
  • Loading branch information
jowlyzhang committed Apr 25, 2024
1 parent 1fca175 commit c6b463d
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 45 deletions.
91 changes: 60 additions & 31 deletions db/column_family_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,14 @@

namespace ROCKSDB_NAMESPACE {

namespace {
std::string EncodeAsUint64(uint64_t v) {
std::string dst;
PutFixed64(&dst, v);
return dst;
}
} // namespace

static const int kValueSize = 1000;

// counts how many operations were performed
Expand Down Expand Up @@ -3674,21 +3682,17 @@ TEST_F(ColumnFamilyRetainUDTTest, FullHistoryTsLowNotSet) {

SyncPoint::GetInstance()->EnableProcessing();
Open();
std::string write_ts;
PutFixed64(&write_ts, 1);
ASSERT_OK(Put(0, "foo", write_ts, "v1"));
ASSERT_OK(Put(0, "foo", EncodeAsUint64(1), "v1"));
// No `full_history_ts_low` explicitly set by user, flush is continued
// without checking if its UDTs expired.
ASSERT_OK(Flush(0));

// After flush, `full_history_ts_low` should be automatically advanced to
// the effective cutoff timestamp: write_ts + 1
std::string cutoff_ts;
PutFixed64(&cutoff_ts, 2);
std::string effective_full_history_ts_low;
ASSERT_OK(
db_->GetFullHistoryTsLow(handles_[0], &effective_full_history_ts_low));
ASSERT_EQ(cutoff_ts, effective_full_history_ts_low);
ASSERT_EQ(EncodeAsUint64(2), effective_full_history_ts_low);
Close();

SyncPoint::GetInstance()->DisableProcessing();
Expand All @@ -3705,12 +3709,8 @@ TEST_F(ColumnFamilyRetainUDTTest, AllKeysExpired) {

SyncPoint::GetInstance()->EnableProcessing();
Open();
std::string write_ts;
PutFixed64(&write_ts, 1);
ASSERT_OK(Put(0, "foo", write_ts, "v1"));
std::string cutoff_ts;
PutFixed64(&cutoff_ts, 3);
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], cutoff_ts));
ASSERT_OK(Put(0, "foo", EncodeAsUint64(1), "v1"));
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], EncodeAsUint64(3)));
// All keys expired w.r.t the configured `full_history_ts_low`, flush continue
// without the need for a re-schedule.
ASSERT_OK(Flush(0));
Expand All @@ -3719,7 +3719,7 @@ TEST_F(ColumnFamilyRetainUDTTest, AllKeysExpired) {
std::string effective_full_history_ts_low;
ASSERT_OK(
db_->GetFullHistoryTsLow(handles_[0], &effective_full_history_ts_low));
ASSERT_EQ(cutoff_ts, effective_full_history_ts_low);
ASSERT_EQ(EncodeAsUint64(3), effective_full_history_ts_low);
Close();

SyncPoint::GetInstance()->DisableProcessing();
Expand All @@ -3735,12 +3735,8 @@ TEST_F(ColumnFamilyRetainUDTTest, NotAllKeysExpiredFlushToAvoidWriteStall) {

SyncPoint::GetInstance()->EnableProcessing();
Open();
std::string cutoff_ts;
std::string write_ts;
PutFixed64(&write_ts, 1);
ASSERT_OK(Put(0, "foo", write_ts, "v1"));
PutFixed64(&cutoff_ts, 1);
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], cutoff_ts));
ASSERT_OK(Put(0, "foo", EncodeAsUint64(1), "v1"));
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], EncodeAsUint64(1)));
ASSERT_OK(db_->SetOptions(handles_[0], {{"max_write_buffer_number", "1"}}));
// Not all keys expired, but flush is continued without a re-schedule because
// of risk of write stall.
Expand All @@ -3752,9 +3748,46 @@ TEST_F(ColumnFamilyRetainUDTTest, NotAllKeysExpiredFlushToAvoidWriteStall) {
ASSERT_OK(
db_->GetFullHistoryTsLow(handles_[0], &effective_full_history_ts_low));

cutoff_ts.clear();
PutFixed64(&cutoff_ts, 2);
ASSERT_EQ(cutoff_ts, effective_full_history_ts_low);
ASSERT_EQ(EncodeAsUint64(2), effective_full_history_ts_low);
Close();

SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}

TEST_F(ColumnFamilyRetainUDTTest, NotAllKeysExpiredUserAsksToIgnore) {
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundFlush:CheckFlushRequest:cb", [&](void* arg) {
ASSERT_NE(nullptr, arg);
auto reschedule_count = *static_cast<int*>(arg);
ASSERT_EQ(1, reschedule_count);
});
SyncPoint::GetInstance()->EnableProcessing();

Open();
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], EncodeAsUint64(1)));
ASSERT_OK(Put(0, "foo", EncodeAsUint64(2), "v1"));
// Not all keys expired but user asks to ignore it.
FlushOptions fopt;
fopt.ignore_unexpired_udt = true;
ASSERT_OK(dbfull()->Flush(fopt, handles_[0]));

std::string effective_full_history_ts_low;
ASSERT_OK(
db_->GetFullHistoryTsLow(handles_[0], &effective_full_history_ts_low));
// `full_history_ts_low` is increased after the flush
ASSERT_EQ(EncodeAsUint64(3), effective_full_history_ts_low);

ASSERT_OK(Put(0, "foo", EncodeAsUint64(4), "v2"));
// Not all keys expired but user asks to ignore it.
CompactRangeOptions copt;
copt.ignore_unexpired_udt_for_flush = true;
ASSERT_OK(dbfull()->CompactRange(copt, handles_[0], nullptr, nullptr));

ASSERT_OK(
db_->GetFullHistoryTsLow(handles_[0], &effective_full_history_ts_low));
// `full_history_ts_low` is increased after the flush
ASSERT_EQ(EncodeAsUint64(5), effective_full_history_ts_low);
Close();

SyncPoint::GetInstance()->DisableProcessing();
Expand All @@ -3767,9 +3800,8 @@ TEST_F(ColumnFamilyRetainUDTTest, NotAllKeysExpiredFlushRescheduled) {
"DBImpl::AfterRetainUDTReschedule:cb", [&](void* /*arg*/) {
// Increasing full_history_ts_low so all keys expired after the initial
// FlushRequest is rescheduled
cutoff_ts.clear();
PutFixed64(&cutoff_ts, 3);
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], cutoff_ts));
ASSERT_OK(
db_->IncreaseFullHistoryTsLow(handles_[0], EncodeAsUint64(3)));
});
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundFlush:CheckFlushRequest:cb", [&](void* arg) {
Expand All @@ -3780,11 +3812,8 @@ TEST_F(ColumnFamilyRetainUDTTest, NotAllKeysExpiredFlushRescheduled) {
SyncPoint::GetInstance()->EnableProcessing();

Open();
std::string write_ts;
PutFixed64(&write_ts, 1);
ASSERT_OK(Put(0, "foo", write_ts, "v1"));
PutFixed64(&cutoff_ts, 1);
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], cutoff_ts));
ASSERT_OK(Put(0, "foo", EncodeAsUint64(1), "v1"));
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], EncodeAsUint64(1)));
// Not all keys expired, and there is no risk of write stall. Flush is
// rescheduled. The actual flush happens after `full_history_ts_low` is
// increased to mark all keys expired.
Expand All @@ -3794,7 +3823,7 @@ TEST_F(ColumnFamilyRetainUDTTest, NotAllKeysExpiredFlushRescheduled) {
ASSERT_OK(
db_->GetFullHistoryTsLow(handles_[0], &effective_full_history_ts_low));
// `full_history_ts_low` stays unchanged.
ASSERT_EQ(cutoff_ts, effective_full_history_ts_low);
ASSERT_EQ(EncodeAsUint64(3), effective_full_history_ts_low);
Close();

SyncPoint::GetInstance()->DisableProcessing();
Expand Down
9 changes: 8 additions & 1 deletion db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -2139,6 +2139,12 @@ class DBImpl : public DB {
std::unordered_map<ColumnFamilyData*, uint64_t>
cfd_to_max_mem_id_to_persist;

// When this is true, FlushRequest will not be rescheduled to retain
// unexpired user-defined timestamps. This should only be set to true by
// manual Flush and the default is false, the user needs to be explicitly
// set this to true. All automatic flushes should have this set to false.
bool ignore_unexpired_udt;

#ifndef NDEBUG
int reschedule_count = 1;
#endif /* !NDEBUG */
Expand All @@ -2151,7 +2157,8 @@ class DBImpl : public DB {
//
// REQUIRES: mutex held
void GenerateFlushRequest(const autovector<ColumnFamilyData*>& cfds,
FlushReason flush_reason, FlushRequest* req);
FlushReason flush_reason, bool ignore_unexpired_udt,
FlushRequest* req);

void SchedulePendingFlush(const FlushRequest& req);

Expand Down
33 changes: 27 additions & 6 deletions db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ bool DBImpl::ShouldRescheduleFlushRequestToRetainUDT(
const FlushRequest& flush_req) {
mutex_.AssertHeld();
assert(flush_req.cfd_to_max_mem_id_to_persist.size() == 1);
if (flush_req.ignore_unexpired_udt) {
return false;
}
ColumnFamilyData* cfd = flush_req.cfd_to_max_mem_id_to_persist.begin()->first;
uint64_t max_memtable_id =
flush_req.cfd_to_max_mem_id_to_persist.begin()->second;
Expand Down Expand Up @@ -1154,6 +1157,7 @@ Status DBImpl::CompactRangeInternal(const CompactRangeOptions& options,
if (s.ok() && flush_needed) {
FlushOptions fo;
fo.allow_write_stall = options.allow_write_stall;
fo.ignore_unexpired_udt = options.ignore_unexpired_udt_for_flush;
if (immutable_db_options_.atomic_flush) {
s = AtomicFlushMemTables(fo, FlushReason::kManualCompaction);
} else {
Expand Down Expand Up @@ -2267,10 +2271,13 @@ Status DBImpl::RunManualCompaction(
}

void DBImpl::GenerateFlushRequest(const autovector<ColumnFamilyData*>& cfds,
FlushReason flush_reason, FlushRequest* req) {
FlushReason flush_reason,
bool ignore_unexpired_udt,
FlushRequest* req) {
assert(req != nullptr);
req->flush_reason = flush_reason;
req->cfd_to_max_mem_id_to_persist.reserve(cfds.size());
req->ignore_unexpired_udt = ignore_unexpired_udt;
for (const auto cfd : cfds) {
if (nullptr == cfd) {
// cfd may be null, see DBImpl::ScheduleFlushes
Expand Down Expand Up @@ -2328,7 +2335,9 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
if (s.ok()) {
if (cfd->imm()->NumNotFlushed() != 0 || !cfd->mem()->IsEmpty() ||
!cached_recoverable_state_empty_.load()) {
FlushRequest req{flush_reason, {{cfd, flush_memtable_id}}};
FlushRequest req{flush_reason,
{{cfd, flush_memtable_id}},
flush_options.ignore_unexpired_udt};
flush_reqs.emplace_back(std::move(req));
memtable_ids_to_wait.emplace_back(
cfd->imm()->GetLatestMemTableID(false /* for_atomic_flush */));
Expand Down Expand Up @@ -2356,7 +2365,9 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
"to avoid holding old logs",
cfd->GetName().c_str());
s = SwitchMemtable(cfd_stats, &context);
FlushRequest req{flush_reason, {{cfd_stats, flush_memtable_id}}};
FlushRequest req{flush_reason,
{{cfd_stats, flush_memtable_id}},
/*ignore_unexpired_udt=*/false};
flush_reqs.emplace_back(std::move(req));
memtable_ids_to_wait.emplace_back(
cfd_stats->imm()->GetLatestMemTableID(
Expand Down Expand Up @@ -2432,6 +2443,13 @@ Status DBImpl::AtomicFlushMemTables(
"Please try again later after writes are resumed";
return Status::TryAgain(oss.str());
}
if (flush_options.ignore_unexpired_udt) {
std::ostringstream oss;
oss << "User-defined timestamps in Memtable only feature is not compatible"
"with atomic flush. FlushOptions.ignore_unexpired_udt should not be"
"set to true.";
return Status::TryAgain(oss.str());
}
Status s;
autovector<ColumnFamilyData*> candidate_cfds;
if (provided_candidate_cfds.empty()) {
Expand Down Expand Up @@ -2530,7 +2548,8 @@ Status DBImpl::AtomicFlushMemTables(
cfd->Ref();
}
}
GenerateFlushRequest(cfds, flush_reason, &flush_req);
GenerateFlushRequest(cfds, flush_reason, /*ignore_unexpired_udt=*/false,
&flush_req);
SchedulePendingFlush(flush_req);
MaybeScheduleFlushOrCompaction();
}
Expand Down Expand Up @@ -2584,7 +2603,8 @@ Status DBImpl::RetryFlushesForErrorRecovery(FlushReason flush_reason,
autovector<uint64_t> flush_memtable_ids;
if (immutable_db_options_.atomic_flush) {
FlushRequest flush_req;
GenerateFlushRequest(cfds, flush_reason, &flush_req);
GenerateFlushRequest(cfds, flush_reason, /*ignore_unexpired_udt=*/false,
&flush_req);
SchedulePendingFlush(flush_req);
for (auto& iter : flush_req.cfd_to_max_mem_id_to_persist) {
flush_memtable_ids.push_back(iter.second);
Expand All @@ -2598,7 +2618,8 @@ Status DBImpl::RetryFlushesForErrorRecovery(FlushReason flush_reason,
FlushRequest flush_req{
flush_reason,
{{cfd,
std::numeric_limits<uint64_t>::max() /* max_mem_id_to_persist */}}};
std::numeric_limits<uint64_t>::max() /* max_mem_id_to_persist */}},
/*ignore_unexpired_udt=*/true};
SchedulePendingFlush(flush_req);
}
}
Expand Down
17 changes: 11 additions & 6 deletions db/db_impl/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1742,13 +1742,15 @@ Status DBImpl::SwitchWAL(WriteContext* write_context) {
cfd->imm()->FlushRequested();
if (!immutable_db_options_.atomic_flush) {
FlushRequest flush_req;
GenerateFlushRequest({cfd}, FlushReason::kWalFull, &flush_req);
GenerateFlushRequest({cfd}, FlushReason::kWalFull,
/*ignore_unexpired_udt=*/false, &flush_req);
SchedulePendingFlush(flush_req);
}
}
if (immutable_db_options_.atomic_flush) {
FlushRequest flush_req;
GenerateFlushRequest(cfds, FlushReason::kWalFull, &flush_req);
GenerateFlushRequest(cfds, FlushReason::kWalFull,
/*ignore_unexpired_udt=*/false, &flush_req);
SchedulePendingFlush(flush_req);
}
MaybeScheduleFlushOrCompaction();
Expand Down Expand Up @@ -1834,13 +1836,14 @@ Status DBImpl::HandleWriteBufferManagerFlush(WriteContext* write_context) {
if (!immutable_db_options_.atomic_flush) {
FlushRequest flush_req;
GenerateFlushRequest({cfd}, FlushReason::kWriteBufferManager,
&flush_req);
/*ignore_unexpired_udt=*/false, &flush_req);
SchedulePendingFlush(flush_req);
}
}
if (immutable_db_options_.atomic_flush) {
FlushRequest flush_req;
GenerateFlushRequest(cfds, FlushReason::kWriteBufferManager, &flush_req);
GenerateFlushRequest(cfds, FlushReason::kWriteBufferManager,
/*ignore_unexpired_udt=*/false, &flush_req);
SchedulePendingFlush(flush_req);
}
MaybeScheduleFlushOrCompaction();
Expand Down Expand Up @@ -2116,12 +2119,14 @@ Status DBImpl::ScheduleFlushes(WriteContext* context) {
if (immutable_db_options_.atomic_flush) {
AssignAtomicFlushSeq(cfds);
FlushRequest flush_req;
GenerateFlushRequest(cfds, FlushReason::kWriteBufferFull, &flush_req);
GenerateFlushRequest(cfds, FlushReason::kWriteBufferFull,
/*ignore_unexpired_udt=*/false, &flush_req);
SchedulePendingFlush(flush_req);
} else {
for (auto* cfd : cfds) {
FlushRequest flush_req;
GenerateFlushRequest({cfd}, FlushReason::kWriteBufferFull, &flush_req);
GenerateFlushRequest({cfd}, FlushReason::kWriteBufferFull,
/*ignore_unexpired_udt=*/false, &flush_req);
SchedulePendingFlush(flush_req);
}
}
Expand Down
14 changes: 13 additions & 1 deletion include/rocksdb/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -1925,7 +1925,13 @@ struct FlushOptions {
// is performed by someone else (foreground call or background thread).
// Default: false
bool allow_write_stall;
FlushOptions() : wait(true), allow_write_stall(false) {}
// If true, will not try to retain user-defined timestamps by rescheduling
// flush. This option only applies for when user-defined timestamps is enabled
// and `persist_user_defined_timestamps` is set to false.
// Default: false
bool ignore_unexpired_udt;
FlushOptions()
: wait(true), allow_write_stall(false), ignore_unexpired_udt(false) {}
};

// Create a Logger from provided DBOptions
Expand Down Expand Up @@ -2011,6 +2017,12 @@ struct CompactRangeOptions {
// Set user-defined timestamp low bound, the data with older timestamp than
// low bound maybe GCed by compaction. Default: nullptr
const Slice* full_history_ts_low = nullptr;
// If true, will not try to retain user-defined timestamps by rescheduling
// flush. This option only applies for when user-defined timestamps is enabled
// and `persist_user_defined_timestamps` is set to false. Also see
// FlushOptions.ignore_unexpired_udt
// Default: false
bool ignore_unexpired_udt_for_flush = false;

// Allows cancellation of an in-progress manual compaction.
//
Expand Down

0 comments on commit c6b463d

Please sign in to comment.