Skip to content

Commit

Permalink
Refactor flush request queueing and processing (facebook#3952)
Browse files Browse the repository at this point in the history
Summary:
RocksDB currently queues individual column family for flushing. This is not sufficient to support the needs of some applications that want to enforce order/dependency between column families, given that multiple foreground and background activities can trigger flushing in RocksDB.

This PR aims to address this limitation. Each flush request is described as a `FlushRequest` that can contain multiple column families. A background flushing thread pops one flush request from the queue at a time and processes it.

This PR does not enable atomic_flush yet, but is a subset of [PR 3752](facebook#3752).
Pull Request resolved: facebook#3952

Differential Revision: D8529933

Pulled By: riversand963

fbshipit-source-id: 78908a21e389a3a3f7de2a79bae0cd13af5f3539
  • Loading branch information
riversand963 authored and facebook-github-bot committed Aug 24, 2018
1 parent 17f9a18 commit 7daae51
Show file tree
Hide file tree
Showing 4 changed files with 239 additions and 103 deletions.
9 changes: 6 additions & 3 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -348,9 +348,12 @@ Status DBImpl::CloseHelper() {
flush_scheduler_.Clear();

while (!flush_queue_.empty()) {
auto cfd = PopFirstFromFlushQueue();
if (cfd->Unref()) {
delete cfd;
const FlushRequest& flush_req = PopFirstFromFlushQueue();
for (const auto& iter : flush_req) {
ColumnFamilyData* cfd = iter.first;
if (cfd->Unref()) {
delete cfd;
}
}
}
while (!compaction_queue_.empty()) {
Expand Down
59 changes: 51 additions & 8 deletions db/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -884,12 +884,41 @@ class DBImpl : public DB {
Status SyncClosedLogs(JobContext* job_context);

// Flush the in-memory write buffer to storage. Switches to a new
// log-file/memtable and writes a new descriptor iff successful.
// log-file/memtable and writes a new descriptor iff successful. Then
// installs a new super version for the column family.
Status FlushMemTableToOutputFile(ColumnFamilyData* cfd,
const MutableCFOptions& mutable_cf_options,
bool* madeProgress, JobContext* job_context,
SuperVersionContext* superversion_context,
LogBuffer* log_buffer);

// Argument required by background flush thread.
struct BGFlushArg {
BGFlushArg()
: cfd_(nullptr), memtable_id_(0), superversion_context_(nullptr) {}
BGFlushArg(ColumnFamilyData* cfd, uint64_t memtable_id,
SuperVersionContext* superversion_context)
: cfd_(cfd),
memtable_id_(memtable_id),
superversion_context_(superversion_context) {}

// Column family to flush.
ColumnFamilyData* cfd_;
// Maximum ID of memtable to flush. In this column family, memtables with
// IDs smaller than this value must be flushed before this flush completes.
uint64_t memtable_id_;
// Pointer to a SuperVersionContext object. After flush completes, RocksDB
// installs a new superversion for the column family. This operation
// requires a SuperVersionContext object (currently embedded in JobContext).
SuperVersionContext* superversion_context_;
};

// Flush the memtables of (multiple) column families to multiple files on
// persistent storage.
Status FlushMemTablesToOutputFiles(
const autovector<BGFlushArg>& bg_flush_args, bool* made_progress,
JobContext* job_context, LogBuffer* log_buffer);

// REQUIRES: log_numbers are sorted in ascending order
Status RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
SequenceNumber* next_sequence, bool read_only);
Expand All @@ -911,8 +940,7 @@ class DBImpl : public DB {

Status ScheduleFlushes(WriteContext* context);

Status SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context,
FlushReason flush_reason = FlushReason::kOthers);
Status SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context);

// Force current memtable contents to be flushed.
Status FlushMemTable(ColumnFamilyData* cfd, const FlushOptions& options,
Expand All @@ -923,7 +951,13 @@ class DBImpl : public DB {
// gets flush. Otherwise, wait until the column family don't have any
// memtable pending flush.
Status WaitForFlushMemTable(ColumnFamilyData* cfd,
const uint64_t* flush_memtable_id = nullptr);
const uint64_t* flush_memtable_id = nullptr) {
return WaitForFlushMemTables({cfd}, {flush_memtable_id});
}
// Wait for memtables to be flushed for multiple column families.
Status WaitForFlushMemTables(
const autovector<ColumnFamilyData*>& cfds,
const autovector<const uint64_t*>& flush_memtable_ids);

// REQUIRES: mutex locked
Status SwitchWAL(WriteContext* write_context);
Expand Down Expand Up @@ -979,7 +1013,17 @@ class DBImpl : public DB {
ColumnFamilyData* GetColumnFamilyDataByName(const std::string& cf_name);

void MaybeScheduleFlushOrCompaction();
void SchedulePendingFlush(ColumnFamilyData* cfd, FlushReason flush_reason);

// A flush request specifies the column families to flush as well as the
// largest memtable id to persist for each column family. Once all the
// memtables whose IDs are smaller than or equal to this per-column-family
// specified value, this flush request is considered to have completed its
// work of flushing this column family. After completing the work for all
// column families in this request, this flush is considered complete.
typedef std::vector<std::pair<ColumnFamilyData*, uint64_t>> FlushRequest;

void SchedulePendingFlush(const FlushRequest& req, FlushReason flush_reason);

void SchedulePendingCompaction(ColumnFamilyData* cfd);
void SchedulePendingPurge(std::string fname, std::string dir_to_sync,
FileType type, uint64_t number, int job_id);
Expand Down Expand Up @@ -1021,8 +1065,7 @@ class DBImpl : public DB {
// helper functions for adding and removing from flush & compaction queues
void AddToCompactionQueue(ColumnFamilyData* cfd);
ColumnFamilyData* PopFirstFromCompactionQueue();
void AddToFlushQueue(ColumnFamilyData* cfd, FlushReason flush_reason);
ColumnFamilyData* PopFirstFromFlushQueue();
FlushRequest PopFirstFromFlushQueue();

// helper function to call after some of the logs_ were synced
void MarkLogsSynced(uint64_t up_to, bool synced_dir, const Status& status);
Expand Down Expand Up @@ -1255,7 +1298,7 @@ class DBImpl : public DB {
// in MaybeScheduleFlushOrCompaction()
// invariant(column family present in flush_queue_ <==>
// ColumnFamilyData::pending_flush_ == true)
std::deque<ColumnFamilyData*> flush_queue_;
std::deque<FlushRequest> flush_queue_;
// invariant(column family present in compaction_queue_ <==>
// ColumnFamilyData::pending_compaction_ == true)
std::deque<ColumnFamilyData*> compaction_queue_;
Expand Down
Loading

0 comments on commit 7daae51

Please sign in to comment.