diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java index f1db841f2d35..53c0394a4c4c 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Flow.Publisher; import java.util.concurrent.Flow.Subscriber; @@ -58,6 +59,7 @@ import org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl; import org.apache.ignite.internal.metrics.MetricManager; import org.apache.ignite.internal.network.ClusterService; +import org.apache.ignite.internal.raft.IndexWithTerm; import org.apache.ignite.internal.raft.Peer; import org.apache.ignite.internal.raft.PeersAndLearners; import org.apache.ignite.internal.raft.RaftGroupEventsListener; @@ -140,6 +142,9 @@ public class MetaStorageManagerImpl implements MetaStorageManager { private final RaftGroupOptionsConfigurer raftGroupOptionsConfigurer; + /** Gets completed when raft service is started. */ + private final CompletableFuture raftNodeStarted = new CompletableFuture<>(); + /** * The constructor. * @@ -294,13 +299,16 @@ private CompletableFuture initializeMetaStorage(Set new MetaStorageServiceImpl( - thisNodeName, - raftService, - busyLock, - clusterTime, - () -> clusterService.topologyService().localMember().id()) - ); + raftNodeStarted.complete(null); + + return raftServiceFuture + .thenApply(raftService -> new MetaStorageServiceImpl( + thisNodeName, + raftService, + busyLock, + clusterTime, + () -> clusterService.topologyService().localMember().id()) + ); } catch (NodeStoppingException e) { return failedFuture(e); } @@ -325,7 +333,7 @@ private CompletableFuture startFollowerNode( followerListener = new MetaStorageListener(storage, clusterTime); CompletableFuture raftServiceFuture = raftMgr.startRaftGroupNodeAndWaitNodeReadyFuture( - new RaftNodeId(MetastorageGroupId.INSTANCE, localPeer), + raftNodeId(localPeer), configuration, followerListener, RaftGroupEventsListener.noopLsnr, @@ -370,7 +378,7 @@ private CompletableFuture startLearnerNode( learnerListener = new MetaStorageListener(storage, clusterTime); return raftMgr.startRaftGroupNodeAndWaitNodeReadyFuture( - new RaftNodeId(MetastorageGroupId.INSTANCE, localPeer), + raftNodeId(localPeer), configuration, learnerListener, RaftGroupEventsListener.noopLsnr, @@ -379,6 +387,10 @@ private CompletableFuture startLearnerNode( ); } + private static RaftNodeId raftNodeId(Peer localPeer) { + return new RaftNodeId(MetastorageGroupId.INSTANCE, localPeer); + } + /** * Sets the Meta Storage configuration. * @@ -837,6 +849,29 @@ public CompletableFuture recoveryFinishedFuture() { return recoveryFinishedFuture; } + /** + * Returns a future that will be completed with information about index and term of the Metastorage Raft group. + * + *

This method is special in the following regard: it can be called before the component gets started. The returned + * future will be completed after the component start. + */ + public CompletableFuture raftNodeIndex() { + return raftNodeStarted.thenApply(unused -> inBusyLock(busyLock, () -> { + RaftNodeId nodeId = raftNodeId(new Peer(clusterService.nodeName())); + + IndexWithTerm indexWithTerm; + try { + indexWithTerm = raftMgr.raftNodeIndex(nodeId); + } catch (NodeStoppingException e) { + throw new CompletionException(e); + } + + assert indexWithTerm != null : "Attempt to get index and term when Raft node is not started yet or already stopped)"; + + return indexWithTerm; + })); + } + @TestOnly public CompletableFuture metaStorageService() { return metaStorageSvcFut; diff --git a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/IndexWithTerm.java b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/IndexWithTerm.java new file mode 100644 index 000000000000..b95be4417bdc --- /dev/null +++ b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/IndexWithTerm.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.raft; + +/** + * Raft index with the corresponding term. + */ +public class IndexWithTerm { + private final long index; + private final long term; + + /** Constructor. */ + public IndexWithTerm(long index, long term) { + this.index = index; + this.term = term; + } + + /** Returns the index. */ + public long index() { + return index; + } + + /** Returns the term corresponding to the index. */ + public long term() { + return term; + } +} diff --git a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftManager.java b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftManager.java index f990cc0804a1..16eb9c7d198d 100644 --- a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftManager.java +++ b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftManager.java @@ -189,4 +189,12 @@ CompletableFuture startRaftGroupService( */ void destroyRaftNodeStorages(RaftNodeId nodeId, RaftGroupOptionsConfigurer raftGroupOptionsConfigurer) throws NodeStoppingException; + + /** + * Returns information about index and term of the given node, or {@code null} if the group is not started. + * + * @param nodeId ID of the Raft node. + * @throws NodeStoppingException If the node is already being stopped. + */ + @Nullable IndexWithTerm raftNodeIndex(RaftNodeId nodeId) throws NodeStoppingException; } diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java index 24d3a52ed8f6..6cf2142c2627 100644 --- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java +++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java @@ -414,6 +414,19 @@ public void destroyRaftNodeStorages(RaftNodeId nodeId, RaftGroupOptionsConfigure } } + @Override + public @Nullable IndexWithTerm raftNodeIndex(RaftNodeId nodeId) throws NodeStoppingException { + if (!busyLock.enterBusy()) { + throw new NodeStoppingException(); + } + + try { + return raftServer.raftNodeIndex(nodeId); + } finally { + busyLock.leaveBusy(); + } + } + private CompletableFuture startRaftGroupNodeInternal( RaftNodeId nodeId, PeersAndLearners configuration, diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftServer.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftServer.java index 17004e53d9a9..34684dfead5b 100644 --- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftServer.java +++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftServer.java @@ -21,6 +21,7 @@ import java.util.Set; import org.apache.ignite.internal.manager.IgniteComponent; import org.apache.ignite.internal.network.ClusterService; +import org.apache.ignite.internal.raft.IndexWithTerm; import org.apache.ignite.internal.raft.Peer; import org.apache.ignite.internal.raft.PeersAndLearners; import org.apache.ignite.internal.raft.RaftGroupEventsListener; @@ -28,6 +29,7 @@ import org.apache.ignite.internal.raft.service.RaftGroupListener; import org.apache.ignite.internal.replicator.ReplicationGroupId; import org.apache.ignite.raft.jraft.option.NodeOptions; +import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.TestOnly; /** @@ -108,6 +110,13 @@ boolean startRaftNode( */ void destroyRaftNodeStorages(RaftNodeId nodeId, RaftGroupOptions groupOptions); + /** + * Returns information about index and term of the given node, or {@code null} if the group is not started. + * + * @param nodeId ID of the Raft node. + */ + @Nullable IndexWithTerm raftNodeIndex(RaftNodeId nodeId); + /** * Returns local nodes running the given Raft group. * diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java index ce499affebea..b1de0c22cf2f 100644 --- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java +++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java @@ -55,6 +55,7 @@ import org.apache.ignite.internal.manager.ComponentContext; import org.apache.ignite.internal.metrics.sources.RaftMetricSource; import org.apache.ignite.internal.network.ClusterService; +import org.apache.ignite.internal.raft.IndexWithTerm; import org.apache.ignite.internal.raft.Marshaller; import org.apache.ignite.internal.raft.Peer; import org.apache.ignite.internal.raft.PeersAndLearners; @@ -85,6 +86,7 @@ import org.apache.ignite.raft.jraft.core.ReadOnlyServiceImpl.ReadIndexEvent; import org.apache.ignite.raft.jraft.core.StateMachineAdapter; import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor; +import org.apache.ignite.raft.jraft.entity.LogId; import org.apache.ignite.raft.jraft.entity.PeerId; import org.apache.ignite.raft.jraft.error.RaftError; import org.apache.ignite.raft.jraft.option.NodeOptions; @@ -577,6 +579,19 @@ public void destroyRaftNodeStorages(RaftNodeId nodeId, RaftGroupOptions groupOpt } } + @Override + public @Nullable IndexWithTerm raftNodeIndex(RaftNodeId nodeId) { + RaftGroupService service = nodes.get(nodeId); + + if (service == null) { + return null; + } + + LogId logId = service.getRaftNode().lastLogIndexAndTerm(); + + return new IndexWithTerm(logId.getIndex(), logId.getTerm()); + } + /** * Performs a {@code resetPeers} operation on raft node. * diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/Node.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/Node.java index 3cc5e80dc450..7137292aaf9a 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/Node.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/Node.java @@ -22,7 +22,7 @@ import org.apache.ignite.raft.jraft.core.NodeMetrics; import org.apache.ignite.raft.jraft.core.Replicator; import org.apache.ignite.raft.jraft.core.State; -import org.apache.ignite.raft.jraft.entity.NodeId; +import org.apache.ignite.raft.jraft.entity.LogId;import org.apache.ignite.raft.jraft.entity.NodeId; import org.apache.ignite.raft.jraft.entity.PeerId; import org.apache.ignite.raft.jraft.entity.Task; import org.apache.ignite.raft.jraft.entity.UserLog; @@ -328,4 +328,10 @@ public interface Node extends Lifecycle, Describer { * Returns the value of last replicated log index. Corresponding log entry might not yet be written to the log storage (no flush). */ long lastLogIndex(); + + /** + * Returns the value of last replicated log index (with its term). Corresponding log entry might not yet be written to the log storage + * (no flush). + */ + LogId lastLogIndexAndTerm(); } diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java index 0fcf832981ac..3bfbae526b82 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java @@ -2787,6 +2787,17 @@ public long lastLogIndex() { } } + @Override + public LogId lastLogIndexAndTerm() { + this.readLock.lock(); + try { + return logManager.getLastLogId(false).copy(); + } + finally { + this.readLock.unlock(); + } + } + @OnlyForTest ConfigurationEntry getConf() { this.readLock.lock(); diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java index d117637b69c3..5545170441c2 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java @@ -551,14 +551,6 @@ public class IgniteImpl implements Ignite { failureProcessor ); - systemDisasterRecoveryManager = new SystemDisasterRecoveryManagerImpl( - name, - clusterSvc.topologyService(), - clusterSvc.messagingService(), - vaultMgr, - restarter - ); - clock = new HybridClockImpl(); clockWaiter = new ClockWaiter(name, clock); @@ -701,6 +693,15 @@ public class IgniteImpl implements Ignite { metaStorageMgr.configure(clusterConfigRegistry.getConfiguration(MetaStorageExtensionConfiguration.KEY).metaStorage()); + systemDisasterRecoveryManager = new SystemDisasterRecoveryManagerImpl( + name, + clusterSvc.topologyService(), + clusterSvc.messagingService(), + vaultMgr, + restarter, + metaStorageMgr::raftNodeIndex + ); + SchemaSynchronizationConfiguration schemaSyncConfig = clusterConfigRegistry .getConfiguration(SchemaSynchronizationExtensionConfiguration.KEY).schemaSync(); @@ -1228,11 +1229,11 @@ CompletableFuture startAsync() { failureProcessor, clusterStateStorage, clusterIdService, + systemDisasterRecoveryManager, criticalWorkerRegistry, nettyBootstrapFactory, nettyWorkersRegistrar, clusterSvc, - systemDisasterRecoveryManager, restComponent, partitionsLogStorageFactory, msLogStorageFactory, diff --git a/modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/message/MetastorageIndexTermRequestMessage.java b/modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/message/MetastorageIndexTermRequestMessage.java new file mode 100644 index 000000000000..8035aa68a6bd --- /dev/null +++ b/modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/message/MetastorageIndexTermRequestMessage.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.disaster.system.message; + +import org.apache.ignite.internal.network.NetworkMessage; +import org.apache.ignite.internal.network.annotations.Transferable; + +/** + * Message for retrieving index+term of the Metastorage Raft group on the recipient node. + */ +@Transferable(SystemDisasterRecoveryMessageGroup.METASTORAGE_INDEX_TERM_REQUEST) +public interface MetastorageIndexTermRequestMessage extends NetworkMessage { +} diff --git a/modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/message/MetastorageIndexTermResponseMessage.java b/modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/message/MetastorageIndexTermResponseMessage.java new file mode 100644 index 000000000000..85eecbfbb6de --- /dev/null +++ b/modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/message/MetastorageIndexTermResponseMessage.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.disaster.system.message; + +import org.apache.ignite.internal.network.NetworkMessage; +import org.apache.ignite.internal.network.annotations.Transferable; + +/** + * Response to {@link MetastorageIndexTermRequestMessage}. + */ +@Transferable(SystemDisasterRecoveryMessageGroup.METASTORAGE_INDEX_TERM_RESPONSE) +public interface MetastorageIndexTermResponseMessage extends NetworkMessage { + /** + * Returns Raft index of the Metastorage group on the sender of this message. + */ + long raftIndex(); + + /** + * Returns term corresponding to the Raft index of the Metastorage group on the sender of this message. + */ + long raftTerm(); +} diff --git a/modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/message/SystemDisasterRecoveryMessageGroup.java b/modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/message/SystemDisasterRecoveryMessageGroup.java index c6f0b0fa0700..e94ea8bd9c7d 100644 --- a/modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/message/SystemDisasterRecoveryMessageGroup.java +++ b/modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/message/SystemDisasterRecoveryMessageGroup.java @@ -28,4 +28,14 @@ public class SystemDisasterRecoveryMessageGroup { * Message type for {@link ResetClusterMessage}. */ public static final short RESET_CLUSTER = 1; + + /** + * Message type for {@link MetastorageIndexTermRequestMessage}. + */ + public static final short METASTORAGE_INDEX_TERM_REQUEST = 2; + + /** + * Message type for {@link MetastorageIndexTermResponseMessage}. + */ + public static final short METASTORAGE_INDEX_TERM_RESPONSE = 3; } diff --git a/modules/system-disaster-recovery/build.gradle b/modules/system-disaster-recovery/build.gradle index e4b9ec8d056a..6989f69f5b3c 100644 --- a/modules/system-disaster-recovery/build.gradle +++ b/modules/system-disaster-recovery/build.gradle @@ -28,6 +28,7 @@ dependencies { implementation project(':ignite-network-api') implementation project(':ignite-vault') implementation project(':ignite-cluster-management') + implementation project(':ignite-raft-api') implementation libs.jetbrains.annotations testImplementation libs.hamcrest.core diff --git a/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItMetastorageIndexGettingTest.java b/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItMetastorageIndexGettingTest.java new file mode 100644 index 000000000000..00b9e0173b8f --- /dev/null +++ b/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItMetastorageIndexGettingTest.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.disaster.system; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.is; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +import java.util.stream.IntStream; +import org.apache.ignite.internal.ClusterPerTestIntegrationTest; +import org.apache.ignite.internal.app.IgniteImpl; +import org.apache.ignite.internal.disaster.system.message.MetastorageIndexTermResponseMessage; +import org.apache.ignite.internal.disaster.system.message.SystemDisasterRecoveryMessagesFactory; +import org.apache.ignite.internal.network.NetworkMessage; +import org.junit.jupiter.api.Test; + +class ItMetastorageIndexGettingTest extends ClusterPerTestIntegrationTest { + private final SystemDisasterRecoveryMessagesFactory messagesFactory = new SystemDisasterRecoveryMessagesFactory(); + + @Override + protected int initialNodes() { + return 0; + } + + @Test + void indexAndTermAreReturnedFromRunningNode() throws Exception { + cluster.startAndInit(2, parametersBuilder -> { + parametersBuilder.cmgNodeNames(cluster.nodeName(0)); + parametersBuilder.metaStorageNodeNames(cluster.nodeName(1)); + }); + + MetastorageIndexTermResponseMessage response = getMetastorageIndexAndTermFrom(1); + + assertThat(response.raftIndex(), is(greaterThanOrEqualTo(1L))); + assertThat(response.raftTerm(), is(greaterThanOrEqualTo(1L))); + } + + private MetastorageIndexTermResponseMessage getMetastorageIndexAndTermFrom(int targetNodeIndex) + throws InterruptedException, ExecutionException, TimeoutException { + IgniteImpl ignite0 = igniteImpl(0); + + CompletableFuture future = ignite0.clusterService().messagingService().invoke( + cluster.nodeName(targetNodeIndex), + messagesFactory.metastorageIndexTermRequestMessage().build(), + SECONDS.toMillis(10) + ); + return (MetastorageIndexTermResponseMessage) future.get(10, SECONDS); + } + + @Test + void indexAndTermAreReturnedFromNodeThatHangsOnStartingMetastorage() throws Exception { + cluster.startAndInit(3, parametersBuilder -> { + parametersBuilder.cmgNodeNames(cluster.nodeName(0)); + parametersBuilder.metaStorageNodeNames(cluster.nodeName(1)); + }); + + // Stopping node 1 will make Metastorage lose majority. + IntStream.of(1, 2).parallel().forEach(cluster::stopNode); + + // This will not be able to finish its startup. + cluster.startEmbeddedNode(2); + + MetastorageIndexTermResponseMessage response = getMetastorageIndexAndTermFrom(2); + + assertThat(response.raftIndex(), is(greaterThanOrEqualTo(1L))); + assertThat(response.raftTerm(), is(greaterThanOrEqualTo(1L))); + } +} diff --git a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/ClusterIdService.java b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/ClusterIdService.java index d8138db464cd..0887aa3764d7 100644 --- a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/ClusterIdService.java +++ b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/ClusterIdService.java @@ -32,6 +32,8 @@ /** * Used to handle volatile information about cluster ID used to restrict which nodes can connect this one and vice versa. + * + *

This MUST be started after the Vault, but before networking. */ public class ClusterIdService implements ClusterIdSupplier, ClusterIdStore, IgniteComponent { private final SystemDisasterRecoveryStorage storage; diff --git a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImpl.java b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImpl.java index a9c815c92add..bd638f715a57 100644 --- a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImpl.java +++ b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImpl.java @@ -36,9 +36,12 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; +import java.util.function.Supplier; import org.apache.ignite.internal.cluster.management.ClusterState; import org.apache.ignite.internal.cluster.management.network.messages.CmgMessagesFactory; import org.apache.ignite.internal.cluster.management.network.messages.SuccessResponseMessage; +import org.apache.ignite.internal.disaster.system.message.MetastorageIndexTermRequestMessage; +import org.apache.ignite.internal.disaster.system.message.MetastorageIndexTermResponseMessage; import org.apache.ignite.internal.disaster.system.message.ResetClusterMessage; import org.apache.ignite.internal.disaster.system.message.ResetClusterMessageBuilder; import org.apache.ignite.internal.disaster.system.message.SystemDisasterRecoveryMessageGroup; @@ -48,6 +51,7 @@ import org.apache.ignite.internal.network.MessagingService; import org.apache.ignite.internal.network.NetworkMessage; import org.apache.ignite.internal.network.TopologyService; +import org.apache.ignite.internal.raft.IndexWithTerm; import org.apache.ignite.internal.util.CompletableFutures; import org.apache.ignite.internal.vault.VaultManager; import org.apache.ignite.network.ClusterNode; @@ -61,6 +65,7 @@ public class SystemDisasterRecoveryManagerImpl implements SystemDisasterRecovery private final TopologyService topologyService; private final MessagingService messagingService; private final ServerRestarter restarter; + private final Supplier> metastorageIndexWithTerm; private final SystemDisasterRecoveryMessagesFactory messagesFactory = new SystemDisasterRecoveryMessagesFactory(); private static final CmgMessagesFactory cmgMessagesFactory = new CmgMessagesFactory(); @@ -76,12 +81,14 @@ public SystemDisasterRecoveryManagerImpl( TopologyService topologyService, MessagingService messagingService, VaultManager vaultManager, - ServerRestarter restarter + ServerRestarter restarter, + Supplier> metastorageIndexWithTerm ) { this.thisNodeName = thisNodeName; this.topologyService = topologyService; this.messagingService = messagingService; this.restarter = restarter; + this.metastorageIndexWithTerm = metastorageIndexWithTerm; storage = new SystemDisasterRecoveryStorage(vaultManager); restartExecutor = new ThreadPerTaskExecutor(thisNodeName + "-restart-"); @@ -93,6 +100,9 @@ public CompletableFuture startAsync(ComponentContext componentContext) { if (message instanceof ResetClusterMessage) { assert correlationId != null; handleResetClusterMessage((ResetClusterMessage) message, sender, correlationId); + } else if (message instanceof MetastorageIndexTermRequestMessage) { + assert correlationId != null; + handleMetastorageIndexTermRequest(sender, correlationId); } }); @@ -112,6 +122,18 @@ private void handleResetClusterMessage(ResetClusterMessage message, ClusterNode }); } + private void handleMetastorageIndexTermRequest(ClusterNode sender, long correlationId) { + metastorageIndexWithTerm.get() + .thenAccept(indexWithTerm -> { + MetastorageIndexTermResponseMessage response = messagesFactory.metastorageIndexTermResponseMessage() + .raftIndex(indexWithTerm.index()) + .raftTerm(indexWithTerm.term()) + .build(); + + messagingService.respond(sender, response, correlationId); + }); + } + private static SuccessResponseMessage successResponseMessage() { return cmgMessagesFactory.successResponseMessage().build(); } diff --git a/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImplTest.java b/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImplTest.java index bfb06116e3f0..d00564ae6df6 100644 --- a/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImplTest.java +++ b/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImplTest.java @@ -56,9 +56,11 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; import org.apache.ignite.internal.cluster.management.ClusterState; import org.apache.ignite.internal.cluster.management.network.messages.CmgMessagesFactory; import org.apache.ignite.internal.cluster.management.network.messages.SuccessResponseMessage; +import org.apache.ignite.internal.disaster.system.message.MetastorageIndexTermResponseMessage; import org.apache.ignite.internal.disaster.system.message.ResetClusterMessage; import org.apache.ignite.internal.disaster.system.message.ResetClusterMessageBuilder; import org.apache.ignite.internal.disaster.system.message.SystemDisasterRecoveryMessageGroup; @@ -71,6 +73,7 @@ import org.apache.ignite.internal.network.NetworkMessageHandler; import org.apache.ignite.internal.network.TopologyService; import org.apache.ignite.internal.properties.IgniteProductVersion; +import org.apache.ignite.internal.raft.IndexWithTerm; import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; import org.apache.ignite.internal.testframework.WorkDirectory; import org.apache.ignite.internal.testframework.WorkDirectoryExtension; @@ -122,6 +125,9 @@ class SystemDisasterRecoveryManagerImplTest extends BaseIgniteAbstractTest { @Mock private ServerRestarter restarter; + @Mock + private Supplier> metastorageIndexWithTerm; + private SystemDisasterRecoveryManagerImpl manager; private final ComponentContext componentContext = new ComponentContext(); @@ -159,7 +165,8 @@ void init() { topologyService, messagingService, vaultManager, - restarter + restarter, + metastorageIndexWithTerm ); assertThat(manager.startAsync(componentContext), willCompleteSuccessfully()); } @@ -587,6 +594,21 @@ void resetClusterWithMgRequiresCurrentTopologyBeEnoughForMgReplicationFactor() { assertThat(ex.getMessage(), is("Metastorage replication factor cannot exceed size of current physical topology (1).")); } + @Test + void returnsIndexAndTerm() { + when(metastorageIndexWithTerm.get()).thenReturn(completedFuture(new IndexWithTerm(234, 2))); + + NetworkMessageHandler handler = extractMessageHandler(); + handler.onReceived(messagesFactory.metastorageIndexTermRequestMessage().build(), thisNode, 123L); + + ArgumentCaptor captor = ArgumentCaptor.forClass(MetastorageIndexTermResponseMessage.class); + verify(messagingService).respond(eq(thisNode), captor.capture(), eq(123L)); + + MetastorageIndexTermResponseMessage response = captor.getValue(); + assertThat(response.raftIndex(), is(234L)); + assertThat(response.raftTerm(), is(2L)); + } + private enum ResetCluster { CMG_ONLY { @Override