Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import com.github.benmanes.caffeine.cache.Caffeine;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectMaps;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
Expand All @@ -49,7 +50,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -114,15 +114,10 @@
import org.apache.ignite.internal.sql.engine.prepare.ExplainPlan;
import org.apache.ignite.internal.sql.engine.prepare.ExplainablePlan;
import org.apache.ignite.internal.sql.engine.prepare.Fragment;
import org.apache.ignite.internal.sql.engine.prepare.IgniteRelShuttle;
import org.apache.ignite.internal.sql.engine.prepare.KillPlan;
import org.apache.ignite.internal.sql.engine.prepare.MultiStepPlan;
import org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
import org.apache.ignite.internal.sql.engine.rel.IgniteIndexScan;
import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
import org.apache.ignite.internal.sql.engine.rel.IgniteTableModify;
import org.apache.ignite.internal.sql.engine.rel.IgniteTableScan;
import org.apache.ignite.internal.sql.engine.rel.SourceAwareIgniteRel;
import org.apache.ignite.internal.sql.engine.schema.IgniteSchemas;
import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
Expand Down Expand Up @@ -1292,65 +1287,39 @@ private CompletableFuture<AsyncCursor<InternalSqlRow>> sendFragments(
}

private void enlistPartitions(MappedFragment mappedFragment, InternalTransaction tx) {
// no need to traverse the tree if fragment has no tables
if (mappedFragment.fragment().tables().isEmpty()) {
return;
}

new IgniteRelShuttle() {
@Override
public IgniteRel visit(IgniteIndexScan rel) {
enlist(rel);

return super.visit(rel);
}

@Override
public IgniteRel visit(IgniteTableScan rel) {
enlist(rel);

return super.visit(rel);
}
boolean shouldAssignCommitPartition = tx.commitPartition() == null;
for (Long2ObjectMap.Entry<IgniteTable> entry : mappedFragment.fragment().tables().long2ObjectEntrySet()) {
long sourceId = entry.getLongKey();
IgniteTable table = entry.getValue();

@Override
public IgniteRel visit(IgniteTableModify rel) {
enlist(rel);
ColocationGroup colocationGroup = mappedFragment.groupsBySourceId().get(sourceId);
Int2ObjectMap<NodeWithConsistencyToken> assignments = colocationGroup.assignments();

return super.visit(rel);
if (assignments.isEmpty()) {
continue;
}

private void enlist(int tableId, int zoneId, Int2ObjectMap<NodeWithConsistencyToken> assignments) {
if (assignments.isEmpty()) {
return;
}

int partsCnt = assignments.size();

tx.assignCommitPartition(new ZonePartitionId(zoneId, ThreadLocalRandom.current().nextInt(partsCnt)));

for (Int2ObjectMap.Entry<NodeWithConsistencyToken> partWithToken : assignments.int2ObjectEntrySet()) {
ZonePartitionId replicationGroupId = new ZonePartitionId(zoneId, partWithToken.getIntKey());
int tableId = table.id();
int zoneId = table.zoneId();

NodeWithConsistencyToken assignment = partWithToken.getValue();

tx.enlist(
replicationGroupId,
tableId,
assignment.name(),
assignment.enlistmentConsistencyToken()
);
}
if (shouldAssignCommitPartition) {
tx.assignCommitPartition(new ZonePartitionId(zoneId, assignments.keySet().iterator().nextInt()));
shouldAssignCommitPartition = false;
}

private void enlist(SourceAwareIgniteRel rel) {
IgniteTable igniteTable = rel.getTable().unwrap(IgniteTable.class);
for (Int2ObjectMap.Entry<NodeWithConsistencyToken> partWithToken : assignments.int2ObjectEntrySet()) {
ZonePartitionId partitionId = new ZonePartitionId(zoneId, partWithToken.getIntKey());

ColocationGroup colocationGroup = mappedFragment.groupsBySourceId().get(rel.sourceId());
Int2ObjectMap<NodeWithConsistencyToken> assignments = colocationGroup.assignments();
NodeWithConsistencyToken assignment = partWithToken.getValue();

enlist(igniteTable.id(), igniteTable.zoneId(), assignments);
tx.enlist(
partitionId,
tableId,
assignment.name(),
assignment.enlistmentConsistencyToken()
);
}
}.visit(mappedFragment.fragment().root());
}
}

private CompletableFuture<Void> close(CancellationReason reason) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;

import java.util.List;
Expand All @@ -29,34 +31,44 @@
import java.util.stream.IntStream;
import org.apache.ignite.internal.hlc.HybridTimestampTracker;
import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.replicator.ZonePartitionId;
import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
import org.apache.ignite.internal.sql.engine.InternalSqlRow;
import org.apache.ignite.internal.sql.engine.QueryProcessor;
import org.apache.ignite.internal.sql.engine.SqlProperties;
import org.apache.ignite.internal.sql.engine.exec.mapping.ColocationGroup;
import org.apache.ignite.internal.sql.engine.framework.DataProvider;
import org.apache.ignite.internal.sql.engine.framework.NoOpTransaction;
import org.apache.ignite.internal.sql.engine.framework.TestBuilders;
import org.apache.ignite.internal.sql.engine.framework.TestCluster;
import org.apache.ignite.internal.sql.engine.framework.TestNode;
import org.apache.ignite.internal.sql.engine.prepare.QueryMetadata;
import org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
import org.apache.ignite.internal.sql.engine.schema.PartitionCalculator;
import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
import org.apache.ignite.internal.sql.engine.util.InjectQueryCheckerFactory;
import org.apache.ignite.internal.sql.engine.util.QueryChecker;
import org.apache.ignite.internal.sql.engine.util.QueryCheckerExtension;
import org.apache.ignite.internal.sql.engine.util.QueryCheckerFactory;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.type.NativeType;
import org.apache.ignite.internal.type.NativeTypes;
import org.apache.ignite.lang.CancellationToken;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentMatcher;
import org.mockito.Mockito;

/** Transactions enlist count test. */
@ExtendWith(QueryCheckerExtension.class)
public class TransactionEnlistTest extends BaseIgniteAbstractTest {
private static final int PARTITIONS_COUNT = 3;
private static final String NODE_NAME1 = "N1";

@InjectQueryCheckerFactory
Expand All @@ -72,13 +84,45 @@ static void startCluster() {

//noinspection ConcatenationWithEmptyString
CLUSTER.node("N1").initSchema(""
+ "CREATE ZONE test_zone (partitions 3) storage profiles ['Default'];"
+ "CREATE TABLE t1 (id INT PRIMARY KEY, val INT) ZONE test_zone");
+ "CREATE ZONE test_zone (partitions " + PARTITIONS_COUNT + ") storage profiles ['Default'];"
+ "CREATE TABLE t1 (id INT PRIMARY KEY, val INT) ZONE test_zone;");

CLUSTER.setAssignmentsProvider("T1", (partitionCount, b) -> IntStream.range(0, partitionCount)
.mapToObj(i -> List.of("N1"))
.collect(Collectors.toList()));
CLUSTER.setDataProvider("T1", TestBuilders.tableScan(DataProvider.fromCollection(List.of())));
CLUSTER.setUpdatableTable("T1", new UpdatableTable() {
@Override
public TableDescriptor descriptor() {
return null;
}

@Override
public <RowT> CompletableFuture<?> insertAll(ExecutionContext<RowT> ectx, List<RowT> rows, ColocationGroup colocationGroup) {
return nullCompletedFuture();
}

@Override
public <RowT> CompletableFuture<Void> insert(@Nullable InternalTransaction explicitTx, ExecutionContext<RowT> ectx, RowT row) {
return nullCompletedFuture();
}

@Override
public <RowT> CompletableFuture<?> upsertAll(ExecutionContext<RowT> ectx, List<RowT> rows, ColocationGroup colocationGroup) {
return nullCompletedFuture();
}

@Override
public <RowT> CompletableFuture<Boolean> delete(@Nullable InternalTransaction explicitTx, ExecutionContext<RowT> ectx,
RowT key) {
return nullCompletedFuture();
}

@Override
public <RowT> CompletableFuture<?> deleteAll(ExecutionContext<RowT> ectx, List<RowT> rows, ColocationGroup colocationGroup) {
return nullCompletedFuture();
}
});
}

@AfterAll
Expand All @@ -104,6 +148,55 @@ void testEnlistCall() {
Mockito.verify(spiedTx, times(2)).enlist(any(), anyInt(), any(), anyLong());
}

@ParameterizedTest
@ValueSource(ints = {1, 2, 3})
void testCommitPartitionChoice(int id) {
NoOpTransaction tx = NoOpTransaction.readWrite("t1", false);

NoOpTransaction spiedTx = Mockito.spy(tx);

assertQuery("UPDATE t1 /*+ no_index */ SET val = 42 WHERE id = ?", spiedTx)
.withParam(id)
.check();

int expectedPartition = expectedPartition(id);
{
ArgumentMatcher<ZonePartitionId> partitionIdMatch = zonePartitionId -> zonePartitionId.partitionId() == expectedPartition;
// We expect commit partitions to be assigned once for given transaction.
Mockito.verify(spiedTx, times(1))
.assignCommitPartition(argThat(partitionIdMatch));
// Individual partition on the other hand will be enlisted for every source.
// In this particular case -- first time for scan and second for Modify node.
Mockito.verify(spiedTx, times(2))
.enlist(argThat(partitionIdMatch), anyInt(), any(), anyLong());
}

{
// Due to partition pruning we don't expect any more enlistment.
// We should not try to assign other partition as commit partition as well.
ArgumentMatcher<ZonePartitionId> partitionIdMismatch = zonePartitionId -> zonePartitionId.partitionId() != expectedPartition;
Mockito.verify(spiedTx, never())
.assignCommitPartition(argThat(partitionIdMismatch));
Mockito.verify(spiedTx, never())
.enlist(argThat(partitionIdMismatch), anyInt(), any(), anyLong());
}
}

@Test
void testNoCommitPartitionAssignment() {
NoOpTransaction tx = NoOpTransaction.readWrite("t1", false);
tx.assignCommitPartition(new ZonePartitionId(1, 1));

NoOpTransaction spiedTx = Mockito.spy(tx);

assertQuery("UPDATE t1 /*+ no_index */ SET val = 42 WHERE id = ?", spiedTx)
.withParam(1)
.check();

// Transaction already has a commit partition, so no assignment is expected during query processing.
Mockito.verify(spiedTx, never()).assignCommitPartition(any());
}

private static QueryChecker assertQuery(String qry, InternalTransaction tx) {
TestNode testNode = CLUSTER.node(NODE_NAME1);

Expand Down Expand Up @@ -149,10 +242,9 @@ public CompletableFuture<AsyncSqlCursor<InternalSqlRow>> queryAsync(
String qry,
Object... params
) {
assert params == null || params.length == 0 : "params are not supported";
assert !prepareOnly : "Expected that the query will only be prepared, but not executed";

AsyncSqlCursor<InternalSqlRow> sqlCursor = node.executeQuery(transaction, qry);
AsyncSqlCursor<InternalSqlRow> sqlCursor = node.executeQuery(transaction, qry, params);

return CompletableFuture.completedFuture(sqlCursor);
}
Expand All @@ -169,4 +261,10 @@ public CompletableFuture<Void> stopAsync(ComponentContext componentContext) {
return nullCompletedFuture();
}
}

private static int expectedPartition(int key) {
var calculator = new PartitionCalculator(PARTITIONS_COUNT, new NativeType[] {NativeTypes.INT32});
calculator.append(key);
return calculator.partition();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.ignite.internal.binarytuple.BinaryTuple;
import org.apache.ignite.internal.binarytuple.BinaryTuplePrefix;
import org.apache.ignite.internal.network.InternalClusterNode;
import org.apache.ignite.internal.replicator.ZonePartitionId;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryRowEx;
import org.apache.ignite.internal.sql.engine.api.expressions.RowFactory;
Expand All @@ -80,6 +81,7 @@
import org.apache.ignite.internal.table.TxContext;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.type.StructNativeType;
import org.apache.ignite.tx.Transaction;
import org.hamcrest.Matchers;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.Named;
Expand Down Expand Up @@ -220,7 +222,7 @@ private static Stream<Arguments> indexScanParameters() {
for (Bound leftBound : Bound.values()) {
for (Bound rightBound : Bound.values()) {
params.add(Arguments.of(NoOpTransaction.readOnly("RO", false), leftBound, rightBound));
params.add(Arguments.of(NoOpTransaction.readWrite("RW", false), leftBound, rightBound));
params.add(Arguments.of(rwWithCommitPartition(), leftBound, rightBound));
}
}

Expand Down Expand Up @@ -483,10 +485,16 @@ public void testIndexLookupError(NoOpTransaction tx) {
private static Stream<Arguments> transactions() {
return Stream.of(
Arguments.of(Named.of("Read-only transaction", NoOpTransaction.readOnly("RO", false))),
Arguments.of(Named.of("Read-write transaction", NoOpTransaction.readWrite("RW", false)))
Arguments.of(Named.of("Read-write transaction", rwWithCommitPartition()))
);
}

private static Transaction rwWithCommitPartition() {
NoOpTransaction tx = NoOpTransaction.readWrite("RW", false);
tx.assignCommitPartition(new ZonePartitionId(1, 1));
return tx;
}

private class Tester {

final ScannableTable scannableTable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.network.ClusterNodeImpl;
import org.apache.ignite.internal.network.InternalClusterNode;
Expand All @@ -42,12 +43,8 @@
* Dummy transaction that should be used as mock transaction for execution tests.
*/
public final class NoOpTransaction implements InternalTransaction {
private static final int ZONE_ID = 1;

private static final int TABLE_ID = 2;

private static final int PARTITION_ID = 2;

private final UUID id;

private final HybridTimestamp hybridTimestamp = new HybridTimestamp(1, 1)
Expand All @@ -57,7 +54,7 @@ public final class NoOpTransaction implements InternalTransaction {

private final PendingTxPartitionEnlistment enlistment;

private final ZonePartitionId groupId = new ZonePartitionId(ZONE_ID, PARTITION_ID);
private final AtomicReference<ZonePartitionId> commitPartition = new AtomicReference<>();

private final boolean implicit;

Expand Down Expand Up @@ -178,12 +175,12 @@ public TxState state() {

@Override
public boolean assignCommitPartition(ZonePartitionId replicationGroupId) {
return true;
return commitPartition.compareAndSet(null, replicationGroupId);
}

@Override
public ZonePartitionId commitPartition() {
return groupId;
return commitPartition.get();
}

@Override
Expand Down