Skip to content

Commit

Permalink
WritePrepared Txn: smallest_prepare optimization
Browse files Browse the repository at this point in the history
Summary:
The is an optimization to reduce lookup in the CommitCache when querying IsInSnapshot. The optimization takes the smallest uncommitted data at the time that the snapshot was taken and if the sequence number of the read data is lower than that number it assumes the data as committed.
To implement this optimization two changes are required: i) The AddPrepared function must be called sequentially to avoid out of order insertion in the PrepareHeap (otherwise the top of the heap does not indicate the smallest prepare in future too), ii) non-2PC transactions also call AddPrepared if they do not commit in one step.
Closes #3649

Differential Revision: D7388630

Pulled By: maysamyabandeh

fbshipit-source-id: b79506238c17467d590763582960d4d90181c600
  • Loading branch information
Maysam Yabandeh authored and facebook-github-bot committed Apr 3, 2018
1 parent 1579626 commit b225de7
Show file tree
Hide file tree
Showing 9 changed files with 304 additions and 155 deletions.
2 changes: 1 addition & 1 deletion db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1681,7 +1681,7 @@ const Snapshot* DBImpl::GetSnapshotForWriteConflictBoundary() {
}
#endif // ROCKSDB_LITE

const Snapshot* DBImpl::GetSnapshotImpl(bool is_write_conflict_boundary) {
SnapshotImpl* DBImpl::GetSnapshotImpl(bool is_write_conflict_boundary) {
int64_t unix_time = 0;
env_->GetCurrentTime(&unix_time); // Ignore error
SnapshotImpl* s = new SnapshotImpl;
Expand Down
3 changes: 2 additions & 1 deletion db/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -731,6 +731,7 @@ class DBImpl : public DB {
friend class DB;
friend class InternalStats;
friend class PessimisticTransaction;
friend class TransactionBaseImpl;
friend class WriteCommittedTxn;
friend class WritePreparedTxn;
friend class WritePreparedTxnDB;
Expand Down Expand Up @@ -955,7 +956,7 @@ class DBImpl : public DB {
// helper function to call after some of the logs_ were synced
void MarkLogsSynced(uint64_t up_to, bool synced_dir, const Status& status);

const Snapshot* GetSnapshotImpl(bool is_write_conflict_boundary);
SnapshotImpl* GetSnapshotImpl(bool is_write_conflict_boundary);

uint64_t GetMaxTotalWalSize() const;

Expand Down
8 changes: 6 additions & 2 deletions db/snapshot_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ class SnapshotList;
class SnapshotImpl : public Snapshot {
public:
SequenceNumber number_; // const after creation
// It indicates the smallest uncommitted data at the time the snapshot was
// taken. This is currently used by WritePrepared transactions to limit the
// scope of queries to IsInSnpashot.
SequenceNumber min_uncommitted_ = 0;

virtual SequenceNumber GetSequenceNumber() const override { return number_; }

Expand Down Expand Up @@ -56,8 +60,8 @@ class SnapshotList {
SnapshotImpl* oldest() const { assert(!empty()); return list_.next_; }
SnapshotImpl* newest() const { assert(!empty()); return list_.prev_; }

const SnapshotImpl* New(SnapshotImpl* s, SequenceNumber seq,
uint64_t unix_time, bool is_write_conflict_boundary) {
SnapshotImpl* New(SnapshotImpl* s, SequenceNumber seq, uint64_t unix_time,
bool is_write_conflict_boundary) {
s->number_ = seq;
s->unix_time_ = unix_time;
s->is_write_conflict_boundary_ = is_write_conflict_boundary;
Expand Down
2 changes: 2 additions & 0 deletions utilities/transactions/pessimistic_transaction_db.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ class PessimisticTransactionDB : public TransactionDB {

virtual ~PessimisticTransactionDB();

virtual const Snapshot* GetSnapshot() override { return db_->GetSnapshot(); }

virtual Status Initialize(
const std::vector<size_t>& compaction_enabled_cf_indices,
const std::vector<ColumnFamilyHandle*>& handles);
Expand Down
4 changes: 2 additions & 2 deletions utilities/transactions/transaction_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ class TransactionBaseImpl : public Transaction {
return snapshot_ ? snapshot_.get() : nullptr;
}

void SetSnapshot() override;
virtual void SetSnapshot() override;
void SetSnapshotOnNextOperation(
std::shared_ptr<TransactionNotifier> notifier = nullptr) override;

Expand Down Expand Up @@ -303,6 +303,7 @@ class TransactionBaseImpl : public Transaction {
WriteBatchWithIndex write_batch_;

private:
friend class WritePreparedTxn;
// Extra data to be persisted with the commit. Note this is only used when
// prepare phase is not skipped.
WriteBatch commit_time_batch_;
Expand Down Expand Up @@ -335,7 +336,6 @@ class TransactionBaseImpl : public Transaction {
bool read_only, bool exclusive, bool skip_validate = false);

WriteBatchBase* GetBatchForWrite();

void SetSnapshotInternal(const Snapshot* snapshot);
};

Expand Down
63 changes: 46 additions & 17 deletions utilities/transactions/write_prepared_txn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "rocksdb/db.h"
#include "rocksdb/status.h"
#include "rocksdb/utilities/transaction_db.h"
#include "util/cast_util.h"
#include "utilities/transactions/pessimistic_transaction.h"
#include "utilities/transactions/write_prepared_txn_db.h"

Expand All @@ -39,8 +40,14 @@ Status WritePreparedTxn::Get(const ReadOptions& read_options,
auto snapshot = read_options.snapshot;
auto snap_seq =
snapshot != nullptr ? snapshot->GetSequenceNumber() : kMaxSequenceNumber;
SequenceNumber min_uncommitted = 0; // by default disable the optimization
if (snapshot != nullptr) {
min_uncommitted =
static_cast_with_check<const SnapshotImpl, const Snapshot>(snapshot)
->min_uncommitted_;
}

WritePreparedTxnReadCallback callback(wpt_db_, snap_seq);
WritePreparedTxnReadCallback callback(wpt_db_, snap_seq, min_uncommitted);
return write_batch_.GetFromBatchAndDB(db_, read_options, column_family, key,
pinnable_val, &callback);
}
Expand Down Expand Up @@ -68,25 +75,26 @@ Status WritePreparedTxn::PrepareInternal() {
const bool WRITE_AFTER_COMMIT = true;
WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(), name_,
!WRITE_AFTER_COMMIT);
const bool DISABLE_MEMTABLE = true;
uint64_t seq_used = kMaxSequenceNumber;
// For each duplicate key we account for a new sub-batch
prepare_batch_cnt_ = GetWriteBatch()->SubBatchCnt();
Status s =
db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(),
/*callback*/ nullptr, &log_number_, /*log ref*/ 0,
!DISABLE_MEMTABLE, &seq_used, prepare_batch_cnt_);
// AddPrepared better to be called in the pre-release callback otherwise there
// is a non-zero chance of max advancing prepare_seq and readers assume the
// data as committed.
// Also having it in the PreReleaseCallback allows in-order addition of
// prepared entries to PrepareHeap and hence enables an optimization. Refer to
// SmallestUnCommittedSeq for more details.
AddPreparedCallback add_prepared_callback(
wpt_db_, prepare_batch_cnt_,
db_impl_->immutable_db_options().two_write_queues);
const bool DISABLE_MEMTABLE = true;
uint64_t seq_used = kMaxSequenceNumber;
Status s = db_impl_->WriteImpl(
write_options, GetWriteBatch()->GetWriteBatch(),
/*callback*/ nullptr, &log_number_, /*log ref*/ 0, !DISABLE_MEMTABLE,
&seq_used, prepare_batch_cnt_, &add_prepared_callback);
assert(!s.ok() || seq_used != kMaxSequenceNumber);
auto prepare_seq = seq_used;
SetId(prepare_seq);
// TODO(myabandeh): AddPrepared better to be called in the pre-release
// callback otherwise there is a non-zero chance of max dvancing prepare_seq
// and readers assume the data as committed.
if (s.ok()) {
for (size_t i = 0; i < prepare_batch_cnt_; i++) {
wpt_db_->AddPrepared(prepare_seq + i);
}
}
return s;
}

Expand Down Expand Up @@ -135,6 +143,10 @@ Status WritePreparedTxn::CommitInternal() {
const bool do_one_write =
!db_impl_->immutable_db_options().two_write_queues || disable_memtable;
const bool publish_seq = do_one_write;
// Note: CommitTimeWriteBatch does not need AddPrepared since it is written to
// DB in one shot. min_uncommitted still works since it requires capturing
// data that is written to DB but not yet committed, while
// CommitTimeWriteBatch commits with PreReleaseCallback.
WritePreparedCommitEntryPreReleaseCallback update_commit_map(
wpt_db_, db_impl_, prepare_seq, prepare_batch_cnt_, commit_batch_cnt,
!PREP_HEAP_SKIPPED, publish_seq);
Expand Down Expand Up @@ -204,7 +216,8 @@ Status WritePreparedTxn::RollbackInternal() {
WriteBatch* dst_batch,
std::map<uint32_t, const Comparator*>& comparators)
: db_(db),
callback(wpt_db, snap_seq),
callback(wpt_db, snap_seq,
0), // 0 disables min_uncommitted optimization
rollback_batch_(dst_batch),
comparators_(comparators) {}

Expand Down Expand Up @@ -285,6 +298,10 @@ Status WritePreparedTxn::RollbackInternal() {
const size_t ONE_BATCH = 1;
WritePreparedCommitEntryPreReleaseCallback update_commit_map(
wpt_db_, db_impl_, kMaxSequenceNumber, ZERO_PREPARES, ONE_BATCH);
// Note: the rollback batch does not need AddPrepared since it is written to
// DB in one shot. min_uncommitted still works since it requires capturing
// data that is written to DB but not yet committed, while
// the roolback batch commits with PreReleaseCallback.
s = db_impl_->WriteImpl(write_options_, &rollback_batch, nullptr, nullptr,
NO_REF_LOG, !DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
do_one_write ? &update_commit_map : nullptr);
Expand Down Expand Up @@ -335,6 +352,10 @@ Status WritePreparedTxn::ValidateSnapshot(ColumnFamilyHandle* column_family,
SequenceNumber* tracked_at_seq) {
assert(snapshot_);

SequenceNumber min_uncommitted =
static_cast_with_check<const SnapshotImpl, const Snapshot>(
snapshot_.get())
->min_uncommitted_;
SequenceNumber snap_seq = snapshot_->GetSequenceNumber();
// tracked_at_seq is either max or the last snapshot with which this key was
// trackeed so there is no need to apply the IsInSnapshot to this comparison
Expand All @@ -351,12 +372,20 @@ Status WritePreparedTxn::ValidateSnapshot(ColumnFamilyHandle* column_family,
ColumnFamilyHandle* cfh =
column_family ? column_family : db_impl_->DefaultColumnFamily();

WritePreparedTxnReadCallback snap_checker(wpt_db_, snap_seq);
WritePreparedTxnReadCallback snap_checker(wpt_db_, snap_seq, min_uncommitted);
return TransactionUtil::CheckKeyForConflicts(db_impl_, cfh, key.ToString(),
snap_seq, false /* cache_only */,
&snap_checker);
}

void WritePreparedTxn::SetSnapshot() {
const bool FOR_WW_CONFLICT_CHECK = true;
SnapshotImpl* snapshot = dbimpl_->GetSnapshotImpl(FOR_WW_CONFLICT_CHECK);
assert(snapshot);
wpt_db_->EnhanceSnapshot(snapshot);
SetSnapshotInternal(snapshot);
}

Status WritePreparedTxn::RebuildFromWriteBatch(WriteBatch* src_batch) {
auto ret = PessimisticTransaction::RebuildFromWriteBatch(src_batch);
prepare_batch_cnt_ = GetWriteBatch()->SubBatchCnt();
Expand Down
2 changes: 2 additions & 0 deletions utilities/transactions/write_prepared_txn.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ class WritePreparedTxn : public PessimisticTransaction {
virtual Iterator* GetIterator(const ReadOptions& options,
ColumnFamilyHandle* column_family) override;

virtual void SetSnapshot() override;

protected:
// Override the protected SetId to make it visible to the friend class
// WritePreparedTxnDB
Expand Down
Loading

0 comments on commit b225de7

Please sign in to comment.