diff --git a/db/log_reader.cc b/db/log_reader.cc index 110eb2c27c8..ce5476fb5ba 100644 --- a/db/log_reader.cc +++ b/db/log_reader.cc @@ -43,7 +43,8 @@ Reader::Reader(std::shared_ptr info_log, compression_type_record_read_(false), uncompress_(nullptr), hash_state_(nullptr), - uncompress_hash_state_(nullptr){} + uncompress_hash_state_(nullptr), + skipped_(false) {} Reader::~Reader() { delete[] backing_store_; @@ -112,6 +113,7 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch, *record = fragment; last_record_offset_ = prospective_record_offset; first_record_read_ = true; + skipped_ = false; return true; case kFirstType: @@ -130,13 +132,16 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch, prospective_record_offset = physical_record_offset; scratch->assign(fragment.data(), fragment.size()); in_fragmented_record = true; + skipped_ = false; break; case kMiddleType: case kRecyclableMiddleType: if (!in_fragmented_record) { - ReportCorruption(fragment.size(), - "missing start of fragmented record(1)"); + if (!skipped_) { + ReportCorruption(fragment.size(), + "missing start of fragmented record(1)"); + } } else { if (record_checksum != nullptr) { XXH3_64bits_update(hash_state_, fragment.data(), fragment.size()); @@ -148,8 +153,10 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch, case kLastType: case kRecyclableLastType: if (!in_fragmented_record) { - ReportCorruption(fragment.size(), - "missing start of fragmented record(2)"); + if (!skipped_) { + ReportCorruption(fragment.size(), + "missing start of fragmented record(2)"); + } } else { if (record_checksum != nullptr) { XXH3_64bits_update(hash_state_, fragment.data(), fragment.size()); diff --git a/db/log_reader.h b/db/log_reader.h index 6e4eded0916..d4f0734c16c 100644 --- a/db/log_reader.h +++ b/db/log_reader.h @@ -16,6 +16,7 @@ #include "db/log_format.h" #include "file/sequence_file_reader.h" +#include "rocksdb/io_status.h" #include "rocksdb/options.h" #include "rocksdb/slice.h" #include "rocksdb/status.h" @@ -123,6 +124,20 @@ class Reader { return !first_record_read_ && compression_type_record_read_; } + rocksdb::IOStatus Skip(uint64_t n) { + if (n == 0) { + return IOStatus::OK(); + } + auto s = file_->Skip(n); + if (!s.ok()) { + return s; + } + end_of_buffer_offset_ += n; + skipped_ = true; + buffer_.clear(); + return s; + } + protected: std::shared_ptr info_log_; const std::unique_ptr file_; @@ -170,6 +185,9 @@ class Reader { // is only for WAL logs. UnorderedMap recorded_cf_to_ts_sz_; + // if log reader is skipped, may need to drop bytes until seek to first of a record + bool skipped_; + // Extend record types with the following special values enum { kEof = kMaxRecordType + 1, diff --git a/db/transaction_log_impl.cc b/db/transaction_log_impl.cc index 8841b8cf3b2..7247e573630 100644 --- a/db/transaction_log_impl.cc +++ b/db/transaction_log_impl.cc @@ -19,7 +19,8 @@ TransactionLogIteratorImpl::TransactionLogIteratorImpl( const TransactionLogIterator::ReadOptions& read_options, const EnvOptions& soptions, const SequenceNumber seq, std::unique_ptr files, VersionSet const* const versions, - const bool seq_per_batch, const std::shared_ptr& io_tracer) + const bool seq_per_batch, const std::shared_ptr& io_tracer, + const std::shared_ptr& transaction_log_seq_cache) : dir_(dir), options_(options), read_options_(read_options), @@ -32,6 +33,7 @@ TransactionLogIteratorImpl::TransactionLogIteratorImpl( started_(false), is_valid_(false), current_file_index_(0), + transaction_log_seq_cache_(transaction_log_seq_cache), current_batch_seq_(0), current_last_seq_(0) { assert(files_ != nullptr); @@ -113,8 +115,13 @@ void TransactionLogIteratorImpl::SeekToStartSequence(uint64_t start_file_index, } else if (!current_status_.ok()) { return; } - Status s = - OpenLogReader(files_->at(static_cast(start_file_index)).get()); + auto& file = files_->at(static_cast(start_file_index)); + uint64_t hint_block_index{0}; + if (read_options_.with_cache_) { + transaction_log_seq_cache_->Lookup( + file->LogNumber(), starting_sequence_number_, &hint_block_index); + } + Status s = OpenLogReader(file.get(), hint_block_index * log::kBlockSize); if (!s.ok()) { current_status_ = s; reporter_.Info(current_status_.ToString().c_str()); @@ -207,7 +214,13 @@ void TransactionLogIteratorImpl::NextImpl(bool internal) { // Open the next file if (current_file_index_ < files_->size() - 1) { ++current_file_index_; - Status s = OpenLogReader(files_->at(current_file_index_).get()); + auto& file = files_->at(static_cast(current_file_index_)); + uint64_t hint_block_index{0}; + if (read_options_.with_cache_) { + transaction_log_seq_cache_->Lookup( + file->LogNumber(), starting_sequence_number_, &hint_block_index); + } + Status s = OpenLogReader(file.get(), hint_block_index * log::kBlockSize); if (!s.ok()) { is_valid_ = false; current_status_ = s; @@ -276,12 +289,25 @@ void TransactionLogIteratorImpl::UpdateCurrentWriteBatch(const Slice& record) { // currentBatchSeq_ can only change here assert(current_last_seq_ <= versions_->LastSequence()); + if (read_options_.with_cache_) { + // cache the mapping of sequence to log block index when seeking to the + // start or end sequence + if ((current_batch_seq_ <= starting_sequence_number_ && + current_last_seq_ >= starting_sequence_number_) || + current_last_seq_ == versions_->LastSequence()) { + transaction_log_seq_cache_->Insert( + current_log_reader_->GetLogNumber(), current_batch_seq_, + current_log_reader_->LastRecordOffset() / log::kBlockSize); + } + } + current_batch_ = std::move(batch); is_valid_ = true; current_status_ = Status::OK(); } -Status TransactionLogIteratorImpl::OpenLogReader(const LogFile* log_file) { +Status TransactionLogIteratorImpl::OpenLogReader(const LogFile* log_file, + uint64_t hint_offset) { std::unique_ptr file; Status s = OpenLogFile(log_file, &file); if (!s.ok()) { @@ -291,6 +317,9 @@ Status TransactionLogIteratorImpl::OpenLogReader(const LogFile* log_file) { current_log_reader_.reset( new log::Reader(options_->info_log, std::move(file), &reporter_, read_options_.verify_checksums_, log_file->LogNumber())); + if (hint_offset > 0) { + return current_log_reader_->Skip(hint_offset); + } return Status::OK(); } } // namespace ROCKSDB_NAMESPACE diff --git a/db/transaction_log_impl.h b/db/transaction_log_impl.h index eb700036110..14ff75d9077 100644 --- a/db/transaction_log_impl.h +++ b/db/transaction_log_impl.h @@ -16,6 +16,7 @@ #include "rocksdb/options.h" #include "rocksdb/transaction_log.h" #include "rocksdb/types.h" +#include "util/random.h" namespace ROCKSDB_NAMESPACE { @@ -54,6 +55,59 @@ class LogFileImpl : public LogFile { uint64_t sizeFileBytes_; }; +class TransactionLogSeqCache { + public: + TransactionLogSeqCache(size_t size) + : size_(size), rand_(std::random_device{}()) {} + struct SeqWithFileBlockIdx { + uint64_t log_number; + uint64_t seq_number; + uint64_t block_index; + SeqWithFileBlockIdx(uint64_t log_num, uint64_t seq_num, uint64_t block_idx) + : log_number(log_num), seq_number(seq_num), block_index(block_idx){}; + bool operator<(const SeqWithFileBlockIdx& other) const { + return std::tie(other.log_number, other.seq_number, other.block_index) < + std::tie(log_number, seq_number, block_index); + } + bool operator==(const SeqWithFileBlockIdx& other) const { + return std::tie(log_number, seq_number, block_index) == + std::tie(other.log_number, other.seq_number, other.block_index); + } + }; + + void Insert(uint64_t log_number, uint64_t seq_number, uint64_t block_index) { + std::lock_guard lk{mutex_}; + if (cache_.size() > size_) { + auto iter = cache_.begin(); + std::advance(iter, rand_.Next() % cache_.size()); + cache_.erase(iter); + } + cache_.emplace(log_number, seq_number, block_index); + } + + bool Lookup(uint64_t log_number, uint64_t seq_number, uint64_t* block_index) { + std::lock_guard lk{mutex_}; + const static uint64_t max_block_index = + std::numeric_limits::max(); + auto iter = cache_.lower_bound( + SeqWithFileBlockIdx{log_number, seq_number, max_block_index}); + if (iter == cache_.end()) { + return false; + } + if (log_number == iter->log_number && seq_number >= iter->seq_number) { + *block_index = iter->block_index; + return true; + } + return false; + } + + private: + size_t size_; + std::mutex mutex_; + Random32 rand_; + std::set cache_; +}; + class TransactionLogIteratorImpl : public TransactionLogIterator { public: TransactionLogIteratorImpl( @@ -61,7 +115,8 @@ class TransactionLogIteratorImpl : public TransactionLogIterator { const TransactionLogIterator::ReadOptions& read_options, const EnvOptions& soptions, const SequenceNumber seqNum, std::unique_ptr files, VersionSet const* const versions, - const bool seq_per_batch, const std::shared_ptr& io_tracer); + const bool seq_per_batch, const std::shared_ptr& io_tracer, + const std::shared_ptr& transaction_log_seq_cache); bool Valid() override; @@ -92,6 +147,7 @@ class TransactionLogIteratorImpl : public TransactionLogIterator { std::unique_ptr current_batch_; std::unique_ptr current_log_reader_; std::string scratch_; + std::shared_ptr transaction_log_seq_cache_; Status OpenLogFile(const LogFile* log_file, std::unique_ptr* file); @@ -123,6 +179,6 @@ class TransactionLogIteratorImpl : public TransactionLogIterator { bool IsBatchExpected(const WriteBatch* batch, SequenceNumber expected_seq); // Update current batch if a continuous batch is found. void UpdateCurrentWriteBatch(const Slice& record); - Status OpenLogReader(const LogFile* file); + Status OpenLogReader(const LogFile* file, uint64_t hint_offset); }; } // namespace ROCKSDB_NAMESPACE diff --git a/db/wal_manager.cc b/db/wal_manager.cc index 1f8190b93a9..4c90354c3e6 100644 --- a/db/wal_manager.cc +++ b/db/wal_manager.cc @@ -125,7 +125,7 @@ Status WalManager::GetUpdatesSince( } iter->reset(new TransactionLogIteratorImpl( wal_dir_, &db_options_, read_options, file_options_, seq, - std::move(wal_files), version_set, seq_per_batch_, io_tracer_)); + std::move(wal_files), version_set, seq_per_batch_, io_tracer_, transaction_log_seq_cache_)); return (*iter)->status(); } diff --git a/db/wal_manager.h b/db/wal_manager.h index d8acba8afa3..758f3f451d6 100644 --- a/db/wal_manager.h +++ b/db/wal_manager.h @@ -17,6 +17,7 @@ #include #include +#include "db/transaction_log_impl.h" #include "db/version_set.h" #include "file/file_util.h" #include "options/db_options.h" @@ -47,7 +48,8 @@ class WalManager { seq_per_batch_(seq_per_batch), wal_dir_(db_options_.GetWalDir()), wal_in_db_path_(db_options_.IsWalDirSameAsDBPath()), - io_tracer_(io_tracer) {} + io_tracer_(io_tracer), + transaction_log_seq_cache_(std::make_shared(128)) {} Status GetSortedWalFiles(VectorLogPtr& files); @@ -132,6 +134,9 @@ class WalManager { static constexpr uint64_t kDefaultIntervalToDeleteObsoleteWAL = 600; std::shared_ptr io_tracer_; + + // cache for sequence to log block index + std::shared_ptr transaction_log_seq_cache_; }; } // namespace ROCKSDB_NAMESPACE diff --git a/include/rocksdb/transaction_log.h b/include/rocksdb/transaction_log.h index e13ad8f80a5..6e6a4fcafcc 100644 --- a/include/rocksdb/transaction_log.h +++ b/include/rocksdb/transaction_log.h @@ -113,7 +113,13 @@ class TransactionLogIterator { // Default: true bool verify_checksums_; - ReadOptions() : verify_checksums_(true) {} + // if true, the mapping of db sequence to WAL file block index will be + // cached. This prevents the need to read from the beginning of the target + // wal log when GetUpdatesSince() is called. + // Default: true + bool with_cache_; + + ReadOptions() : verify_checksums_(true), with_cache_(true) {} explicit ReadOptions(bool verify_checksums) : verify_checksums_(verify_checksums) {}