Skip to content

Commit

Permalink
IGNITE-18554 Remove MetaStorage learners on topology events (apache#1542
Browse files Browse the repository at this point in the history
)
  • Loading branch information
sashapolo authored and lowka committed Mar 18, 2023
1 parent 8aae53d commit 798ee95
Show file tree
Hide file tree
Showing 21 changed files with 743 additions and 240 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.network.ClusterNode;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtendWith;
Expand Down Expand Up @@ -139,6 +140,7 @@ void testInitCancel(TestInfo testInfo) throws Exception {
/**
* Tests a scenario when a node is restarted.
*/
@Disabled("https://issues.apache.org/jira/browse/IGNITE-18795")
@Test
void testNodeRestart(TestInfo testInfo) throws Exception {
startCluster(2, testInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,8 @@
import org.apache.ignite.internal.properties.IgniteProductVersion;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.PeersAndLearners;
import org.apache.ignite.internal.raft.RaftGroupEventsListener;
import org.apache.ignite.internal.raft.RaftManager;
import org.apache.ignite.internal.raft.RaftNodeId;
import org.apache.ignite.internal.raft.Status;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
Expand Down Expand Up @@ -522,7 +520,7 @@ private CompletableFuture<CmgRaftService> startCmgRaftService(Set<String> nodeNa
new RaftNodeId(CmgGroupId.INSTANCE, serverPeer),
raftConfiguration,
new CmgRaftGroupListener(clusterStateStorage, logicalTopology, this::onLogicalTopologyChanged),
createCmgRaftGroupEventsListener()
this::onElectedAsLeader
)
.thenApply(service -> new CmgRaftService(service, clusterService, logicalTopology));
} catch (Exception e) {
Expand All @@ -549,25 +547,6 @@ private void onLogicalTopologyChanged(long term) {
}
}

private RaftGroupEventsListener createCmgRaftGroupEventsListener() {
return new RaftGroupEventsListener() {
@Override
public void onLeaderElected(long term) {
ClusterManagementGroupManager.this.onElectedAsLeader(term);
}

@Override
public void onNewPeersConfigurationApplied(PeersAndLearners configuration) {
// No-op.
}

@Override
public void onReconfigurationError(Status status, PeersAndLearners configuration, long term) {
// No-op.
}
};
}

/**
* Starts the CMG Raft service using the given {@code state} and persists it to the local storage.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,13 @@ private Serializable completeValidation(JoinReadyCommand command) {
ClusterNode node = command.node().asClusterNode();

if (validationManager.isNodeValidated(node)) {
logicalTopology.putNode(node);
validationManager.completeValidation(node);

LOG.info("Node added to the logical topology [node={}]", node.name());
logicalTopology.putNode(node);

validationManager.completeValidation(node);
if (LOG.isInfoEnabled()) {
LOG.info("Node added to the logical topology [node={}]", node.name());
}

return null;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@

package org.apache.ignite.internal.cluster.management.raft;

import static java.util.stream.Collectors.toSet;

import java.util.Collection;
import java.util.Comparator;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;
import org.apache.ignite.internal.cluster.management.ClusterState;
import org.apache.ignite.internal.cluster.management.ClusterTag;
import org.apache.ignite.internal.cluster.management.raft.commands.InitCmgStateCommand;
Expand Down Expand Up @@ -90,7 +91,7 @@ static ValidationResult validateState(@Nullable ClusterState state, ClusterNode
}

/**
* Validates a given node and issues a validation token.
* Validates a given node and saves it in the set of validated nodes.
*
* @return {@code null} in case of successful validation or a {@link ValidationErrorResponse} otherwise.
*/
Expand Down Expand Up @@ -134,16 +135,17 @@ void putValidatedNode(ClusterNode node) {
void removeValidatedNodes(Collection<ClusterNode> nodes) {
Set<String> validatedNodeIds = storage.getValidatedNodes().stream()
.map(ClusterNode::id)
// Using a sorted collection to have a stable notification order.
.collect(Collectors.toCollection(TreeSet::new));
.collect(toSet());

nodes.forEach(node -> {
if (validatedNodeIds.contains(node.id())) {
storage.removeValidatedNode(node);
// Using a sorted stream to have a stable notification order.
nodes.stream()
.filter(node -> validatedNodeIds.contains(node.id()))
.sorted(Comparator.comparing(ClusterNode::id))
.forEach(node -> {
storage.removeValidatedNode(node);

logicalTopology.onNodeInvalidated(node);
}
});
logicalTopology.onNodeInvalidated(node);
});
}

/**
Expand All @@ -152,6 +154,16 @@ void removeValidatedNodes(Collection<ClusterNode> nodes) {
* @param node Node that wishes to join the logical topology.
*/
void completeValidation(ClusterNode node) {
// Remove all other versions of this node, if they were validated at some point, but not removed from the physical topology.
storage.getValidatedNodes().stream()
.filter(n -> n.name().equals(node.name()) && !n.id().equals(node.id()))
.sorted(Comparator.comparing(ClusterNode::id))
.forEach(nodeVersion -> {
storage.removeValidatedNode(nodeVersion);

logicalTopology.onNodeInvalidated(nodeVersion);
});

storage.removeValidatedNode(node);
}
}
2 changes: 2 additions & 0 deletions modules/metastorage/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ dependencies {
integrationTestImplementation testFixtures(project(':ignite-raft'))
integrationTestImplementation testFixtures(project(':ignite-configuration'))
integrationTestImplementation testFixtures(project(':ignite-vault'))
integrationTestImplementation testFixtures(project(':ignite-metastorage'))
integrationTestImplementation testFixtures(project(':ignite-cluster-management'))

testFixturesImplementation project(':ignite-core')
testFixturesImplementation project(':ignite-rocksdb-common')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@
import static org.mockito.Mockito.when;

import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;
import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.hlc.HybridClockImpl;
Expand All @@ -52,8 +52,7 @@
import org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.vault.VaultEntry;
import org.apache.ignite.internal.vault.VaultManager;
Expand All @@ -72,9 +71,8 @@
/**
* Integration tests for {@link MetaStorageManagerImpl}.
*/
@ExtendWith(WorkDirectoryExtension.class)
@ExtendWith(ConfigurationExtension.class)
public class ItMetaStorageManagerImplTest {
public class ItMetaStorageManagerImplTest extends IgniteAbstractTest {
private VaultManager vaultManager;

private ClusterService clusterService;
Expand All @@ -86,8 +84,7 @@ public class ItMetaStorageManagerImplTest {
private MetaStorageManagerImpl metaStorageManager;

@BeforeEach
void setUp(TestInfo testInfo, @WorkDirectory Path workDir, @InjectConfiguration RaftConfiguration raftConfiguration)
throws NodeStoppingException {
void setUp(TestInfo testInfo, @InjectConfiguration RaftConfiguration raftConfiguration) throws NodeStoppingException {
var addr = new NetworkAddress("localhost", 10_000);

clusterService = clusterService(testInfo, addr.port(), new StaticNodeFinder(List.of(addr)));
Expand All @@ -107,6 +104,7 @@ void setUp(TestInfo testInfo, @WorkDirectory Path workDir, @InjectConfiguration
vaultManager,
clusterService,
cmgManager,
mock(LogicalTopologyService.class),
raftManager,
storage
);
Expand Down Expand Up @@ -266,6 +264,7 @@ void testMetaStorageStopBeforeRaftServiceStarted() throws Exception {
vaultManager,
clusterService,
cmgManager,
mock(LogicalTopologyService.class),
raftManager,
storage
);
Expand Down

0 comments on commit 798ee95

Please sign in to comment.