Skip to content

Commit

Permalink
Avoid copying active WAL in inconsistent state during checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
andlr committed May 19, 2024
1 parent 3ed46e0 commit 65d8634
Show file tree
Hide file tree
Showing 6 changed files with 211 additions and 11 deletions.
44 changes: 35 additions & 9 deletions db/db_filesnapshot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
// (found in the LICENSE.Apache file in the root directory).
//


#include <algorithm>
#include <cstdint>
#include <memory>
Expand Down Expand Up @@ -311,6 +310,20 @@ Status DBImpl::GetLiveFilesStorageInfo(
const uint64_t options_size = versions_->options_file_size_;
const uint64_t min_log_num = MinLogNumberToKeep();

// If there is an active log writer, capture current log number and its
// current size (excluding incomplete records at the log tail), in order to
// return size of the current WAL file in a consistent state.
log_write_mutex_.Lock();
const uint64_t current_log_num = logfile_number_;
// With `manual_wal_flush` enabled, this function can return size of the file,
// including yet not flushed data.
// But we're calling `FlushWAL()` below, so it will be flushed and actual
// size of the WAL file will be greater or equal than the one we capture here.
const uint64_t current_log_aligned_len =
logs_.empty() ? 0
: logs_.back().writer->get_latest_complete_record_offset();
log_write_mutex_.Unlock();

mutex_.Unlock();

std::string manifest_fname = DescriptorFileName(manifest_number);
Expand Down Expand Up @@ -393,27 +406,40 @@ Status DBImpl::GetLiveFilesStorageInfo(
return s;
}

TEST_SYNC_POINT("DBImpl::GetLiveFilesStorageInfo:AfterGettingLiveWalFiles");

size_t wal_size = live_wal_files.size();
// Link WAL files. Copy exact size of last one because it is the only one
// that has changes after the last flush.
auto wal_dir = immutable_db_options_.GetWalDir();
for (size_t i = 0; s.ok() && i < wal_size; ++i) {
const uint64_t log_num = live_wal_files[i]->LogNumber();
// Indicates whether this is a new WAL, created after we've captured current
// log number under the mutex.
const bool new_wal = current_log_num != 0 && log_num > current_log_num;
if ((live_wal_files[i]->Type() == kAliveLogFile) &&
(!flush_memtable || live_wal_files[i]->LogNumber() >= min_log_num)) {
(!flush_memtable || log_num >= min_log_num) && !new_wal) {
results.emplace_back();
LiveFileStorageInfo& info = results.back();
auto f = live_wal_files[i]->PathName();
assert(!f.empty() && f[0] == '/');
info.relative_filename = f.substr(1);
info.directory = wal_dir;
info.file_number = live_wal_files[i]->LogNumber();
info.file_number = log_num;
info.file_type = kWalFile;
info.size = live_wal_files[i]->SizeFileBytes();
// Trim the log either if its the last one, or log file recycling is
// enabled. In the latter case, a hard link doesn't prevent the file
// from being renamed and recycled. So we need to copy it instead.
info.trim_to_size = (i + 1 == wal_size) ||
(immutable_db_options_.recycle_log_file_num > 0);
if (current_log_num == info.file_number) {
// Data can be written into the current log file while we're taking a
// checkpoint, so we need to copy it and trim its size to the consistent
// state, captured under the mutex.
info.size = current_log_aligned_len;
info.trim_to_size = true;
} else {
info.size = live_wal_files[i]->SizeFileBytes();
// Trim the log if log file recycling is enabled. In this case, a hard
// link doesn't prevent the file from being renamed and recycled.
// So we need to copy it instead.
info.trim_to_size = immutable_db_options_.recycle_log_file_num > 0;
}
if (opts.include_checksum_info) {
info.file_checksum_func_name = kUnknownFileChecksumFuncName;
info.file_checksum = kUnknownFileChecksum;
Expand Down
18 changes: 18 additions & 0 deletions db/log_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -800,6 +800,24 @@ TEST_P(LogTest, TimestampSizeRecordPadding) {
CheckRecordAndTimestampSize(second_str, ts_sz);
}

TEST_P(LogTest, GetLatestCompleteRecordOffset) {
ASSERT_OK(writer_->AddCompressionTypeRecord(WriteOptions()));
ASSERT_EQ(writer_->get_latest_complete_record_offset(),
writer_->TEST_block_offset());

UnorderedMap<uint32_t, size_t> ts_sz = {
{2, sizeof(uint64_t)},
};
ASSERT_OK(
writer_->MaybeAddUserDefinedTimestampSizeRecord(WriteOptions(), ts_sz));
ASSERT_EQ(writer_->get_latest_complete_record_offset(),
writer_->TEST_block_offset());

Write("foo");
ASSERT_EQ(writer_->get_latest_complete_record_offset(),
writer_->TEST_block_offset());
}

// Do NOT enable compression for this instantiation.
INSTANTIATE_TEST_CASE_P(
Log, LogTest,
Expand Down
22 changes: 20 additions & 2 deletions db/log_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ Writer::Writer(std::unique_ptr<WritableFileWriter>&& dest, uint64_t log_number,
CompressionType compression_type)
: dest_(std::move(dest)),
block_offset_(0),
latest_complete_record_offset_(0),
log_number_(log_number),
recycle_log_files_(recycle_log_files),
// Header size varies depending on whether we are recycling or not.
Expand Down Expand Up @@ -155,6 +156,8 @@ IOStatus Writer::AddRecord(const WriteOptions& write_options,
type = recycle_log_files_ ? kRecyclableFirstType : kFirstType;
} else if (end) {
type = recycle_log_files_ ? kRecyclableLastType : kLastType;
TEST_SYNC_POINT("Writer::AddRecord:BeforeLastFragmentWrite1");
TEST_SYNC_POINT("Writer::AddRecord:BeforeLastFragmentWrite2");
} else {
type = recycle_log_files_ ? kRecyclableMiddleType : kMiddleType;
}
Expand All @@ -171,6 +174,11 @@ IOStatus Writer::AddRecord(const WriteOptions& write_options,
}
}

if (s.ok()) {
latest_complete_record_offset_.store(dest_->GetFileSize(),
std::memory_order_release);
}

return s;
}

Expand Down Expand Up @@ -211,6 +219,9 @@ IOStatus Writer::AddCompressionTypeRecord(const WriteOptions& write_options) {
compressed_buffer_ =
std::unique_ptr<char[]>(new char[max_output_buffer_len]);
assert(compressed_buffer_);

latest_complete_record_offset_.store(dest_->GetFileSize(),
std::memory_order_release);
} else {
// Disable compression if the record could not be added.
compression_type_ = kNoCompression;
Expand Down Expand Up @@ -260,8 +271,15 @@ IOStatus Writer::MaybeAddUserDefinedTimestampSizeRecord(
block_offset_ = 0;
}

return EmitPhysicalRecord(write_options, type, encoded.data(),
encoded.size());
IOStatus s =
EmitPhysicalRecord(write_options, type, encoded.data(), encoded.size());

if (s.ok()) {
latest_complete_record_offset_.store(dest_->GetFileSize(),
std::memory_order_release);
}

return s;
}

bool Writer::BufferIsEmpty() { return dest_->BufferIsEmpty(); }
Expand Down
11 changes: 11 additions & 0 deletions db/log_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,14 @@ class Writer {
WritableFileWriter* file() { return dest_.get(); }
const WritableFileWriter* file() const { return dest_.get(); }

// Get current length of the log, excluding incomplete records at the
// tail, in bytes.
// Note: if `manual_wal_flush` is enabled, this value can include buffered
// data, which has not been flushed yet.
uint64_t get_latest_complete_record_offset() const {
return latest_complete_record_offset_.load(std::memory_order_acquire);
}

uint64_t get_log_number() const { return log_number_; }

IOStatus WriteBuffer(const WriteOptions& write_options);
Expand All @@ -114,6 +122,9 @@ class Writer {
private:
std::unique_ptr<WritableFileWriter> dest_;
size_t block_offset_; // Current offset in block
// Offset in the file, which corresponds to the end offset of the latest added
// record.
std::atomic<uint64_t> latest_complete_record_offset_;
uint64_t log_number_;
bool recycle_log_files_;
int header_size_;
Expand Down
5 changes: 5 additions & 0 deletions file/writable_file_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ IOStatus WritableFileWriter::Append(const IOOptions& opts, const Slice& data,
set_seen_error();
return s;
}

TEST_SYNC_POINT(
"WritableFileWriter::Append:FlushAfterBufferCapacityLimitReached1");
TEST_SYNC_POINT(
"WritableFileWriter::Append:FlushAfterBufferCapacityLimitReached2");
}
assert(buf_.CurrentSize() == 0);
}
Expand Down
122 changes: 122 additions & 0 deletions utilities/checkpoint/checkpoint_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -975,6 +975,128 @@ TEST_F(CheckpointTest, PutRaceWithCheckpointTrackedWalSync) {
Close();
}

// Parameterized checkpoint test.
// Parameter specifies whether manual WAL flush is enabled.
class CheckpointTestWithParams : public CheckpointTest,
public ::testing::WithParamInterface<bool> {
public:
CheckpointTestWithParams() : CheckpointTest() {}
};

INSTANTIATE_TEST_CASE_P(Checkpoint, CheckpointTestWithParams,
::testing::Bool());

// Reproduces a race condition, where checkpoint operation gets size of the
// active WAL using stat, when WAL is in inconsistent state - when last write to
// the file appended a non-last fragment of a record. Such checkpoint cannot be
// recovered with WAL recovery mode `kAbsoluteConsistency`, if WAL is copied
// as-is. It fails with 'error reading trailing data due to encountering EOF'
TEST_P(CheckpointTestWithParams, CheckpointWithFragmentedRecordInWal) {
Options options = CurrentOptions();
options.manual_wal_flush = GetParam();
options.wal_recovery_mode = WALRecoveryMode::kAbsoluteConsistency;
// For simplicity of this test, reduce buffer size.
// Buffer size needs to be smaller than the size of data we put during
// checkpoint, so that record fragments get flushed to the file once buffer
// reaches its limit.
// This race condition can be reproduced with default 1M buffer size, too.
options.writable_file_max_buffer_size = log::kBlockSize / 2;

Reopen(options);

Random rnd(42);

ASSERT_OK(Put("foo1", rnd.RandomString(1024)));

ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
{"CheckpointImpl::CreateCustomCheckpoint:AfterGetLive1",
"CheckpointTest::CheckpointWithFragmentedRecordInWal:BeforePut"},
{"Writer::AddRecord:BeforeLastFragmentWrite1",
"CheckpointImpl::CreateCustomCheckpoint:AfterGetLive2"},
{"DBImpl::GetLiveFilesStorageInfo:AfterGettingLiveWalFiles",
"Writer::AddRecord:BeforeLastFragmentWrite2"},
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();

std::function<void()> create_checkpoint_func = [&]() {
std::unique_ptr<Checkpoint> checkpoint;
Checkpoint* checkpoint_ptr;
ASSERT_OK(Checkpoint::Create(db_, &checkpoint_ptr));
checkpoint.reset(checkpoint_ptr);
const uint64_t wal_size_for_flush = std::numeric_limits<uint64_t>::max();
ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_, wal_size_for_flush));
};

port::Thread create_checkpoint_thread(create_checkpoint_func);

TEST_SYNC_POINT(
"CheckpointTest::CheckpointWithFragmentedRecordInWal:BeforePut");
ASSERT_OK(Put("foo2", rnd.RandomString(log::kBlockSize)));

create_checkpoint_thread.join();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();

Close();

DB* snapshot_db = nullptr;
ASSERT_OK(DB::Open(options, snapshot_name_, &snapshot_db));
ASSERT_OK(snapshot_db->Close());
delete snapshot_db;
}

// Reproduces a race condition, where checkpoint operation gets size of the
// active WAL using stat, when WAL is in inconsistent state - when last write to
// the file appended an incomplete fragment of a record. Such checkpoint cannot
// be recovered with WAL recovery mode `kAbsoluteConsistency`, if WAL is copied
// as-is. It fails with 'truncated record body' error.
TEST_P(CheckpointTestWithParams, CheckpointWithTruncatedRecordBodyInWal) {
Options options = CurrentOptions();
options.manual_wal_flush = GetParam();
options.wal_recovery_mode = WALRecoveryMode::kAbsoluteConsistency;

Reopen(options);

Random rnd(42);

ASSERT_OK(Put("foo1", rnd.RandomString(1024)));

ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
{"CheckpointImpl::CreateCustomCheckpoint:AfterGetLive1",
"CheckpointTest::CheckpointWithTruncatedRecordBodyInWal:BeforePut"},
{"WritableFileWriter::Append:FlushAfterBufferCapacityLimitReached1",
"CheckpointImpl::CreateCustomCheckpoint:AfterGetLive2"},
{"DBImpl::GetLiveFilesStorageInfo:AfterGettingLiveWalFiles",
"WritableFileWriter::Append:FlushAfterBufferCapacityLimitReached2"},
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();

std::function<void()> create_checkpoint_func = [&]() {
std::unique_ptr<Checkpoint> checkpoint;
Checkpoint* checkpoint_ptr;
ASSERT_OK(Checkpoint::Create(db_, &checkpoint_ptr));
checkpoint.reset(checkpoint_ptr);
const uint64_t wal_size_for_flush = std::numeric_limits<uint64_t>::max();
ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_, wal_size_for_flush));
};

port::Thread create_checkpoint_thread(create_checkpoint_func);

TEST_SYNC_POINT(
"CheckpointTest::CheckpointWithTruncatedRecordBodyInWal:BeforePut");
ASSERT_OK(Put("foo2",
rnd.RandomString((int)options.writable_file_max_buffer_size)));

create_checkpoint_thread.join();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();

Close();

DB* snapshot_db = nullptr;
ASSERT_OK(DB::Open(options, snapshot_name_, &snapshot_db));
ASSERT_OK(snapshot_db->Close());
delete snapshot_db;
}

} // namespace ROCKSDB_NAMESPACE

int main(int argc, char** argv) {
Expand Down

0 comments on commit 65d8634

Please sign in to comment.