Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove random writes from SST file ingestion #4172

Closed
Closed
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions db/compaction_picker_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ class CompactionPickerTest : public testing::Test {
f->fd = FileDescriptor(file_number, path_id, file_size);
f->smallest = InternalKey(smallest, smallest_seq, kTypeValue);
f->largest = InternalKey(largest, largest_seq, kTypeValue);
f->smallest_seqno = smallest_seq;
f->largest_seqno = largest_seq;
f->fd.smallest_seqno = smallest_seq;
f->fd.largest_seqno = largest_seq;
f->compensated_file_size = file_size;
f->refs = 0;
vstorage_->AddFile(level, f);
Expand Down
18 changes: 9 additions & 9 deletions db/compaction_picker_universal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -97,17 +97,17 @@ void GetSmallestLargestSeqno(const std::vector<FileMetaData*>& files,
SequenceNumber* largest_seqno) {
bool is_first = true;
for (FileMetaData* f : files) {
assert(f->smallest_seqno <= f->largest_seqno);
assert(f->fd.smallest_seqno <= f->fd.largest_seqno);
if (is_first) {
is_first = false;
*smallest_seqno = f->smallest_seqno;
*largest_seqno = f->largest_seqno;
*smallest_seqno = f->fd.smallest_seqno;
*largest_seqno = f->fd.largest_seqno;
} else {
if (f->smallest_seqno < *smallest_seqno) {
*smallest_seqno = f->smallest_seqno;
if (f->fd.smallest_seqno < *smallest_seqno) {
*smallest_seqno = f->fd.smallest_seqno;
}
if (f->largest_seqno > *largest_seqno) {
*largest_seqno = f->largest_seqno;
if (f->fd.largest_seqno > *largest_seqno) {
*largest_seqno = f->fd.largest_seqno;
}
}
}
Expand Down Expand Up @@ -365,11 +365,11 @@ Compaction* UniversalCompactionPicker::PickCompaction(
size_t level_index = 0U;
if (c->start_level() == 0) {
for (auto f : *c->inputs(0)) {
assert(f->smallest_seqno <= f->largest_seqno);
assert(f->fd.smallest_seqno <= f->fd.largest_seqno);
if (is_first) {
is_first = false;
}
prev_smallest_seqno = f->smallest_seqno;
prev_smallest_seqno = f->fd.smallest_seqno;
}
level_index = 1U;
}
Expand Down
14 changes: 7 additions & 7 deletions db/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,8 @@ void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta,
info.job_id = job_id;
info.triggered_writes_slowdown = triggered_writes_slowdown;
info.triggered_writes_stop = triggered_writes_stop;
info.smallest_seqno = file_meta->smallest_seqno;
info.largest_seqno = file_meta->largest_seqno;
info.smallest_seqno = file_meta->fd.smallest_seqno;
info.largest_seqno = file_meta->fd.largest_seqno;
info.table_properties = prop;
info.flush_reason = cfd->GetFlushReason();
for (auto listener : immutable_db_options_.listeners) {
Expand Down Expand Up @@ -281,8 +281,8 @@ void DBImpl::NotifyOnFlushCompleted(ColumnFamilyData* cfd,
info.job_id = job_id;
info.triggered_writes_slowdown = triggered_writes_slowdown;
info.triggered_writes_stop = triggered_writes_stop;
info.smallest_seqno = file_meta->smallest_seqno;
info.largest_seqno = file_meta->largest_seqno;
info.smallest_seqno = file_meta->fd.smallest_seqno;
info.largest_seqno = file_meta->fd.largest_seqno;
info.table_properties = prop;
info.flush_reason = cfd->GetFlushReason();
for (auto listener : immutable_db_options_.listeners) {
Expand Down Expand Up @@ -885,7 +885,7 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
edit.DeleteFile(level, f->fd.GetNumber());
edit.AddFile(to_level, f->fd.GetNumber(), f->fd.GetPathId(),
f->fd.GetFileSize(), f->smallest, f->largest,
f->smallest_seqno, f->largest_seqno,
f->fd.smallest_seqno, f->fd.largest_seqno,
f->marked_for_compaction);
}
ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
Expand Down Expand Up @@ -1804,8 +1804,8 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
c->edit()->DeleteFile(c->level(l), f->fd.GetNumber());
c->edit()->AddFile(c->output_level(), f->fd.GetNumber(),
f->fd.GetPathId(), f->fd.GetFileSize(), f->smallest,
f->largest, f->smallest_seqno, f->largest_seqno,
f->marked_for_compaction);
f->largest, f->fd.smallest_seqno,
f->fd.largest_seqno, f->marked_for_compaction);

ROCKS_LOG_BUFFER(
log_buffer,
Expand Down
2 changes: 1 addition & 1 deletion db/db_impl_experimental.cc
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ Status DBImpl::PromoteL0(ColumnFamilyHandle* column_family, int target_level) {
edit.DeleteFile(0, f->fd.GetNumber());
edit.AddFile(target_level, f->fd.GetNumber(), f->fd.GetPathId(),
f->fd.GetFileSize(), f->smallest, f->largest,
f->smallest_seqno, f->largest_seqno,
f->fd.smallest_seqno, f->fd.largest_seqno,
f->marked_for_compaction);
}

Expand Down
2 changes: 1 addition & 1 deletion db/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -969,7 +969,7 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
if (s.ok() && meta.fd.GetFileSize() > 0) {
edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetPathId(),
meta.fd.GetFileSize(), meta.smallest, meta.largest,
meta.smallest_seqno, meta.largest_seqno,
meta.fd.smallest_seqno, meta.fd.largest_seqno,
meta.marked_for_compaction);
}

Expand Down
2 changes: 1 addition & 1 deletion db/db_range_del_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ TEST_F(DBRangeDelTest, SentinelsOmittedFromOutputFile) {

std::vector<std::vector<FileMetaData>> files;
dbfull()->TEST_GetFilesMetaData(db_->DefaultColumnFamily(), &files);
ASSERT_GT(files[0][0].smallest_seqno, 0);
ASSERT_GT(files[0][0].fd.smallest_seqno, 0);

db_->ReleaseSnapshot(snapshot);
}
Expand Down
41 changes: 22 additions & 19 deletions db/external_sst_file_ingestion_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -475,9 +475,9 @@ Status ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile(
const SequenceNumber level_largest_seqno =
(*max_element(level_files.begin(), level_files.end(),
[](FileMetaData* f1, FileMetaData* f2) {
return f1->largest_seqno < f2->largest_seqno;
return f1->fd.largest_seqno < f2->fd.largest_seqno;
}))
->largest_seqno;
->fd.largest_seqno;
// should only assign seqno to current level's largest seqno when
// the file fits
if (level_largest_seqno != 0 &&
Expand Down Expand Up @@ -522,7 +522,7 @@ Status ExternalSstFileIngestionJob::CheckLevelForIngestedBehindFile(
// at some upper level
for (int lvl = 0; lvl < cfd_->NumberLevels() - 1; lvl++) {
for (auto file : vstorage->LevelFiles(lvl)) {
if (file->smallest_seqno == 0) {
if (file->fd.smallest_seqno == 0) {
return Status::InvalidArgument(
"Can't ingest_behind file as despite allow_ingest_behind=true "
"there are files with 0 seqno in database at upper levels!");
Expand All @@ -547,24 +547,27 @@ Status ExternalSstFileIngestionJob::AssignGlobalSeqnoForIngestedFile(
"field");
}

std::unique_ptr<RandomRWFile> rwfile;
Status status = env_->NewRandomRWFile(file_to_ingest->internal_file_path,
&rwfile, env_options_);
if (!status.ok()) {
return status;
if (ingestion_options_.write_global_seqno) {
// Determine if we can write global_seqno to a given offset of file.
// If the file system does not support random write, then we should not.
// Otherwise we should.
std::unique_ptr<RandomRWFile> rwfile;
Status status = env_->NewRandomRWFile(file_to_ingest->internal_file_path,
&rwfile, env_options_);
if (status.ok()) {
std::string seqno_val;
PutFixed64(&seqno_val, seqno);
status = rwfile->Write(file_to_ingest->global_seqno_offset, seqno_val);
if (!status.ok()) {
return status;
}
} else if (!status.IsNotSupported()) {
return status;
}
}

// Write the new seqno in the global sequence number field in the file
std::string seqno_val;
PutFixed64(&seqno_val, seqno);
status = rwfile->Write(file_to_ingest->global_seqno_offset, seqno_val);
if (status.ok()) {
status = rwfile->Fsync();
}
if (status.ok()) {
file_to_ingest->assigned_seqno = seqno;
}
return status;
file_to_ingest->assigned_seqno = seqno;
return Status::OK();
}

bool ExternalSstFileIngestionJob::IngestedFileFitInLevel(
Expand Down
2 changes: 1 addition & 1 deletion db/flush_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ Status FlushJob::WriteLevel0Table() {
// Add file to L0
edit_->AddFile(0 /* level */, meta_.fd.GetNumber(), meta_.fd.GetPathId(),
meta_.fd.GetFileSize(), meta_.smallest, meta_.largest,
meta_.smallest_seqno, meta_.largest_seqno,
meta_.fd.smallest_seqno, meta_.fd.largest_seqno,
meta_.marked_for_compaction);
}

Expand Down
15 changes: 8 additions & 7 deletions db/flush_job_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -147,19 +147,20 @@ TEST_F(FlushJobTest, NonEmpty) {
db_options_.statistics.get(), &event_logger, true);

HistogramData hist;
FileMetaData fd;
FileMetaData file_meta;
mutex_.Lock();
flush_job.PickMemTable();
ASSERT_OK(flush_job.Run(nullptr, &fd));
ASSERT_OK(flush_job.Run(nullptr, &file_meta));
mutex_.Unlock();
db_options_.statistics->histogramData(FLUSH_TIME, &hist);
ASSERT_GT(hist.average, 0.0);

ASSERT_EQ(ToString(0), fd.smallest.user_key().ToString());
ASSERT_EQ("9999a",
fd.largest.user_key().ToString()); // range tombstone end key
ASSERT_EQ(1, fd.smallest_seqno);
ASSERT_EQ(10000, fd.largest_seqno); // range tombstone seqnum 10000
ASSERT_EQ(ToString(0), file_meta.smallest.user_key().ToString());
ASSERT_EQ(
"9999a",
file_meta.largest.user_key().ToString()); // range tombstone end key
ASSERT_EQ(1, file_meta.fd.smallest_seqno);
ASSERT_EQ(10000, file_meta.fd.largest_seqno); // range tombstone seqnum 10000
mock_table_factory_->AssertSingleFile(inserted_keys);
job_context.Clean();
}
Expand Down
2 changes: 1 addition & 1 deletion db/table_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ Status TableCache::GetTableReader(
s = ioptions_.table_factory->NewTableReader(
TableReaderOptions(ioptions_, prefix_extractor, env_options,
internal_comparator, skip_filters, immortal_tables_,
level),
level, fd.largest_seqno),
std::move(file_reader), fd.GetFileSize(), table_reader,
prefetch_index_and_filter_in_cache);
TEST_SYNC_POINT("TableCache::GetTableReader:0");
Expand Down
32 changes: 17 additions & 15 deletions db/version_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@
namespace rocksdb {

bool NewestFirstBySeqNo(FileMetaData* a, FileMetaData* b) {
if (a->largest_seqno != b->largest_seqno) {
return a->largest_seqno > b->largest_seqno;
if (a->fd.largest_seqno != b->fd.largest_seqno) {
return a->fd.largest_seqno > b->fd.largest_seqno;
}
if (a->smallest_seqno != b->smallest_seqno) {
return a->smallest_seqno > b->smallest_seqno;
if (a->fd.smallest_seqno != b->fd.smallest_seqno) {
return a->fd.smallest_seqno > b->fd.smallest_seqno;
}
// Break ties by file number
return a->fd.GetNumber() > b->fd.GetNumber();
Expand Down Expand Up @@ -162,22 +162,24 @@ class VersionBuilder::Rep {
abort();
}

if (f2->smallest_seqno == f2->largest_seqno) {
if (f2->fd.smallest_seqno == f2->fd.largest_seqno) {
// This is an external file that we ingested
SequenceNumber external_file_seqno = f2->smallest_seqno;
if (!(external_file_seqno < f1->largest_seqno ||
SequenceNumber external_file_seqno = f2->fd.smallest_seqno;
if (!(external_file_seqno < f1->fd.largest_seqno ||
external_file_seqno == 0)) {
fprintf(stderr, "L0 file with seqno %" PRIu64 " %" PRIu64
" vs. file with global_seqno %" PRIu64 "\n",
f1->smallest_seqno, f1->largest_seqno,
fprintf(stderr,
"L0 file with seqno %" PRIu64 " %" PRIu64
" vs. file with global_seqno %" PRIu64 "\n",
f1->fd.smallest_seqno, f1->fd.largest_seqno,
external_file_seqno);
abort();
}
} else if (f1->smallest_seqno <= f2->smallest_seqno) {
fprintf(stderr, "L0 files seqno %" PRIu64 " %" PRIu64
" vs. %" PRIu64 " %" PRIu64 "\n",
f1->smallest_seqno, f1->largest_seqno, f2->smallest_seqno,
f2->largest_seqno);
} else if (f1->fd.smallest_seqno <= f2->fd.smallest_seqno) {
fprintf(stderr,
"L0 files seqno %" PRIu64 " %" PRIu64 " vs. %" PRIu64
" %" PRIu64 "\n",
f1->fd.smallest_seqno, f1->fd.largest_seqno,
f2->fd.smallest_seqno, f2->fd.largest_seqno);
abort();
}
} else {
Expand Down
4 changes: 2 additions & 2 deletions db/version_builder_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ class VersionBuilderTest : public testing::Test {
f->fd = FileDescriptor(file_number, path_id, file_size);
f->smallest = GetInternalKey(smallest, smallest_seq);
f->largest = GetInternalKey(largest, largest_seq);
f->smallest_seqno = smallest_seqno;
f->largest_seqno = largest_seqno;
f->fd.smallest_seqno = smallest_seqno;
f->fd.largest_seqno = largest_seqno;
f->compensated_file_size = file_size;
f->refs = 0;
f->num_entries = num_entries;
Expand Down
29 changes: 19 additions & 10 deletions db/version_edit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ bool VersionEdit::EncodeTo(std::string* dst) const {
PutVarint64(dst, f.fd.GetFileSize());
PutLengthPrefixedSlice(dst, f.smallest.Encode());
PutLengthPrefixedSlice(dst, f.largest.Encode());
PutVarint64Varint64(dst, f.smallest_seqno, f.largest_seqno);
PutVarint64Varint64(dst, f.fd.smallest_seqno, f.fd.largest_seqno);
if (has_customized_fields) {
// Customized fields' format:
// +-----------------------------+
Expand Down Expand Up @@ -233,14 +233,16 @@ const char* VersionEdit::DecodeNewFile4From(Slice* input) {
uint64_t number;
uint32_t path_id = 0;
uint64_t file_size;
SequenceNumber smallest_seqno;
SequenceNumber largest_seqno;
// Since this is the only forward-compatible part of the code, we hack new
// extension into this record. When we do, we set this boolean to distinguish
// the record from the normal NewFile records.
if (GetLevel(input, &level, &msg) && GetVarint64(input, &number) &&
GetVarint64(input, &file_size) && GetInternalKey(input, &f.smallest) &&
GetInternalKey(input, &f.largest) &&
GetVarint64(input, &f.smallest_seqno) &&
GetVarint64(input, &f.largest_seqno)) {
GetVarint64(input, &smallest_seqno) &&
GetVarint64(input, &largest_seqno)) {
// See comments in VersionEdit::EncodeTo() for format of customized fields
while (true) {
uint32_t custom_tag;
Expand Down Expand Up @@ -289,7 +291,8 @@ const char* VersionEdit::DecodeNewFile4From(Slice* input) {
} else {
return "new-file4 entry";
}
f.fd = FileDescriptor(number, path_id, file_size);
f.fd =
FileDescriptor(number, path_id, file_size, smallest_seqno, largest_seqno);
new_files_.push_back(std::make_pair(level, f));
return nullptr;
}
Expand Down Expand Up @@ -409,13 +412,16 @@ Status VersionEdit::DecodeFrom(const Slice& src) {
case kNewFile2: {
uint64_t number;
uint64_t file_size;
SequenceNumber smallest_seqno;
SequenceNumber largest_seqno;
if (GetLevel(&input, &level, &msg) && GetVarint64(&input, &number) &&
GetVarint64(&input, &file_size) &&
GetInternalKey(&input, &f.smallest) &&
GetInternalKey(&input, &f.largest) &&
GetVarint64(&input, &f.smallest_seqno) &&
GetVarint64(&input, &f.largest_seqno)) {
f.fd = FileDescriptor(number, 0, file_size);
GetVarint64(&input, &smallest_seqno) &&
GetVarint64(&input, &largest_seqno)) {
f.fd = FileDescriptor(number, 0, file_size, smallest_seqno,
largest_seqno);
new_files_.push_back(std::make_pair(level, f));
} else {
if (!msg) {
Expand All @@ -429,13 +435,16 @@ Status VersionEdit::DecodeFrom(const Slice& src) {
uint64_t number;
uint32_t path_id;
uint64_t file_size;
SequenceNumber smallest_seqno;
SequenceNumber largest_seqno;
if (GetLevel(&input, &level, &msg) && GetVarint64(&input, &number) &&
GetVarint32(&input, &path_id) && GetVarint64(&input, &file_size) &&
GetInternalKey(&input, &f.smallest) &&
GetInternalKey(&input, &f.largest) &&
GetVarint64(&input, &f.smallest_seqno) &&
GetVarint64(&input, &f.largest_seqno)) {
f.fd = FileDescriptor(number, path_id, file_size);
GetVarint64(&input, &smallest_seqno) &&
GetVarint64(&input, &largest_seqno)) {
f.fd = FileDescriptor(number, path_id, file_size, smallest_seqno,
largest_seqno);
new_files_.push_back(std::make_pair(level, f));
} else {
if (!msg) {
Expand Down
Loading