Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WritePrepared: reduce prepared_mutex_ overhead #5420

Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions db/db_impl/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -263,16 +263,19 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
size_t total_count = 0;
size_t valid_batches = 0;
size_t total_byte_size = 0;
size_t pre_release_callback_cnt = 0;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes to db_impl_write.cc are separately approved in #5381

for (auto* writer : write_group) {
if (writer->CheckCallback(this)) {
valid_batches += writer->batch_cnt;
if (writer->ShouldWriteToMemtable()) {
total_count += WriteBatchInternal::Count(writer->batch);
parallel = parallel && !writer->batch->HasMerge();
}

total_byte_size = WriteBatchInternal::AppendedByteSize(
total_byte_size, WriteBatchInternal::ByteSize(writer->batch));
if (writer->pre_release_callback) {
pre_release_callback_cnt++;
}
}
}
// Note about seq_per_batch_: either disableWAL is set for the entire write
Expand Down Expand Up @@ -336,6 +339,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
// PreReleaseCallback is called after WAL write and before memtable write
if (status.ok()) {
SequenceNumber next_sequence = current_sequence;
size_t index = 0;
// Note: the logic for advancing seq here must be consistent with the
// logic in WriteBatchInternal::InsertInto(write_group...) as well as
// with WriteBatchInternal::InsertInto(write_batch...) that is called on
Expand All @@ -347,7 +351,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
writer->sequence = next_sequence;
if (writer->pre_release_callback) {
Status ws = writer->pre_release_callback->Callback(
writer->sequence, disable_memtable, writer->log_used);
writer->sequence, disable_memtable, writer->log_used, index++,
pre_release_callback_cnt);
if (!ws.ok()) {
status = ws;
break;
Expand Down Expand Up @@ -675,11 +680,15 @@ Status DBImpl::WriteImplWALOnly(
// Note: no need to update last_batch_group_size_ here since the batch writes
// to WAL only

size_t pre_release_callback_cnt = 0;
size_t total_byte_size = 0;
for (auto* writer : write_group) {
if (writer->CheckCallback(this)) {
total_byte_size = WriteBatchInternal::AppendedByteSize(
total_byte_size, WriteBatchInternal::ByteSize(writer->batch));
if (writer->pre_release_callback) {
pre_release_callback_cnt++;
}
}
}

Expand Down Expand Up @@ -758,11 +767,13 @@ Status DBImpl::WriteImplWALOnly(
WriteStatusCheck(status);
}
if (status.ok()) {
size_t index = 0;
for (auto* writer : write_group) {
if (!writer->CallbackFailed() && writer->pre_release_callback) {
assert(writer->sequence != kMaxSequenceNumber);
Status ws = writer->pre_release_callback->Callback(
writer->sequence, disable_memtable, writer->log_used);
writer->sequence, disable_memtable, writer->log_used, index++,
pre_release_callback_cnt);
if (!ws.ok()) {
status = ws;
break;
Expand Down Expand Up @@ -1121,7 +1132,7 @@ Status DBImpl::WriteRecoverableState() {
// AddCommitted -> AdvanceMaxEvictedSeq -> GetSnapshotListFromDB
mutex_.Unlock();
status = recoverable_state_pre_release_callback_->Callback(
sub_batch_seq, !DISABLE_MEMTABLE, no_log_num);
sub_batch_seq, !DISABLE_MEMTABLE, no_log_num, 0, 1);
mutex_.Lock();
}
}
Expand Down
6 changes: 5 additions & 1 deletion db/pre_release_callback.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,12 @@ class PreReleaseCallback {
// is_mem_disabled is currently used for debugging purposes to assert that
// the callback is done from the right write queue.
// If non-zero, log_number indicates the WAL log to which we wrote.
// index >= 0 specifies the order of callback in the same write thread.
// total > index specifies the total number of callbacks in the same write
// thread. Together with index, could be used to reduce the redundant
// operations among the callbacks.
virtual Status Callback(SequenceNumber seq, bool is_mem_disabled,
uint64_t log_number) = 0;
uint64_t log_number, size_t index, size_t total) = 0;
};

} // namespace rocksdb
3 changes: 2 additions & 1 deletion db/write_callback_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,8 @@ TEST_F(WriteCallbackTest, WriteWithCallbackTest) {
PublishSeqCallback(DBImpl* db_impl_in)
: db_impl_(db_impl_in) {}
Status Callback(SequenceNumber last_seq, bool /*not used*/,
uint64_t) override {
uint64_t, size_t /*index*/,
size_t /*total*/) override {
db_impl_->SetLastPublishedSequence(last_seq);
return Status::OK();
}
Expand Down
3 changes: 2 additions & 1 deletion utilities/transactions/pessimistic_transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,8 @@ Status WriteCommittedTxn::PrepareInternal() {
(void)two_write_queues_; // to silence unused private field warning
}
virtual Status Callback(SequenceNumber, bool is_mem_disabled,
uint64_t log_number) override {
uint64_t log_number, size_t /*index*/,
size_t /*total*/) override {
#ifdef NDEBUG
(void)is_mem_disabled;
#endif
Expand Down
61 changes: 39 additions & 22 deletions utilities/transactions/write_prepared_transaction_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@

#include "utilities/transactions/transaction_test.h"

#include <cinttypes>
#include <algorithm>
#include <atomic>
#include <cinttypes>
#include <functional>
#include <string>
#include <thread>
Expand Down Expand Up @@ -55,25 +55,17 @@ TEST(PreparedHeap, BasicsTest) {
heap.push(34l);
// Test that old min is still on top
ASSERT_EQ(14l, heap.top());
heap.push(13l);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With ::AddPrepared being called from the main write queue, the push is now called in order. So we should remove tests with unordered calls to push. This will later help to simplify the implementation of PreparedHeap as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this is only true in case of two write queues though? If so, maybe we should just decide to only support two writes queues.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No it is true regardless. I did not mention one write queue because it has always been the case for one write queue to insert into PreparedHeap in order.
But I am in favor of deprecating one-write-queue case, which would make the implementation simpler. We should keep that direction in mind.

// Test that the new min will be on top
ASSERT_EQ(13l, heap.top());
// Test that it is persistent
ASSERT_EQ(13l, heap.top());
heap.push(44l);
heap.push(54l);
heap.push(64l);
heap.push(74l);
heap.push(84l);
// Test that old min is still on top
ASSERT_EQ(13l, heap.top());
ASSERT_EQ(14l, heap.top());
heap.erase(24l);
// Test that old min is still on top
ASSERT_EQ(13l, heap.top());
ASSERT_EQ(14l, heap.top());
heap.erase(14l);
// Test that old min is still on top
ASSERT_EQ(13l, heap.top());
heap.erase(13l);
// Test that the new comes to the top after multiple erase
ASSERT_EQ(34l, heap.top());
heap.erase(34l);
Expand Down Expand Up @@ -3001,13 +2993,16 @@ TEST_P(WritePreparedTransactionTest, AddPreparedBeforeMax) {
ASSERT_OK(txn->Put(Slice("key0"), uncommitted_value));
port::Mutex txn_mutex_;

// t1) Insert prepared entry, t2) commit other entires to advance max
// evicted sec and finish checking the existing prepared entires, t1)
// t1) Insert prepared entry, t2) commit other entries to advance max
// evicted sec and finish checking the existing prepared entries, t1)
// AddPrepared, t2) update max_evicted_seq_
rocksdb::SyncPoint::GetInstance()->LoadDependency({
{"AddPrepared::begin:pause", "AddPreparedBeforeMax::read_thread:start"},
{"AdvanceMaxEvictedSeq::update_max:pause", "AddPrepared::begin:resume"},
{"AddPrepared::end", "AdvanceMaxEvictedSeq::update_max:resume"},
{"AddPreparedCallback::AddPrepared::begin:pause",
"AddPreparedBeforeMax::read_thread:start"},
{"AdvanceMaxEvictedSeq::update_max:pause",
"AddPreparedCallback::AddPrepared::begin:resume"},
{"AddPreparedCallback::AddPrepared::end",
"AdvanceMaxEvictedSeq::update_max:resume"},
});
SyncPoint::GetInstance()->EnableProcessing();

Expand Down Expand Up @@ -3061,20 +3056,36 @@ TEST_P(WritePreparedTransactionTest, CommitOfDelayedPrepared) {
ReOpen();
std::atomic<const Snapshot*> snap = {nullptr};
std::atomic<SequenceNumber> exp_prepare = {0};
std::atomic<bool> snapshot_taken = {false};
// Value is synchronized via snap
PinnableSlice value;
// Take a snapshot after publish and before RemovePrepared:Start
auto snap_callback = [&]() {
ASSERT_EQ(nullptr, snap.load());
snap.store(db->GetSnapshot());
ReadOptions roptions;
roptions.snapshot = snap.load();
auto s = db->Get(roptions, db->DefaultColumnFamily(), "key", &value);
ASSERT_OK(s);
snapshot_taken.store(true);
};
auto callback = [&](void* param) {
SequenceNumber prep_seq = *((SequenceNumber*)param);
if (prep_seq == exp_prepare.load()) { // only for write_thread
ASSERT_EQ(nullptr, snap.load());
snap.store(db->GetSnapshot());
ReadOptions roptions;
roptions.snapshot = snap.load();
auto s = db->Get(roptions, db->DefaultColumnFamily(), "key", &value);
ASSERT_OK(s);
// We need to spawn a thread to avoid deadlock since getting a
// snpashot might end up calling AdvanceSeqByOne which needs joining
// the write queue.
auto t = rocksdb::port::Thread(snap_callback);
t.detach();
TEST_SYNC_POINT("callback:end");
}
};
// Wait for the first snapshot be taken in GetSnapshotInternal. Although
// it might be updated before GetSnapshotInternal finishes but this should
// cover most of the cases.
rocksdb::SyncPoint::GetInstance()->LoadDependency({
{"WritePreparedTxnDB::GetSnapshotInternal:first", "callback:end"},
});
SyncPoint::GetInstance()->SetCallBack("RemovePrepared:Start", callback);
SyncPoint::GetInstance()->EnableProcessing();
// Thread to cause frequent evictions
Expand All @@ -3098,9 +3109,15 @@ TEST_P(WritePreparedTransactionTest, CommitOfDelayedPrepared) {
// Let an eviction to kick in
std::this_thread::yield();

snapshot_taken.store(false);
exp_prepare.store(txn->GetId());
ASSERT_OK(txn->Commit());
delete txn;
// Wait for the snapshot taking that is triggered by
// RemovePrepared:Start callback
while (!snapshot_taken) {
std::this_thread::yield();
}

// Read with the snapshot taken before delayed_prepared_ cleanup
ReadOptions roptions;
Expand Down
26 changes: 19 additions & 7 deletions utilities/transactions/write_prepared_txn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -169,12 +169,15 @@ Status WritePreparedTxn::CommitInternal() {
assert(!s.ok() || seq_used != kMaxSequenceNumber);
const SequenceNumber commit_batch_seq = seq_used;
if (LIKELY(do_one_write || !s.ok())) {
if (LIKELY(s.ok())) {
// Note RemovePrepared should be called after WriteImpl that publishsed
if (UNLIKELY(!db_impl_->immutable_db_options().two_write_queues &&
s.ok())) {
// Note: RemovePrepared should be called after WriteImpl that publishsed
// the seq. Otherwise SmallestUnCommittedSeq optimization breaks.
wpt_db_->RemovePrepared(prepare_seq, prepare_batch_cnt_);
}
} // else RemovePrepared is called from within PreReleaseCallback
if (UNLIKELY(!do_one_write)) {
assert(!s.ok());
// Cleanup the prepared entry we added with add_prepared_callback
wpt_db_->RemovePrepared(commit_batch_seq, commit_batch_cnt);
}
return s;
Expand All @@ -199,10 +202,14 @@ Status WritePreparedTxn::CommitInternal() {
NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
&update_commit_map_with_aux_batch);
assert(!s.ok() || seq_used != kMaxSequenceNumber);
// Note RemovePrepared should be called after WriteImpl that publishsed the
// seq. Otherwise SmallestUnCommittedSeq optimization breaks.
wpt_db_->RemovePrepared(prepare_seq, prepare_batch_cnt_);
wpt_db_->RemovePrepared(commit_batch_seq, commit_batch_cnt);
if (UNLIKELY(!db_impl_->immutable_db_options().two_write_queues)) {
if (s.ok()) {
// Note: RemovePrepared should be called after WriteImpl that publishsed
// the seq. Otherwise SmallestUnCommittedSeq optimization breaks.
wpt_db_->RemovePrepared(prepare_seq, prepare_batch_cnt_);
}
wpt_db_->RemovePrepared(commit_batch_seq, commit_batch_cnt);
} // else RemovePrepared is called from within PreReleaseCallback
return s;
}

Expand Down Expand Up @@ -348,6 +355,7 @@ Status WritePreparedTxn::RollbackInternal() {
return s;
}
if (do_one_write) {
assert(!db_impl_->immutable_db_options().two_write_queues);
wpt_db_->RemovePrepared(GetId(), prepare_batch_cnt_);
return s;
} // else do the 2nd write for commit
Expand All @@ -370,9 +378,13 @@ Status WritePreparedTxn::RollbackInternal() {
ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
"RollbackInternal (status=%s) commit: %" PRIu64,
s.ToString().c_str(), GetId());
// TODO(lth): For WriteUnPrepared that rollback is called frequently,
// RemovePrepared could be moved to the callback to reduce lock contention.
if (s.ok()) {
wpt_db_->RemovePrepared(GetId(), prepare_batch_cnt_);
}
// Note: RemovePrepared for prepared batch is called from within
// PreReleaseCallback
wpt_db_->RemovePrepared(rollback_seq, ONE_BATCH);

return s;
Expand Down
Loading