diff --git a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java index aa252d828a2f..e295b7af8c19 100644 --- a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java +++ b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java @@ -41,6 +41,7 @@ import org.apache.ignite.lang.NodeStoppingException; import org.apache.ignite.network.ClusterNode; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.extension.ExtendWith; @@ -139,6 +140,7 @@ void testInitCancel(TestInfo testInfo) throws Exception { /** * Tests a scenario when a node is restarted. */ + @Disabled("https://issues.apache.org/jira/browse/IGNITE-18795") @Test void testNodeRestart(TestInfo testInfo) throws Exception { startCluster(2, testInfo); diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java index 32b827434da4..d3f58bc4e85f 100644 --- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java +++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java @@ -58,10 +58,8 @@ import org.apache.ignite.internal.properties.IgniteProductVersion; import org.apache.ignite.internal.raft.Peer; import org.apache.ignite.internal.raft.PeersAndLearners; -import org.apache.ignite.internal.raft.RaftGroupEventsListener; import org.apache.ignite.internal.raft.RaftManager; import org.apache.ignite.internal.raft.RaftNodeId; -import org.apache.ignite.internal.raft.Status; import org.apache.ignite.internal.thread.NamedThreadFactory; import org.apache.ignite.internal.util.IgniteSpinBusyLock; import org.apache.ignite.internal.util.IgniteUtils; @@ -522,7 +520,7 @@ private CompletableFuture startCmgRaftService(Set nodeNa new RaftNodeId(CmgGroupId.INSTANCE, serverPeer), raftConfiguration, new CmgRaftGroupListener(clusterStateStorage, logicalTopology, this::onLogicalTopologyChanged), - createCmgRaftGroupEventsListener() + this::onElectedAsLeader ) .thenApply(service -> new CmgRaftService(service, clusterService, logicalTopology)); } catch (Exception e) { @@ -549,25 +547,6 @@ private void onLogicalTopologyChanged(long term) { } } - private RaftGroupEventsListener createCmgRaftGroupEventsListener() { - return new RaftGroupEventsListener() { - @Override - public void onLeaderElected(long term) { - ClusterManagementGroupManager.this.onElectedAsLeader(term); - } - - @Override - public void onNewPeersConfigurationApplied(PeersAndLearners configuration) { - // No-op. - } - - @Override - public void onReconfigurationError(Status status, PeersAndLearners configuration, long term) { - // No-op. - } - }; - } - /** * Starts the CMG Raft service using the given {@code state} and persists it to the local storage. */ diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListener.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListener.java index 3500713e1571..945625b555e0 100644 --- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListener.java +++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListener.java @@ -185,11 +185,13 @@ private Serializable completeValidation(JoinReadyCommand command) { ClusterNode node = command.node().asClusterNode(); if (validationManager.isNodeValidated(node)) { - logicalTopology.putNode(node); + validationManager.completeValidation(node); - LOG.info("Node added to the logical topology [node={}]", node.name()); + logicalTopology.putNode(node); - validationManager.completeValidation(node); + if (LOG.isInfoEnabled()) { + LOG.info("Node added to the logical topology [node={}]", node.name()); + } return null; } else { diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/ValidationManager.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/ValidationManager.java index 3fbc42e1a3f7..8b822c437e01 100644 --- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/ValidationManager.java +++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/ValidationManager.java @@ -17,10 +17,11 @@ package org.apache.ignite.internal.cluster.management.raft; +import static java.util.stream.Collectors.toSet; + import java.util.Collection; +import java.util.Comparator; import java.util.Set; -import java.util.TreeSet; -import java.util.stream.Collectors; import org.apache.ignite.internal.cluster.management.ClusterState; import org.apache.ignite.internal.cluster.management.ClusterTag; import org.apache.ignite.internal.cluster.management.raft.commands.InitCmgStateCommand; @@ -90,7 +91,7 @@ static ValidationResult validateState(@Nullable ClusterState state, ClusterNode } /** - * Validates a given node and issues a validation token. + * Validates a given node and saves it in the set of validated nodes. * * @return {@code null} in case of successful validation or a {@link ValidationErrorResponse} otherwise. */ @@ -134,16 +135,17 @@ void putValidatedNode(ClusterNode node) { void removeValidatedNodes(Collection nodes) { Set validatedNodeIds = storage.getValidatedNodes().stream() .map(ClusterNode::id) - // Using a sorted collection to have a stable notification order. - .collect(Collectors.toCollection(TreeSet::new)); + .collect(toSet()); - nodes.forEach(node -> { - if (validatedNodeIds.contains(node.id())) { - storage.removeValidatedNode(node); + // Using a sorted stream to have a stable notification order. + nodes.stream() + .filter(node -> validatedNodeIds.contains(node.id())) + .sorted(Comparator.comparing(ClusterNode::id)) + .forEach(node -> { + storage.removeValidatedNode(node); - logicalTopology.onNodeInvalidated(node); - } - }); + logicalTopology.onNodeInvalidated(node); + }); } /** @@ -152,6 +154,16 @@ void removeValidatedNodes(Collection nodes) { * @param node Node that wishes to join the logical topology. */ void completeValidation(ClusterNode node) { + // Remove all other versions of this node, if they were validated at some point, but not removed from the physical topology. + storage.getValidatedNodes().stream() + .filter(n -> n.name().equals(node.name()) && !n.id().equals(node.id())) + .sorted(Comparator.comparing(ClusterNode::id)) + .forEach(nodeVersion -> { + storage.removeValidatedNode(nodeVersion); + + logicalTopology.onNodeInvalidated(nodeVersion); + }); + storage.removeValidatedNode(node); } } diff --git a/modules/metastorage/build.gradle b/modules/metastorage/build.gradle index 7202db5c398e..197562dae083 100644 --- a/modules/metastorage/build.gradle +++ b/modules/metastorage/build.gradle @@ -52,6 +52,8 @@ dependencies { integrationTestImplementation testFixtures(project(':ignite-raft')) integrationTestImplementation testFixtures(project(':ignite-configuration')) integrationTestImplementation testFixtures(project(':ignite-vault')) + integrationTestImplementation testFixtures(project(':ignite-metastorage')) + integrationTestImplementation testFixtures(project(':ignite-cluster-management')) testFixturesImplementation project(':ignite-core') testFixturesImplementation project(':ignite-rocksdb-common') diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java index 9941d8b5cbdf..c2411ee654a8 100644 --- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java +++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java @@ -33,12 +33,12 @@ import static org.mockito.Mockito.when; import java.nio.charset.StandardCharsets; -import java.nio.file.Path; import java.util.List; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.stream.Stream; import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager; +import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService; import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; import org.apache.ignite.internal.hlc.HybridClockImpl; @@ -52,8 +52,7 @@ import org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage; import org.apache.ignite.internal.raft.Loza; import org.apache.ignite.internal.raft.configuration.RaftConfiguration; -import org.apache.ignite.internal.testframework.WorkDirectory; -import org.apache.ignite.internal.testframework.WorkDirectoryExtension; +import org.apache.ignite.internal.testframework.IgniteAbstractTest; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.vault.VaultEntry; import org.apache.ignite.internal.vault.VaultManager; @@ -72,9 +71,8 @@ /** * Integration tests for {@link MetaStorageManagerImpl}. */ -@ExtendWith(WorkDirectoryExtension.class) @ExtendWith(ConfigurationExtension.class) -public class ItMetaStorageManagerImplTest { +public class ItMetaStorageManagerImplTest extends IgniteAbstractTest { private VaultManager vaultManager; private ClusterService clusterService; @@ -86,8 +84,7 @@ public class ItMetaStorageManagerImplTest { private MetaStorageManagerImpl metaStorageManager; @BeforeEach - void setUp(TestInfo testInfo, @WorkDirectory Path workDir, @InjectConfiguration RaftConfiguration raftConfiguration) - throws NodeStoppingException { + void setUp(TestInfo testInfo, @InjectConfiguration RaftConfiguration raftConfiguration) throws NodeStoppingException { var addr = new NetworkAddress("localhost", 10_000); clusterService = clusterService(testInfo, addr.port(), new StaticNodeFinder(List.of(addr))); @@ -107,6 +104,7 @@ void setUp(TestInfo testInfo, @WorkDirectory Path workDir, @InjectConfiguration vaultManager, clusterService, cmgManager, + mock(LogicalTopologyService.class), raftManager, storage ); @@ -266,6 +264,7 @@ void testMetaStorageStopBeforeRaftServiceStarted() throws Exception { vaultManager, clusterService, cmgManager, + mock(LogicalTopologyService.class), raftManager, storage ); diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesTest.java new file mode 100644 index 000000000000..f7e1787f0cbc --- /dev/null +++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesTest.java @@ -0,0 +1,345 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.metastorage.impl; + +import static java.util.concurrent.CompletableFuture.allOf; +import static java.util.stream.Collectors.toSet; +import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists; +import static org.apache.ignite.internal.metastorage.dsl.Conditions.revision; +import static org.apache.ignite.internal.metastorage.dsl.Operations.noop; +import static org.apache.ignite.internal.metastorage.dsl.Operations.put; +import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.nio.charset.StandardCharsets; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Stream; +import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager; +import org.apache.ignite.internal.cluster.management.configuration.ClusterManagementConfiguration; +import org.apache.ignite.internal.cluster.management.raft.ClusterStateStorage; +import org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorage; +import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl; +import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl; +import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; +import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; +import org.apache.ignite.internal.hlc.HybridClockImpl; +import org.apache.ignite.internal.manager.IgniteComponent; +import org.apache.ignite.internal.metastorage.Entry; +import org.apache.ignite.internal.metastorage.EntryEvent; +import org.apache.ignite.internal.metastorage.WatchEvent; +import org.apache.ignite.internal.metastorage.WatchListener; +import org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage; +import org.apache.ignite.internal.network.message.ScaleCubeMessage; +import org.apache.ignite.internal.raft.Loza; +import org.apache.ignite.internal.raft.Peer; +import org.apache.ignite.internal.raft.RaftManager; +import org.apache.ignite.internal.raft.configuration.RaftConfiguration; +import org.apache.ignite.internal.testframework.IgniteAbstractTest; +import org.apache.ignite.internal.util.IgniteUtils; +import org.apache.ignite.internal.vault.VaultManager; +import org.apache.ignite.internal.vault.inmemory.InMemoryVaultService; +import org.apache.ignite.lang.ByteArray; +import org.apache.ignite.lang.NodeStoppingException; +import org.apache.ignite.network.ClusterNode; +import org.apache.ignite.network.ClusterService; +import org.apache.ignite.network.DefaultMessagingService; +import org.apache.ignite.network.NetworkAddress; +import org.apache.ignite.network.NetworkMessage; +import org.apache.ignite.network.StaticNodeFinder; +import org.apache.ignite.utils.ClusterServiceTestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.extension.ExtendWith; + +/** + * Tests for scenarios when Meta Storage nodes join and leave a cluster. + */ +@ExtendWith(ConfigurationExtension.class) +public class ItMetaStorageMultipleNodesTest extends IgniteAbstractTest { + @InjectConfiguration + private static RaftConfiguration raftConfiguration; + + @InjectConfiguration("mock.failoverTimeout=0") + private static ClusterManagementConfiguration cmgConfiguration; + + private static class Node { + private final VaultManager vaultManager; + + private final ClusterService clusterService; + + private final RaftManager raftManager; + + private final ClusterStateStorage clusterStateStorage = new TestClusterStateStorage(); + + private final ClusterManagementGroupManager cmgManager; + + private final MetaStorageManagerImpl metaStorageManager; + + Node(ClusterService clusterService, Path dataPath) { + this.clusterService = clusterService; + + this.vaultManager = new VaultManager(new InMemoryVaultService()); + + Path basePath = dataPath.resolve(name()); + + this.raftManager = new Loza( + clusterService, + raftConfiguration, + basePath.resolve("raft"), + new HybridClockImpl() + ); + + var logicalTopology = new LogicalTopologyImpl(clusterStateStorage); + + this.cmgManager = new ClusterManagementGroupManager( + vaultManager, + clusterService, + raftManager, + clusterStateStorage, + logicalTopology, + cmgConfiguration + ); + + this.metaStorageManager = new MetaStorageManagerImpl( + vaultManager, + clusterService, + cmgManager, + new LogicalTopologyServiceImpl(logicalTopology, cmgManager), + raftManager, + new SimpleInMemoryKeyValueStorage(name()) + ); + } + + void start() throws NodeStoppingException { + List components = + List.of(vaultManager, clusterService, raftManager, clusterStateStorage, cmgManager, metaStorageManager); + + components.forEach(IgniteComponent::start); + + metaStorageManager.deployWatches(); + } + + String name() { + return clusterService.localConfiguration().getName(); + } + + void stop() throws Exception { + List components = + List.of(metaStorageManager, cmgManager, raftManager, clusterStateStorage, clusterService, vaultManager); + + Stream beforeNodeStop = components.stream().map(c -> c::beforeNodeStop); + + Stream nodeStop = components.stream().map(c -> c::stop); + + IgniteUtils.closeAll(Stream.concat(beforeNodeStop, nodeStop)); + } + + CompletableFuture> getMetaStorageLearners() { + return metaStorageManager + .metaStorageServiceFuture() + .thenApply(MetaStorageServiceImpl::raftGroupService) + .thenCompose(service -> service.refreshMembers(false).thenApply(v -> service.learners())) + .thenApply(learners -> learners.stream().map(Peer::consistentId).collect(toSet())); + } + + void startDroppingMessagesTo(Node recipient, Class msgType) { + ((DefaultMessagingService) clusterService.messagingService()) + .dropMessages((recipientConsistentId, message) -> + recipient.name().equals(recipientConsistentId) && msgType.isInstance(message)); + } + + void stopDroppingMessages() { + ((DefaultMessagingService) clusterService.messagingService()).stopDroppingMessages(); + } + } + + private final List nodes = new ArrayList<>(); + + private Node startNode(TestInfo testInfo) throws NodeStoppingException { + var nodeFinder = new StaticNodeFinder(List.of(new NetworkAddress("localhost", 10_000))); + + ClusterService clusterService = ClusterServiceTestUtils.clusterService(testInfo, 10_000 + nodes.size(), nodeFinder); + + var node = new Node(clusterService, workDir); + + node.start(); + + nodes.add(node); + + return node; + } + + @AfterEach + void tearDown() throws Exception { + IgniteUtils.closeAll(nodes.stream().map(node -> node::stop)); + } + + /** + * Tests that an incoming node gets registered as a Learner and receives Meta Storage updates. + */ + @Test + void testLearnerJoin(TestInfo testInfo) throws NodeStoppingException { + Node firstNode = startNode(testInfo); + + firstNode.cmgManager.initCluster(List.of(firstNode.name()), List.of(firstNode.name()), "test"); + + var key = new ByteArray("foo"); + byte[] value = "bar".getBytes(StandardCharsets.UTF_8); + + CompletableFuture invokeFuture = firstNode.metaStorageManager.invoke(notExists(key), put(key, value), noop()); + + assertThat(invokeFuture, willBe(true)); + + Node secondNode = startNode(testInfo); + + // Check that reading remote data works correctly. + assertThat(secondNode.metaStorageManager.get(key).thenApply(Entry::value), willBe(value)); + + // Check that the new node will receive events. + var awaitFuture = new CompletableFuture(); + + secondNode.metaStorageManager.registerExactWatch(key, new WatchListener() { + @Override + public void onUpdate(WatchEvent event) { + // Skip the first update event, because it's not guaranteed to arrive here (insert may have happened before the watch was + // registered). + if (event.revision() != 1) { + awaitFuture.complete(event.entryEvent()); + } + } + + @Override + public void onError(Throwable e) { + awaitFuture.completeExceptionally(e); + } + }); + + byte[] newValue = "baz".getBytes(StandardCharsets.UTF_8); + + invokeFuture = firstNode.metaStorageManager.invoke(revision(key).eq(1), put(key, newValue), noop()); + + assertThat(invokeFuture, willBe(true)); + + var expectedEntryEvent = new EntryEvent( + new EntryImpl(key.bytes(), value, 1, 1), + new EntryImpl(key.bytes(), newValue, 2, 2) + ); + + assertThat(awaitFuture, willBe(expectedEntryEvent)); + + // Check that the second node has been registered as a learner. + assertThat(firstNode.getMetaStorageLearners(), willBe(Set.of(secondNode.name()))); + } + + /** + * Tests a case when a node leaves the physical topology without entering the logical topology. + */ + @Test + void testLearnerLeavePhysicalTopology(TestInfo testInfo) throws Exception { + Node firstNode = startNode(testInfo); + Node secondNode = startNode(testInfo); + + firstNode.cmgManager.initCluster(List.of(firstNode.name()), List.of(firstNode.name()), "test"); + + // Try reading some data to make sure that Raft has been configured correctly. + assertThat(secondNode.metaStorageManager.get(new ByteArray("test")).thenApply(Entry::value), willBe(nullValue())); + + // Check that the second node has been registered as a learner. + waitForCondition(() -> firstNode.getMetaStorageLearners().join().equals(Set.of(secondNode.name())), 10_000); + + // Stop the second node. + secondNode.stop(); + + nodes.remove(1); + + assertTrue(waitForCondition(() -> firstNode.getMetaStorageLearners().join().isEmpty(), 10_000)); + } + + /** + * Tests a case when a node leaves the physical topology without entering the logical topology. + */ + @Test + void testLearnerLeaveLogicalTopology(TestInfo testInfo) throws Exception { + Node firstNode = startNode(testInfo); + Node secondNode = startNode(testInfo); + + firstNode.cmgManager.initCluster(List.of(firstNode.name()), List.of(firstNode.name()), "test"); + + assertThat(allOf(firstNode.cmgManager.onJoinReady(), secondNode.cmgManager.onJoinReady()), willCompleteSuccessfully()); + + CompletableFuture> logicalTopologyNodes = firstNode.cmgManager + .logicalTopology() + .thenApply(logicalTopology -> logicalTopology.nodes().stream().map(ClusterNode::name).collect(toSet())); + + assertThat(logicalTopologyNodes, willBe(Set.of(firstNode.name(), secondNode.name()))); + + // Try reading some data to make sure that Raft has been configured correctly. + assertThat(secondNode.metaStorageManager.get(new ByteArray("test")).thenApply(Entry::value), willBe(nullValue())); + + // Check that the second node has been registered as a learner. + waitForCondition(() -> firstNode.getMetaStorageLearners().join().equals(Set.of(secondNode.name())), 10_000); + + // Stop the second node. + secondNode.stop(); + + nodes.remove(1); + + assertTrue(waitForCondition(() -> firstNode.getMetaStorageLearners().join().isEmpty(), 10_000)); + } + + /** + * Tests a scenario when a node gets kicked out of the Logical Topology due to a network partition. It should then be able to join + * the Meta Storage Raft group successfully. + */ + @Test + void testLearnerLeaveAndJoinBecauseOfNetworkPartition(TestInfo testInfo) throws Exception { + Node firstNode = startNode(testInfo); + Node secondNode = startNode(testInfo); + + firstNode.cmgManager.initCluster(List.of(firstNode.name()), List.of(firstNode.name()), "test"); + + assertThat(allOf(firstNode.cmgManager.onJoinReady(), secondNode.cmgManager.onJoinReady()), willCompleteSuccessfully()); + + CompletableFuture> logicalTopologyNodes = firstNode.cmgManager + .logicalTopology() + .thenApply(logicalTopology -> logicalTopology.nodes().stream().map(ClusterNode::name).collect(toSet())); + + assertThat(logicalTopologyNodes, willBe(Set.of(firstNode.name(), secondNode.name()))); + + waitForCondition(() -> firstNode.getMetaStorageLearners().join().equals(Set.of(secondNode.name())), 10_000); + + // Make first node lose the second node from the Physical and Logical topologies. + firstNode.startDroppingMessagesTo(secondNode, ScaleCubeMessage.class); + + assertTrue(waitForCondition(() -> firstNode.getMetaStorageLearners().join().isEmpty(), 10_000)); + + // Make the first node discover the second node again. The second node should be added as a Meta Storage Learner again. + firstNode.stopDroppingMessages(); + + assertTrue(waitForCondition(() -> firstNode.getMetaStorageLearners().join().equals(Set.of(secondNode.name())), 10_000)); + } +} diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java index ca9631a1100b..2b25b4f2de0f 100644 --- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java +++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java @@ -60,8 +60,10 @@ import java.util.Set; import java.util.TreeMap; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.Flow.Subscription; +import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -854,11 +856,9 @@ public void testGetThatThrowsOperationTimeoutException() { /** * Tests {@link MetaStorageService#closeCursors(String)}. - * - * @throws Exception If failed. */ @Test - public void testCursorsCleanup() throws Exception { + public void testCursorsCleanup() throws InterruptedException { startNodes(2); Node leader = nodes.get(0); @@ -873,14 +873,23 @@ public void testCursorsCleanup() throws Exception { return cursor; }); - class MockSubscriber implements Subscriber { - private Subscription subscription; + var subscriptionLatch = new CountDownLatch(3); + var closeCursorLatch = new CountDownLatch(1); + class MockSubscriber implements Subscriber { private final CompletableFuture result = new CompletableFuture<>(); @Override public void onSubscribe(Subscription subscription) { - this.subscription = subscription; + subscriptionLatch.countDown(); + + try { + assertTrue(closeCursorLatch.await(10, TimeUnit.SECONDS)); + } catch (Throwable e) { + onError(e); + } + + subscription.request(1); } @Override @@ -895,6 +904,7 @@ public void onError(Throwable throwable) { @Override public void onComplete() { + result.completeExceptionally(new AssertionError("No items produced")); } } @@ -908,11 +918,16 @@ public void onComplete() { learner.metaStorageService.range(new ByteArray(EXPECTED_RESULT_ENTRY.key()), null).subscribe(node1Subscriber0); - leader.metaStorageService.closeCursors(leader.clusterService.topologyService().localMember().id()).get(); + // Wait for all cursors to be registered on the server side. + assertTrue(subscriptionLatch.await(10, TimeUnit.SECONDS)); + + String leaderId = leader.clusterService.topologyService().localMember().id(); + + CompletableFuture closeCursorsFuture = leader.metaStorageService.closeCursors(leaderId); + + assertThat(closeCursorsFuture, willCompleteSuccessfully()); - node0Subscriber0.subscription.request(1); - node0Subscriber1.subscription.request(1); - node1Subscriber0.subscription.request(1); + closeCursorLatch.countDown(); assertThat(node0Subscriber0.result, willFailFast(NoSuchElementException.class)); assertThat(node0Subscriber1.result, willFailFast(NoSuchElementException.class)); diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java index b8d88f453f1f..635175d0ede2 100644 --- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java +++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java @@ -24,15 +24,12 @@ import static org.hamcrest.Matchers.is; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.util.ArrayList; +import java.util.Collections; import java.util.List; -import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -40,9 +37,14 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager; +import org.apache.ignite.internal.cluster.management.configuration.ClusterManagementConfiguration; +import org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorage; +import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl; +import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl; import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; import org.apache.ignite.internal.hlc.HybridClockImpl; +import org.apache.ignite.internal.manager.IgniteComponent; import org.apache.ignite.internal.metastorage.MetaStorageManager; import org.apache.ignite.internal.metastorage.WatchEvent; import org.apache.ignite.internal.metastorage.WatchListener; @@ -50,12 +52,11 @@ import org.apache.ignite.internal.metastorage.dsl.Operations; import org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage; import org.apache.ignite.internal.raft.Loza; -import org.apache.ignite.internal.raft.RaftManager; import org.apache.ignite.internal.raft.configuration.RaftConfiguration; -import org.apache.ignite.internal.testframework.WorkDirectory; -import org.apache.ignite.internal.testframework.WorkDirectoryExtension; +import org.apache.ignite.internal.testframework.IgniteAbstractTest; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.vault.VaultManager; +import org.apache.ignite.internal.vault.inmemory.InMemoryVaultService; import org.apache.ignite.lang.ByteArray; import org.apache.ignite.lang.NodeStoppingException; import org.apache.ignite.network.ClusterService; @@ -71,55 +72,68 @@ /** * Tests for Meta Storage Watches. */ -@ExtendWith(WorkDirectoryExtension.class) @ExtendWith(ConfigurationExtension.class) -public class ItMetaStorageWatchTest { +public class ItMetaStorageWatchTest extends IgniteAbstractTest { private static class Node { - private final ClusterService clusterService; + private final List components = new ArrayList<>(); - private final RaftManager raftManager; + private final ClusterService clusterService; private final MetaStorageManager metaStorageManager; - private final CompletableFuture> metaStorageNodesFuture = new CompletableFuture<>(); + private final ClusterManagementGroupManager cmgManager; + + Node(ClusterService clusterService, Path dataPath) { + var vaultManager = new VaultManager(new InMemoryVaultService()); + + components.add(vaultManager); - Node(ClusterService clusterService, RaftConfiguration raftConfiguration, Path dataPath) { this.clusterService = clusterService; + components.add(clusterService); + Path basePath = dataPath.resolve(name()); - this.raftManager = new Loza( + var raftManager = new Loza( clusterService, raftConfiguration, basePath.resolve("raft"), new HybridClockImpl() ); - var vaultManager = mock(VaultManager.class); + components.add(raftManager); - when(vaultManager.get(any())).thenReturn(CompletableFuture.completedFuture(null)); - when(vaultManager.put(any(), any())).thenReturn(CompletableFuture.completedFuture(null)); - when(vaultManager.putAll(any())).thenReturn(CompletableFuture.completedFuture(null)); + var clusterStateStorage = new TestClusterStateStorage(); - var cmgManager = mock(ClusterManagementGroupManager.class); + components.add(clusterStateStorage); - when(cmgManager.metaStorageNodes()).thenReturn(metaStorageNodesFuture); + var logicalTopology = new LogicalTopologyImpl(clusterStateStorage); + + this.cmgManager = new ClusterManagementGroupManager( + vaultManager, + clusterService, + raftManager, + clusterStateStorage, + logicalTopology, + cmgConfiguration + ); + + components.add(cmgManager); this.metaStorageManager = new MetaStorageManagerImpl( vaultManager, clusterService, cmgManager, + new LogicalTopologyServiceImpl(logicalTopology, cmgManager), raftManager, new RocksDbKeyValueStorage(name(), basePath.resolve("storage")) ); - } - void start(Set metaStorageNodes) { - clusterService.start(); - raftManager.start(); - metaStorageManager.start(); + components.add(metaStorageManager); + } - metaStorageNodesFuture.complete(metaStorageNodes); + void start() { + components.forEach(IgniteComponent::start); } String name() { @@ -127,9 +141,11 @@ String name() { } void stop() throws Exception { - Stream beforeNodeStop = Stream.of(metaStorageManager, raftManager, clusterService).map(c -> c::beforeNodeStop); + Collections.reverse(components); - Stream nodeStop = Stream.of(metaStorageManager, raftManager, clusterService).map(c -> c::stop); + Stream beforeNodeStop = components.stream().map(c -> c::beforeNodeStop); + + Stream nodeStop = components.stream().map(c -> c::stop); IgniteUtils.closeAll(Stream.concat(beforeNodeStop, nodeStop)); } @@ -137,11 +153,11 @@ void stop() throws Exception { private TestInfo testInfo; - @WorkDirectory - private Path workDir; + @InjectConfiguration + private static RaftConfiguration raftConfiguration; @InjectConfiguration - private RaftConfiguration raftConfiguration; + private static ClusterManagementConfiguration cmgConfiguration; private final List nodes = new ArrayList<>(); @@ -155,16 +171,20 @@ void tearDown() throws Exception { IgniteUtils.closeAll(nodes.stream().map(node -> node::stop)); } - private void startNodes(int amount) { - List localAddresses = findLocalAddresses(10_000, 10_000 + nodes.size() + amount); + private void startCluster(int size) throws NodeStoppingException { + List localAddresses = findLocalAddresses(10_000, 10_000 + nodes.size() + size); var nodeFinder = new StaticNodeFinder(localAddresses); localAddresses.stream() .map(addr -> ClusterServiceTestUtils.clusterService(testInfo, addr.port(), nodeFinder)) - .forEach(clusterService -> nodes.add(new Node(clusterService, raftConfiguration, workDir))); + .forEach(clusterService -> nodes.add(new Node(clusterService, workDir))); + + nodes.parallelStream().forEach(Node::start); + + String name = nodes.get(0).name(); - nodes.parallelStream().forEach(node -> node.start(Set.of(nodes.get(0).name()))); + nodes.get(0).cmgManager.initCluster(List.of(name), List.of(name), "test"); } @Test @@ -229,7 +249,7 @@ public void onError(Throwable e) { private void testWatches(BiConsumer registerWatchAction) throws Exception { int numNodes = 3; - startNodes(numNodes); + startCluster(numNodes); var latch = new CountDownLatch(numNodes); @@ -256,10 +276,10 @@ private void testWatches(BiConsumer registerWatchAction) t * Tests that metastorage missed metastorage events are replayed after deploying watches. */ @Test - void testReplayUpdates() throws InterruptedException { + void testReplayUpdates() throws Exception { int numNodes = 3; - startNodes(numNodes); + startCluster(numNodes); var exactLatch = new CountDownLatch(numNodes); var prefixLatch = new CountDownLatch(numNodes); diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/EntryImpl.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/EntryImpl.java index 123b02277909..130c38c58dcd 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/EntryImpl.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/EntryImpl.java @@ -19,6 +19,7 @@ import java.util.Arrays; import org.apache.ignite.internal.metastorage.Entry; +import org.apache.ignite.internal.tostring.IgniteToStringInclude; import org.apache.ignite.internal.tostring.S; import org.jetbrains.annotations.Nullable; @@ -46,9 +47,11 @@ public final class EntryImpl implements Entry { private static final long serialVersionUID = 3636551347117181271L; /** Key. */ + @IgniteToStringInclude private final byte[] key; /** Value. */ + @IgniteToStringInclude private final byte @Nullable [] val; /** Revision. */ diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java index 8f0bea21d66c..7a0921e64b13 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java @@ -17,14 +17,11 @@ package org.apache.ignite.internal.metastorage.impl; -import static java.util.concurrent.CompletableFuture.completedFuture; -import static java.util.stream.Collectors.toSet; import static org.apache.ignite.internal.util.ByteUtils.bytesToLong; import static org.apache.ignite.internal.util.ByteUtils.longToBytes; import static org.apache.ignite.internal.util.IgniteUtils.cancelOrConsume; import java.util.Collection; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -32,6 +29,7 @@ import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager; +import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.metastorage.Entry; @@ -52,7 +50,6 @@ import org.apache.ignite.internal.raft.RaftGroupEventsListener; import org.apache.ignite.internal.raft.RaftManager; import org.apache.ignite.internal.raft.RaftNodeId; -import org.apache.ignite.internal.raft.Status; import org.apache.ignite.internal.raft.service.RaftGroupService; import org.apache.ignite.internal.util.IgniteSpinBusyLock; import org.apache.ignite.internal.util.IgniteUtils; @@ -61,8 +58,6 @@ import org.apache.ignite.lang.NodeStoppingException; import org.apache.ignite.network.ClusterNode; import org.apache.ignite.network.ClusterService; -import org.apache.ignite.network.TopologyEventHandler; -import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.TestOnly; @@ -94,6 +89,8 @@ public class MetaStorageManagerImpl implements MetaStorageManager { private final ClusterManagementGroupManager cmgMgr; + private final LogicalTopologyService logicalTopologyService; + /** Meta storage service. */ private final CompletableFuture metaStorageSvcFut = new CompletableFuture<>(); @@ -123,6 +120,7 @@ public MetaStorageManagerImpl( VaultManager vaultMgr, ClusterService clusterService, ClusterManagementGroupManager cmgMgr, + LogicalTopologyService logicalTopologyService, RaftManager raftMgr, KeyValueStorage storage ) { @@ -130,6 +128,7 @@ public MetaStorageManagerImpl( this.clusterService = clusterService; this.raftMgr = raftMgr; this.cmgMgr = cmgMgr; + this.logicalTopologyService = logicalTopologyService; this.storage = storage; } @@ -153,26 +152,7 @@ private CompletableFuture initializeMetaStorage(Set new MetaStorageServiceImpl(raftService, busyLock, thisNode)); } - private void registerTopologyEventListener() { - clusterService.topologyService().addEventHandler(new TopologyEventHandler() { - @Override - public void onAppeared(ClusterNode member) { - addLearners(List.of(member)); - } - - @Override - public void onDisappeared(ClusterNode member) { - metaStorageSvcFut.thenAccept(service -> isCurrentNodeLeader(service.raftGroupService()) - .thenAccept(isLeader -> { - if (isLeader) { - service.closeCursors(member.id()); - } - })); - } - }); - } - - private void addLearners(Collection nodes) { - if (!busyLock.enterBusy()) { - LOG.info("Skipping Meta Storage configuration update because the node is stopping"); - - return; - } - - try { - metaStorageSvcFut - .thenApply(MetaStorageServiceImpl::raftGroupService) - .thenCompose(raftService -> isCurrentNodeLeader(raftService) - .thenCompose(isLeader -> isLeader ? addLearners(raftService, nodes) : completedFuture(null))) - .whenComplete((v, e) -> { - if (e != null) { - LOG.error("Unable to change peers on topology update", e); - } - }); - } finally { - busyLock.leaveBusy(); - } - } - - private CompletableFuture addLearners(RaftGroupService raftService, Collection nodes) { - if (!busyLock.enterBusy()) { - LOG.info("Skipping Meta Storage configuration update because the node is stopping"); - - return completedFuture(null); - } - - try { - Set peers = raftService.peers().stream() - .map(Peer::consistentId) - .collect(toSet()); - - Set learners = nodes.stream() - .map(ClusterNode::name) - .filter(name -> !peers.contains(name)) - .collect(toSet()); - - if (learners.isEmpty()) { - return completedFuture(null); - } - - if (LOG.isInfoEnabled()) { - LOG.info("New Meta Storage learners detected: " + learners); - } - - PeersAndLearners newConfiguration = PeersAndLearners.fromConsistentIds(peers, learners); - - return raftService.addLearners(newConfiguration.learners()); - } finally { - busyLock.leaveBusy(); - } - } - - private CompletableFuture isCurrentNodeLeader(RaftGroupService raftService) { - String name = clusterService.topologyService().localMember().name(); - - return raftService.refreshLeader() - .thenApply(v -> raftService.leader().consistentId().equals(name)); - } - - /** {@inheritDoc} */ @Override public void start() { this.appliedRevision = readAppliedRevision().join(); @@ -304,7 +202,6 @@ public void start() { }); } - /** {@inheritDoc} */ @Override public void stop() throws Exception { if (!isStopped.compareAndSet(false, true)) { @@ -404,7 +301,7 @@ public CompletableFuture> getAll(Set keys) { * * @see MetaStorageService#getAll(Set, long) */ - public @NotNull CompletableFuture> getAll(Set keys, long revUpperBound) { + public CompletableFuture> getAll(Set keys, long revUpperBound) { if (!busyLock.enterBusy()) { return CompletableFuture.failedFuture(new NodeStoppingException()); } @@ -421,7 +318,7 @@ public CompletableFuture> getAll(Set keys) { * * @see MetaStorageService#put(ByteArray, byte[]) */ - public @NotNull CompletableFuture put(@NotNull ByteArray key, byte[] val) { + public CompletableFuture put(ByteArray key, byte[] val) { if (!busyLock.enterBusy()) { return CompletableFuture.failedFuture(new NodeStoppingException()); } @@ -438,7 +335,7 @@ public CompletableFuture> getAll(Set keys) { * * @see MetaStorageService#getAndPut(ByteArray, byte[]) */ - public @NotNull CompletableFuture getAndPut(@NotNull ByteArray key, byte[] val) { + public CompletableFuture getAndPut(ByteArray key, byte[] val) { if (!busyLock.enterBusy()) { return CompletableFuture.failedFuture(new NodeStoppingException()); } @@ -455,7 +352,7 @@ public CompletableFuture> getAll(Set keys) { * * @see MetaStorageService#putAll(Map) */ - public @NotNull CompletableFuture putAll(@NotNull Map vals) { + public CompletableFuture putAll(Map vals) { if (!busyLock.enterBusy()) { return CompletableFuture.failedFuture(new NodeStoppingException()); } @@ -472,7 +369,7 @@ public CompletableFuture> getAll(Set keys) { * * @see MetaStorageService#getAndPutAll(Map) */ - public @NotNull CompletableFuture> getAndPutAll(@NotNull Map vals) { + public CompletableFuture> getAndPutAll(Map vals) { if (!busyLock.enterBusy()) { return CompletableFuture.failedFuture(new NodeStoppingException()); } @@ -489,7 +386,7 @@ public CompletableFuture> getAll(Set keys) { * * @see MetaStorageService#remove(ByteArray) */ - public @NotNull CompletableFuture remove(@NotNull ByteArray key) { + public CompletableFuture remove(ByteArray key) { if (!busyLock.enterBusy()) { return CompletableFuture.failedFuture(new NodeStoppingException()); } @@ -506,7 +403,7 @@ public CompletableFuture> getAll(Set keys) { * * @see MetaStorageService#getAndRemove(ByteArray) */ - public @NotNull CompletableFuture getAndRemove(@NotNull ByteArray key) { + public CompletableFuture getAndRemove(ByteArray key) { if (!busyLock.enterBusy()) { return CompletableFuture.failedFuture(new NodeStoppingException()); } @@ -523,7 +420,7 @@ public CompletableFuture> getAll(Set keys) { * * @see MetaStorageService#removeAll(Set) */ - public @NotNull CompletableFuture removeAll(@NotNull Set keys) { + public CompletableFuture removeAll(Set keys) { if (!busyLock.enterBusy()) { return CompletableFuture.failedFuture(new NodeStoppingException()); } @@ -540,7 +437,7 @@ public CompletableFuture> getAll(Set keys) { * * @see MetaStorageService#getAndRemoveAll(Set) */ - public @NotNull CompletableFuture> getAndRemoveAll(@NotNull Set keys) { + public CompletableFuture> getAndRemoveAll(Set keys) { if (!busyLock.enterBusy()) { return CompletableFuture.failedFuture(new NodeStoppingException()); } diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageRaftGroupEventsListener.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageRaftGroupEventsListener.java new file mode 100644 index 000000000000..9fe455fff4c3 --- /dev/null +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageRaftGroupEventsListener.java @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.metastorage.impl; + +import static java.util.concurrent.CompletableFuture.allOf; +import static java.util.concurrent.CompletableFuture.completedFuture; +import static java.util.stream.Collectors.toSet; + +import java.util.List; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; +import java.util.function.Supplier; +import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener; +import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService; +import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.raft.Peer; +import org.apache.ignite.internal.raft.PeersAndLearners; +import org.apache.ignite.internal.raft.RaftGroupEventsListener; +import org.apache.ignite.internal.raft.service.RaftGroupService; +import org.apache.ignite.internal.util.IgniteSpinBusyLock; +import org.apache.ignite.network.ClusterNode; +import org.apache.ignite.network.ClusterService; + +/** + * Raft Group Events listener that registers Logical Topology listener for updating the list of Meta Storage Raft group listeners. + */ +public class MetaStorageRaftGroupEventsListener implements RaftGroupEventsListener { + private static final IgniteLogger LOG = Loggers.forClass(MetaStorageManagerImpl.class); + + private final IgniteSpinBusyLock busyLock; + + private final String nodeName; + + private final LogicalTopologyService logicalTopologyService; + + private final CompletableFuture metaStorageSvcFut; + + /** + * Future required to enable serialized processing of topology events. + * + *

While Logical Topology events are linearized, we usually start a bunch of async operations inside the event listener and need + * them to finish in the same order. + * + *

Multi-threaded access is guarded by {@code serializationFutureMux}. + */ + private CompletableFuture serializationFuture = null; + + private final Object serializationFutureMux = new Object(); + + MetaStorageRaftGroupEventsListener( + IgniteSpinBusyLock busyLock, + ClusterService clusterService, + LogicalTopologyService logicalTopologyService, + CompletableFuture metaStorageSvcFut + ) { + this.busyLock = busyLock; + this.nodeName = clusterService.localConfiguration().getName(); + this.logicalTopologyService = logicalTopologyService; + this.metaStorageSvcFut = metaStorageSvcFut; + } + + @Override + public void onLeaderElected(long term) { + synchronized (serializationFutureMux) { + registerTopologyEventListeners(); + + // Update learner configuration (in case we missed some topology updates) and initialize the serialization future. + serializationFuture = executeIfLeaderImpl(this::resetLearners); + } + } + + private void registerTopologyEventListeners() { + logicalTopologyService.addEventListener(new LogicalTopologyEventListener() { + @Override + public void onNodeValidated(ClusterNode validatedNode) { + executeIfLeader((service, term) -> addLearner(service.raftGroupService(), validatedNode)); + } + + @Override + public void onNodeInvalidated(ClusterNode invalidatedNode) { + executeIfLeader((service, term) -> { + CompletableFuture closeCursorsFuture = service.closeCursors(invalidatedNode.id()) + .exceptionally(e -> { + LOG.error("Unable to close cursor for " + invalidatedNode, e); + + return null; + }); + + CompletableFuture removeLearnersFuture = removeLearner(service.raftGroupService(), invalidatedNode); + + return allOf(closeCursorsFuture, removeLearnersFuture); + }); + } + + @Override + public void onNodeLeft(ClusterNode leftNode, LogicalTopologySnapshot newTopology) { + onNodeInvalidated(leftNode); + } + + @Override + public void onTopologyLeap(LogicalTopologySnapshot newTopology) { + executeIfLeader(MetaStorageRaftGroupEventsListener.this::resetLearners); + } + }); + } + + @FunctionalInterface + private interface OnLeaderAction { + CompletableFuture apply(MetaStorageServiceImpl service, long term); + } + + /** + * Executes the given action if the current node is the Meta Storage leader. + */ + private void executeIfLeader(OnLeaderAction action) { + if (!busyLock.enterBusy()) { + LOG.info("Skipping Meta Storage configuration update because the node is stopping"); + + return; + } + + try { + synchronized (serializationFutureMux) { + // We are definitely not a leader if the serialization future has not been initialized. + if (serializationFuture == null) { + return; + } + + serializationFuture = serializationFuture + // we don't care about exceptions here, they should be logged independently + .handle((v, e) -> executeIfLeaderImpl(action)) + .thenCompose(Function.identity()); + } + } finally { + busyLock.leaveBusy(); + } + } + + private CompletableFuture executeIfLeaderImpl(OnLeaderAction action) { + return metaStorageSvcFut.thenCompose(service -> service.raftGroupService().refreshAndGetLeaderWithTerm() + .thenCompose(leaderWithTerm -> { + String leaderName = leaderWithTerm.leader().consistentId(); + + return leaderName.equals(nodeName) ? action.apply(service, leaderWithTerm.term()) : completedFuture(null); + })); + } + + private CompletableFuture addLearner(RaftGroupService raftService, ClusterNode learner) { + return updateConfigUnderLock(() -> isPeer(raftService, learner) + ? completedFuture(null) + : raftService.addLearners(List.of(new Peer(learner.name())))); + } + + private static boolean isPeer(RaftGroupService raftService, ClusterNode node) { + return raftService.peers().stream().anyMatch(peer -> peer.consistentId().equals(node.name())); + } + + private CompletableFuture removeLearner(RaftGroupService raftService, ClusterNode learner) { + return updateConfigUnderLock(() -> logicalTopologyService.validatedNodesOnLeader() + .thenCompose(validatedNodes -> updateConfigUnderLock(() -> { + if (isPeer(raftService, learner)) { + return completedFuture(null); + } + + // Due to possible races, we can have multiple versions of the same node in the validated set. We only remove + // a learner if there are no such versions left. + if (validatedNodes.stream().anyMatch(n -> n.name().equals(learner.name()))) { + return completedFuture(null); + } + + return raftService.removeLearners(List.of(new Peer(learner.name()))); + }))); + } + + private CompletableFuture resetLearners(MetaStorageServiceImpl service, long term) { + return updateConfigUnderLock(() -> logicalTopologyService.validatedNodesOnLeader() + .thenCompose(validatedNodes -> updateConfigUnderLock(() -> { + RaftGroupService raftService = service.raftGroupService(); + + Set peers = raftService.peers().stream().map(Peer::consistentId).collect(toSet()); + + Set learners = validatedNodes.stream() + .map(ClusterNode::name) + .filter(name -> !peers.contains(name)) + .collect(toSet()); + + PeersAndLearners newPeerConfiguration = PeersAndLearners.fromConsistentIds(peers, learners); + + // We can't use 'resetLearners' call here because it does not support empty lists of learners. + return raftService.changePeersAsync(newPeerConfiguration, term); + }))); + } + + private CompletableFuture updateConfigUnderLock(Supplier> action) { + if (!busyLock.enterBusy()) { + LOG.info("Skipping Meta Storage configuration update because the node is stopping"); + + return completedFuture(null); + } + + try { + return action.get() + .whenComplete((v, e) -> { + if (e != null) { + LOG.error("Unable to change peers on topology update", e); + } + }); + } finally { + busyLock.leaveBusy(); + } + } +} diff --git a/modules/placement-driver/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java b/modules/placement-driver/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java index d37dd7e70f3f..47d16c65cf68 100644 --- a/modules/placement-driver/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java +++ b/modules/placement-driver/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java @@ -367,12 +367,12 @@ public CompletableFuture addLearners(Collection learners) { } @Override - public CompletableFuture removeLearners(List learners) { + public CompletableFuture removeLearners(Collection learners) { return raftClient.removeLearners(learners); } @Override - public CompletableFuture resetLearners(List learners) { + public CompletableFuture resetLearners(Collection learners) { return raftClient.resetLearners(learners); } diff --git a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftGroupEventsListener.java b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftGroupEventsListener.java index 6647bfeb5ca8..fe9be4c453db 100644 --- a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftGroupEventsListener.java +++ b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftGroupEventsListener.java @@ -33,7 +33,7 @@ public interface RaftGroupEventsListener { * * @param configuration New Raft group configuration. */ - void onNewPeersConfigurationApplied(PeersAndLearners configuration); + default void onNewPeersConfigurationApplied(PeersAndLearners configuration) {} /** * Invoked on the leader if membership reconfiguration failed, because of {@link Status}. @@ -42,22 +42,10 @@ public interface RaftGroupEventsListener { * @param configuration Configuration that failed to be applied. * @param term Raft term of the current leader. */ - void onReconfigurationError(Status status, PeersAndLearners configuration, long term); + default void onReconfigurationError(Status status, PeersAndLearners configuration, long term) {} /** * No-op raft group events listener. */ - RaftGroupEventsListener noopLsnr = new RaftGroupEventsListener() { - /** {@inheritDoc} */ - @Override - public void onLeaderElected(long term) { } - - /** {@inheritDoc} */ - @Override - public void onNewPeersConfigurationApplied(PeersAndLearners configuration) { } - - /** {@inheritDoc} */ - @Override - public void onReconfigurationError(Status status, PeersAndLearners configuration, long term) {} - }; + RaftGroupEventsListener noopLsnr = term -> {}; } diff --git a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupService.java b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupService.java index 72eeaa453d9a..e86f83e994c6 100644 --- a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupService.java +++ b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupService.java @@ -181,10 +181,10 @@ public interface RaftGroupService { * *

This operation is executed on a group leader. * - * @param learners List of learners. + * @param learners Collection of learners. * @return A future. */ - CompletableFuture removeLearners(List learners); + CompletableFuture removeLearners(Collection learners); /** * Set learners of the raft group to needed list of learners. @@ -194,10 +194,10 @@ public interface RaftGroupService { * *

This operation is executed on a group leader. * - * @param learners List of learners. + * @param learners Collection of learners. * @return A future. */ - CompletableFuture resetLearners(List learners); + CompletableFuture resetLearners(Collection learners); /** * Takes a state machine snapshot on a given group peer. diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java index b806120105f2..b5156316fd49 100644 --- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java +++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java @@ -360,7 +360,7 @@ public CompletableFuture addLearners(Collection learners) { } @Override - public CompletableFuture removeLearners(List learners) { + public CompletableFuture removeLearners(Collection learners) { Peer leader = this.leader; if (leader == null) { @@ -378,7 +378,7 @@ public CompletableFuture removeLearners(List learners) { } @Override - public CompletableFuture resetLearners(List learners) { + public CompletableFuture resetLearners(Collection learners) { Peer leader = this.leader; if (leader == null) { diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java index fa44708e9900..a107f23c40bd 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java @@ -41,6 +41,7 @@ import org.apache.ignite.internal.cluster.management.configuration.ClusterManagementConfiguration; import org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorage; import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl; +import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl; import org.apache.ignite.internal.configuration.storage.ConfigurationStorageListener; import org.apache.ignite.internal.configuration.storage.DistributedConfigurationStorage; import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; @@ -127,14 +128,14 @@ private static class Node { raftManager = new Loza(clusterService, raftConfiguration, workDir, new HybridClockImpl()); var clusterStateStorage = new TestClusterStateStorage(); - var logicalTopologyService = new LogicalTopologyImpl(clusterStateStorage); + var logicalTopology = new LogicalTopologyImpl(clusterStateStorage); cmgManager = new ClusterManagementGroupManager( vaultManager, clusterService, raftManager, clusterStateStorage, - logicalTopologyService, + logicalTopology, clusterManagementConfiguration ); @@ -142,6 +143,7 @@ private static class Node { vaultManager, clusterService, cmgManager, + new LogicalTopologyServiceImpl(logicalTopology, cmgManager), raftManager, new SimpleInMemoryKeyValueStorage(name()) ); diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java index 791df8a8cb0b..7e14edb25676 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java @@ -35,6 +35,7 @@ import org.apache.ignite.internal.cluster.management.configuration.ClusterManagementConfiguration; import org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorage; import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl; +import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl; import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; import org.apache.ignite.internal.hlc.HybridClockImpl; @@ -101,14 +102,14 @@ private static class Node { raftManager = new Loza(clusterService, raftConfiguration, workDir, new HybridClockImpl()); var clusterStateStorage = new TestClusterStateStorage(); - var logicalTopologyService = new LogicalTopologyImpl(clusterStateStorage); + var logicalTopology = new LogicalTopologyImpl(clusterStateStorage); cmgManager = new ClusterManagementGroupManager( vaultManager, clusterService, raftManager, clusterStateStorage, - logicalTopologyService, + logicalTopology, clusterManagementConfiguration ); @@ -116,6 +117,7 @@ private static class Node { vaultManager, clusterService, cmgManager, + new LogicalTopologyServiceImpl(logicalTopology, cmgManager), raftManager, new SimpleInMemoryKeyValueStorage(name()) ); diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java index 13dd06aa18d2..a809d22747b4 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java @@ -69,6 +69,7 @@ import org.apache.ignite.internal.cluster.management.configuration.ClusterManagementConfiguration; import org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorage; import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl; +import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl; import org.apache.ignite.internal.configuration.ConfigurationManager; import org.apache.ignite.internal.configuration.NodeBootstrapConfiguration; import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; @@ -642,14 +643,14 @@ private class Node { txManager = new TxManagerImpl(replicaSvc, lockManager, hybridClock); var clusterStateStorage = new TestClusterStateStorage(); - var logicalTopologyService = new LogicalTopologyImpl(clusterStateStorage); + var logicalTopology = new LogicalTopologyImpl(clusterStateStorage); cmgManager = new ClusterManagementGroupManager( vaultManager, clusterService, raftManager, clusterStateStorage, - logicalTopologyService, + logicalTopology, clusterManagementConfiguration ); @@ -659,6 +660,7 @@ private class Node { vaultManager, clusterService, cmgManager, + new LogicalTopologyServiceImpl(logicalTopology, cmgManager), raftManager, testInfo.getTestMethod().get().isAnnotationPresent(UseRocksMetaStorage.class) ? new RocksDbKeyValueStorage(nodeName, resolveDir(dir, "metaStorage")) diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java index 6b5675902ef5..71e53c799e07 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java @@ -59,6 +59,7 @@ import org.apache.ignite.internal.cluster.management.configuration.ClusterManagementConfiguration; import org.apache.ignite.internal.cluster.management.raft.RocksDbClusterStateStorage; import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl; +import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl; import org.apache.ignite.internal.configuration.ConfigurationManager; import org.apache.ignite.internal.configuration.ConfigurationModule; import org.apache.ignite.internal.configuration.ConfigurationModules; @@ -269,14 +270,14 @@ private List startPartialNode( var clusterStateStorage = new RocksDbClusterStateStorage(dir.resolve("cmg")); - var logicalTopologyService = new LogicalTopologyImpl(clusterStateStorage); + var logicalTopology = new LogicalTopologyImpl(clusterStateStorage); var cmgManager = new ClusterManagementGroupManager( vault, clusterSvc, raftMgr, clusterStateStorage, - logicalTopologyService, + logicalTopology, clusterManagementConfiguration ); @@ -284,6 +285,7 @@ private List startPartialNode( vault, clusterSvc, cmgManager, + new LogicalTopologyServiceImpl(logicalTopology, cmgManager), raftMgr, new RocksDbKeyValueStorage(name, dir.resolve("metastorage")) ); diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java index c7f1b0247d43..afe6311409d8 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java @@ -349,6 +349,7 @@ public class IgniteImpl implements Ignite { vaultMgr, clusterSvc, cmgMgr, + logicalTopologyService, raftMgr, new RocksDbKeyValueStorage(name, workDir.resolve(METASTORAGE_DB_PATH)) );