Skip to content

Commit

Permalink
WritePrepared Txn: fix smallest_prep atomicity issue
Browse files Browse the repository at this point in the history
Summary:
We introduced smallest_prep optimization in this commit b225de7, which enables storing the smallest uncommitted sequence number along with the snapshot. This enables the readers that read from the snapshot to skip further checks and safely assumed the data is committed if its sequence number is less than smallest uncommitted when the snapshot was taken. The problem was that smallest uncommitted and the snapshot must be taken atomically, and the lack of atomicity had led to readers using a smallest uncommitted after the snapshot was taken and hence mistakenly skipping some data.
This patch fixes the problem by i) separating the process of removing of prepare entries from the AddCommitted function, ii) removing the prepare entires AFTER the committed sequence number is published, iii) getting smallest uncommitted (from the prepare list) BEFORE taking a snapshot. This guarantees that the smallest uncommitted that is accompanied with a snapshot is less than or equal of such number if it was obtained atomically.

Tested by running MySQLStyleTransactionTest/MySQLStyleTransactionTest.TransactionStressTest that was failing sporadically.
Closes #3703

Differential Revision: D7581934

Pulled By: maysamyabandeh

fbshipit-source-id: dc9d6f4fb477eba75d4d5927326905b548a96a32
  • Loading branch information
Maysam Yabandeh authored and facebook-github-bot committed Apr 12, 2018
1 parent d42bd04 commit 6f5e644
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 47 deletions.
7 changes: 7 additions & 0 deletions utilities/transactions/write_prepared_transaction_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -698,8 +698,10 @@ TEST_P(WritePreparedTransactionTest, OldCommitMapGC) {
wp_db->TakeSnapshot(snap_seq1);
auto commit_seq = ++seq;
wp_db->AddCommitted(prep_seq, commit_seq);
wp_db->RemovePrepared(prep_seq);
auto commit_seq2 = ++seq;
wp_db->AddCommitted(prep_seq2, commit_seq2);
wp_db->RemovePrepared(prep_seq2);
// Take the 2nd and 3rd snapshot that overlap with the same txn
prep_seq = ++seq;
wp_db->AddPrepared(prep_seq);
Expand All @@ -711,12 +713,14 @@ TEST_P(WritePreparedTransactionTest, OldCommitMapGC) {
seq++;
commit_seq = ++seq;
wp_db->AddCommitted(prep_seq, commit_seq);
wp_db->RemovePrepared(prep_seq);
// Make sure max_evicted_seq_ will be larger than 2nd snapshot by evicting the
// only item in the commit_cache_ via another commit.
prep_seq = ++seq;
wp_db->AddPrepared(prep_seq);
commit_seq = ++seq;
wp_db->AddCommitted(prep_seq, commit_seq);
wp_db->RemovePrepared(prep_seq);

// Verify that the evicted commit entries for all snapshots are in the
// old_commit_map_
Expand Down Expand Up @@ -1367,6 +1371,7 @@ TEST_P(WritePreparedTransactionTest, IsInSnapshotTest) {
} else { // else commit it
seq++;
wp_db->AddCommitted(cur_txn, seq);
wp_db->RemovePrepared(cur_txn);
commit_seqs.insert(seq);
if (!snapshot) {
committed_before.insert(cur_txn);
Expand Down Expand Up @@ -1416,9 +1421,11 @@ TEST_P(WritePreparedTransactionTest, IsInSnapshotTest) {
// they are committed.
if (cur_txn) {
wp_db->AddCommitted(cur_txn, seq);
wp_db->RemovePrepared(cur_txn);
}
for (auto p : prepared) {
wp_db->AddCommitted(p, seq);
wp_db->RemovePrepared(p);
}
ASSERT_TRUE(wp_db->delayed_prepared_.empty());
ASSERT_TRUE(wp_db->prepared_txns_.empty());
Expand Down
27 changes: 21 additions & 6 deletions utilities/transactions/write_prepared_txn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,6 @@ Status WritePreparedTxn::CommitInternal() {
assert(s.ok());
commit_batch_cnt = counter.BatchCount();
}
const bool PREP_HEAP_SKIPPED = true;
const bool disable_memtable = !includes_data;
const bool do_one_write =
!db_impl_->immutable_db_options().two_write_queues || disable_memtable;
Expand All @@ -149,7 +148,7 @@ Status WritePreparedTxn::CommitInternal() {
// CommitTimeWriteBatch commits with PreReleaseCallback.
WritePreparedCommitEntryPreReleaseCallback update_commit_map(
wpt_db_, db_impl_, prepare_seq, prepare_batch_cnt_, commit_batch_cnt,
!PREP_HEAP_SKIPPED, publish_seq);
publish_seq);
uint64_t seq_used = kMaxSequenceNumber;
// Since the prepared batch is directly written to memtable, there is already
// a connection between the memtable and its WAL, so there is no need to
Expand All @@ -161,6 +160,11 @@ Status WritePreparedTxn::CommitInternal() {
batch_cnt, &update_commit_map);
assert(!s.ok() || seq_used != kMaxSequenceNumber);
if (LIKELY(do_one_write || !s.ok())) {
if (LIKELY(s.ok())) {
// Note RemovePrepared should be called after WriteImpl that publishsed
// the seq. Otherwise SmallestUnCommittedSeq optimization breaks.
wpt_db_->RemovePrepared(prepare_seq, prepare_batch_cnt_);
}
return s;
} // else do the 2nd write to publish seq
// Note: the 2nd write comes with a performance penality. So if we have too
Expand Down Expand Up @@ -192,6 +196,9 @@ Status WritePreparedTxn::CommitInternal() {
NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
&publish_seq_callback);
assert(!s.ok() || seq_used != kMaxSequenceNumber);
// Note RemovePrepared should be called after WriteImpl that publishsed the
// seq. Otherwise SmallestUnCommittedSeq optimization breaks.
wpt_db_->RemovePrepared(prepare_seq, prepare_batch_cnt_);
return s;
}

Expand Down Expand Up @@ -324,10 +331,8 @@ Status WritePreparedTxn::RollbackInternal() {
// Commit the batch by writing an empty batch to the queue that will release
// the commit sequence number to readers.
const size_t ZERO_COMMITS = 0;
const bool PREP_HEAP_SKIPPED = true;
WritePreparedCommitEntryPreReleaseCallback update_commit_map_with_prepare(
wpt_db_, db_impl_, prepare_seq, ONE_BATCH, ZERO_COMMITS,
PREP_HEAP_SKIPPED);
wpt_db_, db_impl_, prepare_seq, ONE_BATCH, ZERO_COMMITS);
WriteBatch empty_batch;
empty_batch.PutLogData(Slice());
// In the absence of Prepare markers, use Noop as a batch separator
Expand Down Expand Up @@ -379,10 +384,20 @@ Status WritePreparedTxn::ValidateSnapshot(ColumnFamilyHandle* column_family,
}

void WritePreparedTxn::SetSnapshot() {
// Note: for this optimization setting the last sequence number and obtaining
// the smallest uncommitted seq should be done atomically. However to avoid
// the mutex overhead, we call SmallestUnCommittedSeq BEFORE taking the
// snapshot. Since we always updated the list of unprepared seq (via
// AddPrepared) AFTER the last sequence is updated, this guarantees that the
// smallest uncommited seq that we pair with the snapshot is smaller or equal
// the value that would be obtained otherwise atomically. That is ok since
// this optimization works as long as min_uncommitted is less than or equal
// than the smallest uncommitted seq when the snapshot was taken.
auto min_uncommitted = wpt_db_->SmallestUnCommittedSeq();
const bool FOR_WW_CONFLICT_CHECK = true;
SnapshotImpl* snapshot = dbimpl_->GetSnapshotImpl(FOR_WW_CONFLICT_CHECK);
assert(snapshot);
wpt_db_->EnhanceSnapshot(snapshot);
wpt_db_->EnhanceSnapshot(snapshot, min_uncommitted);
SetSnapshotInternal(snapshot);
}

Expand Down
53 changes: 25 additions & 28 deletions utilities/transactions/write_prepared_txn_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ Status WritePreparedTxnDB::Initialize(
virtual Status Callback(SequenceNumber commit_seq,
bool is_mem_disabled) override {
assert(!is_mem_disabled);
const bool PREPARE_SKIPPED = true;
db_->AddCommitted(commit_seq, commit_seq, PREPARE_SKIPPED);
db_->AddCommitted(commit_seq, commit_seq);
return Status::OK();
}

Expand Down Expand Up @@ -167,7 +166,7 @@ Status WritePreparedTxnDB::WriteInternal(const WriteOptions& write_options_orig,
no_log_ref, !DISABLE_MEMTABLE, &seq_used,
batch_cnt, pre_release_callback);
assert(!s.ok() || seq_used != kMaxSequenceNumber);
uint64_t& prepare_seq = seq_used;
uint64_t prepare_seq = seq_used;
if (txn != nullptr) {
txn->SetId(prepare_seq);
}
Expand All @@ -185,9 +184,8 @@ Status WritePreparedTxnDB::WriteInternal(const WriteOptions& write_options_orig,
// Commit the batch by writing an empty batch to the 2nd queue that will
// release the commit sequence number to readers.
const size_t ZERO_COMMITS = 0;
const bool PREP_HEAP_SKIPPED = true;
WritePreparedCommitEntryPreReleaseCallback update_commit_map_with_prepare(
this, db_impl_, prepare_seq, batch_cnt, ZERO_COMMITS, !PREP_HEAP_SKIPPED);
this, db_impl_, prepare_seq, batch_cnt, ZERO_COMMITS);
WriteBatch empty_batch;
empty_batch.PutLogData(Slice());
const size_t ONE_BATCH = 1;
Expand All @@ -197,6 +195,9 @@ Status WritePreparedTxnDB::WriteInternal(const WriteOptions& write_options_orig,
no_log_ref, DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
&update_commit_map_with_prepare);
assert(!s.ok() || seq_used != kMaxSequenceNumber);
// Note RemovePrepared should be called after WriteImpl that publishsed the
// seq. Otherwise SmallestUnCommittedSeq optimization breaks.
RemovePrepared(prepare_seq, batch_cnt);
return s;
}

Expand Down Expand Up @@ -392,24 +393,13 @@ void WritePreparedTxnDB::RollbackPrepared(uint64_t prep_seq,
throw std::runtime_error(
"Rollback reqeust while there are live snapshots.");
}
WriteLock wl(&prepared_mutex_);
prepared_txns_.erase(prep_seq);
bool was_empty = delayed_prepared_.empty();
if (!was_empty) {
delayed_prepared_.erase(prep_seq);
bool is_empty = delayed_prepared_.empty();
if (was_empty != is_empty) {
delayed_prepared_empty_.store(is_empty, std::memory_order_release);
}
}
RemovePrepared(prep_seq);
}

void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq, uint64_t commit_seq,
bool prepare_skipped, uint8_t loop_cnt) {
ROCKS_LOG_DETAILS(info_log_,
"Txn %" PRIu64 " Committing with %" PRIu64
"(prepare_skipped=%d)",
prepare_seq, commit_seq, prepare_skipped);
uint8_t loop_cnt) {
ROCKS_LOG_DETAILS(info_log_, "Txn %" PRIu64 " Committing with %" PRIu64,
prepare_seq, commit_seq);
TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:start");
TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:start:pause");
auto indexed_seq = prepare_seq % COMMIT_CACHE_SIZE;
Expand Down Expand Up @@ -443,23 +433,27 @@ void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq, uint64_t commit_seq,
if (loop_cnt > 100) {
throw std::runtime_error("Infinite loop in AddCommitted!");
}
AddCommitted(prepare_seq, commit_seq, prepare_skipped, ++loop_cnt);
AddCommitted(prepare_seq, commit_seq, ++loop_cnt);
return;
}
if (!prepare_skipped) {
WriteLock wl(&prepared_mutex_);
prepared_txns_.erase(prepare_seq);
TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:end");
TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:end:pause");
}

void WritePreparedTxnDB::RemovePrepared(const uint64_t prepare_seq,
const size_t batch_cnt) {
WriteLock wl(&prepared_mutex_);
for (size_t i = 0; i < batch_cnt; i++) {
prepared_txns_.erase(prepare_seq + i);
bool was_empty = delayed_prepared_.empty();
if (!was_empty) {
delayed_prepared_.erase(prepare_seq);
delayed_prepared_.erase(prepare_seq + i);
bool is_empty = delayed_prepared_.empty();
if (was_empty != is_empty) {
delayed_prepared_empty_.store(is_empty, std::memory_order_release);
}
}
}
TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:end");
TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:end:pause");
}

bool WritePreparedTxnDB::GetCommitEntry(const uint64_t indexed_seq,
Expand Down Expand Up @@ -542,10 +536,13 @@ void WritePreparedTxnDB::AdvanceMaxEvictedSeq(const SequenceNumber& prev_max,
}

const Snapshot* WritePreparedTxnDB::GetSnapshot() {
// Note: SmallestUnCommittedSeq must be called before GetSnapshotImpl. Refer
// to WritePreparedTxn::SetSnapshot for more explanation.
auto min_uncommitted = WritePreparedTxnDB::SmallestUnCommittedSeq();
const bool FOR_WW_CONFLICT_CHECK = true;
SnapshotImpl* snap_impl = db_impl_->GetSnapshotImpl(!FOR_WW_CONFLICT_CHECK);
assert(snap_impl);
EnhanceSnapshot(snap_impl);
EnhanceSnapshot(snap_impl, min_uncommitted);
return snap_impl;
}

Expand Down
27 changes: 14 additions & 13 deletions utilities/transactions/write_prepared_txn_db.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,11 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
// should check delayed_prepared_ first before applying this optimization.
// TODO(myabandeh): include delayed_prepared_ in min_uncommitted
if (prep_seq < min_uncommitted) {
ROCKS_LOG_DETAILS(info_log_,
"IsInSnapshot %" PRIu64 " in %" PRIu64
" returns %" PRId32
" because of min_uncommitted %" PRIu64,
prep_seq, snapshot_seq, 1, min_uncommitted);
return true;
}
auto indexed_seq = prep_seq % COMMIT_CACHE_SIZE;
Expand Down Expand Up @@ -242,15 +247,16 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {

// Add the transaction with prepare sequence seq to the prepared list
void AddPrepared(uint64_t seq);
// Remove the transaction with prepare sequence seq from the prepared list
void RemovePrepared(const uint64_t seq, const size_t batch_cnt = 1);
// Rollback a prepared txn identified with prep_seq. rollback_seq is the seq
// with which the additional data is written to cancel the txn effect. It can
// be used to identify the snapshots that overlap with the rolled back txn.
void RollbackPrepared(uint64_t prep_seq, uint64_t rollback_seq);
// Add the transaction with prepare sequence prepare_seq and commit sequence
// commit_seq to the commit map. prepare_skipped is set if the prepare phase
// is skipped for this commit. loop_cnt is to detect infinite loops.
// commit_seq to the commit map. loop_cnt is to detect infinite loops.
void AddCommitted(uint64_t prepare_seq, uint64_t commit_seq,
bool prepare_skipped = false, uint8_t loop_cnt = 0);
uint8_t loop_cnt = 0);

struct CommitEntry {
uint64_t prep_seq;
Expand Down Expand Up @@ -492,9 +498,10 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
}
}
// Enhance the snapshot object by recording in it the smallest uncommitted seq
inline void EnhanceSnapshot(SnapshotImpl* snapshot) {
inline void EnhanceSnapshot(SnapshotImpl* snapshot,
SequenceNumber min_uncommitted) {
assert(snapshot);
snapshot->min_uncommitted_ = WritePreparedTxnDB::SmallestUnCommittedSeq();
snapshot->min_uncommitted_ = min_uncommitted;
}

virtual const std::vector<SequenceNumber> GetSnapshotListFromDB(
Expand Down Expand Up @@ -648,14 +655,12 @@ class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback {
SequenceNumber prep_seq,
size_t prep_batch_cnt,
size_t data_batch_cnt = 0,
bool prep_heap_skipped = false,
bool publish_seq = true)
: db_(db),
db_impl_(db_impl),
prep_seq_(prep_seq),
prep_batch_cnt_(prep_batch_cnt),
data_batch_cnt_(data_batch_cnt),
prep_heap_skipped_(prep_heap_skipped),
includes_data_(data_batch_cnt_ > 0),
publish_seq_(publish_seq) {
assert((prep_batch_cnt_ > 0) != (prep_seq == kMaxSequenceNumber)); // xor
Expand All @@ -670,18 +675,17 @@ class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback {
: commit_seq + data_batch_cnt_ - 1;
if (prep_seq_ != kMaxSequenceNumber) {
for (size_t i = 0; i < prep_batch_cnt_; i++) {
db_->AddCommitted(prep_seq_ + i, last_commit_seq, prep_heap_skipped_);
db_->AddCommitted(prep_seq_ + i, last_commit_seq);
}
} // else there was no prepare phase
if (includes_data_) {
assert(data_batch_cnt_);
// Commit the data that is accompanied with the commit request
const bool PREPARE_SKIPPED = true;
for (size_t i = 0; i < data_batch_cnt_; i++) {
// For commit seq of each batch use the commit seq of the last batch.
// This would make debugging easier by having all the batches having
// the same sequence number.
db_->AddCommitted(commit_seq + i, last_commit_seq, PREPARE_SKIPPED);
db_->AddCommitted(commit_seq + i, last_commit_seq);
}
}
if (db_impl_->immutable_db_options().two_write_queues && publish_seq_) {
Expand All @@ -704,9 +708,6 @@ class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback {
SequenceNumber prep_seq_;
size_t prep_batch_cnt_;
size_t data_batch_cnt_;
// An optimization that indicates that there is no need to update the prepare
// heap since the prepare sequence number was not added to it.
bool prep_heap_skipped_;
// Either because it is commit without prepare or it has a
// CommitTimeWriteBatch
bool includes_data_;
Expand Down

0 comments on commit 6f5e644

Please sign in to comment.