From 6d370cb091d31caabc1a121701e8b4a744e1799a Mon Sep 17 00:00:00 2001 From: Maysam Yabandeh Date: Wed, 21 Mar 2018 17:13:54 -0700 Subject: [PATCH 1/3] Fix concurrent publish seq bug --- db/db_impl.h | 2 + db/db_impl_write.cc | 10 +++-- db/pre_release_callback.h | 4 +- db/write_callback_test.cc | 3 +- utilities/transactions/write_prepared_txn.cc | 43 ++++++++++++++++--- .../transactions/write_prepared_txn_db.h | 14 ++++-- 6 files changed, 62 insertions(+), 14 deletions(-) diff --git a/db/db_impl.h b/db/db_impl.h index c316b6ad474..c97d407bfbc 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -223,6 +223,8 @@ class DBImpl : public DB { virtual Status SyncWAL() override; virtual SequenceNumber GetLatestSequenceNumber() const override; + // REQUIRES: joined the main write queue if two_write_queues is disabled, and + // the second write queue otherwise. virtual void SetLastPublishedSequence(SequenceNumber seq); // Returns LastSequence in last_seq_same_as_publish_seq_ // mode and LastAllocatedSequence otherwise. This is useful when visiblility diff --git a/db/db_impl_write.cc b/db/db_impl_write.cc index 5c3eaf404e2..8b26ac51174 100644 --- a/db/db_impl_write.cc +++ b/db/db_impl_write.cc @@ -133,7 +133,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, for (auto* writer : *(w.write_group)) { if (!writer->CallbackFailed() && writer->pre_release_callback) { assert(writer->sequence != kMaxSequenceNumber); - Status ws = writer->pre_release_callback->Callback(writer->sequence); + Status ws = writer->pre_release_callback->Callback(writer->sequence, + disable_memtable); if (!ws.ok()) { status = ws; break; @@ -368,7 +369,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, for (auto* writer : write_group) { if (!writer->CallbackFailed() && writer->pre_release_callback) { assert(writer->sequence != kMaxSequenceNumber); - Status ws = writer->pre_release_callback->Callback(writer->sequence); + Status ws = writer->pre_release_callback->Callback(writer->sequence, + disable_memtable); if (!ws.ok()) { status = ws; break; @@ -629,7 +631,9 @@ Status DBImpl::WriteImplWALOnly(const WriteOptions& write_options, for (auto* writer : write_group) { if (!writer->CallbackFailed() && writer->pre_release_callback) { assert(writer->sequence != kMaxSequenceNumber); - Status ws = writer->pre_release_callback->Callback(writer->sequence); + const bool DISABLE_MEMTABLE = true; + Status ws = writer->pre_release_callback->Callback(writer->sequence, + DISABLE_MEMTABLE); if (!ws.ok()) { status = ws; break; diff --git a/db/pre_release_callback.h b/db/pre_release_callback.h index fdc4d50c53c..1e6efab53a5 100644 --- a/db/pre_release_callback.h +++ b/db/pre_release_callback.h @@ -24,7 +24,9 @@ class PreReleaseCallback { // propagated to all the writers in the write group. // seq is the sequence number that is used for this write and will be // released. - virtual Status Callback(SequenceNumber seq) = 0; + // is_mem_disabled is currently used for debugging purposes to assert that + // the callback is done from the right write queue. + virtual Status Callback(SequenceNumber seq, const bool is_mem_disabled) = 0; }; } // namespace rocksdb diff --git a/db/write_callback_test.cc b/db/write_callback_test.cc index c4f0e35dcd9..9a82eb8c149 100644 --- a/db/write_callback_test.cc +++ b/db/write_callback_test.cc @@ -294,7 +294,8 @@ TEST_F(WriteCallbackTest, WriteWithCallbackTest) { public: PublishSeqCallback(DBImpl* db_impl_in) : db_impl_(db_impl_in) {} - virtual Status Callback(SequenceNumber last_seq) { + virtual Status Callback(SequenceNumber last_seq, + const bool /*not used*/) { db_impl_->SetLastPublishedSequence(last_seq); return Status::OK(); } diff --git a/utilities/transactions/write_prepared_txn.cc b/utilities/transactions/write_prepared_txn.cc index 591a5d3abe9..0221357a3c0 100644 --- a/utilities/transactions/write_prepared_txn.cc +++ b/utilities/transactions/write_prepared_txn.cc @@ -128,9 +128,14 @@ Status WritePreparedTxn::CommitInternal() { assert(s.ok()); commit_batch_cnt = counter.BatchCount(); } - WritePreparedCommitEntryPreReleaseCallback update_commit_map( - wpt_db_, db_impl_, prepare_seq, prepare_batch_cnt_, commit_batch_cnt); + const bool PREP_HEAP_SKIPPED = true; const bool disable_memtable = !includes_data; + const bool do_one_write = + !db_impl_->immutable_db_options().two_write_queues || disable_memtable; + const bool publish_seq = do_one_write; + WritePreparedCommitEntryPreReleaseCallback update_commit_map( + wpt_db_, db_impl_, prepare_seq, prepare_batch_cnt_, commit_batch_cnt, + !PREP_HEAP_SKIPPED, publish_seq); uint64_t seq_used = kMaxSequenceNumber; // Since the prepared batch is directly written to memtable, there is already // a connection between the memtable and its WAL, so there is no need to @@ -141,6 +146,34 @@ Status WritePreparedTxn::CommitInternal() { zero_log_number, disable_memtable, &seq_used, batch_cnt, &update_commit_map); assert(!s.ok() || seq_used != kMaxSequenceNumber); + if (LIKELY(do_one_write || !s.ok())) { + return s; + } // else do the 2nd write to publish seq + // Note: the 2nd write comes with a performance penality. So if we have too many of commits accompanied with ComitTimeWriteBatch and yet we cannot enable use_only_the_last_commit_time_batch_for_recovery_ optimization, two_write_queues should be disabled to avoid many additional writes here. + class PublishSeqPreReleaseCallback : public PreReleaseCallback { + public: + PublishSeqPreReleaseCallback(DBImpl* db_impl) : db_impl_(db_impl) {} + virtual Status Callback(SequenceNumber seq, + bool is_mem_disabled) override { + assert(is_mem_disabled); + assert(db_impl_->immutable_db_options().two_write_queues); + db_impl_->SetLastPublishedSequence(seq); + return Status::OK(); + } + private: + DBImpl* db_impl_; + } publish_seq_callback(db_impl_); + WriteBatch empty_batch; + empty_batch.PutLogData(Slice()); + // In the absence of Prepare markers, use Noop as a batch separator + WriteBatchInternal::InsertNoop(&empty_batch); + const bool DISABLE_MEMTABLE = true; + const size_t ONE_BATCH = 1; + const uint64_t NO_REF_LOG = 0; + s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr, + NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH, + &publish_seq_callback); + assert(!s.ok() || seq_used != kMaxSequenceNumber); return s; } @@ -240,14 +273,14 @@ Status WritePreparedTxn::RollbackInternal() { WriteBatchInternal::MarkRollback(&rollback_batch, name_); bool do_one_write = !db_impl_->immutable_db_options().two_write_queues; const bool DISABLE_MEMTABLE = true; - const uint64_t no_log_ref = 0; + const uint64_t NO_REF_LOG = 0; uint64_t seq_used = kMaxSequenceNumber; const size_t ZERO_PREPARES = 0; const size_t ONE_BATCH = 1; WritePreparedCommitEntryPreReleaseCallback update_commit_map( wpt_db_, db_impl_, kMaxSequenceNumber, ZERO_PREPARES, ONE_BATCH); s = db_impl_->WriteImpl(write_options_, &rollback_batch, nullptr, nullptr, - no_log_ref, !DISABLE_MEMTABLE, &seq_used, ONE_BATCH, + NO_REF_LOG, !DISABLE_MEMTABLE, &seq_used, ONE_BATCH, do_one_write ? &update_commit_map : nullptr); assert(!s.ok() || seq_used != kMaxSequenceNumber); if (!s.ok()) { @@ -275,7 +308,7 @@ Status WritePreparedTxn::RollbackInternal() { // In the absence of Prepare markers, use Noop as a batch separator WriteBatchInternal::InsertNoop(&empty_batch); s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr, - no_log_ref, DISABLE_MEMTABLE, &seq_used, ONE_BATCH, + NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH, &update_commit_map_with_prepare); assert(!s.ok() || seq_used != kMaxSequenceNumber); // Mark the txn as rolled back diff --git a/utilities/transactions/write_prepared_txn_db.h b/utilities/transactions/write_prepared_txn_db.h index bb495b6c663..0ad25d2e0ed 100644 --- a/utilities/transactions/write_prepared_txn_db.h +++ b/utilities/transactions/write_prepared_txn_db.h @@ -459,19 +459,22 @@ class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback { SequenceNumber prep_seq, size_t prep_batch_cnt, size_t data_batch_cnt = 0, - bool prep_heap_skipped = false) + bool prep_heap_skipped = false, + bool publish_seq = true) : db_(db), db_impl_(db_impl), prep_seq_(prep_seq), prep_batch_cnt_(prep_batch_cnt), data_batch_cnt_(data_batch_cnt), prep_heap_skipped_(prep_heap_skipped), - includes_data_(data_batch_cnt_ > 0) { + includes_data_(data_batch_cnt_ > 0), + publish_seq_(publish_seq) { assert((prep_batch_cnt_ > 0) != (prep_seq == kMaxSequenceNumber)); // xor assert(prep_batch_cnt_ > 0 || data_batch_cnt_ > 0); } - virtual Status Callback(SequenceNumber commit_seq) override { + virtual Status Callback(SequenceNumber commit_seq, + bool is_mem_disabled) override { assert(includes_data_ || prep_seq_ != kMaxSequenceNumber); const uint64_t last_commit_seq = LIKELY(data_batch_cnt_ <= 1) ? commit_seq @@ -492,7 +495,8 @@ class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback { db_->AddCommitted(commit_seq + i, last_commit_seq, PREPARE_SKIPPED); } } - if (db_impl_->immutable_db_options().two_write_queues) { + if (db_impl_->immutable_db_options().two_write_queues && publish_seq_) { + assert(is_mem_disabled); // implies the 2nd queue // Publish the sequence number. We can do that here assuming the callback // is invoked only from one write queue, which would guarantee that the // publish sequence numbers will be in order, i.e., once a seq is @@ -517,6 +521,8 @@ class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback { // Either because it is commit without prepare or it has a // CommitTimeWriteBatch bool includes_data_; + // Should the callback also publishes the commit seq number + bool publish_seq_; }; // Count the number of sub-batches inside a batch. A sub-batch does not have From fd5beda719b8eea6e66f70ec3b286347fe7e3738 Mon Sep 17 00:00:00 2001 From: Maysam Yabandeh Date: Wed, 21 Mar 2018 17:16:37 -0700 Subject: [PATCH 2/3] make format --- db/pre_release_callback.h | 2 +- utilities/transactions/write_prepared_txn.cc | 27 ++++++++++--------- .../transactions/write_prepared_txn_db.h | 2 +- 3 files changed, 17 insertions(+), 14 deletions(-) diff --git a/db/pre_release_callback.h b/db/pre_release_callback.h index 1e6efab53a5..b3e6585770b 100644 --- a/db/pre_release_callback.h +++ b/db/pre_release_callback.h @@ -24,7 +24,7 @@ class PreReleaseCallback { // propagated to all the writers in the write group. // seq is the sequence number that is used for this write and will be // released. - // is_mem_disabled is currently used for debugging purposes to assert that + // is_mem_disabled is currently used for debugging purposes to assert that // the callback is done from the right write queue. virtual Status Callback(SequenceNumber seq, const bool is_mem_disabled) = 0; }; diff --git a/utilities/transactions/write_prepared_txn.cc b/utilities/transactions/write_prepared_txn.cc index 0221357a3c0..22d8152a0ac 100644 --- a/utilities/transactions/write_prepared_txn.cc +++ b/utilities/transactions/write_prepared_txn.cc @@ -149,19 +149,22 @@ Status WritePreparedTxn::CommitInternal() { if (LIKELY(do_one_write || !s.ok())) { return s; } // else do the 2nd write to publish seq - // Note: the 2nd write comes with a performance penality. So if we have too many of commits accompanied with ComitTimeWriteBatch and yet we cannot enable use_only_the_last_commit_time_batch_for_recovery_ optimization, two_write_queues should be disabled to avoid many additional writes here. + // Note: the 2nd write comes with a performance penality. So if we have too + // many of commits accompanied with ComitTimeWriteBatch and yet we cannot + // enable use_only_the_last_commit_time_batch_for_recovery_ optimization, + // two_write_queues should be disabled to avoid many additional writes here. class PublishSeqPreReleaseCallback : public PreReleaseCallback { - public: - PublishSeqPreReleaseCallback(DBImpl* db_impl) : db_impl_(db_impl) {} - virtual Status Callback(SequenceNumber seq, - bool is_mem_disabled) override { - assert(is_mem_disabled); - assert(db_impl_->immutable_db_options().two_write_queues); - db_impl_->SetLastPublishedSequence(seq); - return Status::OK(); - } - private: - DBImpl* db_impl_; + public: + PublishSeqPreReleaseCallback(DBImpl* db_impl) : db_impl_(db_impl) {} + virtual Status Callback(SequenceNumber seq, bool is_mem_disabled) override { + assert(is_mem_disabled); + assert(db_impl_->immutable_db_options().two_write_queues); + db_impl_->SetLastPublishedSequence(seq); + return Status::OK(); + } + + private: + DBImpl* db_impl_; } publish_seq_callback(db_impl_); WriteBatch empty_batch; empty_batch.PutLogData(Slice()); diff --git a/utilities/transactions/write_prepared_txn_db.h b/utilities/transactions/write_prepared_txn_db.h index 0ad25d2e0ed..a3d2ab8fc20 100644 --- a/utilities/transactions/write_prepared_txn_db.h +++ b/utilities/transactions/write_prepared_txn_db.h @@ -496,7 +496,7 @@ class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback { } } if (db_impl_->immutable_db_options().two_write_queues && publish_seq_) { - assert(is_mem_disabled); // implies the 2nd queue + assert(is_mem_disabled); // implies the 2nd queue // Publish the sequence number. We can do that here assuming the callback // is invoked only from one write queue, which would guarantee that the // publish sequence numbers will be in order, i.e., once a seq is From 7657e905a04101fa8465883445ffda4027290361 Mon Sep 17 00:00:00 2001 From: Maysam Yabandeh Date: Thu, 22 Mar 2018 09:19:30 -0700 Subject: [PATCH 3/3] lint --- db/write_callback_test.cc | 2 +- utilities/transactions/write_prepared_txn.cc | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/db/write_callback_test.cc b/db/write_callback_test.cc index 9a82eb8c149..c91a4305cbd 100644 --- a/db/write_callback_test.cc +++ b/db/write_callback_test.cc @@ -295,7 +295,7 @@ TEST_F(WriteCallbackTest, WriteWithCallbackTest) { PublishSeqCallback(DBImpl* db_impl_in) : db_impl_(db_impl_in) {} virtual Status Callback(SequenceNumber last_seq, - const bool /*not used*/) { + const bool /*not used*/) override { db_impl_->SetLastPublishedSequence(last_seq); return Status::OK(); } diff --git a/utilities/transactions/write_prepared_txn.cc b/utilities/transactions/write_prepared_txn.cc index 22d8152a0ac..f6aef7fe2db 100644 --- a/utilities/transactions/write_prepared_txn.cc +++ b/utilities/transactions/write_prepared_txn.cc @@ -155,7 +155,8 @@ Status WritePreparedTxn::CommitInternal() { // two_write_queues should be disabled to avoid many additional writes here. class PublishSeqPreReleaseCallback : public PreReleaseCallback { public: - PublishSeqPreReleaseCallback(DBImpl* db_impl) : db_impl_(db_impl) {} + explicit PublishSeqPreReleaseCallback(DBImpl* db_impl) + : db_impl_(db_impl) {} virtual Status Callback(SequenceNumber seq, bool is_mem_disabled) override { assert(is_mem_disabled); assert(db_impl_->immutable_db_options().two_write_queues);