Skip to content

Commit

Permalink
WritePrepared: reduce prepared_mutex_ overhead (#5420)
Browse files Browse the repository at this point in the history
Summary:
The patch reduces the contention over prepared_mutex_ using these techniques:
1) Move ::RemovePrepared() to be called from the commit callback when we have two write queues.
2) Use two separate mutex for PreparedHeap, one prepared_mutex_ needed for ::RemovePrepared, and one ::push_pop_mutex() needed for ::AddPrepared(). Given that we call ::AddPrepared only from the first write queue and ::RemovePrepared mostly from the 2nd, this will result into each the two write queues not competing with each other over a single mutex. ::RemovePrepared might occasionally need to acquire ::push_pop_mutex() if ::erase() ends up with calling ::pop()
3) Acquire ::push_pop_mutex() on the first callback of the write queue and release it on the last.
Pull Request resolved: #5420

Differential Revision: D15741985

Pulled By: maysamyabandeh

fbshipit-source-id: 84ce8016007e88bb6e10da5760ba1f0d26347735
  • Loading branch information
Maysam Yabandeh authored and facebook-github-bot committed Jun 10, 2019
1 parent a16d0cc commit c292dc8
Show file tree
Hide file tree
Showing 11 changed files with 218 additions and 94 deletions.
19 changes: 15 additions & 4 deletions db/db_impl/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -263,16 +263,19 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
size_t total_count = 0;
size_t valid_batches = 0;
size_t total_byte_size = 0;
size_t pre_release_callback_cnt = 0;
for (auto* writer : write_group) {
if (writer->CheckCallback(this)) {
valid_batches += writer->batch_cnt;
if (writer->ShouldWriteToMemtable()) {
total_count += WriteBatchInternal::Count(writer->batch);
parallel = parallel && !writer->batch->HasMerge();
}

total_byte_size = WriteBatchInternal::AppendedByteSize(
total_byte_size, WriteBatchInternal::ByteSize(writer->batch));
if (writer->pre_release_callback) {
pre_release_callback_cnt++;
}
}
}
// Note about seq_per_batch_: either disableWAL is set for the entire write
Expand Down Expand Up @@ -336,6 +339,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
// PreReleaseCallback is called after WAL write and before memtable write
if (status.ok()) {
SequenceNumber next_sequence = current_sequence;
size_t index = 0;
// Note: the logic for advancing seq here must be consistent with the
// logic in WriteBatchInternal::InsertInto(write_group...) as well as
// with WriteBatchInternal::InsertInto(write_batch...) that is called on
Expand All @@ -347,7 +351,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
writer->sequence = next_sequence;
if (writer->pre_release_callback) {
Status ws = writer->pre_release_callback->Callback(
writer->sequence, disable_memtable, writer->log_used);
writer->sequence, disable_memtable, writer->log_used, index++,
pre_release_callback_cnt);
if (!ws.ok()) {
status = ws;
break;
Expand Down Expand Up @@ -675,11 +680,15 @@ Status DBImpl::WriteImplWALOnly(
// Note: no need to update last_batch_group_size_ here since the batch writes
// to WAL only

size_t pre_release_callback_cnt = 0;
size_t total_byte_size = 0;
for (auto* writer : write_group) {
if (writer->CheckCallback(this)) {
total_byte_size = WriteBatchInternal::AppendedByteSize(
total_byte_size, WriteBatchInternal::ByteSize(writer->batch));
if (writer->pre_release_callback) {
pre_release_callback_cnt++;
}
}
}

Expand Down Expand Up @@ -758,11 +767,13 @@ Status DBImpl::WriteImplWALOnly(
WriteStatusCheck(status);
}
if (status.ok()) {
size_t index = 0;
for (auto* writer : write_group) {
if (!writer->CallbackFailed() && writer->pre_release_callback) {
assert(writer->sequence != kMaxSequenceNumber);
Status ws = writer->pre_release_callback->Callback(
writer->sequence, disable_memtable, writer->log_used);
writer->sequence, disable_memtable, writer->log_used, index++,
pre_release_callback_cnt);
if (!ws.ok()) {
status = ws;
break;
Expand Down Expand Up @@ -1121,7 +1132,7 @@ Status DBImpl::WriteRecoverableState() {
// AddCommitted -> AdvanceMaxEvictedSeq -> GetSnapshotListFromDB
mutex_.Unlock();
status = recoverable_state_pre_release_callback_->Callback(
sub_batch_seq, !DISABLE_MEMTABLE, no_log_num);
sub_batch_seq, !DISABLE_MEMTABLE, no_log_num, 0, 1);
mutex_.Lock();
}
}
Expand Down
6 changes: 5 additions & 1 deletion db/pre_release_callback.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,12 @@ class PreReleaseCallback {
// is_mem_disabled is currently used for debugging purposes to assert that
// the callback is done from the right write queue.
// If non-zero, log_number indicates the WAL log to which we wrote.
// index >= 0 specifies the order of callback in the same write thread.
// total > index specifies the total number of callbacks in the same write
// thread. Together with index, could be used to reduce the redundant
// operations among the callbacks.
virtual Status Callback(SequenceNumber seq, bool is_mem_disabled,
uint64_t log_number) = 0;
uint64_t log_number, size_t index, size_t total) = 0;
};

} // namespace rocksdb
3 changes: 2 additions & 1 deletion db/write_callback_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,8 @@ TEST_F(WriteCallbackTest, WriteWithCallbackTest) {
PublishSeqCallback(DBImpl* db_impl_in)
: db_impl_(db_impl_in) {}
Status Callback(SequenceNumber last_seq, bool /*not used*/,
uint64_t) override {
uint64_t, size_t /*index*/,
size_t /*total*/) override {
db_impl_->SetLastPublishedSequence(last_seq);
return Status::OK();
}
Expand Down
3 changes: 2 additions & 1 deletion utilities/transactions/pessimistic_transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,8 @@ Status WriteCommittedTxn::PrepareInternal() {
(void)two_write_queues_; // to silence unused private field warning
}
virtual Status Callback(SequenceNumber, bool is_mem_disabled,
uint64_t log_number) override {
uint64_t log_number, size_t /*index*/,
size_t /*total*/) override {
#ifdef NDEBUG
(void)is_mem_disabled;
#endif
Expand Down
61 changes: 39 additions & 22 deletions utilities/transactions/write_prepared_transaction_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@

#include "utilities/transactions/transaction_test.h"

#include <cinttypes>
#include <algorithm>
#include <atomic>
#include <cinttypes>
#include <functional>
#include <string>
#include <thread>
Expand Down Expand Up @@ -55,25 +55,17 @@ TEST(PreparedHeap, BasicsTest) {
heap.push(34l);
// Test that old min is still on top
ASSERT_EQ(14l, heap.top());
heap.push(13l);
// Test that the new min will be on top
ASSERT_EQ(13l, heap.top());
// Test that it is persistent
ASSERT_EQ(13l, heap.top());
heap.push(44l);
heap.push(54l);
heap.push(64l);
heap.push(74l);
heap.push(84l);
// Test that old min is still on top
ASSERT_EQ(13l, heap.top());
ASSERT_EQ(14l, heap.top());
heap.erase(24l);
// Test that old min is still on top
ASSERT_EQ(13l, heap.top());
ASSERT_EQ(14l, heap.top());
heap.erase(14l);
// Test that old min is still on top
ASSERT_EQ(13l, heap.top());
heap.erase(13l);
// Test that the new comes to the top after multiple erase
ASSERT_EQ(34l, heap.top());
heap.erase(34l);
Expand Down Expand Up @@ -3001,13 +2993,16 @@ TEST_P(WritePreparedTransactionTest, AddPreparedBeforeMax) {
ASSERT_OK(txn->Put(Slice("key0"), uncommitted_value));
port::Mutex txn_mutex_;

// t1) Insert prepared entry, t2) commit other entires to advance max
// evicted sec and finish checking the existing prepared entires, t1)
// t1) Insert prepared entry, t2) commit other entries to advance max
// evicted sec and finish checking the existing prepared entries, t1)
// AddPrepared, t2) update max_evicted_seq_
rocksdb::SyncPoint::GetInstance()->LoadDependency({
{"AddPrepared::begin:pause", "AddPreparedBeforeMax::read_thread:start"},
{"AdvanceMaxEvictedSeq::update_max:pause", "AddPrepared::begin:resume"},
{"AddPrepared::end", "AdvanceMaxEvictedSeq::update_max:resume"},
{"AddPreparedCallback::AddPrepared::begin:pause",
"AddPreparedBeforeMax::read_thread:start"},
{"AdvanceMaxEvictedSeq::update_max:pause",
"AddPreparedCallback::AddPrepared::begin:resume"},
{"AddPreparedCallback::AddPrepared::end",
"AdvanceMaxEvictedSeq::update_max:resume"},
});
SyncPoint::GetInstance()->EnableProcessing();

Expand Down Expand Up @@ -3061,20 +3056,36 @@ TEST_P(WritePreparedTransactionTest, CommitOfDelayedPrepared) {
ReOpen();
std::atomic<const Snapshot*> snap = {nullptr};
std::atomic<SequenceNumber> exp_prepare = {0};
std::atomic<bool> snapshot_taken = {false};
// Value is synchronized via snap
PinnableSlice value;
// Take a snapshot after publish and before RemovePrepared:Start
auto snap_callback = [&]() {
ASSERT_EQ(nullptr, snap.load());
snap.store(db->GetSnapshot());
ReadOptions roptions;
roptions.snapshot = snap.load();
auto s = db->Get(roptions, db->DefaultColumnFamily(), "key", &value);
ASSERT_OK(s);
snapshot_taken.store(true);
};
auto callback = [&](void* param) {
SequenceNumber prep_seq = *((SequenceNumber*)param);
if (prep_seq == exp_prepare.load()) { // only for write_thread
ASSERT_EQ(nullptr, snap.load());
snap.store(db->GetSnapshot());
ReadOptions roptions;
roptions.snapshot = snap.load();
auto s = db->Get(roptions, db->DefaultColumnFamily(), "key", &value);
ASSERT_OK(s);
// We need to spawn a thread to avoid deadlock since getting a
// snpashot might end up calling AdvanceSeqByOne which needs joining
// the write queue.
auto t = rocksdb::port::Thread(snap_callback);
t.detach();
TEST_SYNC_POINT("callback:end");
}
};
// Wait for the first snapshot be taken in GetSnapshotInternal. Although
// it might be updated before GetSnapshotInternal finishes but this should
// cover most of the cases.
rocksdb::SyncPoint::GetInstance()->LoadDependency({
{"WritePreparedTxnDB::GetSnapshotInternal:first", "callback:end"},
});
SyncPoint::GetInstance()->SetCallBack("RemovePrepared:Start", callback);
SyncPoint::GetInstance()->EnableProcessing();
// Thread to cause frequent evictions
Expand All @@ -3098,9 +3109,15 @@ TEST_P(WritePreparedTransactionTest, CommitOfDelayedPrepared) {
// Let an eviction to kick in
std::this_thread::yield();

snapshot_taken.store(false);
exp_prepare.store(txn->GetId());
ASSERT_OK(txn->Commit());
delete txn;
// Wait for the snapshot taking that is triggered by
// RemovePrepared:Start callback
while (!snapshot_taken) {
std::this_thread::yield();
}

// Read with the snapshot taken before delayed_prepared_ cleanup
ReadOptions roptions;
Expand Down
26 changes: 19 additions & 7 deletions utilities/transactions/write_prepared_txn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -169,12 +169,15 @@ Status WritePreparedTxn::CommitInternal() {
assert(!s.ok() || seq_used != kMaxSequenceNumber);
const SequenceNumber commit_batch_seq = seq_used;
if (LIKELY(do_one_write || !s.ok())) {
if (LIKELY(s.ok())) {
// Note RemovePrepared should be called after WriteImpl that publishsed
if (UNLIKELY(!db_impl_->immutable_db_options().two_write_queues &&
s.ok())) {
// Note: RemovePrepared should be called after WriteImpl that publishsed
// the seq. Otherwise SmallestUnCommittedSeq optimization breaks.
wpt_db_->RemovePrepared(prepare_seq, prepare_batch_cnt_);
}
} // else RemovePrepared is called from within PreReleaseCallback
if (UNLIKELY(!do_one_write)) {
assert(!s.ok());
// Cleanup the prepared entry we added with add_prepared_callback
wpt_db_->RemovePrepared(commit_batch_seq, commit_batch_cnt);
}
return s;
Expand All @@ -199,10 +202,14 @@ Status WritePreparedTxn::CommitInternal() {
NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
&update_commit_map_with_aux_batch);
assert(!s.ok() || seq_used != kMaxSequenceNumber);
// Note RemovePrepared should be called after WriteImpl that publishsed the
// seq. Otherwise SmallestUnCommittedSeq optimization breaks.
wpt_db_->RemovePrepared(prepare_seq, prepare_batch_cnt_);
wpt_db_->RemovePrepared(commit_batch_seq, commit_batch_cnt);
if (UNLIKELY(!db_impl_->immutable_db_options().two_write_queues)) {
if (s.ok()) {
// Note: RemovePrepared should be called after WriteImpl that publishsed
// the seq. Otherwise SmallestUnCommittedSeq optimization breaks.
wpt_db_->RemovePrepared(prepare_seq, prepare_batch_cnt_);
}
wpt_db_->RemovePrepared(commit_batch_seq, commit_batch_cnt);
} // else RemovePrepared is called from within PreReleaseCallback
return s;
}

Expand Down Expand Up @@ -348,6 +355,7 @@ Status WritePreparedTxn::RollbackInternal() {
return s;
}
if (do_one_write) {
assert(!db_impl_->immutable_db_options().two_write_queues);
wpt_db_->RemovePrepared(GetId(), prepare_batch_cnt_);
return s;
} // else do the 2nd write for commit
Expand All @@ -370,9 +378,13 @@ Status WritePreparedTxn::RollbackInternal() {
ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
"RollbackInternal (status=%s) commit: %" PRIu64,
s.ToString().c_str(), GetId());
// TODO(lth): For WriteUnPrepared that rollback is called frequently,
// RemovePrepared could be moved to the callback to reduce lock contention.
if (s.ok()) {
wpt_db_->RemovePrepared(GetId(), prepare_batch_cnt_);
}
// Note: RemovePrepared for prepared batch is called from within
// PreReleaseCallback
wpt_db_->RemovePrepared(rollback_seq, ONE_BATCH);

return s;
Expand Down
Loading

0 comments on commit c292dc8

Please sign in to comment.