Skip to content

Commit

Permalink
Support MultiGetEntity in optimistic and WriteCommitted pessimistic t…
Browse files Browse the repository at this point in the history
…ransactions (#12634)

Summary:
Pull Request resolved: #12634

The patch implements support for the `MultiGetEntity` API in optimistic transactions and pessimistic transactions with the WriteCommitted policy. Similarly to the other wide-column transaction APIs, the implementation leverages the `WriteBatchWithIndex` layer.

Reviewed By: jaykorean

Differential Revision: D57177638

fbshipit-source-id: 2d9f9f287fc97e7c126830b48d21457c7c35db3f
  • Loading branch information
ltamasi authored and facebook-github-bot committed May 9, 2024
1 parent 1f2715d commit b92d874
Show file tree
Hide file tree
Showing 6 changed files with 325 additions and 0 deletions.
6 changes: 6 additions & 0 deletions include/rocksdb/utilities/transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,12 @@ class Transaction {
}
}

virtual void MultiGetEntity(const ReadOptions& options,
ColumnFamilyHandle* column_family,
size_t num_keys, const Slice* keys,
PinnableWideColumns* results, Status* statuses,
bool sorted_input = false) = 0;

// Read this key and ensure that this transaction will only
// be able to be committed if this key is not written outside this
// transaction after it has first been read (or after the snapshot if a
Expand Down
1 change: 1 addition & 0 deletions unreleased_history/new_features/multi_get_entity_txn.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Optimistic transactions and WriteCommitted pessimistic transactions now support the `MultiGetEntity` API.
164 changes: 164 additions & 0 deletions utilities/transactions/optimistic_transaction_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1783,6 +1783,23 @@ TEST_P(OptimisticTransactionTest, PutEntityWriteConflict) {
ASSERT_EQ(columns.columns(), baz_columns);
}

{
constexpr size_t num_keys = 2;

std::array<Slice, num_keys> keys{{foo, baz}};
std::array<PinnableWideColumns, num_keys> results;
std::array<Status, num_keys> statuses;

txn->MultiGetEntity(ReadOptions(), txn_db->DefaultColumnFamily(), num_keys,
keys.data(), results.data(), statuses.data());

ASSERT_OK(statuses[0]);
ASSERT_OK(statuses[1]);

ASSERT_EQ(results[0].columns(), foo_columns);
ASSERT_EQ(results[1].columns(), baz_columns);
}

const WideColumns foo_new_columns{{kDefaultWideColumnName, "FOO"},
{"hello", "world"}};
const WideColumns baz_new_columns{{kDefaultWideColumnName, "BAZ"},
Expand All @@ -1807,6 +1824,23 @@ TEST_P(OptimisticTransactionTest, PutEntityWriteConflict) {
ASSERT_EQ(columns.columns(), baz_new_columns);
}

{
constexpr size_t num_keys = 2;

std::array<Slice, num_keys> keys{{foo, baz}};
std::array<PinnableWideColumns, num_keys> results;
std::array<Status, num_keys> statuses;

txn->MultiGetEntity(ReadOptions(), txn_db->DefaultColumnFamily(), num_keys,
keys.data(), results.data(), statuses.data());

ASSERT_OK(statuses[0]);
ASSERT_OK(statuses[1]);

ASSERT_EQ(results[0].columns(), foo_new_columns);
ASSERT_EQ(results[1].columns(), baz_new_columns);
}

// This PutEntity outside of a transaction will conflict with the previous
// write
const WideColumns foo_conflict_columns{{kDefaultWideColumnName, "X"},
Expand Down Expand Up @@ -1837,6 +1871,25 @@ TEST_P(OptimisticTransactionTest, PutEntityWriteConflict) {
baz, &columns));
ASSERT_EQ(columns.columns(), baz_columns);
}

{
constexpr size_t num_keys = 2;

std::array<Slice, num_keys> keys{{foo, baz}};
std::array<PinnableWideColumns, num_keys> results;
std::array<Status, num_keys> statuses;
constexpr bool sorted_input = false;

txn_db->MultiGetEntity(ReadOptions(), txn_db->DefaultColumnFamily(),
num_keys, keys.data(), results.data(),
statuses.data(), sorted_input);

ASSERT_OK(statuses[0]);
ASSERT_OK(statuses[1]);

ASSERT_EQ(results[0].columns(), foo_conflict_columns);
ASSERT_EQ(results[1].columns(), baz_columns);
}
}

TEST_P(OptimisticTransactionTest, PutEntityWriteConflictTxnTxn) {
Expand Down Expand Up @@ -1869,6 +1922,23 @@ TEST_P(OptimisticTransactionTest, PutEntityWriteConflictTxnTxn) {
ASSERT_EQ(columns.columns(), baz_columns);
}

{
constexpr size_t num_keys = 2;

std::array<Slice, num_keys> keys{{foo, baz}};
std::array<PinnableWideColumns, num_keys> results;
std::array<Status, num_keys> statuses;

txn->MultiGetEntity(ReadOptions(), txn_db->DefaultColumnFamily(), num_keys,
keys.data(), results.data(), statuses.data());

ASSERT_OK(statuses[0]);
ASSERT_OK(statuses[1]);

ASSERT_EQ(results[0].columns(), foo_columns);
ASSERT_EQ(results[1].columns(), baz_columns);
}

const WideColumns foo_new_columns{{kDefaultWideColumnName, "FOO"},
{"hello", "world"}};
const WideColumns baz_new_columns{{kDefaultWideColumnName, "BAZ"},
Expand All @@ -1893,6 +1963,23 @@ TEST_P(OptimisticTransactionTest, PutEntityWriteConflictTxnTxn) {
ASSERT_EQ(columns.columns(), baz_new_columns);
}

{
constexpr size_t num_keys = 2;

std::array<Slice, num_keys> keys{{foo, baz}};
std::array<PinnableWideColumns, num_keys> results;
std::array<Status, num_keys> statuses;

txn->MultiGetEntity(ReadOptions(), txn_db->DefaultColumnFamily(), num_keys,
keys.data(), results.data(), statuses.data());

ASSERT_OK(statuses[0]);
ASSERT_OK(statuses[1]);

ASSERT_EQ(results[0].columns(), foo_new_columns);
ASSERT_EQ(results[1].columns(), baz_new_columns);
}

std::unique_ptr<Transaction> conflicting_txn(
txn_db->BeginTransaction(WriteOptions()));
ASSERT_NE(conflicting_txn, nullptr);
Expand Down Expand Up @@ -1926,10 +2013,31 @@ TEST_P(OptimisticTransactionTest, PutEntityWriteConflictTxnTxn) {
baz, &columns));
ASSERT_EQ(columns.columns(), baz_columns);
}

{
constexpr size_t num_keys = 2;

std::array<Slice, num_keys> keys{{foo, baz}};
std::array<PinnableWideColumns, num_keys> results;
std::array<Status, num_keys> statuses;
constexpr bool sorted_input = false;

txn_db->MultiGetEntity(ReadOptions(), txn_db->DefaultColumnFamily(),
num_keys, keys.data(), results.data(),
statuses.data(), sorted_input);

ASSERT_OK(statuses[0]);
ASSERT_OK(statuses[1]);

ASSERT_EQ(results[0].columns(), foo_conflict_columns);
ASSERT_EQ(results[1].columns(), baz_columns);
}
}

TEST_P(OptimisticTransactionTest, EntityReadSanityChecks) {
constexpr char foo[] = "foo";
constexpr char bar[] = "bar";
constexpr size_t num_keys = 2;

std::unique_ptr<Transaction> txn(txn_db->BeginTransaction(WriteOptions()));
ASSERT_NE(txn, nullptr);
Expand Down Expand Up @@ -1957,6 +2065,62 @@ TEST_P(OptimisticTransactionTest, EntityReadSanityChecks) {
&columns)
.IsInvalidArgument());
}

{
constexpr ColumnFamilyHandle* column_family = nullptr;
std::array<Slice, num_keys> keys{{foo, bar}};
std::array<PinnableWideColumns, num_keys> results;
std::array<Status, num_keys> statuses;
constexpr bool sorted_input = false;

txn->MultiGetEntity(ReadOptions(), column_family, num_keys, keys.data(),
results.data(), statuses.data(), sorted_input);

ASSERT_TRUE(statuses[0].IsInvalidArgument());
ASSERT_TRUE(statuses[1].IsInvalidArgument());
}

{
constexpr Slice* keys = nullptr;
std::array<PinnableWideColumns, num_keys> results;
std::array<Status, num_keys> statuses;
constexpr bool sorted_input = false;

txn->MultiGetEntity(ReadOptions(), txn_db->DefaultColumnFamily(), num_keys,
keys, results.data(), statuses.data(), sorted_input);

ASSERT_TRUE(statuses[0].IsInvalidArgument());
ASSERT_TRUE(statuses[1].IsInvalidArgument());
}

{
std::array<Slice, num_keys> keys{{foo, bar}};
constexpr PinnableWideColumns* results = nullptr;
std::array<Status, num_keys> statuses;
constexpr bool sorted_input = false;

txn->MultiGetEntity(ReadOptions(), txn_db->DefaultColumnFamily(), num_keys,
keys.data(), results, statuses.data(), sorted_input);

ASSERT_TRUE(statuses[0].IsInvalidArgument());
ASSERT_TRUE(statuses[1].IsInvalidArgument());
}

{
ReadOptions read_options;
read_options.io_activity = Env::IOActivity::kMultiGet;

std::array<Slice, num_keys> keys{{foo, bar}};
std::array<PinnableWideColumns, num_keys> results;
std::array<Status, num_keys> statuses;
constexpr bool sorted_input = false;

txn->MultiGetEntity(read_options, txn_db->DefaultColumnFamily(), num_keys,
keys.data(), results.data(), statuses.data(),
sorted_input);
ASSERT_TRUE(statuses[0].IsInvalidArgument());
ASSERT_TRUE(statuses[1].IsInvalidArgument());
}
}

INSTANTIATE_TEST_CASE_P(
Expand Down
9 changes: 9 additions & 0 deletions utilities/transactions/transaction_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,15 @@ void TransactionBaseImpl::MultiGet(const ReadOptions& _read_options,
sorted_input);
}

void TransactionBaseImpl::MultiGetEntity(const ReadOptions& read_options,
ColumnFamilyHandle* column_family,
size_t num_keys, const Slice* keys,
PinnableWideColumns* results,
Status* statuses, bool sorted_input) {
MultiGetEntityImpl(read_options, column_family, num_keys, keys, results,
statuses, sorted_input);
}

std::vector<Status> TransactionBaseImpl::MultiGetForUpdate(
const ReadOptions& read_options,
const std::vector<ColumnFamilyHandle*>& column_family,
Expand Down
14 changes: 14 additions & 0 deletions utilities/transactions/transaction_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ class TransactionBaseImpl : public Transaction {
const Slice* keys, PinnableSlice* values, Status* statuses,
const bool sorted_input = false) override;

void MultiGetEntity(const ReadOptions& options,
ColumnFamilyHandle* column_family, size_t num_keys,
const Slice* keys, PinnableWideColumns* results,
Status* statuses, bool sorted_input = false) override;

using Transaction::MultiGetForUpdate;
std::vector<Status> MultiGetForUpdate(
const ReadOptions& options,
Expand Down Expand Up @@ -307,6 +312,15 @@ class TransactionBaseImpl : public Transaction {
key, columns);
}

void MultiGetEntityImpl(const ReadOptions& options,
ColumnFamilyHandle* column_family, size_t num_keys,
const Slice* keys, PinnableWideColumns* results,
Status* statuses, bool sorted_input) {
write_batch_.MultiGetEntityFromBatchAndDB(db_, options, column_family,
num_keys, keys, results, statuses,
sorted_input);
}

Status PutEntityImpl(ColumnFamilyHandle* column_family, const Slice& key,
const WideColumns& columns, bool do_validate,
bool assume_tracked);
Expand Down
Loading

0 comments on commit b92d874

Please sign in to comment.