Skip to content

Commit

Permalink
Fix/cleanup SeqnoToTimeMapping (#12253)
Browse files Browse the repository at this point in the history
Summary:
The SeqnoToTimeMapping class (RocksDB internal) used by the preserve_internal_time_seconds / preclude_last_level_data_seconds options was essentially in a prototype state with some significant flaws that would risk biting us some day. This is a big, complicated change because both the implementation and the behavioral requirements of the class needed to be upgraded together. In short, this makes SeqnoToTimeMapping more internally responsible for maintaining good invariants, so that callers don't easily encounter dangerous scenarios.

* Some API functions were confusingly named and structured, so I fully refactored the APIs to use clear naming (e.g. `DecodeFrom` and `CopyFromSeqnoRange`), object states, function preconditions, etc.
  * Previously the object could informally be sorted / compacted or not, and there was limited checking or enforcement on these states. Now there's a well-defined "enforced" state that is consistently checked in debug mode for applicable operations. (I attempted to create a separate "builder" class for unenforced states, but IIRC found that more cumbersome for existing uses than it was worth.)
* Previously operations would coalesce data in a way that was better for `GetProximalTimeBeforeSeqno` than for `GetProximalSeqnoBeforeTime` which is odd because the latter is the only one used by DB code currently (what is the seqno cut-off for data definitely older than this given time?). This is now reversed to consistently favor `GetProximalSeqnoBeforeTime`, with that logic concentrated in one place: `SeqnoToTimeMapping::SeqnoTimePair::Merge()`. Unfortunately, a lot of unit test logic was specifically testing the old, suboptimal behavior.
* Previously, the natural behavior of SeqnoToTimeMapping was to THROW AWAY data needed to get reasonable answers to the important `GetProximalSeqnoBeforeTime` queries. This is because SeqnoToTimeMapping only had a FIFO policy for staying within the entry capacity (except in aggregate+sort+serialize mode). If the DB wasn't extremely careful to avoid gathering too many time mappings, it could lose track of where the seqno cutoff was for cold data (`GetProximalSeqnoBeforeTime()` returning 0) and preventing all further data migration to the cold tier--until time passes etc. for mappings to catch up with FIFO purging of them. (The problem is not so acute because SST files contain relevant snapshots of the mappings, but the problem would apply to long-lived memtables.)
  * Now the SeqnoToTimeMapping class has fully-integrated smarts for keeping a sufficiently complete history, within capacity limits, to give good answers to `GetProximalSeqnoBeforeTime` queries.
  * Fixes old `// FIXME: be smarter about how we erase to avoid data falling off the front prematurely.`
* Fix an apparent bug in how entries are selected for storing into SST files. Previously, it only selected entries within the seqno range of the file, but that would easily leave a gap at the beginning of the timeline for data in the file for the purposes of answering GetProximalXXX queries with reasonable accuracy. This could probably lead to the same problem discussed above in naively throwing away entries in FIFO order in the old SeqnoToTimeMapping. The updated testing of GetProximalSeqnoBeforeTime in BasicSeqnoToTimeMapping relies on the fixed behavior.
* Fix a potential compaction CPU efficiency/scaling issue in which each compaction output file would iterate over and sort all seqno-to-time mappings from all compaction input files. Now we distill the input file entries to a constant size before processing each compaction output file.

Intended follow-up (me or others):
* Expand some direct testing of SeqnoToTimeMapping APIs. Here I've focused on updating existing tests to make sense.
* There are likely more gaps in availability of needed SeqnoToTimeMapping data when the DB shuts down and is restarted, at least with WAL.
* The data tracked in the DB could be kept more accurate and limited if it used the oldest seqno of unflushed data. This might require some more API refactoring.

Pull Request resolved: #12253

Test Plan: unit tests updated

Reviewed By: jowlyzhang

Differential Revision: D52913733

Pulled By: pdillinger

fbshipit-source-id: 020737fcbbe6212f6701191a6ab86565054c9593
  • Loading branch information
pdillinger authored and facebook-github-bot committed Jan 20, 2024
1 parent d982260 commit cb08a68
Show file tree
Hide file tree
Showing 17 changed files with 749 additions and 534 deletions.
13 changes: 8 additions & 5 deletions db/builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "rocksdb/iterator.h"
#include "rocksdb/options.h"
#include "rocksdb/table.h"
#include "seqno_to_time_mapping.h"
#include "table/block_based/block_based_table_builder.h"
#include "table/format.h"
#include "table/internal_iterator.h"
Expand Down Expand Up @@ -299,12 +300,14 @@ Status BuildTable(
if (!s.ok() || empty) {
builder->Abandon();
} else {
std::string seqno_to_time_mapping_str;
seqno_to_time_mapping.Encode(
seqno_to_time_mapping_str, meta->fd.smallest_seqno,
meta->fd.largest_seqno, meta->file_creation_time);
SeqnoToTimeMapping relevant_mapping;
relevant_mapping.CopyFromSeqnoRange(seqno_to_time_mapping,
meta->fd.smallest_seqno,
meta->fd.largest_seqno);
relevant_mapping.SetCapacity(kMaxSeqnoTimePairsPerSST);
relevant_mapping.Enforce(tboptions.file_creation_time);
builder->SetSeqnoTimeTableProperties(
seqno_to_time_mapping_str,
relevant_mapping,
ioptions.compaction_style == CompactionStyle::kCompactionStyleFIFO
? meta->file_creation_time
: meta->oldest_ancester_time);
Expand Down
40 changes: 24 additions & 16 deletions db/compaction/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -288,39 +288,37 @@ void CompactionJob::Prepare() {

if (preserve_time_duration > 0) {
const ReadOptions read_options(Env::IOActivity::kCompaction);
// setup seqno_to_time_mapping_
seqno_to_time_mapping_.SetMaxTimeDuration(preserve_time_duration);
// Setup seqno_to_time_mapping_ with relevant time range.
seqno_to_time_mapping_.SetMaxTimeSpan(preserve_time_duration);
for (const auto& each_level : *c->inputs()) {
for (const auto& fmd : each_level.files) {
std::shared_ptr<const TableProperties> tp;
Status s =
cfd->current()->GetTableProperties(read_options, &tp, fmd, nullptr);
if (s.ok()) {
seqno_to_time_mapping_.Add(tp->seqno_to_time_mapping)
.PermitUncheckedError();
seqno_to_time_mapping_.Add(fmd->fd.smallest_seqno,
fmd->oldest_ancester_time);
s = seqno_to_time_mapping_.DecodeFrom(tp->seqno_to_time_mapping);
}
if (!s.ok()) {
ROCKS_LOG_WARN(
db_options_.info_log,
"Problem reading or processing seqno-to-time mapping: %s",
s.ToString().c_str());
}
}
}

auto status = seqno_to_time_mapping_.Sort();
if (!status.ok()) {
ROCKS_LOG_WARN(db_options_.info_log,
"Invalid sequence number to time mapping: Status: %s",
status.ToString().c_str());
}
int64_t _current_time = 0;
status = db_options_.clock->GetCurrentTime(&_current_time);
if (!status.ok()) {
Status s = db_options_.clock->GetCurrentTime(&_current_time);
if (!s.ok()) {
ROCKS_LOG_WARN(db_options_.info_log,
"Failed to get current time in compaction: Status: %s",
status.ToString().c_str());
s.ToString().c_str());
// preserve all time information
preserve_time_min_seqno_ = 0;
preclude_last_level_min_seqno_ = 0;
seqno_to_time_mapping_.Enforce();
} else {
seqno_to_time_mapping_.TruncateOldEntries(_current_time);
seqno_to_time_mapping_.Enforce(_current_time);
uint64_t preserve_time =
static_cast<uint64_t>(_current_time) > preserve_time_duration
? _current_time - preserve_time_duration
Expand All @@ -344,6 +342,16 @@ void CompactionJob::Prepare() {
1;
}
}
// For accuracy of the GetProximalSeqnoBeforeTime queries above, we only
// limit the capacity after them.
// Here If we set capacity to the per-SST limit, we could be throwing away
// fidelity when a compaction output file has a narrower seqno range than
// all the inputs. If we only limit capacity for each compaction output, we
// could be doing a lot of unnecessary recomputation in a large compaction
// (up to quadratic in number of files). Thus, we do soemthing in the
// middle: enforce a resonably large constant size limit substantially
// larger than kMaxSeqnoTimePairsPerSST.
seqno_to_time_mapping_.SetCapacity(kMaxSeqnoToTimeEntries);
}
}

Expand Down
10 changes: 5 additions & 5 deletions db/compaction/compaction_outputs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ Status CompactionOutputs::Finish(
assert(meta != nullptr);
Status s = intput_status;
if (s.ok()) {
std::string seqno_to_time_mapping_str;
seqno_to_time_mapping.Encode(
seqno_to_time_mapping_str, meta->fd.smallest_seqno,
meta->fd.largest_seqno, meta->file_creation_time);
builder_->SetSeqnoTimeTableProperties(seqno_to_time_mapping_str,
SeqnoToTimeMapping relevant_mapping;
relevant_mapping.CopyFromSeqnoRange(
seqno_to_time_mapping, meta->fd.smallest_seqno, meta->fd.largest_seqno);
relevant_mapping.SetCapacity(kMaxSeqnoTimePairsPerSST);
builder_->SetSeqnoTimeTableProperties(relevant_mapping,
meta->oldest_ancester_time);
s = builder_->Finish();

Expand Down
5 changes: 2 additions & 3 deletions db/compaction/tiered_compaction_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1575,9 +1575,8 @@ TEST_F(PrecludeLastLevelTest, SmallPrecludeTime) {
ASSERT_EQ(tables_props.size(), 1);
ASSERT_FALSE(tables_props.begin()->second->seqno_to_time_mapping.empty());
SeqnoToTimeMapping tp_mapping;
ASSERT_OK(
tp_mapping.Add(tables_props.begin()->second->seqno_to_time_mapping));
ASSERT_OK(tp_mapping.Sort());
ASSERT_OK(tp_mapping.DecodeFrom(
tables_props.begin()->second->seqno_to_time_mapping));
ASSERT_FALSE(tp_mapping.Empty());
auto seqs = tp_mapping.TEST_GetInternalMapping();
ASSERT_FALSE(seqs.empty());
Expand Down
45 changes: 23 additions & 22 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -834,9 +834,15 @@ Status DBImpl::RegisterRecordSeqnoTimeWorker(const ReadOptions& read_options,
}
}
if (min_preserve_seconds == std::numeric_limits<uint64_t>::max()) {
seqno_to_time_mapping_.Resize(0, 0);
// Don't track
seqno_to_time_mapping_.SetCapacity(0);
seqno_to_time_mapping_.SetMaxTimeSpan(UINT64_MAX);
} else {
seqno_to_time_mapping_.Resize(min_preserve_seconds, max_preserve_seconds);
uint64_t cap = std::min(kMaxSeqnoToTimeEntries,
max_preserve_seconds * kMaxSeqnoTimePairsPerCF /
min_preserve_seconds);
seqno_to_time_mapping_.SetCapacity(cap);
seqno_to_time_mapping_.SetMaxTimeSpan(max_preserve_seconds);
}
mapping_was_empty = seqno_to_time_mapping_.Empty();
}
Expand All @@ -845,9 +851,8 @@ Status DBImpl::RegisterRecordSeqnoTimeWorker(const ReadOptions& read_options,
if (min_preserve_seconds != std::numeric_limits<uint64_t>::max()) {
// round up to 1 when the time_duration is smaller than
// kMaxSeqnoTimePairsPerCF
seqno_time_cadence = (min_preserve_seconds +
SeqnoToTimeMapping::kMaxSeqnoTimePairsPerCF - 1) /
SeqnoToTimeMapping::kMaxSeqnoTimePairsPerCF;
seqno_time_cadence = (min_preserve_seconds + kMaxSeqnoTimePairsPerCF - 1) /
kMaxSeqnoTimePairsPerCF;
}

TEST_SYNC_POINT_CALLBACK(
Expand Down Expand Up @@ -884,7 +889,7 @@ Status DBImpl::RegisterRecordSeqnoTimeWorker(const ReadOptions& read_options,
assert(mapping_was_empty);

// We can simply modify these, before writes are allowed
constexpr uint64_t kMax = SeqnoToTimeMapping::kMaxSeqnoTimePairsPerSST;
constexpr uint64_t kMax = kMaxSeqnoTimePairsPerSST;
versions_->SetLastAllocatedSequence(kMax);
versions_->SetLastPublishedSequence(kMax);
versions_->SetLastSequence(kMax);
Expand Down Expand Up @@ -6639,28 +6644,24 @@ void DBImpl::RecordSeqnoToTimeMapping(uint64_t populate_historical_seconds) {
immutable_db_options_.clock->GetCurrentTime(&unix_time_signed)
.PermitUncheckedError(); // Ignore error
uint64_t unix_time = static_cast<uint64_t>(unix_time_signed);
bool appended = false;
{
InstrumentedMutexLock l(&mutex_);
if (populate_historical_seconds > 0) {
if (populate_historical_seconds > 0) {
bool success = true;
{
InstrumentedMutexLock l(&mutex_);
if (seqno > 1 && unix_time > populate_historical_seconds) {
// seqno=0 is reserved
SequenceNumber from_seqno = 1;
appended = seqno_to_time_mapping_.PrePopulate(
success = seqno_to_time_mapping_.PrePopulate(
from_seqno, seqno, unix_time - populate_historical_seconds,
unix_time);
} else {
// One of these will fail
assert(seqno > 1);
assert(unix_time > populate_historical_seconds);
success = false;
}
} else {
// FIXME: assert(seqno > 0);
appended = seqno_to_time_mapping_.Append(seqno, unix_time);
}
}
if (populate_historical_seconds > 0) {
if (appended) {
if (success) {
ROCKS_LOG_INFO(
immutable_db_options_.info_log,
"Pre-populated sequence number to time entries: [1,%" PRIu64
Expand All @@ -6673,11 +6674,11 @@ void DBImpl::RecordSeqnoToTimeMapping(uint64_t populate_historical_seconds) {
"] -> [%" PRIu64 ",%" PRIu64 "]",
seqno, unix_time - populate_historical_seconds, unix_time);
}
} else if (!appended) {
ROCKS_LOG_WARN(immutable_db_options_.info_log,
"Failed to insert sequence number to time entry: %" PRIu64
" -> %" PRIu64,
seqno, unix_time);
} else {
InstrumentedMutexLock l(&mutex_);
// FIXME: assert(seqno > 0);
// Always successful assuming seqno never go backwards
seqno_to_time_mapping_.Append(seqno, unix_time);
}
}

Expand Down
2 changes: 1 addition & 1 deletion db/event_helpers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ void EventHelpers::LogAndNotifyTableFileCreationFinished(
jwriter << "N/A";
} else {
SeqnoToTimeMapping tmp;
Status status = tmp.Add(table_properties.seqno_to_time_mapping);
Status status = tmp.DecodeFrom(table_properties.seqno_to_time_mapping);
if (status.ok()) {
jwriter << tmp.ToHumanString();
} else {
Expand Down
7 changes: 3 additions & 4 deletions db/flush_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -852,10 +852,9 @@ Status FlushJob::WriteLevel0Table() {

SequenceNumber smallest_seqno = mems_.front()->GetEarliestSequenceNumber();
if (!db_impl_seqno_to_time_mapping_.Empty()) {
// make a local copy, as the seqno_to_time_mapping from db_impl is not
// thread safe, which will be used while not holding the db_mutex.
seqno_to_time_mapping_ =
db_impl_seqno_to_time_mapping_.Copy(smallest_seqno);
// make a local copy to use while not holding the db_mutex.
seqno_to_time_mapping_.CopyFromSeqnoRange(db_impl_seqno_to_time_mapping_,
smallest_seqno);
}

std::vector<BlobFileAddition> blob_file_additions;
Expand Down
Loading

0 comments on commit cb08a68

Please sign in to comment.