diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexBuildCompletionListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexBuildCompletionListener.java new file mode 100644 index 00000000000..e2c98a3df54 --- /dev/null +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexBuildCompletionListener.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.distributed.index; + +/** Index build completion listener, , will be called when a distributed build of an index for a specific partition completes. */ +@FunctionalInterface +public interface IndexBuildCompletionListener { + /** Handles the index build completion event. */ + void onBuildCompletion(int indexId, int tableId, int partitionId); +} diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexBuildTask.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexBuildTask.java index a8a1c8fb8cb..a19932813fd 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexBuildTask.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexBuildTask.java @@ -63,6 +63,8 @@ class IndexBuildTask { private final ClusterNode node; + private final List listeners; + private final IgniteSpinBusyLock taskBusyLock = new IgniteSpinBusyLock(); private final AtomicBoolean taskStopGuard = new AtomicBoolean(); @@ -77,7 +79,8 @@ class IndexBuildTask { ExecutorService executor, IgniteSpinBusyLock busyLock, int batchSize, - ClusterNode node + ClusterNode node, + List listeners ) { this.taskId = taskId; this.indexStorage = indexStorage; @@ -87,6 +90,8 @@ class IndexBuildTask { this.busyLock = busyLock; this.batchSize = batchSize; this.node = node; + // We do not intentionally make a copy of the list, we want to see changes in the passed list. + this.listeners = listeners; } /** Starts building the index. */ @@ -97,9 +102,7 @@ void start() { return; } - if (LOG.isInfoEnabled()) { - LOG.info("Start building the index: [{}]", createCommonIndexInfo()); - } + LOG.info("Start building the index: [{}]", createCommonIndexInfo()); try { supplyAsync(this::handleNextBatch, executor) @@ -148,6 +151,12 @@ private CompletableFuture handleNextBatch() { .thenComposeAsync(unused -> { if (indexStorage.getNextRowIdToBuild() == null) { // Index has been built. + LOG.info("Index build completed: [{}]", createCommonIndexInfo()); + + for (IndexBuildCompletionListener listener : listeners) { + listener.onBuildCompletion(taskId.getIndexId(), taskId.getTableId(), taskId.getPartitionId()); + } + return completedFuture(null); } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexBuilder.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexBuilder.java index b50f51e9911..1c502530d74 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexBuilder.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexBuilder.java @@ -20,9 +20,11 @@ import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockSafe; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -34,6 +36,7 @@ import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.replicator.ReplicaService; import org.apache.ignite.internal.storage.MvPartitionStorage; +import org.apache.ignite.internal.storage.RowId; import org.apache.ignite.internal.storage.index.IndexStorage; import org.apache.ignite.internal.table.distributed.command.BuildIndexCommand; import org.apache.ignite.internal.table.distributed.replication.request.BuildIndexReplicaRequest; @@ -43,12 +46,26 @@ import org.apache.ignite.network.ClusterNode; /** - * Class for managing the building of table indexes. + * Component that is responsible for building an index for a specific partition. + * + *

Approximate index building algorithm:

+ *
    + *
  • If the index has not yet been built ({@link IndexStorage#getNextRowIdToBuild()} {@code != null}) or is not in the process of + * being built, then an asynchronous task is added to build it.
  • + *
  • Index building task generates batches of {@link RowId} (by using {@link IndexStorage#getNextRowIdToBuild()}) and sends these + * batch to the primary replica (only the primary replica is expected to start building the index) so that the corresponding replication + * group builds indexes for the transferred batch.
  • + *
  • Subsequent batches will be sent only after the current batch has been processed and until + * {@link IndexStorage#getNextRowIdToBuild()} {@code != null}.
  • + *
+ * + *

Notes: It is expected that only the primary replica will run tasks to build the index, and if the replica loses primacy, it will stop + * the task to build the index, and this will be done by an external component.

*/ public class IndexBuilder implements ManuallyCloseable { private static final IgniteLogger LOG = Loggers.forClass(IndexBuilder.class); - private static final int BATCH_SIZE = 100; + static final int BATCH_SIZE = 100; private final ExecutorService executor; @@ -60,6 +77,8 @@ public class IndexBuilder implements ManuallyCloseable { private final AtomicBoolean closeGuard = new AtomicBoolean(); + private final List listeners = new CopyOnWriteArrayList<>(); + /** * Constructor. * @@ -119,7 +138,8 @@ public void scheduleBuildIndex( executor, busyLock, BATCH_SIZE, - node + node, + listeners ); IndexBuildTask previousTask = indexBuildTaskById.putIfAbsent(taskId, newTask); @@ -195,4 +215,14 @@ public void close() { IgniteUtils.shutdownAndAwaitTermination(executor, 10, TimeUnit.SECONDS); } + + /** Adds a listener. */ + public void listen(IndexBuildCompletionListener listener) { + listeners.add(listener); + } + + /** Removes a listener. */ + public void stopListen(IndexBuildCompletionListener listener) { + listeners.remove(listener); + } } diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/index/IndexBuilderTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/index/IndexBuilderTest.java new file mode 100644 index 00000000000..0eda973d4a8 --- /dev/null +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/index/IndexBuilderTest.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.distributed.index; + +import static java.util.concurrent.CompletableFuture.completedFuture; +import static java.util.stream.Collectors.toList; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutFast; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.stream.IntStream; +import org.apache.ignite.internal.replicator.ReplicaService; +import org.apache.ignite.internal.replicator.message.ReplicaRequest; +import org.apache.ignite.internal.storage.MvPartitionStorage; +import org.apache.ignite.internal.storage.RowId; +import org.apache.ignite.internal.storage.index.IndexStorage; +import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; +import org.apache.ignite.network.ClusterNode; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +/** For {@link IndexBuilder} testing. */ +public class IndexBuilderTest extends BaseIgniteAbstractTest { + private static final int TABLE_ID = 1; + + private static final int INDEX_ID = 2; + + private static final int PARTITION_ID = 3; + + private final ReplicaService replicaService = mock(ReplicaService.class, invocation -> completedFuture(null)); + + private final IndexBuilder indexBuilder = new IndexBuilder("test", 1, replicaService); + + @AfterEach + void tearDown() { + indexBuilder.close(); + } + + @Test + void testIndexBuildCompletionListener() { + CompletableFuture listenCompletionIndexBuildingFuture = listenCompletionIndexBuilding(INDEX_ID, TABLE_ID, PARTITION_ID); + + scheduleBuildIndex(INDEX_ID, TABLE_ID, PARTITION_ID, List.of(rowId(PARTITION_ID))); + + assertThat(listenCompletionIndexBuildingFuture, willCompleteSuccessfully()); + } + + @Test + void testStopListenIndexBuildCompletion() { + CompletableFuture invokeListenerFuture = new CompletableFuture<>(); + + IndexBuildCompletionListener listener = (indexId, tableId, partitionId) -> invokeListenerFuture.complete(null); + + indexBuilder.listen(listener); + indexBuilder.stopListen(listener); + + scheduleBuildIndex(INDEX_ID, TABLE_ID, PARTITION_ID, List.of(rowId(PARTITION_ID))); + + assertThat(invokeListenerFuture, willTimeoutFast()); + } + + @Test + void testIndexBuildCompletionListenerTwoBatches() { + CompletableFuture listenCompletionIndexBuildingFuture = listenCompletionIndexBuilding(INDEX_ID, TABLE_ID, PARTITION_ID); + + List nextRowIdsToBuild = IntStream.range(0, 2 * IndexBuilder.BATCH_SIZE) + .mapToObj(i -> rowId(PARTITION_ID)) + .collect(toList()); + + CompletableFuture secondInvokeReplicaServiceFuture = new CompletableFuture<>(); + + CompletableFuture awaitSecondInvokeForReplicaService = awaitSecondInvokeForReplicaService(secondInvokeReplicaServiceFuture); + + scheduleBuildIndex(INDEX_ID, TABLE_ID, PARTITION_ID, nextRowIdsToBuild); + + assertThat(awaitSecondInvokeForReplicaService, willCompleteSuccessfully()); + + assertFalse(listenCompletionIndexBuildingFuture.isDone()); + + secondInvokeReplicaServiceFuture.complete(null); + + assertThat(listenCompletionIndexBuildingFuture, willCompleteSuccessfully()); + } + + @Test + void testIndexBuildCompletionListenerForAlreadyBuiltIndex() { + CompletableFuture listenCompletionIndexBuildingFuture = listenCompletionIndexBuilding(INDEX_ID, TABLE_ID, PARTITION_ID); + + scheduleBuildIndex(INDEX_ID, TABLE_ID, PARTITION_ID, List.of()); + + assertThat(listenCompletionIndexBuildingFuture, willTimeoutFast()); + } + + private void scheduleBuildIndex(int indexId, int tableId, int partitionId, Collection nextRowIdsToBuild) { + indexBuilder.scheduleBuildIndex( + tableId, + partitionId, + indexId, + indexStorage(nextRowIdsToBuild), + mock(MvPartitionStorage.class), + mock(ClusterNode.class) + ); + } + + private CompletableFuture listenCompletionIndexBuilding(int indexId, int tableId, int partitionId) { + CompletableFuture future = new CompletableFuture<>(); + + indexBuilder.listen((indexId1, tableId1, partitionId1) -> { + if (indexId1 == indexId && tableId1 == tableId && partitionId1 == partitionId) { + future.complete(null); + } + }); + + return future; + } + + private CompletableFuture awaitSecondInvokeForReplicaService(CompletableFuture secondInvokeFuture) { + CompletableFuture future = new CompletableFuture<>(); + + when(replicaService.invoke(any(ClusterNode.class), any(ReplicaRequest.class))) + .thenReturn(completedFuture(null)) + .thenAnswer(invocation -> { + future.complete(null); + + return secondInvokeFuture; + }); + + return future; + } + + private static IndexStorage indexStorage(Collection nextRowIdsToBuild) { + Iterator it = nextRowIdsToBuild.iterator(); + + IndexStorage indexStorage = mock(IndexStorage.class); + + when(indexStorage.getNextRowIdToBuild()).then(invocation -> it.hasNext() ? it.next() : null); + + return indexStorage; + } + + private static RowId rowId(int partitionId) { + return new RowId(partitionId); + } +}