Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WritePrepared Txn: fix smallest_prep atomicity issue #3703

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions utilities/transactions/write_prepared_transaction_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -698,8 +698,10 @@ TEST_P(WritePreparedTransactionTest, OldCommitMapGC) {
wp_db->TakeSnapshot(snap_seq1);
auto commit_seq = ++seq;
wp_db->AddCommitted(prep_seq, commit_seq);
wp_db->RemovePrepared(prep_seq);
auto commit_seq2 = ++seq;
wp_db->AddCommitted(prep_seq2, commit_seq2);
wp_db->RemovePrepared(prep_seq2);
// Take the 2nd and 3rd snapshot that overlap with the same txn
prep_seq = ++seq;
wp_db->AddPrepared(prep_seq);
Expand All @@ -711,12 +713,14 @@ TEST_P(WritePreparedTransactionTest, OldCommitMapGC) {
seq++;
commit_seq = ++seq;
wp_db->AddCommitted(prep_seq, commit_seq);
wp_db->RemovePrepared(prep_seq);
// Make sure max_evicted_seq_ will be larger than 2nd snapshot by evicting the
// only item in the commit_cache_ via another commit.
prep_seq = ++seq;
wp_db->AddPrepared(prep_seq);
commit_seq = ++seq;
wp_db->AddCommitted(prep_seq, commit_seq);
wp_db->RemovePrepared(prep_seq);

// Verify that the evicted commit entries for all snapshots are in the
// old_commit_map_
Expand Down Expand Up @@ -1367,6 +1371,7 @@ TEST_P(WritePreparedTransactionTest, IsInSnapshotTest) {
} else { // else commit it
seq++;
wp_db->AddCommitted(cur_txn, seq);
wp_db->RemovePrepared(cur_txn);
commit_seqs.insert(seq);
if (!snapshot) {
committed_before.insert(cur_txn);
Expand Down Expand Up @@ -1416,9 +1421,11 @@ TEST_P(WritePreparedTransactionTest, IsInSnapshotTest) {
// they are committed.
if (cur_txn) {
wp_db->AddCommitted(cur_txn, seq);
wp_db->RemovePrepared(cur_txn);
}
for (auto p : prepared) {
wp_db->AddCommitted(p, seq);
wp_db->RemovePrepared(p);
}
ASSERT_TRUE(wp_db->delayed_prepared_.empty());
ASSERT_TRUE(wp_db->prepared_txns_.empty());
Expand Down
27 changes: 21 additions & 6 deletions utilities/transactions/write_prepared_txn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,6 @@ Status WritePreparedTxn::CommitInternal() {
assert(s.ok());
commit_batch_cnt = counter.BatchCount();
}
const bool PREP_HEAP_SKIPPED = true;
const bool disable_memtable = !includes_data;
const bool do_one_write =
!db_impl_->immutable_db_options().two_write_queues || disable_memtable;
Expand All @@ -149,7 +148,7 @@ Status WritePreparedTxn::CommitInternal() {
// CommitTimeWriteBatch commits with PreReleaseCallback.
WritePreparedCommitEntryPreReleaseCallback update_commit_map(
wpt_db_, db_impl_, prepare_seq, prepare_batch_cnt_, commit_batch_cnt,
!PREP_HEAP_SKIPPED, publish_seq);
publish_seq);
uint64_t seq_used = kMaxSequenceNumber;
// Since the prepared batch is directly written to memtable, there is already
// a connection between the memtable and its WAL, so there is no need to
Expand All @@ -161,6 +160,11 @@ Status WritePreparedTxn::CommitInternal() {
batch_cnt, &update_commit_map);
assert(!s.ok() || seq_used != kMaxSequenceNumber);
if (LIKELY(do_one_write || !s.ok())) {
if (LIKELY(s.ok())) {
// Note RemovePrepared should be called after WriteImpl that publishsed
// the seq. Otherwise SmallestUnCommittedSeq optimization breaks.
wpt_db_->RemovePrepared(prepare_seq, prepare_batch_cnt_);
}
return s;
} // else do the 2nd write to publish seq
// Note: the 2nd write comes with a performance penality. So if we have too
Expand Down Expand Up @@ -192,6 +196,9 @@ Status WritePreparedTxn::CommitInternal() {
NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
&publish_seq_callback);
assert(!s.ok() || seq_used != kMaxSequenceNumber);
// Note RemovePrepared should be called after WriteImpl that publishsed the
// seq. Otherwise SmallestUnCommittedSeq optimization breaks.
wpt_db_->RemovePrepared(prepare_seq, prepare_batch_cnt_);
return s;
}

Expand Down Expand Up @@ -324,10 +331,8 @@ Status WritePreparedTxn::RollbackInternal() {
// Commit the batch by writing an empty batch to the queue that will release
// the commit sequence number to readers.
const size_t ZERO_COMMITS = 0;
const bool PREP_HEAP_SKIPPED = true;
WritePreparedCommitEntryPreReleaseCallback update_commit_map_with_prepare(
wpt_db_, db_impl_, prepare_seq, ONE_BATCH, ZERO_COMMITS,
PREP_HEAP_SKIPPED);
wpt_db_, db_impl_, prepare_seq, ONE_BATCH, ZERO_COMMITS);
WriteBatch empty_batch;
empty_batch.PutLogData(Slice());
// In the absence of Prepare markers, use Noop as a batch separator
Expand Down Expand Up @@ -379,10 +384,20 @@ Status WritePreparedTxn::ValidateSnapshot(ColumnFamilyHandle* column_family,
}

void WritePreparedTxn::SetSnapshot() {
// Note: for this optimization setting the last sequence number and obtaining
// the smallest uncommitted seq should be done atomically. However to avoid
// the mutex overhead, we call SmallestUnCommittedSeq BEFORE taking the
// snapshot. Since we always updated the list of unprepared seq (via
// AddPrepared) AFTER the last sequence is updated, this guarantees that the
// smallest uncommited seq that we pair with the snapshot is smaller or equal
// the value that would be obtained otherwise atomically. That is ok since
// this optimization works as long as min_uncommitted is less than or equal
// than the smallest uncommitted seq when the snapshot was taken.
auto min_uncommitted = wpt_db_->SmallestUnCommittedSeq();
const bool FOR_WW_CONFLICT_CHECK = true;
SnapshotImpl* snapshot = dbimpl_->GetSnapshotImpl(FOR_WW_CONFLICT_CHECK);
assert(snapshot);
wpt_db_->EnhanceSnapshot(snapshot);
wpt_db_->EnhanceSnapshot(snapshot, min_uncommitted);
SetSnapshotInternal(snapshot);
}

Expand Down
53 changes: 25 additions & 28 deletions utilities/transactions/write_prepared_txn_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ Status WritePreparedTxnDB::Initialize(
virtual Status Callback(SequenceNumber commit_seq,
bool is_mem_disabled) override {
assert(!is_mem_disabled);
const bool PREPARE_SKIPPED = true;
db_->AddCommitted(commit_seq, commit_seq, PREPARE_SKIPPED);
db_->AddCommitted(commit_seq, commit_seq);
return Status::OK();
}

Expand Down Expand Up @@ -167,7 +166,7 @@ Status WritePreparedTxnDB::WriteInternal(const WriteOptions& write_options_orig,
no_log_ref, !DISABLE_MEMTABLE, &seq_used,
batch_cnt, pre_release_callback);
assert(!s.ok() || seq_used != kMaxSequenceNumber);
uint64_t& prepare_seq = seq_used;
uint64_t prepare_seq = seq_used;
if (txn != nullptr) {
txn->SetId(prepare_seq);
}
Expand All @@ -185,9 +184,8 @@ Status WritePreparedTxnDB::WriteInternal(const WriteOptions& write_options_orig,
// Commit the batch by writing an empty batch to the 2nd queue that will
// release the commit sequence number to readers.
const size_t ZERO_COMMITS = 0;
const bool PREP_HEAP_SKIPPED = true;
WritePreparedCommitEntryPreReleaseCallback update_commit_map_with_prepare(
this, db_impl_, prepare_seq, batch_cnt, ZERO_COMMITS, !PREP_HEAP_SKIPPED);
this, db_impl_, prepare_seq, batch_cnt, ZERO_COMMITS);
WriteBatch empty_batch;
empty_batch.PutLogData(Slice());
const size_t ONE_BATCH = 1;
Expand All @@ -197,6 +195,9 @@ Status WritePreparedTxnDB::WriteInternal(const WriteOptions& write_options_orig,
no_log_ref, DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
&update_commit_map_with_prepare);
assert(!s.ok() || seq_used != kMaxSequenceNumber);
// Note RemovePrepared should be called after WriteImpl that publishsed the
// seq. Otherwise SmallestUnCommittedSeq optimization breaks.
RemovePrepared(prepare_seq, batch_cnt);
return s;
}

Expand Down Expand Up @@ -392,24 +393,13 @@ void WritePreparedTxnDB::RollbackPrepared(uint64_t prep_seq,
throw std::runtime_error(
"Rollback reqeust while there are live snapshots.");
}
WriteLock wl(&prepared_mutex_);
prepared_txns_.erase(prep_seq);
bool was_empty = delayed_prepared_.empty();
if (!was_empty) {
delayed_prepared_.erase(prep_seq);
bool is_empty = delayed_prepared_.empty();
if (was_empty != is_empty) {
delayed_prepared_empty_.store(is_empty, std::memory_order_release);
}
}
RemovePrepared(prep_seq);
}

void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq, uint64_t commit_seq,
bool prepare_skipped, uint8_t loop_cnt) {
ROCKS_LOG_DETAILS(info_log_,
"Txn %" PRIu64 " Committing with %" PRIu64
"(prepare_skipped=%d)",
prepare_seq, commit_seq, prepare_skipped);
uint8_t loop_cnt) {
ROCKS_LOG_DETAILS(info_log_, "Txn %" PRIu64 " Committing with %" PRIu64,
prepare_seq, commit_seq);
TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:start");
TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:start:pause");
auto indexed_seq = prepare_seq % COMMIT_CACHE_SIZE;
Expand Down Expand Up @@ -443,23 +433,27 @@ void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq, uint64_t commit_seq,
if (loop_cnt > 100) {
throw std::runtime_error("Infinite loop in AddCommitted!");
}
AddCommitted(prepare_seq, commit_seq, prepare_skipped, ++loop_cnt);
AddCommitted(prepare_seq, commit_seq, ++loop_cnt);
return;
}
if (!prepare_skipped) {
WriteLock wl(&prepared_mutex_);
prepared_txns_.erase(prepare_seq);
TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:end");
TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:end:pause");
}

void WritePreparedTxnDB::RemovePrepared(const uint64_t prepare_seq,
const size_t batch_cnt) {
WriteLock wl(&prepared_mutex_);
for (size_t i = 0; i < batch_cnt; i++) {
prepared_txns_.erase(prepare_seq + i);
bool was_empty = delayed_prepared_.empty();
if (!was_empty) {
delayed_prepared_.erase(prepare_seq);
delayed_prepared_.erase(prepare_seq + i);
bool is_empty = delayed_prepared_.empty();
if (was_empty != is_empty) {
delayed_prepared_empty_.store(is_empty, std::memory_order_release);
}
}
}
TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:end");
TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:end:pause");
}

bool WritePreparedTxnDB::GetCommitEntry(const uint64_t indexed_seq,
Expand Down Expand Up @@ -542,10 +536,13 @@ void WritePreparedTxnDB::AdvanceMaxEvictedSeq(const SequenceNumber& prev_max,
}

const Snapshot* WritePreparedTxnDB::GetSnapshot() {
// Note: SmallestUnCommittedSeq must be called before GetSnapshotImpl. Refer
// to WritePreparedTxn::SetSnapshot for more explanation.
auto min_uncommitted = WritePreparedTxnDB::SmallestUnCommittedSeq();
const bool FOR_WW_CONFLICT_CHECK = true;
SnapshotImpl* snap_impl = db_impl_->GetSnapshotImpl(!FOR_WW_CONFLICT_CHECK);
assert(snap_impl);
EnhanceSnapshot(snap_impl);
EnhanceSnapshot(snap_impl, min_uncommitted);
return snap_impl;
}

Expand Down
26 changes: 13 additions & 13 deletions utilities/transactions/write_prepared_txn_db.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,11 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
// should check delayed_prepared_ first before applying this optimization.
// TODO(myabandeh): include delayed_prepared_ in min_uncommitted
if (prep_seq < min_uncommitted) {
ROCKS_LOG_DETAILS(info_log_,
"IsInSnapshot %" PRIu64 " in %" PRIu64
" returns %" PRId32
" because of min_uncommitted %" PRIu64,
prep_seq, snapshot_seq, 1, min_uncommitted);
return true;
}
auto indexed_seq = prep_seq % COMMIT_CACHE_SIZE;
Expand Down Expand Up @@ -242,15 +247,16 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {

// Add the transaction with prepare sequence seq to the prepared list
void AddPrepared(uint64_t seq);
// Remove the transaction with prepare sequence seq from the prepared list
void RemovePrepared(const uint64_t seq, const size_t batch_cnt = 1);
// Rollback a prepared txn identified with prep_seq. rollback_seq is the seq
// with which the additional data is written to cancel the txn effect. It can
// be used to identify the snapshots that overlap with the rolled back txn.
void RollbackPrepared(uint64_t prep_seq, uint64_t rollback_seq);
// Add the transaction with prepare sequence prepare_seq and commit sequence
// commit_seq to the commit map. prepare_skipped is set if the prepare phase
// is skipped for this commit. loop_cnt is to detect infinite loops.
// commit_seq to the commit map. loop_cnt is to detect infinite loops.
void AddCommitted(uint64_t prepare_seq, uint64_t commit_seq,
bool prepare_skipped = false, uint8_t loop_cnt = 0);
uint8_t loop_cnt = 0);

struct CommitEntry {
uint64_t prep_seq;
Expand Down Expand Up @@ -492,9 +498,9 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
}
}
// Enhance the snapshot object by recording in it the smallest uncommitted seq
inline void EnhanceSnapshot(SnapshotImpl* snapshot) {
inline void EnhanceSnapshot(SnapshotImpl* snapshot, SequenceNumber min_uncommitted) {
assert(snapshot);
snapshot->min_uncommitted_ = WritePreparedTxnDB::SmallestUnCommittedSeq();
snapshot->min_uncommitted_ = min_uncommitted;
}

virtual const std::vector<SequenceNumber> GetSnapshotListFromDB(
Expand Down Expand Up @@ -648,14 +654,12 @@ class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback {
SequenceNumber prep_seq,
size_t prep_batch_cnt,
size_t data_batch_cnt = 0,
bool prep_heap_skipped = false,
bool publish_seq = true)
: db_(db),
db_impl_(db_impl),
prep_seq_(prep_seq),
prep_batch_cnt_(prep_batch_cnt),
data_batch_cnt_(data_batch_cnt),
prep_heap_skipped_(prep_heap_skipped),
includes_data_(data_batch_cnt_ > 0),
publish_seq_(publish_seq) {
assert((prep_batch_cnt_ > 0) != (prep_seq == kMaxSequenceNumber)); // xor
Expand All @@ -670,18 +674,17 @@ class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback {
: commit_seq + data_batch_cnt_ - 1;
if (prep_seq_ != kMaxSequenceNumber) {
for (size_t i = 0; i < prep_batch_cnt_; i++) {
db_->AddCommitted(prep_seq_ + i, last_commit_seq, prep_heap_skipped_);
db_->AddCommitted(prep_seq_ + i, last_commit_seq);
}
} // else there was no prepare phase
if (includes_data_) {
assert(data_batch_cnt_);
// Commit the data that is accompanied with the commit request
const bool PREPARE_SKIPPED = true;
for (size_t i = 0; i < data_batch_cnt_; i++) {
// For commit seq of each batch use the commit seq of the last batch.
// This would make debugging easier by having all the batches having
// the same sequence number.
db_->AddCommitted(commit_seq + i, last_commit_seq, PREPARE_SKIPPED);
db_->AddCommitted(commit_seq + i, last_commit_seq);
}
}
if (db_impl_->immutable_db_options().two_write_queues && publish_seq_) {
Expand All @@ -704,9 +707,6 @@ class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback {
SequenceNumber prep_seq_;
size_t prep_batch_cnt_;
size_t data_batch_cnt_;
// An optimization that indicates that there is no need to update the prepare
// heap since the prepare sequence number was not added to it.
bool prep_heap_skipped_;
// Either because it is commit without prepare or it has a
// CommitTimeWriteBatch
bool includes_data_;
Expand Down