Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Copy current WAL in consistent state during checkpoint #12671

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 37 additions & 9 deletions db/db_filesnapshot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
// (found in the LICENSE.Apache file in the root directory).
//


#include <algorithm>
#include <cstdint>
#include <memory>
Expand Down Expand Up @@ -317,6 +316,20 @@ Status DBImpl::GetLiveFilesStorageInfo(
const uint64_t options_size = versions_->options_file_size_;
const uint64_t min_log_num = MinLogNumberToKeep();

// If there is an active log writer, capture current log number and its
// current size (excluding incomplete records at the log tail), in order to
// return size of the current WAL file in a consistent state.
log_write_mutex_.Lock();
const uint64_t current_log_num = logfile_number_;
// With `manual_wal_flush` enabled, this function can return size of the file,
// including yet not flushed data.
// But we're calling `FlushWAL()` below, so it will be flushed and actual
// size of the WAL file will be greater or equal than the one we capture here.
const uint64_t current_log_aligned_len =
logs_.empty() ? 0
: logs_.back().writer->get_latest_complete_record_offset();
log_write_mutex_.Unlock();

mutex_.Unlock();

std::string manifest_fname = DescriptorFileName(manifest_number);
Expand Down Expand Up @@ -399,27 +412,42 @@ Status DBImpl::GetLiveFilesStorageInfo(
return s;
}

TEST_SYNC_POINT("DBImpl::GetLiveFilesStorageInfo:AfterGettingLiveWalFiles");

size_t wal_count = live_wal_files.size();
// Link WAL files. Copy exact size of last one because it is the only one
// that has changes after the last flush.
auto wal_dir = immutable_db_options_.GetWalDir();
for (size_t i = 0; s.ok() && i < wal_count; ++i) {
const uint64_t log_num = live_wal_files[i]->LogNumber();
// Indicates whether this is a new WAL, created after we've captured current
// log number under the mutex.
const bool new_wal = current_log_num != 0 && log_num > current_log_num;
if ((live_wal_files[i]->Type() == kAliveLogFile) &&
(!flush_memtable || live_wal_files[i]->LogNumber() >= min_log_num)) {
(!flush_memtable || log_num >= min_log_num) && !new_wal) {
results.emplace_back();
LiveFileStorageInfo& info = results.back();
auto f = live_wal_files[i]->PathName();
assert(!f.empty() && f[0] == '/');
info.relative_filename = f.substr(1);
info.directory = wal_dir;
info.file_number = live_wal_files[i]->LogNumber();
info.file_number = log_num;
info.file_type = kWalFile;
info.size = live_wal_files[i]->SizeFileBytes();
// Trim the log either if its the last one, or log file recycling is
// enabled. In the latter case, a hard link doesn't prevent the file
// from being renamed and recycled. So we need to copy it instead.
info.trim_to_size = (i + 1 == wal_count) ||
(immutable_db_options_.recycle_log_file_num > 0);

if (current_log_num == info.file_number) {
// Data can be written into the current log file while we're taking a
// checkpoint, so we need to copy it and trim its size to the consistent
// state, captured under the mutex.
info.size = current_log_aligned_len;
info.trim_to_size = true;
} else {
info.size = live_wal_files[i]->SizeFileBytes();
// Trim the log if log file recycling is enabled. In this case, a hard
// link doesn't prevent the file from being renamed and recycled.
// So we need to copy it instead.
info.trim_to_size = immutable_db_options_.recycle_log_file_num > 0;
}

if (opts.include_checksum_info) {
info.file_checksum_func_name = kUnknownFileChecksumFuncName;
info.file_checksum = kUnknownFileChecksum;
Expand Down
18 changes: 18 additions & 0 deletions db/log_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -800,6 +800,24 @@ TEST_P(LogTest, TimestampSizeRecordPadding) {
CheckRecordAndTimestampSize(second_str, ts_sz);
}

TEST_P(LogTest, GetLatestCompleteRecordOffset) {
ASSERT_OK(writer_->AddCompressionTypeRecord(WriteOptions()));
ASSERT_EQ(writer_->get_latest_complete_record_offset(),
writer_->TEST_block_offset());

UnorderedMap<uint32_t, size_t> ts_sz = {
{2, sizeof(uint64_t)},
};
ASSERT_OK(
writer_->MaybeAddUserDefinedTimestampSizeRecord(WriteOptions(), ts_sz));
ASSERT_EQ(writer_->get_latest_complete_record_offset(),
writer_->TEST_block_offset());

Write("foo");
ASSERT_EQ(writer_->get_latest_complete_record_offset(),
writer_->TEST_block_offset());
}

// Do NOT enable compression for this instantiation.
INSTANTIATE_TEST_CASE_P(
Log, LogTest,
Expand Down
22 changes: 20 additions & 2 deletions db/log_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ Writer::Writer(std::unique_ptr<WritableFileWriter>&& dest, uint64_t log_number,
CompressionType compression_type)
: dest_(std::move(dest)),
block_offset_(0),
latest_complete_record_offset_(0),
log_number_(log_number),
recycle_log_files_(recycle_log_files),
// Header size varies depending on whether we are recycling or not.
Expand Down Expand Up @@ -155,6 +156,8 @@ IOStatus Writer::AddRecord(const WriteOptions& write_options,
type = recycle_log_files_ ? kRecyclableFirstType : kFirstType;
} else if (end) {
type = recycle_log_files_ ? kRecyclableLastType : kLastType;
TEST_SYNC_POINT("Writer::AddRecord:BeforeLastFragmentWrite1");
TEST_SYNC_POINT("Writer::AddRecord:BeforeLastFragmentWrite2");
} else {
type = recycle_log_files_ ? kRecyclableMiddleType : kMiddleType;
}
Expand All @@ -171,6 +174,11 @@ IOStatus Writer::AddRecord(const WriteOptions& write_options,
}
}

if (s.ok()) {
latest_complete_record_offset_.store(dest_->GetFileSize(),
std::memory_order_release);
}

return s;
}

Expand Down Expand Up @@ -211,6 +219,9 @@ IOStatus Writer::AddCompressionTypeRecord(const WriteOptions& write_options) {
compressed_buffer_ =
std::unique_ptr<char[]>(new char[max_output_buffer_len]);
assert(compressed_buffer_);

latest_complete_record_offset_.store(dest_->GetFileSize(),
std::memory_order_release);
} else {
// Disable compression if the record could not be added.
compression_type_ = kNoCompression;
Expand Down Expand Up @@ -260,8 +271,15 @@ IOStatus Writer::MaybeAddUserDefinedTimestampSizeRecord(
block_offset_ = 0;
}

return EmitPhysicalRecord(write_options, type, encoded.data(),
encoded.size());
IOStatus s =
EmitPhysicalRecord(write_options, type, encoded.data(), encoded.size());

if (s.ok()) {
latest_complete_record_offset_.store(dest_->GetFileSize(),
std::memory_order_release);
}

return s;
}

bool Writer::BufferIsEmpty() { return dest_->BufferIsEmpty(); }
Expand Down
11 changes: 11 additions & 0 deletions db/log_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,14 @@ class Writer {
WritableFileWriter* file() { return dest_.get(); }
const WritableFileWriter* file() const { return dest_.get(); }

// Get current length of the log, excluding incomplete records at the
// tail, in bytes.
// Note: if `manual_wal_flush` is enabled, this value can include buffered
// data, which has not been flushed yet.
uint64_t get_latest_complete_record_offset() const {
return latest_complete_record_offset_.load(std::memory_order_acquire);
}

uint64_t get_log_number() const { return log_number_; }

IOStatus WriteBuffer(const WriteOptions& write_options);
Expand All @@ -114,6 +122,9 @@ class Writer {
private:
std::unique_ptr<WritableFileWriter> dest_;
size_t block_offset_; // Current offset in block
// Offset in the file, which corresponds to the end offset of the latest added
// record.
std::atomic<uint64_t> latest_complete_record_offset_;
uint64_t log_number_;
bool recycle_log_files_;
int header_size_;
Expand Down
5 changes: 5 additions & 0 deletions file/writable_file_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ IOStatus WritableFileWriter::Append(const IOOptions& opts, const Slice& data,
set_seen_error();
return s;
}

TEST_SYNC_POINT(
"WritableFileWriter::Append:FlushAfterBufferCapacityLimitReached1");
TEST_SYNC_POINT(
"WritableFileWriter::Append:FlushAfterBufferCapacityLimitReached2");
}
assert(buf_.CurrentSize() == 0);
}
Expand Down
122 changes: 122 additions & 0 deletions utilities/checkpoint/checkpoint_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -975,6 +975,128 @@ TEST_F(CheckpointTest, PutRaceWithCheckpointTrackedWalSync) {
Close();
}

// Parameterized checkpoint test.
// Parameter specifies whether manual WAL flush is enabled.
class CheckpointTestWithParams : public CheckpointTest,
public ::testing::WithParamInterface<bool> {
public:
CheckpointTestWithParams() : CheckpointTest() {}
};

INSTANTIATE_TEST_CASE_P(Checkpoint, CheckpointTestWithParams,
::testing::Bool());

// Reproduces a race condition, where checkpoint operation gets size of the
// active WAL using stat, when WAL is in inconsistent state - when last write to
// the file appended a non-last fragment of a record. Such checkpoint cannot be
// recovered with WAL recovery mode `kAbsoluteConsistency`, if WAL is copied
// as-is. It fails with 'error reading trailing data due to encountering EOF'
TEST_P(CheckpointTestWithParams, CheckpointWithFragmentedRecordInWal) {
Options options = CurrentOptions();
options.manual_wal_flush = GetParam();
options.wal_recovery_mode = WALRecoveryMode::kAbsoluteConsistency;
// For simplicity of this test, reduce buffer size.
// Buffer size needs to be smaller than the size of data we put during
// checkpoint, so that record fragments get flushed to the file once buffer
// reaches its limit.
// This race condition can be reproduced with default 1M buffer size, too.
options.writable_file_max_buffer_size = log::kBlockSize / 2;

Reopen(options);

Random rnd(42);

ASSERT_OK(Put("foo1", rnd.RandomString(1024)));

ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
{"CheckpointImpl::CreateCustomCheckpoint:AfterGetLive1",
"CheckpointTest::CheckpointWithFragmentedRecordInWal:BeforePut"},
{"Writer::AddRecord:BeforeLastFragmentWrite1",
"CheckpointImpl::CreateCustomCheckpoint:AfterGetLive2"},
{"DBImpl::GetLiveFilesStorageInfo:AfterGettingLiveWalFiles",
"Writer::AddRecord:BeforeLastFragmentWrite2"},
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();

std::function<void()> create_checkpoint_func = [&]() {
std::unique_ptr<Checkpoint> checkpoint;
Checkpoint* checkpoint_ptr;
ASSERT_OK(Checkpoint::Create(db_, &checkpoint_ptr));
checkpoint.reset(checkpoint_ptr);
const uint64_t wal_size_for_flush = std::numeric_limits<uint64_t>::max();
ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_, wal_size_for_flush));
};

port::Thread create_checkpoint_thread(create_checkpoint_func);

TEST_SYNC_POINT(
"CheckpointTest::CheckpointWithFragmentedRecordInWal:BeforePut");
ASSERT_OK(Put("foo2", rnd.RandomString(log::kBlockSize)));

create_checkpoint_thread.join();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();

Close();

DB* snapshot_db = nullptr;
ASSERT_OK(DB::Open(options, snapshot_name_, &snapshot_db));
ASSERT_OK(snapshot_db->Close());
delete snapshot_db;
}

// Reproduces a race condition, where checkpoint operation gets size of the
// active WAL using stat, when WAL is in inconsistent state - when last write to
// the file appended an incomplete fragment of a record. Such checkpoint cannot
// be recovered with WAL recovery mode `kAbsoluteConsistency`, if WAL is copied
// as-is. It fails with 'truncated record body' error.
TEST_P(CheckpointTestWithParams, CheckpointWithTruncatedRecordBodyInWal) {
Options options = CurrentOptions();
options.manual_wal_flush = GetParam();
options.wal_recovery_mode = WALRecoveryMode::kAbsoluteConsistency;

Reopen(options);

Random rnd(42);

ASSERT_OK(Put("foo1", rnd.RandomString(1024)));

ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
{"CheckpointImpl::CreateCustomCheckpoint:AfterGetLive1",
"CheckpointTest::CheckpointWithTruncatedRecordBodyInWal:BeforePut"},
{"WritableFileWriter::Append:FlushAfterBufferCapacityLimitReached1",
"CheckpointImpl::CreateCustomCheckpoint:AfterGetLive2"},
{"DBImpl::GetLiveFilesStorageInfo:AfterGettingLiveWalFiles",
"WritableFileWriter::Append:FlushAfterBufferCapacityLimitReached2"},
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();

std::function<void()> create_checkpoint_func = [&]() {
std::unique_ptr<Checkpoint> checkpoint;
Checkpoint* checkpoint_ptr;
ASSERT_OK(Checkpoint::Create(db_, &checkpoint_ptr));
checkpoint.reset(checkpoint_ptr);
const uint64_t wal_size_for_flush = std::numeric_limits<uint64_t>::max();
ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_, wal_size_for_flush));
};

port::Thread create_checkpoint_thread(create_checkpoint_func);

TEST_SYNC_POINT(
"CheckpointTest::CheckpointWithTruncatedRecordBodyInWal:BeforePut");
ASSERT_OK(Put("foo2",
rnd.RandomString((int)options.writable_file_max_buffer_size)));

create_checkpoint_thread.join();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();

Close();

DB* snapshot_db = nullptr;
ASSERT_OK(DB::Open(options, snapshot_name_, &snapshot_db));
ASSERT_OK(snapshot_db->Close());
delete snapshot_db;
}

} // namespace ROCKSDB_NAMESPACE

int main(int argc, char** argv) {
Expand Down
Loading