diff --git a/db/db_filesnapshot.cc b/db/db_filesnapshot.cc index e306bf62b53..156baaf1eea 100644 --- a/db/db_filesnapshot.cc +++ b/db/db_filesnapshot.cc @@ -4,7 +4,6 @@ // (found in the LICENSE.Apache file in the root directory). // - #include #include #include @@ -311,6 +310,20 @@ Status DBImpl::GetLiveFilesStorageInfo( const uint64_t options_size = versions_->options_file_size_; const uint64_t min_log_num = MinLogNumberToKeep(); + // If there is an active log writer, capture current log number and its + // current size (excluding incomplete records at the log tail), in order to + // return size of the current WAL file in a consistent state. + log_write_mutex_.Lock(); + const uint64_t current_log_num = logfile_number_; + // With `manual_wal_flush` enabled, this function can return size of the file, + // including yet not flushed data. + // But we're calling `FlushWAL()` below, so it will be flushed and actual + // size of the WAL file will be greater or equal than the one we capture here. + const uint64_t current_log_aligned_len = + logs_.empty() ? 0 + : logs_.back().writer->get_latest_complete_record_offset(); + log_write_mutex_.Unlock(); + mutex_.Unlock(); std::string manifest_fname = DescriptorFileName(manifest_number); @@ -393,27 +406,40 @@ Status DBImpl::GetLiveFilesStorageInfo( return s; } + TEST_SYNC_POINT("DBImpl::GetLiveFilesStorageInfo:AfterGettingLiveWalFiles"); + size_t wal_size = live_wal_files.size(); // Link WAL files. Copy exact size of last one because it is the only one // that has changes after the last flush. auto wal_dir = immutable_db_options_.GetWalDir(); for (size_t i = 0; s.ok() && i < wal_size; ++i) { + const uint64_t log_num = live_wal_files[i]->LogNumber(); + // Indicates whether this is a new WAL, created after we've captured current + // log number under the mutex. + const bool new_wal = current_log_num != 0 && log_num > current_log_num; if ((live_wal_files[i]->Type() == kAliveLogFile) && - (!flush_memtable || live_wal_files[i]->LogNumber() >= min_log_num)) { + (!flush_memtable || log_num >= min_log_num) && !new_wal) { results.emplace_back(); LiveFileStorageInfo& info = results.back(); auto f = live_wal_files[i]->PathName(); assert(!f.empty() && f[0] == '/'); info.relative_filename = f.substr(1); info.directory = wal_dir; - info.file_number = live_wal_files[i]->LogNumber(); + info.file_number = log_num; info.file_type = kWalFile; - info.size = live_wal_files[i]->SizeFileBytes(); - // Trim the log either if its the last one, or log file recycling is - // enabled. In the latter case, a hard link doesn't prevent the file - // from being renamed and recycled. So we need to copy it instead. - info.trim_to_size = (i + 1 == wal_size) || - (immutable_db_options_.recycle_log_file_num > 0); + if (current_log_num == info.file_number) { + // Data can be written into the current log file while we're taking a + // checkpoint, so we need to copy it and trim its size to the consistent + // state, captured under the mutex. + info.size = current_log_aligned_len; + info.trim_to_size = true; + } else { + info.size = live_wal_files[i]->SizeFileBytes(); + // Trim the log if log file recycling is enabled. In this case, a hard + // link doesn't prevent the file from being renamed and recycled. + // So we need to copy it instead. + info.trim_to_size = immutable_db_options_.recycle_log_file_num > 0; + } if (opts.include_checksum_info) { info.file_checksum_func_name = kUnknownFileChecksumFuncName; info.file_checksum = kUnknownFileChecksum; diff --git a/db/log_test.cc b/db/log_test.cc index 57b6f64faa3..6db31a88c79 100644 --- a/db/log_test.cc +++ b/db/log_test.cc @@ -800,6 +800,24 @@ TEST_P(LogTest, TimestampSizeRecordPadding) { CheckRecordAndTimestampSize(second_str, ts_sz); } +TEST_P(LogTest, GetLatestCompleteRecordOffset) { + ASSERT_OK(writer_->AddCompressionTypeRecord(WriteOptions())); + ASSERT_EQ(writer_->get_latest_complete_record_offset(), + writer_->TEST_block_offset()); + + UnorderedMap ts_sz = { + {2, sizeof(uint64_t)}, + }; + ASSERT_OK( + writer_->MaybeAddUserDefinedTimestampSizeRecord(WriteOptions(), ts_sz)); + ASSERT_EQ(writer_->get_latest_complete_record_offset(), + writer_->TEST_block_offset()); + + Write("foo"); + ASSERT_EQ(writer_->get_latest_complete_record_offset(), + writer_->TEST_block_offset()); +} + // Do NOT enable compression for this instantiation. INSTANTIATE_TEST_CASE_P( Log, LogTest, diff --git a/db/log_writer.cc b/db/log_writer.cc index 2cd6bbd788c..5d2aa571b5c 100644 --- a/db/log_writer.cc +++ b/db/log_writer.cc @@ -25,6 +25,7 @@ Writer::Writer(std::unique_ptr&& dest, uint64_t log_number, CompressionType compression_type) : dest_(std::move(dest)), block_offset_(0), + latest_complete_record_offset_(0), log_number_(log_number), recycle_log_files_(recycle_log_files), // Header size varies depending on whether we are recycling or not. @@ -155,6 +156,8 @@ IOStatus Writer::AddRecord(const WriteOptions& write_options, type = recycle_log_files_ ? kRecyclableFirstType : kFirstType; } else if (end) { type = recycle_log_files_ ? kRecyclableLastType : kLastType; + TEST_SYNC_POINT("Writer::AddRecord:BeforeLastFragmentWrite1"); + TEST_SYNC_POINT("Writer::AddRecord:BeforeLastFragmentWrite2"); } else { type = recycle_log_files_ ? kRecyclableMiddleType : kMiddleType; } @@ -171,6 +174,11 @@ IOStatus Writer::AddRecord(const WriteOptions& write_options, } } + if (s.ok()) { + latest_complete_record_offset_.store(dest_->GetFileSize(), + std::memory_order_release); + } + return s; } @@ -211,6 +219,9 @@ IOStatus Writer::AddCompressionTypeRecord(const WriteOptions& write_options) { compressed_buffer_ = std::unique_ptr(new char[max_output_buffer_len]); assert(compressed_buffer_); + + latest_complete_record_offset_.store(dest_->GetFileSize(), + std::memory_order_release); } else { // Disable compression if the record could not be added. compression_type_ = kNoCompression; @@ -260,8 +271,15 @@ IOStatus Writer::MaybeAddUserDefinedTimestampSizeRecord( block_offset_ = 0; } - return EmitPhysicalRecord(write_options, type, encoded.data(), - encoded.size()); + IOStatus s = + EmitPhysicalRecord(write_options, type, encoded.data(), encoded.size()); + + if (s.ok()) { + latest_complete_record_offset_.store(dest_->GetFileSize(), + std::memory_order_release); + } + + return s; } bool Writer::BufferIsEmpty() { return dest_->BufferIsEmpty(); } diff --git a/db/log_writer.h b/db/log_writer.h index 48fd3db7c28..d16348b8ba3 100644 --- a/db/log_writer.h +++ b/db/log_writer.h @@ -101,6 +101,14 @@ class Writer { WritableFileWriter* file() { return dest_.get(); } const WritableFileWriter* file() const { return dest_.get(); } + // Get current length of the log, excluding incomplete records at the + // tail, in bytes. + // Note: if `manual_wal_flush` is enabled, this value can include buffered + // data, which has not been flushed yet. + uint64_t get_latest_complete_record_offset() const { + return latest_complete_record_offset_.load(std::memory_order_acquire); + } + uint64_t get_log_number() const { return log_number_; } IOStatus WriteBuffer(const WriteOptions& write_options); @@ -114,6 +122,9 @@ class Writer { private: std::unique_ptr dest_; size_t block_offset_; // Current offset in block + // Offset in the file, which corresponds to the end offset of the latest added + // record. + std::atomic latest_complete_record_offset_; uint64_t log_number_; bool recycle_log_files_; int header_size_; diff --git a/file/writable_file_writer.cc b/file/writable_file_writer.cc index d36023c0e00..47839fa8d33 100644 --- a/file/writable_file_writer.cc +++ b/file/writable_file_writer.cc @@ -112,6 +112,11 @@ IOStatus WritableFileWriter::Append(const IOOptions& opts, const Slice& data, set_seen_error(); return s; } + + TEST_SYNC_POINT( + "WritableFileWriter::Append:FlushAfterBufferCapacityLimitReached1"); + TEST_SYNC_POINT( + "WritableFileWriter::Append:FlushAfterBufferCapacityLimitReached2"); } assert(buf_.CurrentSize() == 0); } diff --git a/utilities/checkpoint/checkpoint_test.cc b/utilities/checkpoint/checkpoint_test.cc index cdda1c05959..e5cdc1f8602 100644 --- a/utilities/checkpoint/checkpoint_test.cc +++ b/utilities/checkpoint/checkpoint_test.cc @@ -975,6 +975,128 @@ TEST_F(CheckpointTest, PutRaceWithCheckpointTrackedWalSync) { Close(); } +// Parameterized checkpoint test. +// Parameter specifies whether manual WAL flush is enabled. +class CheckpointTestWithParams : public CheckpointTest, + public ::testing::WithParamInterface { + public: + CheckpointTestWithParams() : CheckpointTest() {} +}; + +INSTANTIATE_TEST_CASE_P(Checkpoint, CheckpointTestWithParams, + ::testing::Bool()); + +// Reproduces a race condition, where checkpoint operation gets size of the +// active WAL using stat, when WAL is in inconsistent state - when last write to +// the file appended a non-last fragment of a record. Such checkpoint cannot be +// recovered with WAL recovery mode `kAbsoluteConsistency`, if WAL is copied +// as-is. It fails with 'error reading trailing data due to encountering EOF' +TEST_P(CheckpointTestWithParams, CheckpointWithFragmentedRecordInWal) { + Options options = CurrentOptions(); + options.manual_wal_flush = GetParam(); + options.wal_recovery_mode = WALRecoveryMode::kAbsoluteConsistency; + // For simplicity of this test, reduce buffer size. + // Buffer size needs to be smaller than the size of data we put during + // checkpoint, so that record fragments get flushed to the file once buffer + // reaches its limit. + // This race condition can be reproduced with default 1M buffer size, too. + options.writable_file_max_buffer_size = log::kBlockSize / 2; + + Reopen(options); + + Random rnd(42); + + ASSERT_OK(Put("foo1", rnd.RandomString(1024))); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({ + {"CheckpointImpl::CreateCustomCheckpoint:AfterGetLive1", + "CheckpointTest::CheckpointWithFragmentedRecordInWal:BeforePut"}, + {"Writer::AddRecord:BeforeLastFragmentWrite1", + "CheckpointImpl::CreateCustomCheckpoint:AfterGetLive2"}, + {"DBImpl::GetLiveFilesStorageInfo:AfterGettingLiveWalFiles", + "Writer::AddRecord:BeforeLastFragmentWrite2"}, + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + std::function create_checkpoint_func = [&]() { + std::unique_ptr checkpoint; + Checkpoint* checkpoint_ptr; + ASSERT_OK(Checkpoint::Create(db_, &checkpoint_ptr)); + checkpoint.reset(checkpoint_ptr); + const uint64_t wal_size_for_flush = std::numeric_limits::max(); + ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_, wal_size_for_flush)); + }; + + port::Thread create_checkpoint_thread(create_checkpoint_func); + + TEST_SYNC_POINT( + "CheckpointTest::CheckpointWithFragmentedRecordInWal:BeforePut"); + ASSERT_OK(Put("foo2", rnd.RandomString(log::kBlockSize))); + + create_checkpoint_thread.join(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + + Close(); + + DB* snapshot_db = nullptr; + ASSERT_OK(DB::Open(options, snapshot_name_, &snapshot_db)); + ASSERT_OK(snapshot_db->Close()); + delete snapshot_db; +} + +// Reproduces a race condition, where checkpoint operation gets size of the +// active WAL using stat, when WAL is in inconsistent state - when last write to +// the file appended an incomplete fragment of a record. Such checkpoint cannot +// be recovered with WAL recovery mode `kAbsoluteConsistency`, if WAL is copied +// as-is. It fails with 'truncated record body' error. +TEST_P(CheckpointTestWithParams, CheckpointWithTruncatedRecordBodyInWal) { + Options options = CurrentOptions(); + options.manual_wal_flush = GetParam(); + options.wal_recovery_mode = WALRecoveryMode::kAbsoluteConsistency; + + Reopen(options); + + Random rnd(42); + + ASSERT_OK(Put("foo1", rnd.RandomString(1024))); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({ + {"CheckpointImpl::CreateCustomCheckpoint:AfterGetLive1", + "CheckpointTest::CheckpointWithTruncatedRecordBodyInWal:BeforePut"}, + {"WritableFileWriter::Append:FlushAfterBufferCapacityLimitReached1", + "CheckpointImpl::CreateCustomCheckpoint:AfterGetLive2"}, + {"DBImpl::GetLiveFilesStorageInfo:AfterGettingLiveWalFiles", + "WritableFileWriter::Append:FlushAfterBufferCapacityLimitReached2"}, + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + std::function create_checkpoint_func = [&]() { + std::unique_ptr checkpoint; + Checkpoint* checkpoint_ptr; + ASSERT_OK(Checkpoint::Create(db_, &checkpoint_ptr)); + checkpoint.reset(checkpoint_ptr); + const uint64_t wal_size_for_flush = std::numeric_limits::max(); + ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_, wal_size_for_flush)); + }; + + port::Thread create_checkpoint_thread(create_checkpoint_func); + + TEST_SYNC_POINT( + "CheckpointTest::CheckpointWithTruncatedRecordBodyInWal:BeforePut"); + ASSERT_OK(Put("foo2", + rnd.RandomString((int)options.writable_file_max_buffer_size))); + + create_checkpoint_thread.join(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + + Close(); + + DB* snapshot_db = nullptr; + ASSERT_OK(DB::Open(options, snapshot_name_, &snapshot_db)); + ASSERT_OK(snapshot_db->Close()); + delete snapshot_db; +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) {