Skip to content

Commit

Permalink
secondary instance: add support for WAL tailing on OpenAsSecondary
Browse files Browse the repository at this point in the history
Summary: PR #4899 implemented the general framework for RocksDB secondary instances. This PR adds the support for WAL tailing in `OpenAsSecondary`, which means after the `OpenAsSecondary` call, the secondary is now able to see primary's writes that are yet to be flushed. The secondary can see primary's writes in the WAL up to the moment of `OpenAsSecondary` call starts.

Differential Revision: D15059905

Pulled By: miasantreble

fbshipit-source-id: 44f71f548a30b38179a7940165e138f622de1f10
  • Loading branch information
miasantreble authored and facebook-github-bot committed Apr 24, 2019
1 parent 1c8cbf3 commit aa56b7e
Show file tree
Hide file tree
Showing 6 changed files with 298 additions and 20 deletions.
4 changes: 2 additions & 2 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,8 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
env_options_(BuildDBOptions(immutable_db_options_, mutable_db_options_)),
env_options_for_compaction_(env_->OptimizeForCompactionTableWrite(
env_options_, immutable_db_options_)),
seq_per_batch_(seq_per_batch),
batch_per_txn_(batch_per_txn),
db_lock_(nullptr),
shutting_down_(false),
bg_cv_(&mutex_),
Expand Down Expand Up @@ -202,8 +204,6 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
opened_successfully_(false),
two_write_queues_(options.two_write_queues),
manual_wal_flush_(options.manual_wal_flush),
seq_per_batch_(seq_per_batch),
batch_per_txn_(batch_per_txn),
// last_sequencee_ is always maintained by the main queue that also writes
// to the memtable. When two_write_queues_ is disabled last seq in
// memtable is the same as last seq published to the readers. When it is
Expand Down
37 changes: 20 additions & 17 deletions db/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -803,6 +803,23 @@ class DBImpl : public DB {
// Additonal options for compaction and flush
EnvOptions env_options_for_compaction_;

std::unique_ptr<ColumnFamilyMemTablesImpl> column_family_memtables_;

// Increase the sequence number after writing each batch, whether memtable is
// disabled for that or not. Otherwise the sequence number is increased after
// writing each key into memtable. This implies that when disable_memtable is
// set, the seq is not increased at all.
//
// Default: false
const bool seq_per_batch_;
// This determines during recovery whether we expect one writebatch per
// recovered transaction, or potentially multiple writebatches per
// transaction. For WriteUnprepared, this is set to false, since multiple
// batches can exist per transaction.
//
// Default: true
const bool batch_per_txn_;

// Except in DB::Open(), WriteOptionsFile can only be called when:
// Persist options to options file.
// If need_mutex_lock = false, the method will lock DB mutex.
Expand Down Expand Up @@ -1036,8 +1053,8 @@ class DBImpl : public DB {
JobContext* job_context, LogBuffer* log_buffer, Env::Priority thread_pri);

// REQUIRES: log_numbers are sorted in ascending order
Status RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
SequenceNumber* next_sequence, bool read_only);
virtual Status RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
SequenceNumber* next_sequence, bool read_only);

// The following two methods are used to flush a memtable to
// storage. The first one is used at database RecoveryTime (when the
Expand Down Expand Up @@ -1294,7 +1311,6 @@ class DBImpl : public DB {
// expesnive mutex_ lock during WAL write, which update log_empty_.
bool log_empty_;

std::unique_ptr<ColumnFamilyMemTablesImpl> column_family_memtables_;
struct LogFileNumberSize {
explicit LogFileNumberSize(uint64_t _number) : number(_number) {}
void AddSize(uint64_t new_size) { size += new_size; }
Expand Down Expand Up @@ -1689,20 +1705,7 @@ class DBImpl : public DB {
// In 2PC these are the writes at Prepare phase.
const bool two_write_queues_;
const bool manual_wal_flush_;
// Increase the sequence number after writing each batch, whether memtable is
// disabled for that or not. Otherwise the sequence number is increased after
// writing each key into memtable. This implies that when disable_memtable is
// set, the seq is not increased at all.
//
// Default: false
const bool seq_per_batch_;
// This determines during recovery whether we expect one writebatch per
// recovered transaction, or potentially multiple writebatches per
// transaction. For WriteUnprepared, this is set to false, since multiple
// batches can exist per transaction.
//
// Default: true
const bool batch_per_txn_;

// LastSequence also indicates last published sequence visibile to the
// readers. Otherwise LastPublishedSequence should be used.
const bool last_seq_same_as_publish_seq_;
Expand Down
170 changes: 169 additions & 1 deletion db/db_impl_secondary.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@
// (found in the LICENSE.Apache file in the root directory).

#include "db/db_impl_secondary.h"

#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif
#include <inttypes.h>

#include "db/db_iter.h"
#include "db/merge_context.h"
#include "monitoring/perf_context_imp.h"
Expand Down Expand Up @@ -52,12 +58,174 @@ Status DBImplSecondary::Recover(
default_cf_internal_stats_ = default_cf_handle_->cfd()->internal_stats();
single_column_family_mode_ =
versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1;

// Recover from all newer log files than the ones named in the
// descriptor.
std::vector<std::string> filenames;
s = env_->GetChildren(immutable_db_options_.wal_dir, &filenames);
if (s.IsNotFound()) {
return Status::InvalidArgument("Failed to open wal_dir",
immutable_db_options_.wal_dir);
} else if (!s.ok()) {
return s;
}

std::vector<uint64_t> logs;
// if log_readers_ is non-empty, it means we have applied all logs with log
// numbers smaller than the smallest log in log_readers_, so there is no
// need to pass these logs to RecoverLogFiles
uint64_t log_number_min = 0;
if (log_readers_.size() > 0) {
log_number_min = log_readers_.begin()->first;
}
for (size_t i = 0; i < filenames.size(); i++) {
uint64_t number;
FileType type;
if (ParseFileName(filenames[i], &number, &type) && type == kLogFile &&
number >= log_number_min) {
logs.push_back(number);
}
}

if (!logs.empty()) {
// Recover in the order in which the logs were generated
std::sort(logs.begin(), logs.end());
SequenceNumber next_sequence(kMaxSequenceNumber);
s = RecoverLogFiles(logs, &next_sequence, true /*read_only*/);
}
}

// TODO: attempt to recover from WAL files.
// TODO: update options_file_number_ needed?

return s;
}

// try to find log reader using log_number from log_readers_ map, initialize
// if it doesn't exist
Status DBImplSecondary::MaybeInitLogReader(
uint64_t log_number, log::FragmentBufferedReader** log_reader) {
auto iter = log_readers_.find(log_number);
// make sure the log file is still present
if (iter == log_readers_.end() ||
iter->second->reader_->GetLogNumber() != log_number) {
// delete the obsolete log reader if log number mismatch
if (iter != log_readers_.end()) {
log_readers_.erase(iter);
}
// initialize log reader from log_number
// TODO: min_log_number_to_keep_2pc check needed?
// Open the log file
std::string fname = LogFileName(immutable_db_options_.wal_dir, log_number);
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Recovering log #%" PRIu64 " mode %d", log_number,
static_cast<int>(immutable_db_options_.wal_recovery_mode));

std::unique_ptr<SequentialFileReader> file_reader;
{
std::unique_ptr<SequentialFile> file;
Status status = env_->NewSequentialFile(
fname, &file, env_->OptimizeForLogRead(env_options_));
if (!status.ok()) {
*log_reader = nullptr;
return status;
}
file_reader.reset(new SequentialFileReader(std::move(file), fname));
}

// Create the log reader.
LogReaderContainer* log_reader_container = new LogReaderContainer(
env_, immutable_db_options_.info_log, std::move(fname),
std::move(file_reader), log_number);
log_readers_.insert(std::make_pair(
log_number, std::unique_ptr<LogReaderContainer>(log_reader_container)));
}
iter = log_readers_.find(log_number);
assert(iter != log_readers_.end());
*log_reader = iter->second->reader_;
return Status::OK();
}

// After manifest recovery, replay WALs and refresh log_readers_ if necessary
// REQUIRES: log_numbers are sorted in ascending order
Status DBImplSecondary::RecoverLogFiles(
const std::vector<uint64_t>& log_numbers, SequenceNumber* next_sequence,
bool /*read_only*/) {
mutex_.AssertHeld();
Status status;
for (auto log_number : log_numbers) {
log::FragmentBufferedReader* reader = nullptr;
status = MaybeInitLogReader(log_number, &reader);
if (!status.ok()) {
return status;
}
assert(reader != nullptr);
}
for (auto log_number : log_numbers) {
auto it = log_readers_.find(log_number);
assert(it != log_readers_.end());
log::FragmentBufferedReader* reader = it->second->reader_;
// Manually update the file number allocation counter in VersionSet.
versions_->MarkFileNumberUsed(log_number);

// Determine if we should tolerate incomplete records at the tail end of the
// Read all the records and add to a memtable
std::string scratch;
Slice record;
WriteBatch batch;

while (reader->ReadRecord(&record, &scratch,
immutable_db_options_.wal_recovery_mode) &&
status.ok()) {
if (record.size() < WriteBatchInternal::kHeader) {
reader->GetReporter()->Corruption(
record.size(), Status::Corruption("log record too small"));
continue;
}
WriteBatchInternal::SetContents(&batch, record);
// do not check sequence number because user may toggle disableWAL
// between writes which breaks sequence number continuity guarantee

// If column family was not found, it might mean that the WAL write
// batch references to the column family that was dropped after the
// insert. We don't want to fail the whole write batch in that case --
// we just ignore the update.
// That's why we set ignore missing column families to true
// passing null flush_scheduler will disable memtable flushing which is
// needed for secondary instances
bool has_valid_writes = false;
status = WriteBatchInternal::InsertInto(
&batch, column_family_memtables_.get(), nullptr /* flush_scheduler */,
true, log_number, this, false /* concurrent_memtable_writes */,
next_sequence, &has_valid_writes, seq_per_batch_, batch_per_txn_);
if (!status.ok()) {
// We are treating this as a failure while reading since we read valid
// blocks that do not form coherent data
reader->GetReporter()->Corruption(record.size(), status);
continue;
}
}

if (!status.ok()) {
return status;
}

auto last_sequence = *next_sequence - 1;
if ((*next_sequence != kMaxSequenceNumber) &&
(versions_->LastSequence() <= last_sequence)) {
versions_->SetLastAllocatedSequence(last_sequence);
versions_->SetLastPublishedSequence(last_sequence);
versions_->SetLastSequence(last_sequence);
}
}
// remove logreaders from map after successfully recovering the WAL
if (log_readers_.size() > 1) {
auto eraseIter = log_readers_.begin();
std::advance(eraseIter, log_readers_.size() - 1);
log_readers_.erase(log_readers_.begin(), eraseIter);
}
return status;
}

// Implementation of the DB interface
Status DBImplSecondary::Get(const ReadOptions& read_options,
ColumnFamilyHandle* column_family, const Slice& key,
Expand Down
61 changes: 61 additions & 0 deletions db/db_impl_secondary.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,55 @@

namespace rocksdb {

class LogReaderContainer {
public:
LogReaderContainer()
: reader_(nullptr), reporter_(nullptr), status_(nullptr) {}
LogReaderContainer(Env* env, std::shared_ptr<Logger> info_log,
std::string fname,
std::unique_ptr<SequentialFileReader>&& file_reader,
uint64_t log_number) {
LogReporter* reporter = new LogReporter();
status_ = new Status();
reporter->env = env;
reporter->info_log = info_log.get();
reporter->fname = std::move(fname);
reporter->status = status_;
reporter_ = reporter;
// We intentially make log::Reader do checksumming even if
// paranoid_checks==false so that corruptions cause entire commits
// to be skipped instead of propagating bad information (like overly
// large sequence numbers).
reader_ = new log::FragmentBufferedReader(info_log, std::move(file_reader),
reporter, true /*checksum*/,
log_number);
}
log::FragmentBufferedReader* reader_;
log::Reader::Reporter* reporter_;
Status* status_;
~LogReaderContainer() {
delete reader_;
delete reporter_;
delete status_;
}
private:
struct LogReporter : public log::Reader::Reporter {
Env* env;
Logger* info_log;
std::string fname;
Status* status; // nullptr if immutable_db_options_.paranoid_checks==false
void Corruption(size_t bytes, const Status& s) override {
ROCKS_LOG_WARN(info_log, "%s%s: dropping %d bytes; %s",
(this->status == nullptr ? "(ignoring error) " : ""),
fname.c_str(), static_cast<int>(bytes),
s.ToString().c_str());
if (this->status != nullptr && this->status->ok()) {
*this->status = s;
}
}
};
};

class DBImplSecondary : public DBImpl {
public:
DBImplSecondary(const DBOptions& options, const std::string& dbname);
Expand Down Expand Up @@ -133,6 +182,9 @@ class DBImplSecondary : public DBImpl {
// method can take long time due to all the I/O and CPU costs.
Status TryCatchUpWithPrimary() override;

Status MaybeInitLogReader(uint64_t log_number,
log::FragmentBufferedReader** log_reader);

private:
friend class DB;

Expand All @@ -142,10 +194,19 @@ class DBImplSecondary : public DBImpl {

using DBImpl::Recover;

Status RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
SequenceNumber* next_sequence,
bool read_only) override;

std::unique_ptr<log::FragmentBufferedReader> manifest_reader_;
std::unique_ptr<log::Reader::Reporter> manifest_reporter_;
std::unique_ptr<Status> manifest_reader_status_;

// cache log readers for each log number, used for continue WAL replay
// after recovery
std::map<uint64_t, std::unique_ptr<LogReaderContainer>> log_readers_;
};

} // namespace rocksdb

#endif // !ROCKSDB_LITE
Loading

0 comments on commit aa56b7e

Please sign in to comment.