-
Notifications
You must be signed in to change notification settings - Fork 6.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WritePrepared: reduce prepared_mutex_ overhead #5420
Changes from all commits
1a4a243
bfd44d8
f6949a8
2813eb7
b5c8fed
b111a1a
24d6458
6e6d68b
2fe7955
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,9 +7,9 @@ | |
|
||
#include "utilities/transactions/transaction_test.h" | ||
|
||
#include <cinttypes> | ||
#include <algorithm> | ||
#include <atomic> | ||
#include <cinttypes> | ||
#include <functional> | ||
#include <string> | ||
#include <thread> | ||
|
@@ -55,25 +55,17 @@ TEST(PreparedHeap, BasicsTest) { | |
heap.push(34l); | ||
// Test that old min is still on top | ||
ASSERT_EQ(14l, heap.top()); | ||
heap.push(13l); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. With ::AddPrepared being called from the main write queue, the push is now called in order. So we should remove tests with unordered calls to push. This will later help to simplify the implementation of PreparedHeap as well. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this is only true in case of two write queues though? If so, maybe we should just decide to only support two writes queues. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No it is true regardless. I did not mention one write queue because it has always been the case for one write queue to insert into PreparedHeap in order. |
||
// Test that the new min will be on top | ||
ASSERT_EQ(13l, heap.top()); | ||
// Test that it is persistent | ||
ASSERT_EQ(13l, heap.top()); | ||
heap.push(44l); | ||
heap.push(54l); | ||
heap.push(64l); | ||
heap.push(74l); | ||
heap.push(84l); | ||
// Test that old min is still on top | ||
ASSERT_EQ(13l, heap.top()); | ||
ASSERT_EQ(14l, heap.top()); | ||
heap.erase(24l); | ||
// Test that old min is still on top | ||
ASSERT_EQ(13l, heap.top()); | ||
ASSERT_EQ(14l, heap.top()); | ||
heap.erase(14l); | ||
// Test that old min is still on top | ||
ASSERT_EQ(13l, heap.top()); | ||
heap.erase(13l); | ||
// Test that the new comes to the top after multiple erase | ||
ASSERT_EQ(34l, heap.top()); | ||
heap.erase(34l); | ||
|
@@ -3001,13 +2993,16 @@ TEST_P(WritePreparedTransactionTest, AddPreparedBeforeMax) { | |
ASSERT_OK(txn->Put(Slice("key0"), uncommitted_value)); | ||
port::Mutex txn_mutex_; | ||
|
||
// t1) Insert prepared entry, t2) commit other entires to advance max | ||
// evicted sec and finish checking the existing prepared entires, t1) | ||
// t1) Insert prepared entry, t2) commit other entries to advance max | ||
// evicted sec and finish checking the existing prepared entries, t1) | ||
// AddPrepared, t2) update max_evicted_seq_ | ||
rocksdb::SyncPoint::GetInstance()->LoadDependency({ | ||
{"AddPrepared::begin:pause", "AddPreparedBeforeMax::read_thread:start"}, | ||
{"AdvanceMaxEvictedSeq::update_max:pause", "AddPrepared::begin:resume"}, | ||
{"AddPrepared::end", "AdvanceMaxEvictedSeq::update_max:resume"}, | ||
{"AddPreparedCallback::AddPrepared::begin:pause", | ||
"AddPreparedBeforeMax::read_thread:start"}, | ||
{"AdvanceMaxEvictedSeq::update_max:pause", | ||
"AddPreparedCallback::AddPrepared::begin:resume"}, | ||
{"AddPreparedCallback::AddPrepared::end", | ||
"AdvanceMaxEvictedSeq::update_max:resume"}, | ||
}); | ||
SyncPoint::GetInstance()->EnableProcessing(); | ||
|
||
|
@@ -3061,20 +3056,36 @@ TEST_P(WritePreparedTransactionTest, CommitOfDelayedPrepared) { | |
ReOpen(); | ||
std::atomic<const Snapshot*> snap = {nullptr}; | ||
std::atomic<SequenceNumber> exp_prepare = {0}; | ||
std::atomic<bool> snapshot_taken = {false}; | ||
// Value is synchronized via snap | ||
PinnableSlice value; | ||
// Take a snapshot after publish and before RemovePrepared:Start | ||
auto snap_callback = [&]() { | ||
ASSERT_EQ(nullptr, snap.load()); | ||
snap.store(db->GetSnapshot()); | ||
ReadOptions roptions; | ||
roptions.snapshot = snap.load(); | ||
auto s = db->Get(roptions, db->DefaultColumnFamily(), "key", &value); | ||
ASSERT_OK(s); | ||
snapshot_taken.store(true); | ||
}; | ||
auto callback = [&](void* param) { | ||
SequenceNumber prep_seq = *((SequenceNumber*)param); | ||
if (prep_seq == exp_prepare.load()) { // only for write_thread | ||
ASSERT_EQ(nullptr, snap.load()); | ||
snap.store(db->GetSnapshot()); | ||
ReadOptions roptions; | ||
roptions.snapshot = snap.load(); | ||
auto s = db->Get(roptions, db->DefaultColumnFamily(), "key", &value); | ||
ASSERT_OK(s); | ||
// We need to spawn a thread to avoid deadlock since getting a | ||
// snpashot might end up calling AdvanceSeqByOne which needs joining | ||
// the write queue. | ||
auto t = rocksdb::port::Thread(snap_callback); | ||
t.detach(); | ||
TEST_SYNC_POINT("callback:end"); | ||
} | ||
}; | ||
// Wait for the first snapshot be taken in GetSnapshotInternal. Although | ||
// it might be updated before GetSnapshotInternal finishes but this should | ||
// cover most of the cases. | ||
rocksdb::SyncPoint::GetInstance()->LoadDependency({ | ||
{"WritePreparedTxnDB::GetSnapshotInternal:first", "callback:end"}, | ||
}); | ||
SyncPoint::GetInstance()->SetCallBack("RemovePrepared:Start", callback); | ||
SyncPoint::GetInstance()->EnableProcessing(); | ||
// Thread to cause frequent evictions | ||
|
@@ -3098,9 +3109,15 @@ TEST_P(WritePreparedTransactionTest, CommitOfDelayedPrepared) { | |
// Let an eviction to kick in | ||
std::this_thread::yield(); | ||
|
||
snapshot_taken.store(false); | ||
exp_prepare.store(txn->GetId()); | ||
ASSERT_OK(txn->Commit()); | ||
delete txn; | ||
// Wait for the snapshot taking that is triggered by | ||
// RemovePrepared:Start callback | ||
while (!snapshot_taken) { | ||
std::this_thread::yield(); | ||
} | ||
|
||
// Read with the snapshot taken before delayed_prepared_ cleanup | ||
ReadOptions roptions; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The changes to db_impl_write.cc are separately approved in #5381