Skip to content

Commit

Permalink
Fix race conditions in master stability polling (#88874)
Browse files Browse the repository at this point in the history
This fixes some possible race conditions in the cluster formation polling of the stable master code.
It also prevents the list of tasks from growing indefinitely.
  • Loading branch information
masseyke committed Aug 1, 2022
1 parent 83136ef commit 579692d
Show file tree
Hide file tree
Showing 3 changed files with 204 additions and 110 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.cluster.coordination;

import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.disruption.BlockClusterStateProcessing;
import org.elasticsearch.threadpool.Scheduler;
import org.junit.Before;

import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;

import static org.hamcrest.Matchers.emptyOrNullString;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, autoManageMasterNodes = false)
public class CoordinationDiagnosticsServiceIT extends ESIntegTestCase {
@Before
private void setBootstrapMasterNodeIndex() {
internalCluster().setBootstrapMasterNodeIndex(0);
}

public void testBlockClusterStateProcessingOnOneNode() throws Exception {
/*
* This test picks a node that is not elected master, and then blocks cluster state processing on it. The reason is so that we
* can call CoordinationDiagnosticsService#beginPollingClusterFormationInfo without a cluster changed event resulting in the
* values we pass in being overwritten.
*/
final List<String> nodeNames = internalCluster().startNodes(3);

final String master = internalCluster().getMasterName();
assertThat(nodeNames, hasItem(master));
String blockedNode = nodeNames.stream().filter(n -> n.equals(master) == false).findAny().get();
assertNotNull(blockedNode);

DiscoveryNodes discoveryNodes = internalCluster().getInstance(ClusterService.class, master).state().nodes();
Set<DiscoveryNode> nodesWithoutBlockedNode = discoveryNodes.getNodes()
.values()
.stream()
.filter(n -> n.getName().equals(blockedNode) == false)
.collect(Collectors.toSet());

BlockClusterStateProcessing disruption = new BlockClusterStateProcessing(blockedNode, random());
internalCluster().setDisruptionScheme(disruption);
// stop processing cluster state changes
disruption.startDisrupting();

CoordinationDiagnosticsService diagnosticsOnBlockedNode = internalCluster().getInstance(
CoordinationDiagnosticsService.class,
blockedNode
);
ConcurrentMap<DiscoveryNode, CoordinationDiagnosticsService.ClusterFormationStateOrException> nodeToClusterFormationStateMap =
new ConcurrentHashMap<>();
ConcurrentHashMap<DiscoveryNode, Scheduler.Cancellable> cancellables = new ConcurrentHashMap<>();
diagnosticsOnBlockedNode.clusterFormationResponses = nodeToClusterFormationStateMap;
diagnosticsOnBlockedNode.clusterFormationInfoTasks = cancellables;

diagnosticsOnBlockedNode.beginPollingClusterFormationInfo(
nodesWithoutBlockedNode,
nodeToClusterFormationStateMap::put,
cancellables
);

// while the node is blocked from processing cluster state changes it should reach out to the other 2
// master eligible nodes and get a successful response
assertBusy(() -> {
assertThat(cancellables.size(), is(2));
assertThat(nodeToClusterFormationStateMap.size(), is(2));
nodesWithoutBlockedNode.forEach(node -> {
CoordinationDiagnosticsService.ClusterFormationStateOrException result = nodeToClusterFormationStateMap.get(node);
assertNotNull(result);
assertNotNull(result.clusterFormationState());
assertNull(result.exception());
ClusterFormationFailureHelper.ClusterFormationState clusterFormationState = result.clusterFormationState();
assertThat(clusterFormationState.getDescription(), not(emptyOrNullString()));
});
});

disruption.stopDisrupting();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
Expand Down Expand Up @@ -86,12 +85,13 @@ public class CoordinationDiagnosticsService implements ClusterStateListener {
private final int unacceptableIdentityChanges;

/*
* This is a list of tasks that are periodically reaching out to other master eligible nodes to get their ClusterFormationStates for
* diagnosis.
* This is a Map of tasks that are periodically reaching out to other master eligible nodes to get their ClusterFormationStates for
* diagnosis. The key is the DisoveryNode for the master eligible node being polled, and the value is a Cancellable.
* The field is accessed (reads/writes) from multiple threads, but the reference itself is only ever changed on the cluster change
* event thread.
*/
private volatile List<Scheduler.Cancellable> clusterFormationInfoTasks = null;
// Non-private for testing
volatile Map<DiscoveryNode, Scheduler.Cancellable> clusterFormationInfoTasks = null;
/*
* This field holds the results of the tasks in the clusterFormationInfoTasks field above. The field is accessed (reads/writes) from
* multiple threads, but the reference itself is only ever changed on the cluster change event thread.
Expand Down Expand Up @@ -612,9 +612,9 @@ public void clusterChanged(ClusterChangedEvent event) {
}
if (currentMaster == null && clusterService.localNode().isMasterNode()) {
/*
* This begins polling all master-eligible nodes for cluster formation information. However there's a 10-second delay before it
* starts, so in the normal situation where during a master transition it flips from master1 -> null -> master2, it the
* polling tasks will be canceled before any requests are actually made.
* This begins polling all master-eligible nodes for cluster formation information. However there's a 10-second delay
* before it starts, so in the normal situation where during a master transition it flips from master1 -> null ->
* master2 the polling tasks will be canceled before any requests are actually made.
*/
beginPollingClusterFormationInfo();
} else {
Expand All @@ -626,36 +626,41 @@ public void clusterChanged(ClusterChangedEvent event) {
* This method begins polling all known master-eligible nodes for cluster formation information. After a 10-second initial delay, it
* polls each node every 10 seconds until cancelPollingClusterFormationInfo() is called.
*/
private void beginPollingClusterFormationInfo() {
void beginPollingClusterFormationInfo() {
assert ThreadPool.assertCurrentThreadPool(ClusterApplierService.CLUSTER_UPDATE_THREAD_NAME);
cancelPollingClusterFormationInfo();
ConcurrentMap<DiscoveryNode, ClusterFormationStateOrException> responses = new ConcurrentHashMap<>();
List<Scheduler.Cancellable> cancellables = new CopyOnWriteArrayList<>();
beginPollingClusterFormationInfo(getMasterEligibleNodes(), responses::put, cancellables::add);
clusterFormationResponses = responses;
Map<DiscoveryNode, Scheduler.Cancellable> cancellables = new ConcurrentHashMap<>();
/*
* Assignment of clusterFormationInfoTasks must be done before the call to beginPollingClusterFormationInfo because it is used
* asynchronously by rescheduleFetchConsumer, called from beginPollingClusterFormationInfo.
*/
clusterFormationInfoTasks = cancellables;
clusterFormationResponses = responses;
beginPollingClusterFormationInfo(getMasterEligibleNodes(), responses::put, cancellables);
}

/**
* This method returns quickly, but in the background schedules to query the remote node's cluster formation state in 10 seconds, and
* repeats doing that until cancel() is called on all of the Cancellable that this method inserts into cancellables. This method
* exists (rather than being just part of the beginPollingClusterFormationInfo() above) in order to facilitate unit testing.
* @param nodeResponseConsumer A consumer for any results produced for a node by this method
* @param cancellableConsumer A consumer for any Cancellable tasks produced by this method
* @param cancellables The Map of Cancellables, one for each node being polled
*/
// Non-private for testing
void beginPollingClusterFormationInfo(
Collection<DiscoveryNode> masterEligibleNodes,
BiConsumer<DiscoveryNode, ClusterFormationStateOrException> nodeResponseConsumer,
Consumer<Scheduler.Cancellable> cancellableConsumer
Map<DiscoveryNode, Scheduler.Cancellable> cancellables
) {
masterEligibleNodes.forEach(masterEligibleNode -> {
Consumer<ClusterFormationStateOrException> responseConsumer = result -> nodeResponseConsumer.accept(masterEligibleNode, result);
try {
cancellableConsumer.accept(
cancellables.put(
masterEligibleNode,
fetchClusterFormationInfo(
masterEligibleNode,
responseConsumer.andThen(rescheduleFetchConsumer(masterEligibleNode, responseConsumer, cancellableConsumer))
responseConsumer.andThen(rescheduleFetchConsumer(masterEligibleNode, responseConsumer, cancellables))
)
);
} catch (EsRejectedExecutionException e) {
Expand All @@ -673,38 +678,69 @@ void beginPollingClusterFormationInfo(
* completed, adding the resulting Cancellable to cancellableConsumer.
* @param masterEligibleNode The node being polled
* @param responseConsumer The response consumer to be wrapped
* @param cancellableConsumer The list of Cancellables
* @param cancellables The Map of Cancellables, one for each node being polled
* @return
*/
private Consumer<CoordinationDiagnosticsService.ClusterFormationStateOrException> rescheduleFetchConsumer(
DiscoveryNode masterEligibleNode,
Consumer<CoordinationDiagnosticsService.ClusterFormationStateOrException> responseConsumer,
Consumer<Scheduler.Cancellable> cancellableConsumer
Map<DiscoveryNode, Scheduler.Cancellable> cancellables
) {
return response -> {
try {
cancellableConsumer.accept(
fetchClusterFormationInfo(
masterEligibleNode,
responseConsumer.andThen(rescheduleFetchConsumer(masterEligibleNode, responseConsumer, cancellableConsumer))
)
);
} catch (EsRejectedExecutionException e) {
if (e.isExecutorShutdown()) {
logger.trace("Not rescheduling request for cluster coordination info because this node is being shutdown", e);
/*
* If clusterFormationInfoTasks is null, that means that cancelPollingClusterFormationInfo() has been called, so we don't
* want to run anything new, and we want to cancel anything that might still be running in our cancellables just to be safe.
*/
if (clusterFormationInfoTasks != null) {
/*
* If cancellables is not the same as clusterFormationInfoTasks, that means that the current polling track has been
* cancelled and a new polling track has been started. So we don't want to run anything new, and we want to cancel
* anything that might still be running in our cancellables just to be safe. Note that it is possible for
* clusterFormationInfoTasks to be null at this point (since it is assigned in a different thread), so it is important
* that we don't call equals on it.
*/
if (cancellables.equals(clusterFormationInfoTasks)) {
/*
* As mentioned in the comment in cancelPollingClusterFormationInfo(), there is a slim possibility here that we will
* add a task here for a poll that has already been cancelled. But when it completes and runs rescheduleFetchConsumer()
* we will then see that clusterFormationInfoTasks does not equal cancellables, so it will not be run again.
*/
try {
cancellables.put(
masterEligibleNode,
fetchClusterFormationInfo(
masterEligibleNode,
responseConsumer.andThen(rescheduleFetchConsumer(masterEligibleNode, responseConsumer, cancellables))
)
);
} catch (EsRejectedExecutionException e) {
if (e.isExecutorShutdown()) {
logger.trace("Not rescheduling request for cluster coordination info because this node is being shutdown", e);
} else {
throw e;
}
}
} else {
throw e;
cancellables.values().forEach(Scheduler.Cancellable::cancel);
}
} else {
cancellables.values().forEach(Scheduler.Cancellable::cancel);
}
};
}

private void cancelPollingClusterFormationInfo() {
void cancelPollingClusterFormationInfo() {
assert ThreadPool.assertCurrentThreadPool(ClusterApplierService.CLUSTER_UPDATE_THREAD_NAME);
if (clusterFormationResponses != null) {
clusterFormationInfoTasks.forEach(Scheduler.Cancellable::cancel);
clusterFormationResponses = null;
if (clusterFormationInfoTasks != null) {
/*
* There is a slight risk here that a new Cancellable is added to clusterFormationInfoTasks after we begin iterating in the next
* line. We are calling this an acceptable risk because it will result in an un-cancelled un-cancellable task, but it will not
* reschedule itself so it will not be around long. It is possible that cancel() will be called on a Cancellable concurrently
* by multiple threads, but that will not cause any problems.
*/
clusterFormationInfoTasks.values().forEach(Scheduler.Cancellable::cancel);
clusterFormationInfoTasks = null;
clusterFormationResponses = null;
}
}

Expand Down

0 comments on commit 579692d

Please sign in to comment.