Skip to content

Commit

Permalink
Remove nodes with read-only filesystems (#52680) (#59138)
Browse files Browse the repository at this point in the history
Today we do not allow a node to start if its filesystem is readonly, but
it is possible for a filesystem to become readonly while the node is
running. We don't currently have any infrastructure in place to make
sure that Elasticsearch behaves well if this happens. A node that cannot
write to disk may be poisonous to the rest of the cluster.

With this commit we periodically verify that nodes' filesystems are
writable. If a node fails these writability checks then it is removed
from the cluster and prevented from re-joining until the checks start
passing again.

Closes #45286

Co-authored-by: Bukhtawar Khan <bukhtawar7152@gmail.com>
  • Loading branch information
DaveCTurner and Bukhtawar committed Jul 7, 2020
1 parent a8220ad commit 46c8d00
Show file tree
Hide file tree
Showing 26 changed files with 1,299 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1046,6 +1046,11 @@ private enum ElasticsearchExceptionHandle {
org.elasticsearch.indices.recovery.PeerRecoveryNotFound.class,
org.elasticsearch.indices.recovery.PeerRecoveryNotFound::new,
158,
Version.V_7_9_0),
NODE_HEALTH_CHECK_FAILURE_EXCEPTION(
org.elasticsearch.cluster.coordination.NodeHealthCheckFailureException.class,
org.elasticsearch.cluster.coordination.NodeHealthCheckFailureException::new,
159,
Version.V_7_9_0);

final Class<? extends ElasticsearchException> exceptionClass;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.gateway.GatewayMetaState;
import org.elasticsearch.monitor.StatusInfo;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Names;

Expand All @@ -44,6 +45,7 @@
import java.util.stream.StreamSupport;

import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING;
import static org.elasticsearch.monitor.StatusInfo.Status.UNHEALTHY;

public class ClusterFormationFailureHelper {
private static final Logger logger = LogManager.getLogger(ClusterFormationFailureHelper.class);
Expand Down Expand Up @@ -124,18 +126,24 @@ static class ClusterFormationState {
private final List<DiscoveryNode> foundPeers;
private final long currentTerm;
private final ElectionStrategy electionStrategy;
private final StatusInfo statusInfo;

ClusterFormationState(Settings settings, ClusterState clusterState, List<TransportAddress> resolvedAddresses,
List<DiscoveryNode> foundPeers, long currentTerm, ElectionStrategy electionStrategy) {
List<DiscoveryNode> foundPeers, long currentTerm, ElectionStrategy electionStrategy,
StatusInfo statusInfo) {
this.settings = settings;
this.clusterState = clusterState;
this.resolvedAddresses = resolvedAddresses;
this.foundPeers = foundPeers;
this.currentTerm = currentTerm;
this.electionStrategy = electionStrategy;
this.statusInfo = statusInfo;
}

String getDescription() {
if (statusInfo.getStatus() == UNHEALTHY) {
return String.format(Locale.ROOT, "this node is unhealthy: %s", statusInfo.getInfo());
}
final List<String> clusterStateNodes = StreamSupport.stream(clusterState.nodes().getMasterNodes().values().spliterator(), false)
.map(n -> n.value.toString()).collect(Collectors.toList());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@
import org.elasticsearch.discovery.SeedHostsProvider;
import org.elasticsearch.discovery.SeedHostsResolver;
import org.elasticsearch.discovery.zen.PendingClusterStateStats;
import org.elasticsearch.monitor.NodeHealthService;
import org.elasticsearch.monitor.StatusInfo;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.TransportResponse.Empty;
Expand All @@ -94,6 +96,7 @@
import static org.elasticsearch.cluster.coordination.NoMasterBlockService.NO_MASTER_BLOCK_ID;
import static org.elasticsearch.gateway.ClusterStateUpdaters.hideStateIfNotRecovered;
import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
import static org.elasticsearch.monitor.StatusInfo.Status.UNHEALTHY;

public class Coordinator extends AbstractLifecycleComponent implements Discovery {

Expand Down Expand Up @@ -153,6 +156,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
private Optional<Join> lastJoin;
private JoinHelper.JoinAccumulator joinAccumulator;
private Optional<CoordinatorPublication> currentPublication = Optional.empty();
private final NodeHealthService nodeHealthService;

/**
* @param nodeName The name of the node, used to name the {@link java.util.concurrent.ExecutorService} of the {@link SeedHostsResolver}.
Expand All @@ -162,7 +166,7 @@ public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSe
NamedWriteableRegistry namedWriteableRegistry, AllocationService allocationService, MasterService masterService,
Supplier<CoordinationState.PersistedState> persistedStateSupplier, SeedHostsProvider seedHostsProvider,
ClusterApplier clusterApplier, Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators, Random random,
RerouteService rerouteService, ElectionStrategy electionStrategy) {
RerouteService rerouteService, ElectionStrategy electionStrategy, NodeHealthService nodeHealthService) {
this.settings = settings;
this.transportService = transportService;
this.masterService = masterService;
Expand All @@ -172,7 +176,7 @@ public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSe
this.electionStrategy = electionStrategy;
this.joinHelper = new JoinHelper(settings, allocationService, masterService, transportService,
this::getCurrentTerm, this::getStateForMasterService, this::handleJoinRequest, this::joinLeaderInTerm, this.onJoinValidators,
rerouteService);
rerouteService, nodeHealthService);
this.persistedStateSupplier = persistedStateSupplier;
this.noMasterBlockService = new NoMasterBlockService(settings, clusterSettings);
this.lastKnownLeader = Optional.empty();
Expand All @@ -182,14 +186,16 @@ public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSe
this.publishInfoTimeout = PUBLISH_INFO_TIMEOUT_SETTING.get(settings);
this.random = random;
this.electionSchedulerFactory = new ElectionSchedulerFactory(settings, random, transportService.getThreadPool());
this.preVoteCollector = new PreVoteCollector(transportService, this::startElection, this::updateMaxTermSeen, electionStrategy);
this.preVoteCollector = new PreVoteCollector(transportService, this::startElection, this::updateMaxTermSeen, electionStrategy,
nodeHealthService);
configuredHostsResolver = new SeedHostsResolver(nodeName, settings, transportService, seedHostsProvider);
this.peerFinder = new CoordinatorPeerFinder(settings, transportService,
new HandshakingTransportAddressConnector(settings, transportService), configuredHostsResolver);
this.publicationHandler = new PublicationTransportHandler(transportService, namedWriteableRegistry,
this::handlePublishRequest, this::handleApplyCommit);
this.leaderChecker = new LeaderChecker(settings, transportService, this::onLeaderFailure);
this.followersChecker = new FollowersChecker(settings, transportService, this::onFollowerCheckRequest, this::removeNode);
this.leaderChecker = new LeaderChecker(settings, transportService, this::onLeaderFailure, nodeHealthService);
this.followersChecker = new FollowersChecker(settings, transportService, this::onFollowerCheckRequest, this::removeNode,
nodeHealthService);
this.nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, logger);
this.clusterApplier = clusterApplier;
masterService.setClusterStateSupplier(this::getStateForMasterService);
Expand All @@ -202,12 +208,13 @@ public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSe
transportService::getLocalNode);
this.clusterFormationFailureHelper = new ClusterFormationFailureHelper(settings, this::getClusterFormationState,
transportService.getThreadPool(), joinHelper::logLastFailedJoinAttempt);
this.nodeHealthService = nodeHealthService;
}

private ClusterFormationState getClusterFormationState() {
return new ClusterFormationState(settings, getStateForMasterService(), peerFinder.getLastResolvedAddresses(),
Stream.concat(Stream.of(getLocalNode()), StreamSupport.stream(peerFinder.getFoundPeers().spliterator(), false))
.collect(Collectors.toList()), getCurrentTerm(), electionStrategy);
.collect(Collectors.toList()), getCurrentTerm(), electionStrategy, nodeHealthService.getHealth());
}

private void onLeaderFailure(Exception e) {
Expand Down Expand Up @@ -1230,6 +1237,12 @@ public void run() {
return;
}

final StatusInfo statusInfo = nodeHealthService.getHealth();
if (statusInfo.getStatus() == UNHEALTHY) {
logger.debug("skip prevoting as local node is unhealthy: [{}]", statusInfo.getInfo());
return;
}

if (prevotingRound != null) {
prevotingRound.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.discovery.zen.NodesFaultDetection;
import org.elasticsearch.monitor.NodeHealthService;
import org.elasticsearch.monitor.StatusInfo;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.Transport;
Expand All @@ -57,6 +59,7 @@
import java.util.function.Predicate;

import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;
import static org.elasticsearch.monitor.StatusInfo.Status.UNHEALTHY;

/**
* The FollowersChecker is responsible for allowing a leader to check that its followers are still connected and healthy. On deciding that a
Expand Down Expand Up @@ -97,16 +100,17 @@ public class FollowersChecker {
private final Set<DiscoveryNode> faultyNodes = new HashSet<>();

private final TransportService transportService;

private final NodeHealthService nodeHealthService;
private volatile FastResponseState fastResponseState;

public FollowersChecker(Settings settings, TransportService transportService,
Consumer<FollowerCheckRequest> handleRequestAndUpdateState,
BiConsumer<DiscoveryNode, String> onNodeFailure) {
BiConsumer<DiscoveryNode, String> onNodeFailure, NodeHealthService nodeHealthService) {
this.settings = settings;
this.transportService = transportService;
this.handleRequestAndUpdateState = handleRequestAndUpdateState;
this.onNodeFailure = onNodeFailure;
this.nodeHealthService = nodeHealthService;

followerCheckInterval = FOLLOWER_CHECK_INTERVAL_SETTING.get(settings);
followerCheckTimeout = FOLLOWER_CHECK_TIMEOUT_SETTING.get(settings);
Expand Down Expand Up @@ -167,8 +171,15 @@ public void updateFastResponseState(final long term, final Mode mode) {
}

private void handleFollowerCheck(FollowerCheckRequest request, TransportChannel transportChannel) throws IOException {
FastResponseState responder = this.fastResponseState;
final StatusInfo statusInfo = nodeHealthService.getHealth();
if (statusInfo.getStatus() == UNHEALTHY) {
final String message
= "handleFollowerCheck: node is unhealthy [" + statusInfo.getInfo() + "], rejecting " + statusInfo.getInfo();
logger.debug(message);
throw new NodeHealthCheckFailureException(message);
}

final FastResponseState responder = this.fastResponseState;
if (responder.mode == Mode.FOLLOWER && responder.term == request.term) {
logger.trace("responding to {} on fast path", request);
transportChannel.sendResponse(Empty.INSTANCE);
Expand Down Expand Up @@ -340,6 +351,9 @@ public void handleException(TransportException exp) {
|| exp.getCause() instanceof ConnectTransportException) {
logger.debug(() -> new ParameterizedMessage("{} disconnected", FollowerChecker.this), exp);
reason = "disconnected";
} else if (exp.getCause() instanceof NodeHealthCheckFailureException) {
logger.debug(() -> new ParameterizedMessage("{} health check failed", FollowerChecker.this), exp);
reason = "health check failed";
} else {
logger.debug(() -> new ParameterizedMessage("{} failed, retrying", FollowerChecker.this), exp);
scheduleNextWakeUp();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import org.elasticsearch.discovery.zen.MembershipAction;
import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.monitor.NodeHealthService;
import org.elasticsearch.monitor.StatusInfo;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.TransportChannel;
Expand Down Expand Up @@ -71,6 +73,8 @@
import java.util.function.LongSupplier;
import java.util.function.Supplier;

import static org.elasticsearch.monitor.StatusInfo.Status.UNHEALTHY;

public class JoinHelper {

private static final Logger logger = LogManager.getLogger(JoinHelper.class);
Expand All @@ -90,6 +94,7 @@ public class JoinHelper {

@Nullable // if using single-node discovery
private final TimeValue joinTimeout;
private final NodeHealthService nodeHealthService;

private final Set<Tuple<DiscoveryNode, JoinRequest>> pendingOutgoingJoins = Collections.synchronizedSet(new HashSet<>());

Expand All @@ -98,9 +103,11 @@ public class JoinHelper {
JoinHelper(Settings settings, AllocationService allocationService, MasterService masterService,
TransportService transportService, LongSupplier currentTermSupplier, Supplier<ClusterState> currentStateSupplier,
BiConsumer<JoinRequest, JoinCallback> joinHandler, Function<StartJoinRequest, Join> joinLeaderInTerm,
Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators, RerouteService rerouteService) {
Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators, RerouteService rerouteService,
NodeHealthService nodeHealthService) {
this.masterService = masterService;
this.transportService = transportService;
this.nodeHealthService = nodeHealthService;
this.joinTimeout = DiscoveryModule.isSingleNodeDiscovery(settings) ? null : JOIN_TIMEOUT_SETTING.get(settings);
this.joinTaskExecutor = new JoinTaskExecutor(settings, allocationService, logger, rerouteService) {

Expand Down Expand Up @@ -268,6 +275,11 @@ void logLastFailedJoinAttempt() {

public void sendJoinRequest(DiscoveryNode destination, long term, Optional<Join> optionalJoin, Runnable onCompletion) {
assert destination.isMasterNode() : "trying to join master-ineligible " + destination;
final StatusInfo statusInfo = nodeHealthService.getHealth();
if (statusInfo.getStatus() == UNHEALTHY) {
logger.debug("dropping join request to [{}]: [{}]", destination, statusInfo.getInfo());
return;
}
final JoinRequest joinRequest = new JoinRequest(transportService.getLocalNode(), term, optionalJoin);
final Tuple<DiscoveryNode, JoinRequest> dedupKey = Tuple.tuple(destination, joinRequest);
if (pendingOutgoingJoins.add(dedupKey)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.zen.MasterFaultDetection;
import org.elasticsearch.monitor.NodeHealthService;
import org.elasticsearch.monitor.StatusInfo;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.NodeDisconnectedException;
Expand All @@ -55,6 +57,8 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

import static org.elasticsearch.monitor.StatusInfo.Status.UNHEALTHY;

/**
* The LeaderChecker is responsible for allowing followers to check that the currently elected leader is still connected and healthy. We are
* fairly lenient, possibly allowing multiple checks to fail before considering the leader to be faulty, to allow for the leader to
Expand Down Expand Up @@ -88,18 +92,21 @@ public class LeaderChecker {
private final int leaderCheckRetryCount;
private final TransportService transportService;
private final Consumer<Exception> onLeaderFailure;
private final NodeHealthService nodeHealthService;

private AtomicReference<CheckScheduler> currentChecker = new AtomicReference<>();

private volatile DiscoveryNodes discoveryNodes;

LeaderChecker(final Settings settings, final TransportService transportService, final Consumer<Exception> onLeaderFailure) {
LeaderChecker(final Settings settings, final TransportService transportService, final Consumer<Exception> onLeaderFailure,
NodeHealthService nodeHealthService) {
this.settings = settings;
leaderCheckInterval = LEADER_CHECK_INTERVAL_SETTING.get(settings);
leaderCheckTimeout = LEADER_CHECK_TIMEOUT_SETTING.get(settings);
leaderCheckRetryCount = LEADER_CHECK_RETRY_COUNT_SETTING.get(settings);
this.transportService = transportService;
this.onLeaderFailure = onLeaderFailure;
this.nodeHealthService = nodeHealthService;

transportService.registerRequestHandler(LEADER_CHECK_ACTION_NAME, Names.SAME, false, false, LeaderCheckRequest::new,
(request, channel, task) -> {
Expand Down Expand Up @@ -169,8 +176,13 @@ boolean currentNodeIsMaster() {
private void handleLeaderCheck(LeaderCheckRequest request) {
final DiscoveryNodes discoveryNodes = this.discoveryNodes;
assert discoveryNodes != null;

if (discoveryNodes.isLocalNodeElectedMaster() == false) {
final StatusInfo statusInfo = nodeHealthService.getHealth();
if (statusInfo.getStatus() == UNHEALTHY) {
final String message = "rejecting leader check from [" + request.getSender() + "] " +
"since node is unhealthy [" + statusInfo.getInfo() + "]";
logger.debug(message);
throw new NodeHealthCheckFailureException(message);
} else if (discoveryNodes.isLocalNodeElectedMaster() == false) {
logger.debug("rejecting leader check on non-master {}", request);
throw new CoordinationStateRejectedException(
"rejecting leader check from [" + request.getSender() + "] sent to a node that is no longer the master");
Expand Down Expand Up @@ -266,8 +278,12 @@ public void handleException(TransportException exp) {
"leader [{}] disconnected during check", leader), exp);
leaderFailed(new ConnectTransportException(leader, "disconnected during check", exp));
return;
} else if (exp.getCause() instanceof NodeHealthCheckFailureException) {
logger.debug(new ParameterizedMessage(
"leader [{}] health check failed", leader), exp);
leaderFailed(new NodeHealthCheckFailureException("node [" + leader + "] failed health checks", exp));
return;
}

long failureCount = failureCountSinceLastSuccess.incrementAndGet();
if (failureCount >= leaderCheckRetryCount) {
logger.debug(new ParameterizedMessage(
Expand Down
Loading

0 comments on commit 46c8d00

Please sign in to comment.