Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
ENG-4079: #673: Add read intents for serializable isolation during write
Summary:
While running serializable transaction we could have read-modify-write operations.
Besides adding write intents, those operations should also add read intents to detect conflict between transactions correctly.

This revision implements adding these read intents and also contains the following fixes/updates required by serializable isolation:
1) Avoid sending read time in a transaction with serializable isolation. Serializable transactions effectively perform all reads at commit time, because read intents prevent values that a serializable transaction has read from changing until the transaction commits.
2) Acquire read+write locks while adding read+write intents in a transaction with serializable isolation.
?) Transaction with serializable isolation always conflict with committed transaction. [@sergei -- this needs to be removed, right?]
3) Resolve transaction conflicts before performing read operations for a read-modify-write operation, not the other way around. This ensures that no conflicting transactions whose provisional records were already present when our transaction started (or were added before our transaction acquired all its locks) can commit asynchronously and invalidate the results of our transaction's read operations.

Test Plan: ybd --cxx-test serializable-txn-test --gtest_filter SerializableTxnTest.Increment

Reviewers: timur, robert, mikhail

Reviewed By: mikhail

Subscribers: ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D5930
  • Loading branch information
spolitov committed Jan 14, 2019
1 parent c4b06f0 commit 90bc062
Show file tree
Hide file tree
Showing 25 changed files with 561 additions and 179 deletions.
2 changes: 1 addition & 1 deletion java/yb-cql/src/test/java/org/yb/cql/TestIndex.java
Expand Up @@ -652,7 +652,7 @@ public void testRestarts() throws Exception {
int currentRestarts = getRestartsCount("test_restart");
int currentRetries = getRetriesCount();
LOG.info("Current restarts = {}, retries = {}", currentRestarts, currentRetries);
if (currentRestarts > initialRestarts && currentRetries > initialRetries)
if (currentRetries > initialRetries)
break;
}

Expand Down
17 changes: 10 additions & 7 deletions src/yb/client/async_rpc.cc
Expand Up @@ -471,8 +471,8 @@ void WriteRpc::SwapRequestsAndResponses(bool skip_responses = false) {
switch (yb_op->type()) {
case YBOperation::Type::REDIS_WRITE: {
if (redis_idx >= resp_.redis_response_batch().size()) {
batcher_->AddOpCountMismatchError();
return;
++redis_idx;
continue;
}
// Restore Redis write request PB and extract response.
auto* redis_op = down_cast<YBRedisWriteOp*>(yb_op);
Expand All @@ -482,8 +482,8 @@ void WriteRpc::SwapRequestsAndResponses(bool skip_responses = false) {
}
case YBOperation::Type::QL_WRITE: {
if (ql_idx >= resp_.ql_response_batch().size()) {
batcher_->AddOpCountMismatchError();
return;
++ql_idx;
continue;
}
// Restore QL write request PB and extract response.
auto* ql_op = down_cast<YBqlWriteOp*>(yb_op);
Expand All @@ -500,8 +500,8 @@ void WriteRpc::SwapRequestsAndResponses(bool skip_responses = false) {
}
case YBOperation::Type::PGSQL_WRITE: {
if (pgsql_idx >= resp_.pgsql_response_batch().size()) {
batcher_->AddOpCountMismatchError();
return;
++pgsql_idx;
continue;
}
// Restore QL write request PB and extract response.
auto* pgsql_op = down_cast<YBPgsqlWriteOp*>(yb_op);
Expand Down Expand Up @@ -535,8 +535,11 @@ void WriteRpc::SwapRequestsAndResponses(bool skip_responses = false) {
redis_idx, resp_.redis_response_batch().size(),
ql_idx, resp_.ql_response_batch().size(),
pgsql_idx, resp_.pgsql_response_batch().size());
auto status = STATUS(IllegalState, "Write response count mismatch");
LOG(ERROR) << status << ", request: " << req_.ShortDebugString()
<< ", response: " << resp_.ShortDebugString();
batcher_->AddOpCountMismatchError();
Failed(STATUS(IllegalState, "Write response count mismatch"));
Failed(status);
}
}

Expand Down
15 changes: 9 additions & 6 deletions src/yb/client/ql-transaction-test.cc
Expand Up @@ -507,6 +507,7 @@ TEST_F(QLTransactionTest, ConflictResolution) {
LOG(INFO) << "Committed, successes: " << successes.load() << ", failures: " << failures.load();

ASSERT_GE(successes.load(std::memory_order_acquire), 1);
ASSERT_GE(failures.load(std::memory_order_acquire), 1);

auto session = CreateSession();
std::vector<int32_t> values;
Expand Down Expand Up @@ -557,7 +558,7 @@ void QLTransactionTest::TestWriteConflicts(bool do_restarts) {

int value = 0;
size_t tries = 0;
size_t written = 0;
size_t committed = 0;
size_t flushed = 0;
for (;;) {
auto expired = std::chrono::steady_clock::now() >= stop;
Expand Down Expand Up @@ -605,7 +606,7 @@ void QLTransactionTest::TestWriteConflicts(bool do_restarts) {
LOG(INFO) << "Commit failed: " << commit_status;
continue;
}
++written;
++committed;
continue;
}

Expand All @@ -623,10 +624,12 @@ void QLTransactionTest::TestWriteConflicts(bool do_restarts) {
restart_thread.join();
}

ASSERT_GE(written, kTotalKeys);
ASSERT_GE(flushed, written);
ASSERT_GE(flushed, kActiveTransactions);
ASSERT_GE(tries, flushed);
LOG(INFO) << "Committed: " << committed << ", flushed: " << flushed << ", tries: " << tries;

ASSERT_GE(committed, kTotalKeys);
ASSERT_GT(flushed, committed);
ASSERT_GT(flushed, kActiveTransactions);
ASSERT_GT(tries, flushed);
}

class WriteConflictsTest : public QLTransactionTest {
Expand Down
131 changes: 131 additions & 0 deletions src/yb/client/serializable-txn-test.cc
Expand Up @@ -23,6 +23,8 @@ class SerializableTxnTest : public TransactionTestBase {
IsolationLevel GetIsolationLevel() override {
return IsolationLevel::SERIALIZABLE_ISOLATION;
}

void TestIncrement(int key);
};

TEST_F(SerializableTxnTest, NonConflictingWrites) {
Expand Down Expand Up @@ -117,5 +119,134 @@ TEST_F(SerializableTxnTest, ReadWriteConflict) {
ASSERT_GE(writes_won, kKeys / 4);
}

// Execute UPDATE table SET value = value + 1 WHERE key = kKey in parallel, using
// serializable isolation.
// With retries the resulting value should be equal to number of increments.
void SerializableTxnTest::TestIncrement(int key) {
const auto kIncrements = RegularBuildVsSanitizers(100, 20);

{
auto session = CreateSession();
auto op = ASSERT_RESULT(WriteRow(session, key, 0));
ASSERT_EQ(op->response().status(), QLResponsePB::YQL_STATUS_OK);
}

struct Entry {
YBqlWriteOpPtr op;
YBTransactionPtr txn;
YBSessionPtr session;
std::shared_future<Status> write_future;
std::shared_future<Status> commit_future;
};

std::vector<Entry> entries;

auto value_column_id = table_.ColumnId(kValueColumn);
for (int i = 0; i != kIncrements; ++i) {
Entry entry;
entry.txn = CreateTransaction();
entry.session = CreateSession(entry.txn);
entries.push_back(entry);
}

// For each of entries we do the following:
// 1) Write increment operation.
// 2) Wait until write complete and commit transaction of this entry.
// 3) Wait until commit complete.
// When failure happens on any step - retry from step 1.
// Exit from loop when all entries successfully committed their transactions.
// We do all actions in busy loop to get most possible concurrency for operations.
for (;;) {
bool incomplete = false;
for (auto& entry : entries) {
bool entry_complete = false;
if (!entry.op) {
// Execute UPDATE table SET value = value + 1 WHERE key = kKey
entry.op = table_.NewWriteOp(QLWriteRequestPB::QL_STMT_UPDATE);
auto* const req = entry.op->mutable_request();
QLAddInt32HashValue(req, key);
req->mutable_column_refs()->add_ids(value_column_id);
auto* column_value = req->add_column_values();
column_value->set_column_id(value_column_id);
auto* bfcall = column_value->mutable_expr()->mutable_bfcall();
bfcall->set_opcode(to_underlying(bfql::BFOpcode::OPCODE_ConvertI64ToI32_18));
bfcall = bfcall->add_operands()->mutable_bfcall();

bfcall->set_opcode(to_underlying(bfql::BFOpcode::OPCODE_AddI64I64_80));
auto column_op = bfcall->add_operands()->mutable_bfcall();
column_op->set_opcode(to_underlying(bfql::BFOpcode::OPCODE_ConvertI32ToI64_13));
column_op->add_operands()->set_column_id(value_column_id);
bfcall->add_operands()->mutable_value()->set_int64_value(1);

entry.session->SetTransaction(entry.txn);
ASSERT_OK(entry.session->Apply(entry.op));
entry.write_future = entry.session->FlushFuture();
} else if (entry.write_future.valid()) {
if (entry.write_future.wait_for(0s) == std::future_status::ready) {
auto write_status = entry.write_future.get();
entry.write_future = std::shared_future<Status>();
if (!write_status.ok()) {
ASSERT_TRUE(write_status.IsIOError()) << write_status;
auto errors = entry.session->GetPendingErrors();
ASSERT_EQ(errors.size(), 1);
auto status = errors.front()->status();
ASSERT_TRUE(status.IsTryAgain() || status.IsTimedOut()) << status;
entry.txn = CreateTransaction();
entry.op = nullptr;
} else {
if (entry.op->response().status() == QLResponsePB::YQL_STATUS_RESTART_REQUIRED_ERROR) {
auto old_txn = entry.txn;
entry.txn = entry.txn->CreateRestartedTransaction();
entry.op = nullptr;
} else {
ASSERT_EQ(entry.op->response().status(), QLResponsePB::YQL_STATUS_OK);
entry.commit_future = entry.txn->CommitFuture();
}
}
}
} else if (entry.commit_future.valid()) {
if (entry.commit_future.wait_for(0s) == std::future_status::ready) {
auto status = entry.commit_future.get();
if (status.IsExpired() || status.IsTimedOut()) {
entry.txn = CreateTransaction();
entry.op = nullptr;
} else {
ASSERT_OK(status);
entry.commit_future = std::shared_future<Status>();
}
}
} else {
entry_complete = true;
}
incomplete = incomplete || !entry_complete;
}
if (!incomplete) {
break;
}
}

auto value = ASSERT_RESULT(SelectRow(CreateSession(), key));
ASSERT_EQ(value, kIncrements);
}

// Execute UPDATE table SET value = value + 1 WHERE key = kKey in parallel, using
// serializable isolation.
// With retries the resulting value should be equal to number of increments.
TEST_F(SerializableTxnTest, Increment) {
const auto kThreads = 3;

std::vector<std::thread> threads;
while (threads.size() != kThreads) {
int key = threads.size();
threads.emplace_back([this, key] {
TestIncrement(key);
});
}

for (auto& thread : threads) {
thread.join();
}
}

} // namespace client
} // namespace yb
13 changes: 11 additions & 2 deletions src/yb/client/transaction.cc
Expand Up @@ -88,7 +88,13 @@ class YBTransaction::Impl final {
transaction_(transaction),
read_point_(manager->clock()),
child_(Child::kTrue) {
read_point_.SetReadTime(std::move(data.read_time), std::move(data.local_limits));
// For serializable isolation we use read intents, so could always read most recent
// version of DB.
// Otherwise there is possible case when we miss value change that happened after transaction
// start.
if (data.metadata.isolation == IsolationLevel::SNAPSHOT_ISOLATION) {
read_point_.SetReadTime(std::move(data.read_time), std::move(data.local_limits));
}
metadata_ = std::move(data.metadata);
CompleteConstruction();
VLOG_WITH_PREFIX(2) << "Started child, metadata: " << metadata_;
Expand All @@ -112,11 +118,14 @@ class YBTransaction::Impl final {

if (read_time.read.is_valid()) {
read_point_.SetReadTime(read_time, ConsistentReadPoint::HybridTimeMap());
} else {
} else if (isolation == IsolationLevel::SNAPSHOT_ISOLATION) {
read_point_.SetCurrentReadTime();
}
metadata_.isolation = isolation;
metadata_.start_time = read_point_.GetReadTime().read;
if (!metadata_.start_time.is_valid()) {
metadata_.start_time = read_point_.Now();
}

return Status::OK();
}
Expand Down
6 changes: 6 additions & 0 deletions src/yb/common/transaction.cc
Expand Up @@ -42,6 +42,12 @@ Result<TransactionId> DoDecodeTransactionId(const Slice &slice, const bool check

} // namespace

TransactionStatusResult::TransactionStatusResult(TransactionStatus status_, HybridTime status_time_)
: status(status_), status_time(status_time_) {
DCHECK(status == TransactionStatus::ABORTED || status_time.is_valid())
<< "Status: " << status << ", status_time: " << status_time;
}

Result<TransactionId> FullyDecodeTransactionId(const Slice& slice) {
return DoDecodeTransactionId(slice, true);
}
Expand Down
6 changes: 6 additions & 0 deletions src/yb/common/transaction.h
Expand Up @@ -63,6 +63,12 @@ struct TransactionStatusResult {
// COMMITTED - status_time is a commit time.
// ABORTED - not used.
HybridTime status_time;

TransactionStatusResult(TransactionStatus status_, HybridTime status_time_);

static TransactionStatusResult Aborted() {
return TransactionStatusResult(TransactionStatus::ABORTED, HybridTime());
}
};

inline std::ostream& operator<<(std::ostream& out, const TransactionStatusResult& result) {
Expand Down
2 changes: 1 addition & 1 deletion src/yb/common/wire_protocol-test-util.h
Expand Up @@ -137,7 +137,7 @@ inline void AddKVToPB(int32_t key_val,

auto add_kv_pair =
[&](const SubDocKey &subdoc_key, const PrimitiveValue &primitive_value) {
KeyValuePairPB *const kv = write_batch->add_kv_pairs();
KeyValuePairPB *const kv = write_batch->add_write_pairs();
kv->set_key(subdoc_key.Encode().AsStringRef());
kv->set_value(primitive_value.ToValue());
};
Expand Down

0 comments on commit 90bc062

Please sign in to comment.