Skip to content

Commit

Permalink
Block joins while applier is busy (#84919)
Browse files Browse the repository at this point in the history
Certain cluster appliers perform I/O or other heavy computations and in
extreme circumstances may take a significant amount of time to apply a
cluster state. The master considers a node to be unhealthy if it takes
too long to apply a cluster state. Nodes that time out like this are
removed from the cluster, but they will carry on with the slow cluster
state application regardless. If the application takes long enough then
there may be time for the node to rejoin the cluster, time out applying
the state resulting from the rejoin, and be removed again. This can
happen repeatedly and is disruptive to the rest of the cluster.

There is no point in trying to rejoin the cluster while the applier is
busy. With this commit we send the join request from the applier thread,
ensuring that it is not occupied with other work and will therefore be
available to apply the joining state. The commit also adds periodic
logging of the status of ongoing joins to help clarify why it is taking
longer than expected to rejoin the cluster.
  • Loading branch information
DaveCTurner committed May 6, 2022
1 parent dc6f1fd commit c88dd10
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 17 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/84919.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 84919
summary: Block joins while applier is busy
area: Cluster Coordination
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.elasticsearch.discovery;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.coordination.Coordinator;
import org.elasticsearch.cluster.coordination.JoinHelper;
Expand All @@ -27,6 +28,8 @@
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING;
import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING;
Expand Down Expand Up @@ -205,4 +208,47 @@ public void testNodeNotReachableFromMaster() throws Exception {
ensureStableCluster(3);
}

public void testJoinWaitsForClusterApplier() throws Exception {
startCluster(3);

final var masterName = internalCluster().getMasterName();
final var victimName = randomValueOtherThan(masterName, () -> randomFrom(internalCluster().getNodeNames()));

// drop the victim from the cluster with a network disruption
final var masterTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, masterName);
masterTransportService.addFailToSendNoConnectRule(internalCluster().getInstance(TransportService.class, victimName));
ensureStableCluster(2, masterName);

// block the cluster applier thread on the victim
final var barrier = new CyclicBarrier(2);
internalCluster().getInstance(ClusterService.class, victimName).getClusterApplierService().onNewClusterState("block", () -> {
try {
barrier.await(10, TimeUnit.SECONDS);
barrier.await(10, TimeUnit.SECONDS);
return null;
} catch (Exception e) {
throw new AssertionError(e);
}
}, ActionListener.wrap(() -> {}));
barrier.await(10, TimeUnit.SECONDS);

// verify that the victim sends no joins while the applier is blocked
final var victimTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, victimName);
victimTransportService.addSendBehavior((connection, requestId, action, request, options) -> {
assertNotEquals(action, JoinHelper.JOIN_ACTION_NAME);
connection.sendRequest(requestId, action, request, options);
});

// fix the network disruption
masterTransportService.clearAllRules();
ensureStableCluster(2, masterName);

// permit joins again
victimTransportService.addSendBehavior(null);

// release the cluster applier thread
barrier.await(10, TimeUnit.SECONDS);

ensureStableCluster(3);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ public Coordinator(
this.joinHelper = new JoinHelper(
allocationService,
masterService,
clusterApplier,
transportService,
this::getCurrentTerm,
this::handleJoinRequest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RerouteService;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterApplier;
import org.elasticsearch.cluster.service.ClusterApplierService;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
Expand Down Expand Up @@ -59,12 +61,11 @@ public class JoinHelper {
public static final String JOIN_ACTION_NAME = "internal:cluster/coordination/join";
public static final String JOIN_PING_ACTION_NAME = "internal:cluster/coordination/join/ping";

private final AllocationService allocationService;
private final MasterService masterService;
private final ClusterApplier clusterApplier;
private final TransportService transportService;
private final JoinTaskExecutor joinTaskExecutor;
private final LongSupplier currentTermSupplier;
private final RerouteService rerouteService;
private final NodeHealthService nodeHealthService;
private final JoinReasonService joinReasonService;

Expand All @@ -75,6 +76,7 @@ public class JoinHelper {
JoinHelper(
AllocationService allocationService,
MasterService masterService,
ClusterApplier clusterApplier,
TransportService transportService,
LongSupplier currentTermSupplier,
BiConsumer<JoinRequest, ActionListener<Void>> joinHandler,
Expand All @@ -83,12 +85,11 @@ public class JoinHelper {
NodeHealthService nodeHealthService,
JoinReasonService joinReasonService
) {
this.allocationService = allocationService;
this.masterService = masterService;
this.clusterApplier = clusterApplier;
this.transportService = transportService;
this.joinTaskExecutor = new JoinTaskExecutor(allocationService, rerouteService);
this.currentTermSupplier = currentTermSupplier;
this.rerouteService = rerouteService;
this.nodeHealthService = nodeHealthService;
this.joinReasonService = joinReasonService;

Expand Down Expand Up @@ -239,23 +240,52 @@ public void onResponse(Releasable connectionReference) {
// which point the NodeConnectionsService will have taken ownership of it.
registerConnection(destination, connectionReference);

pendingJoinInfo.message = PENDING_JOIN_WAITING_RESPONSE;
transportService.sendRequest(
destination,
JOIN_ACTION_NAME,
joinRequest,
TransportRequestOptions.of(null, TransportRequestOptions.Type.PING),
new TransportResponseHandler.Empty() {
// It's possible that our cluster applier is still applying an earlier cluster state (maybe stuck waiting on IO), in
// which case the master will accept our join and add us to the cluster but we won't be able to apply the joining state
// fast enough and will be kicked out of the cluster for lagging, which can happen repeatedly and be a little
// disruptive. To avoid this we send the join from the applier thread which ensures that it's not busy doing something
// else.
pendingJoinInfo.message = PENDING_JOIN_WAITING_APPLIER;
clusterApplier.onNewClusterState(
"joining " + destination.descriptionWithoutAttributes(),
() -> null,
new ActionListener<>() {
@Override
public void handleResponse(TransportResponse.Empty response) {
pendingJoinInfo.message = PENDING_JOIN_WAITING_STATE; // only logged if state delayed
pendingOutgoingJoins.remove(dedupKey);
logger.debug("successfully joined {} with {}", destination, joinRequest);
lastFailedJoinAttempt.set(null);
public void onResponse(Void unused) {
assert Thread.currentThread()
.getName()
.contains('[' + ClusterApplierService.CLUSTER_UPDATE_THREAD_NAME + ']')
|| Thread.currentThread().getName().startsWith("TEST-") : Thread.currentThread().getName();
pendingJoinInfo.message = PENDING_JOIN_WAITING_RESPONSE;
transportService.sendRequest(
destination,
JOIN_ACTION_NAME,
joinRequest,
TransportRequestOptions.of(null, TransportRequestOptions.Type.PING),
new TransportResponseHandler.Empty() {
@Override
public void handleResponse(TransportResponse.Empty response) {
pendingJoinInfo.message = PENDING_JOIN_WAITING_STATE; // only logged if state delayed
pendingOutgoingJoins.remove(dedupKey);
logger.debug("successfully joined {} with {}", destination, joinRequest);
lastFailedJoinAttempt.set(null);
}

@Override
public void handleException(TransportException exp) {
cleanUpOnFailure(exp);
}
}
);
}

@Override
public void handleException(TransportException exp) {
public void onFailure(Exception e) {
assert false : e; // no-op cluster state update cannot fail
cleanUpOnFailure(new TransportException(e));
}

private void cleanUpOnFailure(TransportException exp) {
pendingJoinInfo.message = PENDING_JOIN_FAILED;
pendingOutgoingJoins.remove(dedupKey);
FailedJoinAttempt attempt = new FailedJoinAttempt(destination, joinRequest, exp);
Expand Down Expand Up @@ -439,6 +469,7 @@ private static class PendingJoinInfo {

static final String PENDING_JOIN_INITIALIZING = "initializing";
static final String PENDING_JOIN_CONNECTING = "waiting to connect";
static final String PENDING_JOIN_WAITING_APPLIER = "waiting for local cluster applier";
static final String PENDING_JOIN_WAITING_RESPONSE = "waiting for response";
static final String PENDING_JOIN_WAITING_STATE = "waiting to receive cluster state";
static final String PENDING_JOIN_CONNECT_FAILED = "failed to connect";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public void testJoinDeduplication() {
JoinHelper joinHelper = new JoinHelper(
null,
null,
new NoOpClusterApplier(),
transportService,
() -> 0L,
(joinRequest, joinCallback) -> { throw new AssertionError(); },
Expand Down Expand Up @@ -217,6 +218,7 @@ public void testJoinFailureOnUnhealthyNodes() {
JoinHelper joinHelper = new JoinHelper(
null,
null,
new NoOpClusterApplier(),
transportService,
() -> 0L,
(joinRequest, joinCallback) -> { throw new AssertionError(); },
Expand Down

0 comments on commit c88dd10

Please sign in to comment.