Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache the mapping of sequence to log block index in transaction log iterator #12538

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
102 changes: 100 additions & 2 deletions db/db_log_iter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,20 @@
// Introduction of SyncPoint effectively disabled building and running this test
// in Release build.
// which is a pity, it is a good test
#include <gtest/gtest.h>

#include <atomic>
#include <cstdint>
#include <memory>
#include <string>

#include "db/db_test_util.h"
#include "env/mock_env.h"
#include "log_format.h"
#include "port/stack_trace.h"
#include "rocksdb/status.h"
#include "rocksdb/transaction_log.h"
#include "test_util/testharness.h"
#include "util/atomic.h"

namespace ROCKSDB_NAMESPACE {
Expand All @@ -24,9 +34,9 @@ class DBTestXactLogIterator : public DBTestBase {
: DBTestBase("db_log_iter_test", /*env_do_fsync=*/true) {}

std::unique_ptr<TransactionLogIterator> OpenTransactionLogIter(
const SequenceNumber seq) {
const SequenceNumber seq, TransactionLogIterator::ReadOptions ro = {}) {
std::unique_ptr<TransactionLogIterator> iter;
Status status = dbfull()->GetUpdatesSince(seq, &iter);
Status status = dbfull()->GetUpdatesSince(seq, &iter, ro);
EXPECT_OK(status);
EXPECT_TRUE(iter->Valid());
return iter;
Expand Down Expand Up @@ -61,6 +71,37 @@ void ExpectRecords(const int expected_no_records,
ReadRecords(iter, num_records);
ASSERT_EQ(num_records, expected_no_records);
}

Status GetLogOffset(Options options, std::string wal_file, uint64_t log_num,
uint64_t seq, uint64_t* offset) {
const auto& fs = options.env->GetFileSystem();
std::unique_ptr<SequentialFileReader> wal_file_reader;
Status s = SequentialFileReader::Create(fs, wal_file, FileOptions(options),
&wal_file_reader, nullptr, nullptr);
if (!s.ok()) {
return s;
}
log::Reader reader(options.info_log, std::move(wal_file_reader), nullptr,
true, log_num);
Slice record;
WriteBatch batch;
std::string scratch;
while (reader.ReadRecord(&record, &scratch)) {
s = WriteBatchInternal::SetContents(&batch, record);
if (!s.ok()) {
break;
}
auto cur_seq = WriteBatchInternal::Sequence(&batch);
if (cur_seq > seq) {
break;
}
if (WriteBatchInternal::Sequence(&batch) == seq) {
*offset = reader.LastRecordOffset();
return Status::OK();
}
}
return Status::NotFound();
}
} // anonymous namespace

TEST_F(DBTestXactLogIterator, TransactionLogIterator) {
Expand Down Expand Up @@ -330,6 +371,63 @@ TEST_F(DBTestXactLogIterator, TransactionLogIteratorBlobs) {
"Delete(0, key2)",
handler.seen);
}

TEST_F(DBTestXactLogIterator, TransactionIteratorCache) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test would pass before this change too. Maybe check that cache is effective by check some statistics on IO.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update it, thx for review

Options options = OptionsForLogIterTest();
DestroyAndReopen(options);
ASSERT_OK(dbfull()->Put({}, "key1", DummyString(log::kBlockSize, 'a')));
ASSERT_OK(dbfull()->Put({}, "key2", DummyString(log::kBlockSize, 'b')));
ASSERT_OK(dbfull()->Put({}, "key3", DummyString(log::kBlockSize, 'c')));
std::atomic_bool hit_cache = false;
std::atomic_uint64_t sequence = 2;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"TransactionLogIteratorImpl:OpenLogReader:Skip", [&](void* arg) {
hit_cache = true;
std::unique_ptr<WalFile> wal_file;
EXPECT_OK(dbfull()->GetCurrentWalFile(&wal_file));
uint64_t offset = 0;
EXPECT_OK(GetLogOffset(options, dbname_ + "/" + wal_file->PathName(),
wal_file->LogNumber(), sequence, &offset));
auto skipped_offset = *static_cast<uint64_t*>(arg);
EXPECT_EQ(skipped_offset, offset / log::kBlockSize * log::kBlockSize);
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
TransactionLogIterator::ReadOptions ro{};
ro.with_cache_ = true;
std::string batch_data;
{
auto iter = OpenTransactionLogIter(sequence, ro);
ASSERT_TRUE(!hit_cache);
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
batch_data = iter->GetBatch().writeBatchPtr->Data();
}
{
// cache should be hit at the start sequence
auto iter = OpenTransactionLogIter(sequence, ro);
ASSERT_TRUE(hit_cache);
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->GetBatch().writeBatchPtr->Data(), batch_data);
// move on iterator to the end sequence
iter->Next();
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
auto batch = iter->GetBatch();
sequence = batch.sequence;
batch_data = batch.writeBatchPtr->Data();
hit_cache = false;
}
{
// cache should be hit at the end sequence
auto iter = OpenTransactionLogIter(sequence, ro);
ASSERT_TRUE(hit_cache);
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->GetBatch().writeBatchPtr->Data(), batch_data);
}
}

} // namespace ROCKSDB_NAMESPACE


Expand Down
19 changes: 14 additions & 5 deletions db/log_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ Reader::Reader(std::shared_ptr<Logger> info_log,
compression_type_record_read_(false),
uncompress_(nullptr),
hash_state_(nullptr),
uncompress_hash_state_(nullptr){}
uncompress_hash_state_(nullptr),
may_skip_first_fragmented_record_(false) {}

Reader::~Reader() {
delete[] backing_store_;
Expand Down Expand Up @@ -112,6 +113,7 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch,
*record = fragment;
last_record_offset_ = prospective_record_offset;
first_record_read_ = true;
may_skip_first_fragmented_record_ = false;
return true;

case kFirstType:
Expand All @@ -130,13 +132,16 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch,
prospective_record_offset = physical_record_offset;
scratch->assign(fragment.data(), fragment.size());
in_fragmented_record = true;
may_skip_first_fragmented_record_ = false;
break;

case kMiddleType:
case kRecyclableMiddleType:
if (!in_fragmented_record) {
ReportCorruption(fragment.size(),
"missing start of fragmented record(1)");
if (!may_skip_first_fragmented_record_) {
ReportCorruption(fragment.size(),
"missing start of fragmented record(1)");
}
} else {
if (record_checksum != nullptr) {
XXH3_64bits_update(hash_state_, fragment.data(), fragment.size());
Expand All @@ -148,8 +153,11 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch,
case kLastType:
case kRecyclableLastType:
if (!in_fragmented_record) {
ReportCorruption(fragment.size(),
"missing start of fragmented record(2)");
if (!may_skip_first_fragmented_record_) {
ReportCorruption(fragment.size(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can also clear skipped_ here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, thx for review

"missing start of fragmented record(2)");
}
may_skip_first_fragmented_record_ = false;
} else {
if (record_checksum != nullptr) {
XXH3_64bits_update(hash_state_, fragment.data(), fragment.size());
Expand Down Expand Up @@ -192,6 +200,7 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch,
scratch->size(),
"user-defined timestamp size record interspersed partial record");
}
may_skip_first_fragmented_record_ = false;
prospective_record_offset = physical_record_offset;
scratch->clear();
last_record_offset_ = prospective_record_offset;
Expand Down
19 changes: 19 additions & 0 deletions db/log_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include "db/log_format.h"
#include "file/sequence_file_reader.h"
#include "rocksdb/io_status.h"
#include "rocksdb/options.h"
#include "rocksdb/slice.h"
#include "rocksdb/status.h"
Expand Down Expand Up @@ -123,6 +124,20 @@ class Reader {
return !first_record_read_ && compression_type_record_read_;
}

IOStatus Skip(uint64_t n) {
if (n == 0) {
return IOStatus::OK();
}
auto s = file_->Skip(n);
if (!s.ok()) {
return s;
}
end_of_buffer_offset_ += n;
may_skip_first_fragmented_record_ = true;
buffer_.clear();
return s;
}

protected:
std::shared_ptr<Logger> info_log_;
const std::unique_ptr<SequentialFileReader> file_;
Expand Down Expand Up @@ -170,6 +185,10 @@ class Reader {
// is only for WAL logs.
UnorderedMap<uint32_t, size_t> recorded_cf_to_ts_sz_;

// if log reader is skipped, may need to drop bytes
// until seek to the first of a record
bool may_skip_first_fragmented_record_;

// Extend record types with the following special values
enum {
kEof = kMaxRecordType + 1,
Expand Down
44 changes: 39 additions & 5 deletions db/transaction_log_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ TransactionLogIteratorImpl::TransactionLogIteratorImpl(
const TransactionLogIterator::ReadOptions& read_options,
const EnvOptions& soptions, const SequenceNumber seq,
std::unique_ptr<VectorWalPtr> files, VersionSet const* const versions,
const bool seq_per_batch, const std::shared_ptr<IOTracer>& io_tracer)
const bool seq_per_batch, const std::shared_ptr<IOTracer>& io_tracer,
const std::shared_ptr<TransactionLogSeqCache>& transaction_log_seq_cache)
: dir_(dir),
options_(options),
read_options_(read_options),
Expand All @@ -32,6 +33,7 @@ TransactionLogIteratorImpl::TransactionLogIteratorImpl(
started_(false),
is_valid_(false),
current_file_index_(0),
transaction_log_seq_cache_(transaction_log_seq_cache),
current_batch_seq_(0),
current_last_seq_(0) {
assert(files_ != nullptr);
Expand Down Expand Up @@ -113,8 +115,13 @@ void TransactionLogIteratorImpl::SeekToStartSequence(uint64_t start_file_index,
} else if (!current_status_.ok()) {
return;
}
Status s =
OpenLogReader(files_->at(static_cast<size_t>(start_file_index)).get());
auto& file = files_->at(static_cast<size_t>(start_file_index));
uint64_t hint_block_index{0};
if (read_options_.with_cache_) {
transaction_log_seq_cache_->Lookup(
file->LogNumber(), starting_sequence_number_, &hint_block_index);
}
Status s = OpenLogReader(file.get(), hint_block_index * log::kBlockSize);
if (!s.ok()) {
current_status_ = s;
reporter_.Info(current_status_.ToString().c_str());
Expand Down Expand Up @@ -207,7 +214,13 @@ void TransactionLogIteratorImpl::NextImpl(bool internal) {
// Open the next file
if (current_file_index_ < files_->size() - 1) {
++current_file_index_;
Status s = OpenLogReader(files_->at(current_file_index_).get());
auto& file = files_->at(static_cast<size_t>(current_file_index_));
uint64_t hint_block_index{0};
if (read_options_.with_cache_) {
transaction_log_seq_cache_->Lookup(
file->LogNumber(), starting_sequence_number_, &hint_block_index);
}
Status s = OpenLogReader(file.get(), hint_block_index * log::kBlockSize);
if (!s.ok()) {
is_valid_ = false;
current_status_ = s;
Expand Down Expand Up @@ -276,12 +289,28 @@ void TransactionLogIteratorImpl::UpdateCurrentWriteBatch(const Slice& record) {
// currentBatchSeq_ can only change here
assert(current_last_seq_ <= versions_->LastSequence());

if (read_options_.with_cache_) {
// cache the mapping of sequence to log block index when seeking to the
// start or end sequence
if ((current_batch_seq_ <= starting_sequence_number_ &&
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the use case to cache for the start sequence?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When there are multiple slaves catching up, the start sequence may not necessarily be incremental; Also, there may be scenarios that do not iterate to the end sequence but calling GetUpdateSince frequently. Cache at the beginning and end sounds reasonable for me

current_last_seq_ >= starting_sequence_number_) ||
current_last_seq_ == versions_->LastSequence()) {
transaction_log_seq_cache_->Insert(
current_log_reader_->GetLogNumber(), current_batch_seq_,
current_log_reader_->LastRecordOffset() / log::kBlockSize);
}
}

TEST_SYNC_POINT_CALLBACK("UpdateCurrentWriteBatch:TransactionLogIteratorImpl",
current_log_reader_.get());

current_batch_ = std::move(batch);
is_valid_ = true;
current_status_ = Status::OK();
}

Status TransactionLogIteratorImpl::OpenLogReader(const WalFile* log_file) {
Status TransactionLogIteratorImpl::OpenLogReader(const WalFile* log_file,
uint64_t hint_offset) {
std::unique_ptr<SequentialFileReader> file;
Status s = OpenLogFile(log_file, &file);
if (!s.ok()) {
Expand All @@ -291,6 +320,11 @@ Status TransactionLogIteratorImpl::OpenLogReader(const WalFile* log_file) {
current_log_reader_.reset(
new log::Reader(options_->info_log, std::move(file), &reporter_,
read_options_.verify_checksums_, log_file->LogNumber()));
if (hint_offset > 0) {
TEST_SYNC_POINT_CALLBACK("TransactionLogIteratorImpl:OpenLogReader:Skip",
&hint_offset);
return current_log_reader_->Skip(hint_offset);
}
return Status::OK();
}
} // namespace ROCKSDB_NAMESPACE