Skip to content

Commit

Permalink
IGNITE-20621 Add a replication group index build completion listener …
Browse files Browse the repository at this point in the history
…to IndexBuilder (#2682)
  • Loading branch information
tkalkirill committed Oct 12, 2023
1 parent 28dfaab commit 5ba1995
Show file tree
Hide file tree
Showing 4 changed files with 238 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.table.distributed.index;

/** Index build completion listener, , will be called when a distributed build of an index for a specific partition completes. */
@FunctionalInterface
public interface IndexBuildCompletionListener {
/** Handles the index build completion event. */
void onBuildCompletion(int indexId, int tableId, int partitionId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ class IndexBuildTask {

private final ClusterNode node;

private final List<IndexBuildCompletionListener> listeners;

private final IgniteSpinBusyLock taskBusyLock = new IgniteSpinBusyLock();

private final AtomicBoolean taskStopGuard = new AtomicBoolean();
Expand All @@ -77,7 +79,8 @@ class IndexBuildTask {
ExecutorService executor,
IgniteSpinBusyLock busyLock,
int batchSize,
ClusterNode node
ClusterNode node,
List<IndexBuildCompletionListener> listeners
) {
this.taskId = taskId;
this.indexStorage = indexStorage;
Expand All @@ -87,6 +90,8 @@ class IndexBuildTask {
this.busyLock = busyLock;
this.batchSize = batchSize;
this.node = node;
// We do not intentionally make a copy of the list, we want to see changes in the passed list.
this.listeners = listeners;
}

/** Starts building the index. */
Expand All @@ -97,9 +102,7 @@ void start() {
return;
}

if (LOG.isInfoEnabled()) {
LOG.info("Start building the index: [{}]", createCommonIndexInfo());
}
LOG.info("Start building the index: [{}]", createCommonIndexInfo());

try {
supplyAsync(this::handleNextBatch, executor)
Expand Down Expand Up @@ -148,6 +151,12 @@ private CompletableFuture<Void> handleNextBatch() {
.thenComposeAsync(unused -> {
if (indexStorage.getNextRowIdToBuild() == null) {
// Index has been built.
LOG.info("Index build completed: [{}]", createCommonIndexInfo());

for (IndexBuildCompletionListener listener : listeners) {
listener.onBuildCompletion(taskId.getIndexId(), taskId.getTableId(), taskId.getPartitionId());
}

return completedFuture(null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockSafe;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
Expand All @@ -34,6 +36,7 @@
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.index.IndexStorage;
import org.apache.ignite.internal.table.distributed.command.BuildIndexCommand;
import org.apache.ignite.internal.table.distributed.replication.request.BuildIndexReplicaRequest;
Expand All @@ -43,12 +46,26 @@
import org.apache.ignite.network.ClusterNode;

/**
* Class for managing the building of table indexes.
* Component that is responsible for building an index for a specific partition.
*
* <p>Approximate index building algorithm:</p>
* <ul>
* <li>If the index has not yet been built ({@link IndexStorage#getNextRowIdToBuild()} {@code != null}) or is not in the process of
* being built, then an asynchronous task is added to build it.</li>
* <li>Index building task generates batches of {@link RowId} (by using {@link IndexStorage#getNextRowIdToBuild()}) and sends these
* batch to the primary replica (only the primary replica is expected to start building the index) so that the corresponding replication
* group builds indexes for the transferred batch.</li>
* <li>Subsequent batches will be sent only after the current batch has been processed and until
* {@link IndexStorage#getNextRowIdToBuild()} {@code != null}.</li>
* </ul>
*
* <p>Notes: It is expected that only the primary replica will run tasks to build the index, and if the replica loses primacy, it will stop
* the task to build the index, and this will be done by an external component.</p>
*/
public class IndexBuilder implements ManuallyCloseable {
private static final IgniteLogger LOG = Loggers.forClass(IndexBuilder.class);

private static final int BATCH_SIZE = 100;
static final int BATCH_SIZE = 100;

private final ExecutorService executor;

Expand All @@ -60,6 +77,8 @@ public class IndexBuilder implements ManuallyCloseable {

private final AtomicBoolean closeGuard = new AtomicBoolean();

private final List<IndexBuildCompletionListener> listeners = new CopyOnWriteArrayList<>();

/**
* Constructor.
*
Expand Down Expand Up @@ -119,7 +138,8 @@ public void scheduleBuildIndex(
executor,
busyLock,
BATCH_SIZE,
node
node,
listeners
);

IndexBuildTask previousTask = indexBuildTaskById.putIfAbsent(taskId, newTask);
Expand Down Expand Up @@ -195,4 +215,14 @@ public void close() {

IgniteUtils.shutdownAndAwaitTermination(executor, 10, TimeUnit.SECONDS);
}

/** Adds a listener. */
public void listen(IndexBuildCompletionListener listener) {
listeners.add(listener);
}

/** Removes a listener. */
public void stopListen(IndexBuildCompletionListener listener) {
listeners.remove(listener);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.table.distributed.index;

import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.stream.Collectors.toList;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutFast;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.IntStream;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.replicator.message.ReplicaRequest;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.index.IndexStorage;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.network.ClusterNode;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;

/** For {@link IndexBuilder} testing. */
public class IndexBuilderTest extends BaseIgniteAbstractTest {
private static final int TABLE_ID = 1;

private static final int INDEX_ID = 2;

private static final int PARTITION_ID = 3;

private final ReplicaService replicaService = mock(ReplicaService.class, invocation -> completedFuture(null));

private final IndexBuilder indexBuilder = new IndexBuilder("test", 1, replicaService);

@AfterEach
void tearDown() {
indexBuilder.close();
}

@Test
void testIndexBuildCompletionListener() {
CompletableFuture<Void> listenCompletionIndexBuildingFuture = listenCompletionIndexBuilding(INDEX_ID, TABLE_ID, PARTITION_ID);

scheduleBuildIndex(INDEX_ID, TABLE_ID, PARTITION_ID, List.of(rowId(PARTITION_ID)));

assertThat(listenCompletionIndexBuildingFuture, willCompleteSuccessfully());
}

@Test
void testStopListenIndexBuildCompletion() {
CompletableFuture<Void> invokeListenerFuture = new CompletableFuture<>();

IndexBuildCompletionListener listener = (indexId, tableId, partitionId) -> invokeListenerFuture.complete(null);

indexBuilder.listen(listener);
indexBuilder.stopListen(listener);

scheduleBuildIndex(INDEX_ID, TABLE_ID, PARTITION_ID, List.of(rowId(PARTITION_ID)));

assertThat(invokeListenerFuture, willTimeoutFast());
}

@Test
void testIndexBuildCompletionListenerTwoBatches() {
CompletableFuture<Void> listenCompletionIndexBuildingFuture = listenCompletionIndexBuilding(INDEX_ID, TABLE_ID, PARTITION_ID);

List<RowId> nextRowIdsToBuild = IntStream.range(0, 2 * IndexBuilder.BATCH_SIZE)
.mapToObj(i -> rowId(PARTITION_ID))
.collect(toList());

CompletableFuture<Void> secondInvokeReplicaServiceFuture = new CompletableFuture<>();

CompletableFuture<Void> awaitSecondInvokeForReplicaService = awaitSecondInvokeForReplicaService(secondInvokeReplicaServiceFuture);

scheduleBuildIndex(INDEX_ID, TABLE_ID, PARTITION_ID, nextRowIdsToBuild);

assertThat(awaitSecondInvokeForReplicaService, willCompleteSuccessfully());

assertFalse(listenCompletionIndexBuildingFuture.isDone());

secondInvokeReplicaServiceFuture.complete(null);

assertThat(listenCompletionIndexBuildingFuture, willCompleteSuccessfully());
}

@Test
void testIndexBuildCompletionListenerForAlreadyBuiltIndex() {
CompletableFuture<Void> listenCompletionIndexBuildingFuture = listenCompletionIndexBuilding(INDEX_ID, TABLE_ID, PARTITION_ID);

scheduleBuildIndex(INDEX_ID, TABLE_ID, PARTITION_ID, List.of());

assertThat(listenCompletionIndexBuildingFuture, willTimeoutFast());
}

private void scheduleBuildIndex(int indexId, int tableId, int partitionId, Collection<RowId> nextRowIdsToBuild) {
indexBuilder.scheduleBuildIndex(
tableId,
partitionId,
indexId,
indexStorage(nextRowIdsToBuild),
mock(MvPartitionStorage.class),
mock(ClusterNode.class)
);
}

private CompletableFuture<Void> listenCompletionIndexBuilding(int indexId, int tableId, int partitionId) {
CompletableFuture<Void> future = new CompletableFuture<>();

indexBuilder.listen((indexId1, tableId1, partitionId1) -> {
if (indexId1 == indexId && tableId1 == tableId && partitionId1 == partitionId) {
future.complete(null);
}
});

return future;
}

private CompletableFuture<Void> awaitSecondInvokeForReplicaService(CompletableFuture<Void> secondInvokeFuture) {
CompletableFuture<Void> future = new CompletableFuture<>();

when(replicaService.invoke(any(ClusterNode.class), any(ReplicaRequest.class)))
.thenReturn(completedFuture(null))
.thenAnswer(invocation -> {
future.complete(null);

return secondInvokeFuture;
});

return future;
}

private static IndexStorage indexStorage(Collection<RowId> nextRowIdsToBuild) {
Iterator<RowId> it = nextRowIdsToBuild.iterator();

IndexStorage indexStorage = mock(IndexStorage.class);

when(indexStorage.getNextRowIdToBuild()).then(invocation -> it.hasNext() ? it.next() : null);

return indexStorage;
}

private static RowId rowId(int partitionId) {
return new RowId(partitionId);
}
}

0 comments on commit 5ba1995

Please sign in to comment.