Skip to content

Commit

Permalink
test(fix): testing the serializability behaviour without relying on t…
Browse files Browse the repository at this point in the history
…he underlying concurrency (#1079)

fix #497 

Since we are testing the behaviour _(i.e. if two transactions conflict with each other, the database guarantees that only one can commit successfully at a time)_ now, both of the tests are now agnostic of underlying concurrency mechanism. This makes sure that both of the tests continue to pass in any gcloud project .
  • Loading branch information
jainsahab committed May 19, 2023
1 parent 2fca5c8 commit 6f03fab
Show file tree
Hide file tree
Showing 2 changed files with 155 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import static org.junit.Assert.fail;

import com.google.cloud.Timestamp;
import com.google.cloud.Tuple;
import com.google.cloud.datastore.AggregationQuery;
import com.google.cloud.datastore.Batch;
import com.google.cloud.datastore.BooleanValue;
Expand Down Expand Up @@ -335,57 +336,82 @@ public void testNewTransactionCommit() {
}

@Test
public void testTransactionWithRead() {
Transaction transaction = DATASTORE.newTransaction();
assertNull(transaction.get(KEY3));
transaction.add(ENTITY3);
transaction.commit();
public void testTransactionWithRead() throws Exception {
StatementExecutor statementExecutor = new StatementExecutor();
Transaction baseTransaction = DATASTORE.newTransaction();
assertNull(baseTransaction.get(KEY3));
baseTransaction.add(ENTITY3);
baseTransaction.commit();
assertEquals(ENTITY3, DATASTORE.get(KEY3));

transaction = DATASTORE.newTransaction();
assertEquals(ENTITY3, transaction.get(KEY3));
// update entity3 during the transaction
DATASTORE.put(Entity.newBuilder(ENTITY2).clear().set("from", "datastore").build());
transaction.update(Entity.newBuilder(ENTITY2).clear().set("from", "transaction").build());
try {
transaction.commit();
fail("Expecting a failure");
} catch (DatastoreException expected) {
assertEquals("ABORTED", expected.getReason());
}
Transaction transaction = DATASTORE.newTransaction();
statementExecutor.execute(
Tuple.of("T1", () -> assertEquals(ENTITY3, transaction.get(KEY3))),
// update entity3 during the transaction, will be blocked in case of pessimistic concurrency
Tuple.of(
"T2",
() ->
DATASTORE.put(Entity.newBuilder(ENTITY3).clear().set("from", "datastore").build())),
Tuple.of(
"T1",
() ->
transaction.update(
Entity.newBuilder(ENTITY3).clear().set("from", "transaction").build())),
Tuple.of("T1", transaction::commit) // T1 will throw error in case of optimistic concurrency
);

boolean t1AllPassed = statementExecutor.didAllPass("T1");
boolean t2AllPassed = statementExecutor.didAllPass("T2");
// If two transactions conflict with each other, the database guarantees that only
// one can commit successfully at a time. Please refer to StatementExecutor class for more info.
// Using XOR to ensure that only one of transaction group is successful,
boolean onlyOneTransactionIsSuccessful = t1AllPassed ^ t2AllPassed;

assertThat(onlyOneTransactionIsSuccessful).isTrue();
}

@Test
public void testTransactionWithQuery() {
public void testTransactionWithQuery() throws Exception {
StatementExecutor statementExecutor = new StatementExecutor();
Query<Entity> query =
Query.newEntityQueryBuilder()
.setKind(KIND2)
.setFilter(PropertyFilter.hasAncestor(KEY2))
.setNamespace(NAMESPACE)
.build();
Transaction transaction = DATASTORE.newTransaction();
QueryResults<Entity> results = transaction.run(query);
assertTrue(results.hasNext());
assertEquals(ENTITY2, results.next());
assertFalse(results.hasNext());
transaction.add(ENTITY3);
transaction.commit();
Transaction baseTransaction = DATASTORE.newTransaction();
QueryResults<Entity> baseResults = baseTransaction.run(query);
assertTrue(baseResults.hasNext());
assertEquals(ENTITY2, baseResults.next());
assertFalse(baseResults.hasNext());
baseTransaction.add(ENTITY3);
baseTransaction.commit();
assertEquals(ENTITY3, DATASTORE.get(KEY3));

transaction = DATASTORE.newTransaction();
results = transaction.run(query);
assertTrue(results.hasNext());
assertEquals(ENTITY2, results.next());
assertFalse(results.hasNext());
transaction.delete(ENTITY3.getKey());
// update entity2 during the transaction
DATASTORE.put(Entity.newBuilder(ENTITY2).clear().build());
try {
transaction.commit();
fail("Expecting a failure");
} catch (DatastoreException expected) {
assertEquals("ABORTED", expected.getReason());
}
Transaction transaction = DATASTORE.newTransaction();
statementExecutor.execute(
Tuple.of(
"T1",
() -> {
QueryResults<Entity> results = transaction.run(query);
assertTrue(results.hasNext());
assertEquals(ENTITY2, results.next());
assertFalse(results.hasNext());
}),
Tuple.of("T1", () -> transaction.delete(ENTITY3.getKey())),
// update entity2 during the transaction, will be blocked in case of pessimistic concurrency
Tuple.of("T2", () -> DATASTORE.put(Entity.newBuilder(ENTITY2).clear().build())),
Tuple.of("T1", transaction::commit) // T1 will throw error in case of optimistic concurrency
);

boolean t1AllPassed = statementExecutor.didAllPass("T1");
boolean t2AllPassed = statementExecutor.didAllPass("T2");
// If two transactions conflict with each other, the database guarantees that only
// one can commit successfully at a time. Please refer to StatementExecutor class for more info.
// Using XOR to ensure that only one of transaction group is successful,
boolean onlyOneTransactionIsSuccessful = t1AllPassed ^ t2AllPassed;

assertThat(onlyOneTransactionIsSuccessful).isTrue();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.datastore.it;

import static java.util.concurrent.TimeUnit.SECONDS;

import com.google.cloud.Tuple;
import com.google.cloud.datastore.DatastoreException;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;

/**
* An executor class to handle interleaved transactions.
*
* <p>It executes statements (under multiple transactions) and record their failures under a groupId
* provided by users.
*/
class StatementExecutor {

private final Multimap<String, Exception> failures = ArrayListMultimap.create();

/**
* Executes a list of {@link Statement} one by one and record their failures under the groupId. In
* case of pessimistic concurrency, a statement will be blocked and cause delay until another
* transaction which was started earlier is committed. In case of optimistic concurrency, both
* transaction can perform their operation simultaneously, but the one which commits first will be
* a winner and other one will get an error on commit operation indicating a need for retry.
*
* @param tuples A {@link Statement(String, String) Tuple(&lt;String, Statement&gt;)} has a
* groupId of {@link String} type and a {@link Statement} to execute.
*/
@SafeVarargs
final void execute(Tuple<String, Statement>... tuples) throws Exception {
ExecutorService executorService = Executors.newSingleThreadExecutor();
for (Tuple<String, Statement> tuple : tuples) {
String groupId = tuple.x();
Statement statement = tuple.y();
Future<?> future = executorService.submit(statement::execute);
try {
// waiting for statement to execute
future.get(10, SECONDS);
} catch (Exception exception) {
future.cancel(true);
if (transactionConflict(exception)) {
failures.put(groupId, exception);
} else {
throw exception;
}
}
}
executorService.shutdown();
}

boolean didAllPass(String groupId) {
return failures.get(groupId).isEmpty();
}

private boolean transactionConflict(Exception exception) {
if (exception instanceof TimeoutException) { // timed out coz of pessimistic concurrency delay
return true;
}
return exception instanceof ExecutionException
&& exception.getCause().getClass() == DatastoreException.class
&& exception
.getMessage()
.contains("contention"); // exception raise coz of optimistic concurrency
}

interface Statement {
void execute();
}
}

0 comments on commit 6f03fab

Please sign in to comment.