diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java index b7bfe8fb861..23d413eab70 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java @@ -27,6 +27,7 @@ import com.github.benmanes.caffeine.cache.Caffeine; import it.unimi.dsi.fastutil.ints.Int2ObjectMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectMap; import it.unimi.dsi.fastutil.longs.Long2ObjectMaps; import java.lang.management.ManagementFactory; import java.lang.management.ThreadInfo; @@ -49,7 +50,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -114,15 +114,10 @@ import org.apache.ignite.internal.sql.engine.prepare.ExplainPlan; import org.apache.ignite.internal.sql.engine.prepare.ExplainablePlan; import org.apache.ignite.internal.sql.engine.prepare.Fragment; -import org.apache.ignite.internal.sql.engine.prepare.IgniteRelShuttle; import org.apache.ignite.internal.sql.engine.prepare.KillPlan; import org.apache.ignite.internal.sql.engine.prepare.MultiStepPlan; import org.apache.ignite.internal.sql.engine.prepare.QueryPlan; -import org.apache.ignite.internal.sql.engine.rel.IgniteIndexScan; import org.apache.ignite.internal.sql.engine.rel.IgniteRel; -import org.apache.ignite.internal.sql.engine.rel.IgniteTableModify; -import org.apache.ignite.internal.sql.engine.rel.IgniteTableScan; -import org.apache.ignite.internal.sql.engine.rel.SourceAwareIgniteRel; import org.apache.ignite.internal.sql.engine.schema.IgniteSchemas; import org.apache.ignite.internal.sql.engine.schema.IgniteTable; import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager; @@ -1292,65 +1287,39 @@ private CompletableFuture> sendFragments( } private void enlistPartitions(MappedFragment mappedFragment, InternalTransaction tx) { - // no need to traverse the tree if fragment has no tables - if (mappedFragment.fragment().tables().isEmpty()) { - return; - } - - new IgniteRelShuttle() { - @Override - public IgniteRel visit(IgniteIndexScan rel) { - enlist(rel); - - return super.visit(rel); - } - - @Override - public IgniteRel visit(IgniteTableScan rel) { - enlist(rel); - - return super.visit(rel); - } + boolean shouldAssignCommitPartition = tx.commitPartition() == null; + for (Long2ObjectMap.Entry entry : mappedFragment.fragment().tables().long2ObjectEntrySet()) { + long sourceId = entry.getLongKey(); + IgniteTable table = entry.getValue(); - @Override - public IgniteRel visit(IgniteTableModify rel) { - enlist(rel); + ColocationGroup colocationGroup = mappedFragment.groupsBySourceId().get(sourceId); + Int2ObjectMap assignments = colocationGroup.assignments(); - return super.visit(rel); + if (assignments.isEmpty()) { + continue; } - private void enlist(int tableId, int zoneId, Int2ObjectMap assignments) { - if (assignments.isEmpty()) { - return; - } - - int partsCnt = assignments.size(); - - tx.assignCommitPartition(new ZonePartitionId(zoneId, ThreadLocalRandom.current().nextInt(partsCnt))); - - for (Int2ObjectMap.Entry partWithToken : assignments.int2ObjectEntrySet()) { - ZonePartitionId replicationGroupId = new ZonePartitionId(zoneId, partWithToken.getIntKey()); + int tableId = table.id(); + int zoneId = table.zoneId(); - NodeWithConsistencyToken assignment = partWithToken.getValue(); - - tx.enlist( - replicationGroupId, - tableId, - assignment.name(), - assignment.enlistmentConsistencyToken() - ); - } + if (shouldAssignCommitPartition) { + tx.assignCommitPartition(new ZonePartitionId(zoneId, assignments.keySet().iterator().nextInt())); + shouldAssignCommitPartition = false; } - private void enlist(SourceAwareIgniteRel rel) { - IgniteTable igniteTable = rel.getTable().unwrap(IgniteTable.class); + for (Int2ObjectMap.Entry partWithToken : assignments.int2ObjectEntrySet()) { + ZonePartitionId partitionId = new ZonePartitionId(zoneId, partWithToken.getIntKey()); - ColocationGroup colocationGroup = mappedFragment.groupsBySourceId().get(rel.sourceId()); - Int2ObjectMap assignments = colocationGroup.assignments(); + NodeWithConsistencyToken assignment = partWithToken.getValue(); - enlist(igniteTable.id(), igniteTable.zoneId(), assignments); + tx.enlist( + partitionId, + tableId, + assignment.name(), + assignment.enlistmentConsistencyToken() + ); } - }.visit(mappedFragment.fragment().root()); + } } private CompletableFuture close(CancellationReason reason) { diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/TransactionEnlistTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/TransactionEnlistTest.java index ba8861c9366..88a4e5848ea 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/TransactionEnlistTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/TransactionEnlistTest.java @@ -21,6 +21,8 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import java.util.List; @@ -29,10 +31,12 @@ import java.util.stream.IntStream; import org.apache.ignite.internal.hlc.HybridTimestampTracker; import org.apache.ignite.internal.manager.ComponentContext; +import org.apache.ignite.internal.replicator.ZonePartitionId; import org.apache.ignite.internal.sql.engine.AsyncSqlCursor; import org.apache.ignite.internal.sql.engine.InternalSqlRow; import org.apache.ignite.internal.sql.engine.QueryProcessor; import org.apache.ignite.internal.sql.engine.SqlProperties; +import org.apache.ignite.internal.sql.engine.exec.mapping.ColocationGroup; import org.apache.ignite.internal.sql.engine.framework.DataProvider; import org.apache.ignite.internal.sql.engine.framework.NoOpTransaction; import org.apache.ignite.internal.sql.engine.framework.TestBuilders; @@ -40,23 +44,31 @@ import org.apache.ignite.internal.sql.engine.framework.TestNode; import org.apache.ignite.internal.sql.engine.prepare.QueryMetadata; import org.apache.ignite.internal.sql.engine.prepare.QueryPlan; +import org.apache.ignite.internal.sql.engine.schema.PartitionCalculator; +import org.apache.ignite.internal.sql.engine.schema.TableDescriptor; import org.apache.ignite.internal.sql.engine.util.InjectQueryCheckerFactory; import org.apache.ignite.internal.sql.engine.util.QueryChecker; import org.apache.ignite.internal.sql.engine.util.QueryCheckerExtension; import org.apache.ignite.internal.sql.engine.util.QueryCheckerFactory; import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; import org.apache.ignite.internal.tx.InternalTransaction; +import org.apache.ignite.internal.type.NativeType; +import org.apache.ignite.internal.type.NativeTypes; import org.apache.ignite.lang.CancellationToken; import org.jetbrains.annotations.Nullable; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.ArgumentMatcher; import org.mockito.Mockito; /** Transactions enlist count test. */ @ExtendWith(QueryCheckerExtension.class) public class TransactionEnlistTest extends BaseIgniteAbstractTest { + private static final int PARTITIONS_COUNT = 3; private static final String NODE_NAME1 = "N1"; @InjectQueryCheckerFactory @@ -72,13 +84,45 @@ static void startCluster() { //noinspection ConcatenationWithEmptyString CLUSTER.node("N1").initSchema("" - + "CREATE ZONE test_zone (partitions 3) storage profiles ['Default'];" - + "CREATE TABLE t1 (id INT PRIMARY KEY, val INT) ZONE test_zone"); + + "CREATE ZONE test_zone (partitions " + PARTITIONS_COUNT + ") storage profiles ['Default'];" + + "CREATE TABLE t1 (id INT PRIMARY KEY, val INT) ZONE test_zone;"); CLUSTER.setAssignmentsProvider("T1", (partitionCount, b) -> IntStream.range(0, partitionCount) .mapToObj(i -> List.of("N1")) .collect(Collectors.toList())); CLUSTER.setDataProvider("T1", TestBuilders.tableScan(DataProvider.fromCollection(List.of()))); + CLUSTER.setUpdatableTable("T1", new UpdatableTable() { + @Override + public TableDescriptor descriptor() { + return null; + } + + @Override + public CompletableFuture insertAll(ExecutionContext ectx, List rows, ColocationGroup colocationGroup) { + return nullCompletedFuture(); + } + + @Override + public CompletableFuture insert(@Nullable InternalTransaction explicitTx, ExecutionContext ectx, RowT row) { + return nullCompletedFuture(); + } + + @Override + public CompletableFuture upsertAll(ExecutionContext ectx, List rows, ColocationGroup colocationGroup) { + return nullCompletedFuture(); + } + + @Override + public CompletableFuture delete(@Nullable InternalTransaction explicitTx, ExecutionContext ectx, + RowT key) { + return nullCompletedFuture(); + } + + @Override + public CompletableFuture deleteAll(ExecutionContext ectx, List rows, ColocationGroup colocationGroup) { + return nullCompletedFuture(); + } + }); } @AfterAll @@ -104,6 +148,55 @@ void testEnlistCall() { Mockito.verify(spiedTx, times(2)).enlist(any(), anyInt(), any(), anyLong()); } + @ParameterizedTest + @ValueSource(ints = {1, 2, 3}) + void testCommitPartitionChoice(int id) { + NoOpTransaction tx = NoOpTransaction.readWrite("t1", false); + + NoOpTransaction spiedTx = Mockito.spy(tx); + + assertQuery("UPDATE t1 /*+ no_index */ SET val = 42 WHERE id = ?", spiedTx) + .withParam(id) + .check(); + + int expectedPartition = expectedPartition(id); + { + ArgumentMatcher partitionIdMatch = zonePartitionId -> zonePartitionId.partitionId() == expectedPartition; + // We expect commit partitions to be assigned once for given transaction. + Mockito.verify(spiedTx, times(1)) + .assignCommitPartition(argThat(partitionIdMatch)); + // Individual partition on the other hand will be enlisted for every source. + // In this particular case -- first time for scan and second for Modify node. + Mockito.verify(spiedTx, times(2)) + .enlist(argThat(partitionIdMatch), anyInt(), any(), anyLong()); + } + + { + // Due to partition pruning we don't expect any more enlistment. + // We should not try to assign other partition as commit partition as well. + ArgumentMatcher partitionIdMismatch = zonePartitionId -> zonePartitionId.partitionId() != expectedPartition; + Mockito.verify(spiedTx, never()) + .assignCommitPartition(argThat(partitionIdMismatch)); + Mockito.verify(spiedTx, never()) + .enlist(argThat(partitionIdMismatch), anyInt(), any(), anyLong()); + } + } + + @Test + void testNoCommitPartitionAssignment() { + NoOpTransaction tx = NoOpTransaction.readWrite("t1", false); + tx.assignCommitPartition(new ZonePartitionId(1, 1)); + + NoOpTransaction spiedTx = Mockito.spy(tx); + + assertQuery("UPDATE t1 /*+ no_index */ SET val = 42 WHERE id = ?", spiedTx) + .withParam(1) + .check(); + + // Transaction already has a commit partition, so no assignment is expected during query processing. + Mockito.verify(spiedTx, never()).assignCommitPartition(any()); + } + private static QueryChecker assertQuery(String qry, InternalTransaction tx) { TestNode testNode = CLUSTER.node(NODE_NAME1); @@ -149,10 +242,9 @@ public CompletableFuture> queryAsync( String qry, Object... params ) { - assert params == null || params.length == 0 : "params are not supported"; assert !prepareOnly : "Expected that the query will only be prepared, but not executed"; - AsyncSqlCursor sqlCursor = node.executeQuery(transaction, qry); + AsyncSqlCursor sqlCursor = node.executeQuery(transaction, qry, params); return CompletableFuture.completedFuture(sqlCursor); } @@ -169,4 +261,10 @@ public CompletableFuture stopAsync(ComponentContext componentContext) { return nullCompletedFuture(); } } + + private static int expectedPartition(int key) { + var calculator = new PartitionCalculator(PARTITIONS_COUNT, new NativeType[] {NativeTypes.INT32}); + calculator.append(key); + return calculator.partition(); + } } diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ScannableTableSelfTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ScannableTableSelfTest.java index 420d3aad04e..33815f21301 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ScannableTableSelfTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ScannableTableSelfTest.java @@ -59,6 +59,7 @@ import org.apache.ignite.internal.binarytuple.BinaryTuple; import org.apache.ignite.internal.binarytuple.BinaryTuplePrefix; import org.apache.ignite.internal.network.InternalClusterNode; +import org.apache.ignite.internal.replicator.ZonePartitionId; import org.apache.ignite.internal.schema.BinaryRow; import org.apache.ignite.internal.schema.BinaryRowEx; import org.apache.ignite.internal.sql.engine.api.expressions.RowFactory; @@ -80,6 +81,7 @@ import org.apache.ignite.internal.table.TxContext; import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; import org.apache.ignite.internal.type.StructNativeType; +import org.apache.ignite.tx.Transaction; import org.hamcrest.Matchers; import org.jetbrains.annotations.Nullable; import org.junit.jupiter.api.Named; @@ -220,7 +222,7 @@ private static Stream indexScanParameters() { for (Bound leftBound : Bound.values()) { for (Bound rightBound : Bound.values()) { params.add(Arguments.of(NoOpTransaction.readOnly("RO", false), leftBound, rightBound)); - params.add(Arguments.of(NoOpTransaction.readWrite("RW", false), leftBound, rightBound)); + params.add(Arguments.of(rwWithCommitPartition(), leftBound, rightBound)); } } @@ -483,10 +485,16 @@ public void testIndexLookupError(NoOpTransaction tx) { private static Stream transactions() { return Stream.of( Arguments.of(Named.of("Read-only transaction", NoOpTransaction.readOnly("RO", false))), - Arguments.of(Named.of("Read-write transaction", NoOpTransaction.readWrite("RW", false))) + Arguments.of(Named.of("Read-write transaction", rwWithCommitPartition())) ); } + private static Transaction rwWithCommitPartition() { + NoOpTransaction tx = NoOpTransaction.readWrite("RW", false); + tx.assignCommitPartition(new ZonePartitionId(1, 1)); + return tx; + } + private class Tester { final ScannableTable scannableTable; diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java index 22af25100ab..aa7c27e4d97 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java @@ -26,6 +26,7 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.network.ClusterNodeImpl; import org.apache.ignite.internal.network.InternalClusterNode; @@ -42,12 +43,8 @@ * Dummy transaction that should be used as mock transaction for execution tests. */ public final class NoOpTransaction implements InternalTransaction { - private static final int ZONE_ID = 1; - private static final int TABLE_ID = 2; - private static final int PARTITION_ID = 2; - private final UUID id; private final HybridTimestamp hybridTimestamp = new HybridTimestamp(1, 1) @@ -57,7 +54,7 @@ public final class NoOpTransaction implements InternalTransaction { private final PendingTxPartitionEnlistment enlistment; - private final ZonePartitionId groupId = new ZonePartitionId(ZONE_ID, PARTITION_ID); + private final AtomicReference commitPartition = new AtomicReference<>(); private final boolean implicit; @@ -178,12 +175,12 @@ public TxState state() { @Override public boolean assignCommitPartition(ZonePartitionId replicationGroupId) { - return true; + return commitPartition.compareAndSet(null, replicationGroupId); } @Override public ZonePartitionId commitPartition() { - return groupId; + return commitPartition.get(); } @Override