Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support to flush multiple CFs atomically #4262

Closed
wants to merge 33 commits into from
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
6c278e9
Add a new flush job for atomic flush
riversand963 Aug 10, 2018
136bb10
Adjust format.
riversand963 Aug 11, 2018
dd8e664
Refactor, add test and comments
riversand963 Aug 11, 2018
19e5501
Merge remote-tracking branch 'upstream/master' into atomic_flush_job
riversand963 Aug 21, 2018
32f8c50
Mark version edits as atomic group
riversand963 Aug 21, 2018
3884b80
Merge remote-tracking branch 'upstream/master' into atomic_flush_job
riversand963 Aug 27, 2018
70e538a
Change function signatures of a few functions
riversand963 Aug 27, 2018
c834396
Remove annonymous variable from a func call
riversand963 Aug 27, 2018
45a9f5c
Fix memory leak in unit test
riversand963 Aug 28, 2018
a311d59
Merge remote-tracking branch 'upstream/master' into atomic_flush_job
riversand963 Aug 30, 2018
5691a73
Flush memtables of multi-CFs in order
riversand963 Sep 1, 2018
0a138df
Add check to return value of fsync
riversand963 Sep 2, 2018
f987492
Update memtable_list_test and flush_job_test
riversand963 Sep 5, 2018
97e5123
Improve inline comments
riversand963 Sep 10, 2018
ca961e7
Merge remote-tracking branch 'upstream/master' into atomic_flush_job
riversand963 Sep 17, 2018
2b69378
Address review comments
riversand963 Sep 17, 2018
4144f2b
Address review comments
riversand963 Sep 17, 2018
5e774cb
Rename a variable
riversand963 Sep 23, 2018
a003137
Fix a linting error
riversand963 Sep 24, 2018
51c8ccd
Merge remote-tracking branch 'upstream/master' into atomic_flush_job
riversand963 Sep 26, 2018
036afce
Merge remote-tracking branch 'upstream/master' into atomic_flush_job
riversand963 Sep 27, 2018
477ddfa
Add test for atomic flush of multi column families
riversand963 Sep 28, 2018
4cf4872
Merge remote-tracking branch 'upstream/master' into atomic_flush_job
riversand963 Oct 10, 2018
348553a
Address review comments
riversand963 Oct 10, 2018
411d6ae
Rename function and add comments
riversand963 Oct 11, 2018
c23a1df
Address review comments
riversand963 Oct 11, 2018
6c0e51f
Address comments
riversand963 Oct 12, 2018
e3e9f7a
Address comments
riversand963 Oct 12, 2018
06e566f
Add a check for list being non-empty
riversand963 Oct 12, 2018
8d7b8cc
Add a new test and some comments
riversand963 Oct 14, 2018
3ae9f54
Fix memory leaks in MemTableListTest.HasOlderAtomicFlush
riversand963 Oct 15, 2018
8a2daf2
Continue to fix memory leak in HasOlderAtomicFlush
riversand963 Oct 15, 2018
893c0da
Fix a lint warning
riversand963 Oct 16, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions db/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -890,6 +890,12 @@ class DBImpl : public DB {
bool* madeProgress, JobContext* job_context,
LogBuffer* log_buffer);

Status AtomicFlushMemTablesToOutputFiles(
const autovector<ColumnFamilyData*>& cfds,
const autovector<MutableCFOptions>& mutable_cf_options_list,
const autovector<uint64_t>& memtable_ids, bool* made_progress,
JobContext* job_context, LogBuffer* log_buffer);

// REQUIRES: log_numbers are sorted in ascending order
Status RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
SequenceNumber* next_sequence, bool read_only);
Expand Down
176 changes: 172 additions & 4 deletions db/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,13 @@ Status DBImpl::FlushMemTableToOutputFile(
}
FlushJob flush_job(
dbname_, cfd, immutable_db_options_, mutable_cf_options,
env_options_for_compaction_, versions_.get(), &mutex_, &shutting_down_,
snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker,
job_context, log_buffer, directories_.GetDbDir(), GetDataDir(cfd, 0U),
nullptr /* memtable_id */, env_options_for_compaction_, versions_.get(),
&mutex_, &shutting_down_, snapshot_seqs, earliest_write_conflict_snapshot,
snapshot_checker, job_context, log_buffer, directories_.GetDbDir(),
GetDataDir(cfd, 0U),
GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_,
&event_logger_, mutable_cf_options.report_bg_io_stats);
&event_logger_, mutable_cf_options.report_bg_io_stats,
true /* sync_output_directory */, true /* write_manifest */);

FileMetaData file_meta;

Expand Down Expand Up @@ -200,6 +202,172 @@ Status DBImpl::FlushMemTableToOutputFile(
return s;
}

/*
* Atomically flushes multiple column families.
*
* For each column family, all memtables with ID smaller than or equal to the
* specified ID will be flushed. Only after all column families finish flush
riversand963 marked this conversation as resolved.
Show resolved Hide resolved
* will this function commit to MANIFEST. If any of the column families are not
* flushed successfully, this function does not have any side-effect on the
* state of the database.
*/
Status DBImpl::AtomicFlushMemTablesToOutputFiles(
const autovector<ColumnFamilyData*>& cfds,
const autovector<MutableCFOptions>& mutable_cf_options_list,
const autovector<uint64_t>& flush_memtable_ids, bool* made_progress,
JobContext* job_context, LogBuffer* log_buffer) {
mutex_.AssertHeld();
#ifndef NDEBUG
for (const auto cfd : cfds) {
assert(cfd->imm()->NumNotFlushed() != 0);
assert(cfd->imm()->IsFlushPending());
}
#endif /* !NDEBUG */

SequenceNumber earliest_write_conflict_snapshot;
std::vector<SequenceNumber> snapshot_seqs =
snapshots_.GetAll(&earliest_write_conflict_snapshot);

auto snapshot_checker = snapshot_checker_.get();
if (use_custom_gc_ && snapshot_checker == nullptr) {
riversand963 marked this conversation as resolved.
Show resolved Hide resolved
snapshot_checker = DisableGCSnapshotChecker::Instance();
}
autovector<Directory*> distinct_output_dirs;
std::vector<FlushJob> jobs;
riversand963 marked this conversation as resolved.
Show resolved Hide resolved
int num_cfs = static_cast<int>(cfds.size());
// If we reach here, there must be more than one column family in the flush
// request. Therefore, we need more superversion_contexts in job_context.
// Since superversion_contexts[0] has already created a new superversion, we
// create superversions for other elements in superversion_contexts.
for (int i = 1; i != num_cfs; ++i) {
job_context->superversion_contexts.emplace_back(SuperVersionContext(true));
}
for (int i = 0; i < num_cfs; ++i) {
auto cfd = cfds[i];
Directory* data_dir = GetDataDir(cfd, 0U);

// Add to distinct output directories if eligible. Use linear search. Since
// the number of elements in the vector is not large, performance should be
// tolerable.
bool found = false;
for (const auto dir : distinct_output_dirs) {
if (dir == data_dir) {
found = true;
break;
}
}
if (!found) {
distinct_output_dirs.emplace_back(data_dir);
}

jobs.emplace_back(
dbname_, cfds[i], immutable_db_options_, mutable_cf_options_list[i],
&flush_memtable_ids[i], env_options_for_compaction_, versions_.get(),
&mutex_, &shutting_down_, snapshot_seqs,
earliest_write_conflict_snapshot, snapshot_checker, job_context,
log_buffer, directories_.GetDbDir(), data_dir,
GetCompressionFlush(*cfd->ioptions(), mutable_cf_options_list[i]),
stats_, &event_logger_, mutable_cf_options_list[i].report_bg_io_stats,
false /* sync_output_directory */, false /* write_manifest */);
}

autovector<FileMetaData> file_meta;
Status s;
// TODO (yanqin): parallelize jobs with threads.
for (int i = 0; i != num_cfs; ++i) {
auto& job = jobs[i];
job.PickMemTable();
file_meta.emplace_back(FileMetaData());

#ifndef ROCKSDB_LITE
// may temporarily unlock and lock the mutex.
riversand963 marked this conversation as resolved.
Show resolved Hide resolved
NotifyOnFlushBegin(cfds[i], &file_meta[i], mutable_cf_options_list[i],
job_context->job_id, job.GetTableProperties());
#endif /* !ROCKSDB_LITE */
if (logfile_number_ > 0 &&
versions_->GetColumnFamilySet()->NumberOfColumnFamilies() > 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When would there be zero column families?

Copy link
Contributor Author

@riversand963 riversand963 Sep 17, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the typo. It should be >1 according to https://github.com/facebook/rocksdb/blob/master/db/db_impl_compaction_flush.cc#L147.
Then it means there is possible unnecessary log syncing. I'll submit a separate PR to see if it's indeed the case.
Update
#4416 tries to address this issue, and this PR will remove this check.

s = SyncClosedLogs(job_context);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm lost here. Are we still holding DB mutex here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO it looks fine since the calls doing I/O (like SyncClosedLogs) look like they all release/reacquire the lock.

}
if (!s.ok()) {
break;
}
s = job.Run(&logs_with_prep_tracker_, &file_meta[i]);
if (!s.ok()) {
break;
}
}

if (s.ok()) {
// Sync on all distinct output directories.
for (auto dir : distinct_output_dirs) {
if (dir != nullptr) {
dir->Fsync();
}
}

autovector<const autovector<MemTable*>*> mems_list;
autovector<MemTableList*> imm_lists;
for (int i = 0; i != num_cfs; ++i) {
const auto& mems = jobs[i].GetMemTables();
mems_list.emplace_back(&mems);
imm_lists.emplace_back(cfds[i]->imm());
}
s = MemTableList::InstallMemtableFlushResults(
imm_lists, cfds, mutable_cf_options_list, mems_list,
&logs_with_prep_tracker_, versions_.get(), &mutex_, file_meta,
&job_context->memtables_to_free, directories_.GetDbDir(), log_buffer);
}

if (s.ok()) {
for (int i = 0; i != num_cfs; ++i) {
InstallSuperVersionAndScheduleWork(cfds[i],
&job_context->superversion_contexts[i],
mutable_cf_options_list[i]);
VersionStorageInfo::LevelSummaryStorage tmp;
ROCKS_LOG_BUFFER(log_buffer, "[%s] Level summary: %s\n",
cfds[i]->GetName().c_str(),
cfds[i]->current()->storage_info()->LevelSummary(&tmp));
}
if (made_progress) {
*made_progress = 1;
}
#ifndef ROCKSDB_LITE
auto sfm = static_cast<SstFileManagerImpl*>(
immutable_db_options_.sst_file_manager.get());
for (int i = 0; i != num_cfs; ++i) {
NotifyOnFlushCompleted(cfds[i], &file_meta[i], mutable_cf_options_list[i],
job_context->job_id, jobs[i].GetTableProperties());
if (sfm) {
std::string file_path = MakeTableFileName(
cfds[i]->ioptions()->cf_paths[0].path, file_meta[i].fd.GetNumber());
sfm->OnAddFile(file_path);
if (sfm->IsMaxAllowedSpaceReached()) {
Status new_bg_error =
Status::SpaceLimit("Max allowed space was reached");
error_handler_.SetBGError(new_bg_error,
riversand963 marked this conversation as resolved.
Show resolved Hide resolved
BackgroundErrorReason::kFlush);
break;
riversand963 marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
#endif // ROCKSDB_LITE
}

if (!s.ok()) {
for (int i = 0; i != num_cfs; ++i) {
auto& mems = jobs[i].GetMemTables();
cfds[i]->imm()->RollbackMemtableFlush(mems, file_meta[i].fd.GetNumber());
jobs[i].Cancel();
}
if (!s.IsShutdownInProgress()) {
Status new_bg_error = s;
error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
}
}

return s;
}

void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta,
const MutableCFOptions& mutable_cf_options,
int job_id, TableProperties prop) {
Expand Down
17 changes: 10 additions & 7 deletions db/flush_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,24 +83,25 @@ const char* GetFlushReasonString (FlushReason flush_reason) {
}
}


FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
const ImmutableDBOptions& db_options,
const MutableCFOptions& mutable_cf_options,
const EnvOptions env_options, VersionSet* versions,
InstrumentedMutex* db_mutex,
const uint64_t* memtable_id, const EnvOptions env_options,
VersionSet* versions, InstrumentedMutex* db_mutex,
std::atomic<bool>* shutting_down,
std::vector<SequenceNumber> existing_snapshots,
SequenceNumber earliest_write_conflict_snapshot,
SnapshotChecker* snapshot_checker, JobContext* job_context,
LogBuffer* log_buffer, Directory* db_directory,
Directory* output_file_directory,
CompressionType output_compression, Statistics* stats,
EventLogger* event_logger, bool measure_io_stats)
EventLogger* event_logger, bool measure_io_stats,
const bool sync_output_directory, const bool write_manifest)
: dbname_(dbname),
cfd_(cfd),
db_options_(db_options),
mutable_cf_options_(mutable_cf_options),
memtable_id_(memtable_id),
env_options_(env_options),
versions_(versions),
db_mutex_(db_mutex),
Expand All @@ -116,6 +117,8 @@ FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
stats_(stats),
event_logger_(event_logger),
measure_io_stats_(measure_io_stats),
sync_output_directory_(sync_output_directory),
write_manifest_(write_manifest),
edit_(nullptr),
base_(nullptr),
pick_memtable_called(false) {
Expand Down Expand Up @@ -160,7 +163,7 @@ void FlushJob::PickMemTable() {
assert(!pick_memtable_called);
pick_memtable_called = true;
// Save the contents of the earliest memtable as a new Table
cfd_->imm()->PickMemtablesToFlush(&mems_);
cfd_->imm()->PickMemtablesToFlush(memtable_id_, &mems_);
if (mems_.empty()) {
return;
}
Expand Down Expand Up @@ -224,7 +227,7 @@ Status FlushJob::Run(LogsWithPrepTracker* prep_tracker,

if (!s.ok()) {
cfd_->imm()->RollbackMemtableFlush(mems_, meta_.fd.GetNumber());
} else {
} else if (write_manifest_) {
TEST_SYNC_POINT("FlushJob::InstallResults");
// Replace immutable memtable with the generated Table
s = cfd_->imm()->InstallMemtableFlushResults(
Expand Down Expand Up @@ -371,7 +374,7 @@ Status FlushJob::WriteLevel0Table() {
s.ToString().c_str(),
meta_.marked_for_compaction ? " (needs compaction)" : "");

if (output_file_directory_ != nullptr) {
if (output_file_directory_ != nullptr && sync_output_directory_) {
output_file_directory_->Fsync();
}
TEST_SYNC_POINT("FlushJob::WriteLevel0Table");
Expand Down
33 changes: 30 additions & 3 deletions db/flush_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,16 @@ class FlushJob {
FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
const ImmutableDBOptions& db_options,
const MutableCFOptions& mutable_cf_options,
const EnvOptions env_options, VersionSet* versions,
InstrumentedMutex* db_mutex, std::atomic<bool>* shutting_down,
const uint64_t* memtable_id, const EnvOptions env_options,
VersionSet* versions, InstrumentedMutex* db_mutex,
std::atomic<bool>* shutting_down,
std::vector<SequenceNumber> existing_snapshots,
SequenceNumber earliest_write_conflict_snapshot,
SnapshotChecker* snapshot_checker, JobContext* job_context,
LogBuffer* log_buffer, Directory* db_directory,
Directory* output_file_directory, CompressionType output_compression,
Statistics* stats, EventLogger* event_logger, bool measure_io_stats);
Statistics* stats, EventLogger* event_logger, bool measure_io_stats,
const bool sync_output_directory, const bool write_manifest);

~FlushJob();

Expand All @@ -77,16 +79,24 @@ class FlushJob {
FileMetaData* file_meta = nullptr);
void Cancel();
TableProperties GetTableProperties() const { return table_properties_; }
const autovector<MemTable*>& GetMemTables() const { return mems_; }

private:
void ReportStartedFlush();
void ReportFlushInputSize(const autovector<MemTable*>& mems);
void RecordFlushIOStats();
Status WriteLevel0Table();

const std::string& dbname_;
ColumnFamilyData* cfd_;
const ImmutableDBOptions& db_options_;
const MutableCFOptions& mutable_cf_options_;
// Pointer to a variable storing the largest memtable id to flush in this
// flush job. RocksDB uses this variable to select the memtables to flush in
// this job. All memtables in this column family with an ID smaller than or
// equal to *memtable_id_ will be selected for flush. If null, then all
// memtables in the column family will be selected.
const uint64_t* memtable_id_;
const EnvOptions env_options_;
VersionSet* versions_;
InstrumentedMutex* db_mutex_;
Expand All @@ -103,6 +113,23 @@ class FlushJob {
EventLogger* event_logger_;
TableProperties table_properties_;
bool measure_io_stats_;
// True if this flush job should call fsync on the output directory. False
// otherwise.
// Usually sync_output_directory_ is true. A flush job needs to call sync on
// the output directory before committing to the MANIFEST.
// However, an individual flush job does not have to call sync on the output
// directory if it is part of an atomic flush. After all flush jobs in the
// atomic flush succeed, call sync once on each distinct output directory.
const bool sync_output_directory_;
// True if this flush job should write to MANIFEST after successfully
// flushing memtables. False otherwise.
// Usually write_manifest_ is true. A flush job commits to the MANIFEST after
// flushing the memtables.
// However, an individual flush job cannot rashly write to the MANIFEST
// immediately after it finishes the flush if it is part of an atomic flush.
// In this case, only after all flush jobs succeed in flush can RocksDB
// commit to the MANIFEST.
const bool write_manifest_;

// Variables below are set by PickMemTable():
FileMetaData meta_;
Expand Down
23 changes: 16 additions & 7 deletions db/flush_job_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,14 @@ TEST_F(FlushJobTest, Empty) {
auto cfd = versions_->GetColumnFamilySet()->GetDefault();
EventLogger event_logger(db_options_.info_log.get());
SnapshotChecker* snapshot_checker = nullptr; // not relavant
FlushJob flush_job(
dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_,
*cfd->GetLatestMutableCFOptions(), env_options_, versions_.get(), &mutex_,
&shutting_down_, {}, kMaxSequenceNumber, snapshot_checker, &job_context,
nullptr, nullptr, nullptr, kNoCompression, nullptr, &event_logger, false);
FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(),
db_options_, *cfd->GetLatestMutableCFOptions(),
nullptr /* memtable_id */,
env_options_, versions_.get(), &mutex_, &shutting_down_,
{}, kMaxSequenceNumber, snapshot_checker, &job_context,
nullptr, nullptr, nullptr, kNoCompression, nullptr,
&event_logger, false, true /* sync_output_directory */,
true /* write_manifest */);
{
InstrumentedMutexLock l(&mutex_);
flush_job.PickMemTable();
Expand Down Expand Up @@ -141,10 +144,13 @@ TEST_F(FlushJobTest, NonEmpty) {
SnapshotChecker* snapshot_checker = nullptr; // not relavant
FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(),
db_options_, *cfd->GetLatestMutableCFOptions(),
nullptr /* memtable_id */,
env_options_, versions_.get(), &mutex_, &shutting_down_,
{}, kMaxSequenceNumber, snapshot_checker, &job_context,
nullptr, nullptr, nullptr, kNoCompression,
db_options_.statistics.get(), &event_logger, true);
db_options_.statistics.get(), &event_logger, true,
true /* sync_output_directory */,
true /* write_manifest */);

HistogramData hist;
FileMetaData file_meta;
Expand Down Expand Up @@ -215,10 +221,13 @@ TEST_F(FlushJobTest, Snapshots) {
SnapshotChecker* snapshot_checker = nullptr; // not relavant
FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(),
db_options_, *cfd->GetLatestMutableCFOptions(),
nullptr /* memtable_id */,
env_options_, versions_.get(), &mutex_, &shutting_down_,
snapshots, kMaxSequenceNumber, snapshot_checker,
&job_context, nullptr, nullptr, nullptr, kNoCompression,
db_options_.statistics.get(), &event_logger, true);
db_options_.statistics.get(), &event_logger, true,
true /* sync_output_directory */,
true /* write_manifest */);
mutex_.Lock();
flush_job.PickMemTable();
ASSERT_OK(flush_job.Run());
Expand Down
Loading