Skip to content

Commit

Permalink
IGNITE-20467 Use replica message instead of directly using raft clien…
Browse files Browse the repository at this point in the history
…t when building indexes (#2630)
  • Loading branch information
tkalkirill committed Sep 26, 2023
1 parent e85139f commit d797019
Show file tree
Hide file tree
Showing 28 changed files with 252 additions and 147 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -207,11 +207,11 @@ public IgniteInternalException(int code, String messagePattern, Object... params
*
* @param code Full error code.
* @param messagePattern Error message pattern.
* @param cause Non-null throwable cause.
* @param cause Throwable cause.
* @param params Error message params.
* @see IgniteStringFormatter#format(String, Object...)
*/
public IgniteInternalException(int code, String messagePattern, Throwable cause, Object... params) {
public IgniteInternalException(int code, String messagePattern, @Nullable Throwable cause, Object... params) {
this(code, IgniteStringFormatter.format(messagePattern, params), cause);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -777,6 +777,7 @@ public static void awaitForWorkersStop(Collection<IgniteWorker> workers, boolean
* @param fn Function to run.
* @param <T> Type of returned value from {@code fn}.
* @return Result of the provided function.
* @throws IgniteInternalException with cause {@link NodeStoppingException} if {@link IgniteSpinBusyLock#enterBusy()} failed.
*/
public static <T> T inBusyLock(IgniteSpinBusyLock busyLock, Supplier<T> fn) {
if (!busyLock.enterBusy()) {
Expand All @@ -794,6 +795,7 @@ public static <T> T inBusyLock(IgniteSpinBusyLock busyLock, Supplier<T> fn) {
*
* @param busyLock Component's busy lock.
* @param fn Runnable to run.
* @throws IgniteInternalException with cause {@link NodeStoppingException} if {@link IgniteSpinBusyLock#enterBusy()} failed.
*/
public static void inBusyLock(IgniteSpinBusyLock busyLock, Runnable fn) {
if (!busyLock.enterBusy()) {
Expand Down Expand Up @@ -829,6 +831,24 @@ public static <T> CompletableFuture<T> inBusyLockAsync(IgniteSpinBusyLock busyLo
}
}

/**
* Method that runs the provided {@code fn} in {@code busyLock} if {@link IgniteSpinBusyLock#enterBusy()} succeed. Otherwise it just
* silently returns.
*
* @param busyLock Component's busy lock.
* @param fn Runnable to run.
*/
public static void inBusyLockSafe(IgniteSpinBusyLock busyLock, Runnable fn) {
if (!busyLock.enterBusy()) {
return;
}
try {
fn.run();
} finally {
busyLock.leaveBusy();
}
}

/**
* Collects all the fields of given class which are defined as a public static within the specified class.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,24 @@

package org.apache.ignite.internal.replicator.exception;

import java.util.UUID;
import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_MISS_ERR;

import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.lang.IgniteStringFormatter;
import org.apache.ignite.lang.ErrorGroups.Replicator;
import org.jetbrains.annotations.Nullable;

/**
* Unchecked exception that is thrown when a replica is not the current primary replica.
*/
public class PrimaryReplicaMissException extends IgniteInternalException {
private static final long serialVersionUID = 8755220779942651494L;

/**
* The constructor.
*
* @param expectedPrimaryReplicaTerm Expected term from.
* @param currentPrimaryReplicaTerm Current raft term.
*/
public PrimaryReplicaMissException(Long expectedPrimaryReplicaTerm, long currentPrimaryReplicaTerm) {
public PrimaryReplicaMissException(long expectedPrimaryReplicaTerm, long currentPrimaryReplicaTerm) {
this(expectedPrimaryReplicaTerm, currentPrimaryReplicaTerm, null);
}

Expand All @@ -43,25 +45,27 @@ public PrimaryReplicaMissException(Long expectedPrimaryReplicaTerm, long current
* @param currentPrimaryReplicaTerm Current raft term.
* @param cause Cause exception.
*/
public PrimaryReplicaMissException(Long expectedPrimaryReplicaTerm, long currentPrimaryReplicaTerm, Throwable cause) {
super(Replicator.REPLICA_MISS_ERR,
IgniteStringFormatter.format(
"The primary replica has changed because the term has been changed "
+ "[expectedPrimaryReplicaTerm={}, currentPrimaryReplicaTerm={}]",
expectedPrimaryReplicaTerm, currentPrimaryReplicaTerm
),
cause);
public PrimaryReplicaMissException(long expectedPrimaryReplicaTerm, long currentPrimaryReplicaTerm, @Nullable Throwable cause) {
super(
REPLICA_MISS_ERR,
"The primary replica has changed because the term has been changed "
+ "[expectedPrimaryReplicaTerm={}, currentPrimaryReplicaTerm={}]",
cause,
expectedPrimaryReplicaTerm, currentPrimaryReplicaTerm
);
}

/**
* The constructor is used for creating an exception instance that is thrown from a remote server.
* The constructor.
*
* @param traceId Trace id.
* @param code Error code.
* @param message Error message.
* @param cause Cause exception.
* @param expectedLeaseholder Expected leaseholder.
* @param currentLeaseholder Current leaseholder.
*/
public PrimaryReplicaMissException(UUID traceId, int code, String message, Throwable cause) {
super(traceId, code, message, cause);
public PrimaryReplicaMissException(String expectedLeaseholder, String currentLeaseholder) {
super(
REPLICA_MISS_ERR,
"The primary replica has changed [expectedLeaseholder={}, currentLeaseholder={}]",
expectedLeaseholder, currentLeaseholder
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,13 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

/**
* Integration test of index building.
*/
@Disabled("https://issues.apache.org/jira/browse/IGNITE-20096")
/** Integration test of index building. */
public class ItBuildIndexTest extends ClusterPerClassIntegrationTest {
private static final String ZONE_NAME = "zone_table";
private static final String ZONE_NAME = "ZONE_TABLE";

private static final String TABLE_NAME = "test_table";
private static final String TABLE_NAME = "TEST_TABLE";

private static final String INDEX_NAME = "test_index";
private static final String INDEX_NAME = "TEST_INDEX";

@AfterEach
void tearDown() {
Expand All @@ -89,7 +86,7 @@ void testBuildIndexOnStableTopology(int replicas) throws Exception {
checkIndexBuild(partitions, replicas, INDEX_NAME);

assertQuery(IgniteStringFormatter.format("SELECT * FROM {} WHERE i1 > 0", TABLE_NAME))
.matches(containsIndexScan("PUBLIC", TABLE_NAME.toUpperCase(), INDEX_NAME.toUpperCase()))
.matches(containsIndexScan("PUBLIC", TABLE_NAME, INDEX_NAME))
.returns(1, 1)
.returns(2, 2)
.returns(3, 3)
Expand All @@ -99,6 +96,7 @@ void testBuildIndexOnStableTopology(int replicas) throws Exception {
}

@Test
@Disabled("https://issues.apache.org/jira/browse/IGNITE-20330")
void testChangePrimaryReplicaOnMiddleBuildIndex() throws Exception {
prepareBuildIndexToChangePrimaryReplica();

Expand Down Expand Up @@ -174,7 +172,7 @@ private static void createAndPopulateTable(int replicas, int partitions) {

sql(IgniteStringFormatter.format(
"CREATE TABLE {} (i0 INTEGER PRIMARY KEY, i1 INTEGER) WITH PRIMARY_ZONE='{}'",
TABLE_NAME, ZONE_NAME.toUpperCase()
TABLE_NAME, ZONE_NAME
));

sql(IgniteStringFormatter.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,18 +410,22 @@ public RaftGroupListener createListener(ClusterService service, Path path, int i
RocksDbStorageEngine storageEngine = new RocksDbStorageEngine("test", engineConfig, path);
storageEngine.start();

int tableId = 1;

MvTableStorage mvTableStorage = storageEngine.createMvTable(
new StorageTableDescriptor(1, 1, DEFAULT_DATA_REGION_NAME),
new StorageTableDescriptor(tableId, 1, DEFAULT_DATA_REGION_NAME),
new StorageIndexDescriptorSupplier(mock(CatalogService.class))
);
mvTableStorage.start();

mvTableStorages.put(index, mvTableStorage);

MvPartitionStorage mvPartitionStorage = getOrCreateMvPartition(mvTableStorage, 0);
int partitionId = 0;

MvPartitionStorage mvPartitionStorage = getOrCreateMvPartition(mvTableStorage, partitionId);
mvPartitionStorages.put(index, mvPartitionStorage);

PartitionDataStorage partitionDataStorage = new TestPartitionDataStorage(mvPartitionStorage);
PartitionDataStorage partitionDataStorage = new TestPartitionDataStorage(tableId, partitionId, mvPartitionStorage);

PendingComparableValuesTracker<HybridTimestamp, Void> safeTime = new PendingComparableValuesTracker<>(
new HybridTimestamp(1, 0)
Expand All @@ -432,7 +436,7 @@ public RaftGroupListener createListener(ClusterService service, Path path, int i
);

StorageUpdateHandler storageUpdateHandler = new StorageUpdateHandler(
0,
partitionId,
partitionDataStorage,
gcConfig,
mock(LowWatermark.class),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ public TableManager(

lowWatermark = new LowWatermark(nodeName, gcConfig.lowWatermark(), clock, txManager, vaultManager, mvGc);

indexBuilder = new IndexBuilder(nodeName, cpus);
indexBuilder = new IndexBuilder(nodeName, cpus, replicaSvc);

raftCommandsMarshaller = new ThreadLocalPartitionCommandsMarshaller(clusterService.serializationRegistry());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotTxDataResponse;
import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
import org.apache.ignite.internal.table.distributed.replication.request.BinaryTupleMessage;
import org.apache.ignite.internal.table.distributed.replication.request.BuildIndexReplicaRequest;
import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyMultiRowPkReplicaRequest;
import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyScanRetrieveBatchReplicaRequest;
import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlySingleRowPkReplicaRequest;
Expand Down Expand Up @@ -161,8 +162,13 @@ public interface TableMessageGroup {
*/
short RW_MULTI_ROW_PK_REPLICA_REQUEST = 20;

/** Message type for {@link BuildIndexReplicaRequest}. */
short BUILD_INDEX_REPLICA_REQUEST = 21;

/**
* Message types for Table module RAFT commands.
*
* <p>NOTE: Commands must be immutable because they will be stored in the replication log.</p>
*/
interface Commands {
/** Message type for {@link FinishTxCommand}. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,28 +23,15 @@
import org.apache.ignite.internal.table.distributed.TableMessageGroup;
import org.apache.ignite.network.annotations.Transferable;

/**
* State machine command to build a table index.
*/
/** State machine command to build a table index. */
@Transferable(TableMessageGroup.Commands.BUILD_INDEX)
public interface BuildIndexCommand extends WriteCommand {
/**
* Returns ID of table partition.
*/
TablePartitionIdMessage tablePartitionId();

/**
* Returns index ID.
*/
/** Returns index ID. */
int indexId();

/**
* Returns row IDs for which to build indexes.
*/
/** Returns row IDs for which to build indexes. */
List<UUID> rowIds();

/**
* Returns {@code true} if this batch is the last one.
*/
/** Returns {@code true} if this batch is the last one. */
boolean finish();
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,17 @@
import org.apache.ignite.internal.lang.IgniteStringFormatter;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.index.IndexStorage;
import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
import org.apache.ignite.internal.table.distributed.command.BuildIndexCommand;
import org.apache.ignite.internal.table.distributed.replication.request.BuildIndexReplicaRequest;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.network.ClusterNode;

/**
* Task of building a table index.
*/
/** Task of building a table index. */
class IndexBuildTask {
private static final IgniteLogger LOG = Loggers.forClass(IndexBuildTask.class);

Expand All @@ -53,14 +53,16 @@ class IndexBuildTask {

private final MvPartitionStorage partitionStorage;

private final RaftGroupService raftClient;
private final ReplicaService replicaService;

private final ExecutorService executor;

private final IgniteSpinBusyLock busyLock;

private final int batchSize;

private final ClusterNode node;

private final IgniteSpinBusyLock taskBusyLock = new IgniteSpinBusyLock();

private final AtomicBoolean taskStopGuard = new AtomicBoolean();
Expand All @@ -71,23 +73,23 @@ class IndexBuildTask {
IndexBuildTaskId taskId,
IndexStorage indexStorage,
MvPartitionStorage partitionStorage,
RaftGroupService raftClient,
ReplicaService replicaService,
ExecutorService executor,
IgniteSpinBusyLock busyLock,
int batchSize
int batchSize,
ClusterNode node
) {
this.taskId = taskId;
this.indexStorage = indexStorage;
this.partitionStorage = partitionStorage;
this.raftClient = raftClient;
this.replicaService = replicaService;
this.executor = executor;
this.busyLock = busyLock;
this.batchSize = batchSize;
this.node = node;
}

/**
* Starts building the index.
*/
/** Starts building the index. */
void start() {
if (!enterBusy()) {
taskFuture.complete(null);
Expand Down Expand Up @@ -120,9 +122,7 @@ void start() {
}
}

/**
* Stops index building.
*/
/** Stops index building. */
void stop() {
if (!taskStopGuard.compareAndSet(false, true)) {
return;
Expand All @@ -131,9 +131,7 @@ void stop() {
taskBusyLock.block();
}

/**
* Returns the index build future.
*/
/** Returns the index build future. */
CompletableFuture<Void> getTaskFuture() {
return taskFuture;
}
Expand All @@ -146,7 +144,7 @@ private CompletableFuture<Void> handleNextBatch() {
try {
List<RowId> batchRowIds = createBatchRowIds();

return raftClient.run(createBuildIndexCommand(batchRowIds))
return replicaService.invoke(node, createBuildIndexReplicaRequest(batchRowIds))
.thenComposeAsync(unused -> {
if (indexStorage.getNextRowIdToBuild() == null) {
// Index has been built.
Expand Down Expand Up @@ -182,15 +180,11 @@ private List<RowId> createBatchRowIds() {
return batch;
}

private BuildIndexCommand createBuildIndexCommand(List<RowId> rowIds) {
private BuildIndexReplicaRequest createBuildIndexReplicaRequest(List<RowId> rowIds) {
boolean finish = rowIds.size() < batchSize;

return TABLE_MESSAGES_FACTORY.buildIndexCommand()
.tablePartitionId(TABLE_MESSAGES_FACTORY.tablePartitionIdMessage()
.tableId(taskId.getTableId())
.partitionId(taskId.getPartitionId())
.build()
)
return TABLE_MESSAGES_FACTORY.buildIndexReplicaRequest()
.groupId(new TablePartitionId(taskId.getTableId(), taskId.getPartitionId()))
.indexId(taskId.getIndexId())
.rowIds(rowIds.stream().map(RowId::uuid).collect(toList()))
.finish(finish)
Expand Down

0 comments on commit d797019

Please sign in to comment.