Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds resiliency to read-only filesystems #45286 #52680

Merged
merged 50 commits into from Jul 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
569e8cc
Merge pull request #2 from elastic/master
Bukhtawar Jul 4, 2019
64815f1
Merge remote-tracking branch 'upstream/master'
Bukhtawar Feb 22, 2020
b598944
[Initial DRAFT] Adds a FsHealthService that periodically tries to wr…
Bukhtawar Feb 23, 2020
d4fb892
Test case addition and PR comments
Bukhtawar Mar 25, 2020
38f1a4e
Merge remote-tracking branch 'upstream/master'
Bukhtawar Mar 25, 2020
f3ac906
Merge branch 'master' into ro-fs-handling
Bukhtawar Mar 25, 2020
79948f3
Changes for FsHealthService and tests
Bukhtawar Mar 25, 2020
20d9ba2
Review comments for simplication and better tests
Bukhtawar May 3, 2020
fa3ed38
Merge remote-tracking branch 'upstream/master'
Bukhtawar May 3, 2020
1646319
Merge branch 'master' into ro-fs-handling
Bukhtawar May 3, 2020
5305ebb
Fixing tests and check styles
Bukhtawar May 3, 2020
26fbce7
FsHealthService comments on slow IO
Bukhtawar May 5, 2020
8a86051
Restricting FS health checks to IOExceptions
Bukhtawar May 11, 2020
c9dd1a7
Addressing comments on logging and tests
Bukhtawar May 20, 2020
c99a68e
Minor edits
Bukhtawar May 20, 2020
545eaf5
Merge branch 'master' into ro-fs-handling
Bukhtawar May 27, 2020
86fa7c9
Updated the exception id
Bukhtawar May 27, 2020
8102c81
Merge branch 'master' into ro-fs-handling
Bukhtawar Jun 4, 2020
043db93
Fix merge conflict
DaveCTurner Jun 16, 2020
bbf5517
Fix spacing in StatusInfo#toString
DaveCTurner Jun 18, 2020
1459937
Tidy 'skip prevoting' log message
DaveCTurner Jun 18, 2020
8eb5e20
Tidy response messages in FollowersChecker
DaveCTurner Jun 18, 2020
2095d82
Tidy log message in JoinHelper
DaveCTurner Jun 18, 2020
39a0565
Tidy message in PreVoteCollector
DaveCTurner Jun 18, 2020
136bc44
Tidy info messages
DaveCTurner Jun 18, 2020
1ab13b2
Tidy tracing messages
DaveCTurner Jun 18, 2020
4143f8f
Tidy warn/error messages
DaveCTurner Jun 18, 2020
1d9a7ab
Fix up tests
DaveCTurner Jun 18, 2020
f222529
Fix too-short delay
DaveCTurner Jun 18, 2020
befd822
Minor fixes to Follower and FsHealthService
Bukhtawar Jun 18, 2020
061dd33
Fix assertions
Bukhtawar Jun 18, 2020
cda2179
Leader checks
Bukhtawar Jun 18, 2020
4d83de0
Leader check tests
Bukhtawar Jun 19, 2020
e41392f
cluster reduce stabilization time after unhealthy node
Bukhtawar Jun 19, 2020
67d49bb
Minor fix up
Bukhtawar Jun 19, 2020
fa3cc69
ClusterFormationFailureHelper changes and more tests
Bukhtawar Jun 19, 2020
89035fb
Minor changes to LeaderChecker
Bukhtawar Jun 21, 2020
adbe670
Pass StatusInfo to ClusterFormationState and simplify message
DaveCTurner Jun 24, 2020
fdcdf45
Whitespace
DaveCTurner Jun 24, 2020
deafeca
Imports
DaveCTurner Jun 24, 2020
1120428
Fixing Random
Bukhtawar Jun 24, 2020
23bc4e5
Merge remote-tracking branch 'upstream/master'
Bukhtawar Jun 24, 2020
06b14b8
Merge branch 'master' into ro-fs-handling
Bukhtawar Jun 24, 2020
56fb9b3
ForbiddenApis for charset
Bukhtawar Jun 24, 2020
0d7b72f
Fix logger
Bukhtawar Jun 24, 2020
f390ed8
Merge remote-tracking branch 'upstream/master' into ro-fs-handling
Bukhtawar Jun 24, 2020
f44cf0d
NPE handling
Bukhtawar Jun 29, 2020
97a4c02
Merge remote-tracking branch 'upstream/master' into ro-fs-handling
Bukhtawar Jun 29, 2020
54d7c98
Merge remote-tracking branch 'upstream/master' into ro-fs-handling
Bukhtawar Jul 2, 2020
aae5142
Merge remote-tracking branch 'upstream/master' into ro-fs-handling
Bukhtawar Jul 3, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -1046,7 +1046,12 @@ private enum ElasticsearchExceptionHandle {
org.elasticsearch.indices.recovery.PeerRecoveryNotFound.class,
org.elasticsearch.indices.recovery.PeerRecoveryNotFound::new,
158,
Version.V_7_9_0);
Version.V_7_9_0),
NODE_HEALTH_CHECK_FAILURE_EXCEPTION(
org.elasticsearch.cluster.coordination.NodeHealthCheckFailureException.class,
org.elasticsearch.cluster.coordination.NodeHealthCheckFailureException::new,
159,
Version.V_8_0_0);

final Class<? extends ElasticsearchException> exceptionClass;
final CheckedFunction<StreamInput, ? extends ElasticsearchException, IOException> constructor;
Expand Down
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.gateway.GatewayMetaState;
import org.elasticsearch.monitor.StatusInfo;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Names;

Expand All @@ -43,6 +44,7 @@
import java.util.stream.StreamSupport;

import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING;
import static org.elasticsearch.monitor.StatusInfo.Status.UNHEALTHY;

public class ClusterFormationFailureHelper {
private static final Logger logger = LogManager.getLogger(ClusterFormationFailureHelper.class);
Expand Down Expand Up @@ -123,18 +125,24 @@ static class ClusterFormationState {
private final List<DiscoveryNode> foundPeers;
private final long currentTerm;
private final ElectionStrategy electionStrategy;
private final StatusInfo statusInfo;

ClusterFormationState(Settings settings, ClusterState clusterState, List<TransportAddress> resolvedAddresses,
List<DiscoveryNode> foundPeers, long currentTerm, ElectionStrategy electionStrategy) {
List<DiscoveryNode> foundPeers, long currentTerm, ElectionStrategy electionStrategy,
StatusInfo statusInfo) {
this.settings = settings;
this.clusterState = clusterState;
this.resolvedAddresses = resolvedAddresses;
this.foundPeers = foundPeers;
this.currentTerm = currentTerm;
this.electionStrategy = electionStrategy;
this.statusInfo = statusInfo;
}

String getDescription() {
if (statusInfo.getStatus() == UNHEALTHY) {
return String.format(Locale.ROOT, "this node is unhealthy: %s", statusInfo.getInfo());
}
final List<String> clusterStateNodes = StreamSupport.stream(clusterState.nodes().getMasterNodes().values().spliterator(), false)
.map(n -> n.value.toString()).collect(Collectors.toList());

Expand Down
Expand Up @@ -67,6 +67,8 @@
import org.elasticsearch.discovery.PeerFinder;
import org.elasticsearch.discovery.SeedHostsProvider;
import org.elasticsearch.discovery.SeedHostsResolver;
import org.elasticsearch.monitor.NodeHealthService;
import org.elasticsearch.monitor.StatusInfo;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.TransportResponse.Empty;
Expand All @@ -91,6 +93,7 @@
import static org.elasticsearch.cluster.coordination.NoMasterBlockService.NO_MASTER_BLOCK_ID;
import static org.elasticsearch.gateway.ClusterStateUpdaters.hideStateIfNotRecovered;
import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
import static org.elasticsearch.monitor.StatusInfo.Status.UNHEALTHY;

public class Coordinator extends AbstractLifecycleComponent implements Discovery {

Expand Down Expand Up @@ -147,6 +150,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
private Optional<Join> lastJoin;
private JoinHelper.JoinAccumulator joinAccumulator;
private Optional<CoordinatorPublication> currentPublication = Optional.empty();
private final NodeHealthService nodeHealthService;

/**
* @param nodeName The name of the node, used to name the {@link java.util.concurrent.ExecutorService} of the {@link SeedHostsResolver}.
Expand All @@ -156,7 +160,7 @@ public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSe
NamedWriteableRegistry namedWriteableRegistry, AllocationService allocationService, MasterService masterService,
Supplier<CoordinationState.PersistedState> persistedStateSupplier, SeedHostsProvider seedHostsProvider,
ClusterApplier clusterApplier, Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators, Random random,
RerouteService rerouteService, ElectionStrategy electionStrategy) {
RerouteService rerouteService, ElectionStrategy electionStrategy, NodeHealthService nodeHealthService) {
this.settings = settings;
this.transportService = transportService;
this.masterService = masterService;
Expand All @@ -166,7 +170,7 @@ public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSe
this.electionStrategy = electionStrategy;
this.joinHelper = new JoinHelper(settings, allocationService, masterService, transportService,
this::getCurrentTerm, this::getStateForMasterService, this::handleJoinRequest, this::joinLeaderInTerm, this.onJoinValidators,
rerouteService);
rerouteService, nodeHealthService);
this.persistedStateSupplier = persistedStateSupplier;
this.noMasterBlockService = new NoMasterBlockService(settings, clusterSettings);
this.lastKnownLeader = Optional.empty();
Expand All @@ -176,14 +180,16 @@ public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSe
this.publishInfoTimeout = PUBLISH_INFO_TIMEOUT_SETTING.get(settings);
this.random = random;
this.electionSchedulerFactory = new ElectionSchedulerFactory(settings, random, transportService.getThreadPool());
this.preVoteCollector = new PreVoteCollector(transportService, this::startElection, this::updateMaxTermSeen, electionStrategy);
this.preVoteCollector = new PreVoteCollector(transportService, this::startElection, this::updateMaxTermSeen, electionStrategy,
nodeHealthService);
configuredHostsResolver = new SeedHostsResolver(nodeName, settings, transportService, seedHostsProvider);
this.peerFinder = new CoordinatorPeerFinder(settings, transportService,
new HandshakingTransportAddressConnector(settings, transportService), configuredHostsResolver);
this.publicationHandler = new PublicationTransportHandler(transportService, namedWriteableRegistry,
this::handlePublishRequest, this::handleApplyCommit);
this.leaderChecker = new LeaderChecker(settings, transportService, this::onLeaderFailure);
this.followersChecker = new FollowersChecker(settings, transportService, this::onFollowerCheckRequest, this::removeNode);
this.leaderChecker = new LeaderChecker(settings, transportService, this::onLeaderFailure, nodeHealthService);
this.followersChecker = new FollowersChecker(settings, transportService, this::onFollowerCheckRequest, this::removeNode,
nodeHealthService);
this.nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, logger);
this.clusterApplier = clusterApplier;
masterService.setClusterStateSupplier(this::getStateForMasterService);
Expand All @@ -194,12 +200,13 @@ public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSe
transportService::getLocalNode);
this.clusterFormationFailureHelper = new ClusterFormationFailureHelper(settings, this::getClusterFormationState,
transportService.getThreadPool(), joinHelper::logLastFailedJoinAttempt);
this.nodeHealthService = nodeHealthService;
}

private ClusterFormationState getClusterFormationState() {
return new ClusterFormationState(settings, getStateForMasterService(), peerFinder.getLastResolvedAddresses(),
Stream.concat(Stream.of(getLocalNode()), StreamSupport.stream(peerFinder.getFoundPeers().spliterator(), false))
.collect(Collectors.toList()), getCurrentTerm(), electionStrategy);
.collect(Collectors.toList()), getCurrentTerm(), electionStrategy, nodeHealthService.getHealth());
}

private void onLeaderFailure(Exception e) {
Expand Down Expand Up @@ -1197,6 +1204,12 @@ public void run() {
return;
}

final StatusInfo statusInfo = nodeHealthService.getHealth();
if (statusInfo.getStatus() == UNHEALTHY) {
logger.debug("skip prevoting as local node is unhealthy: [{}]", statusInfo.getInfo());
return;
}

if (prevotingRound != null) {
prevotingRound.close();
}
Expand Down
Expand Up @@ -31,6 +31,8 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.monitor.NodeHealthService;
import org.elasticsearch.monitor.StatusInfo;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.Transport;
Expand All @@ -54,6 +56,7 @@
import java.util.function.Predicate;

import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;
import static org.elasticsearch.monitor.StatusInfo.Status.UNHEALTHY;

/**
* The FollowersChecker is responsible for allowing a leader to check that its followers are still connected and healthy. On deciding that a
Expand Down Expand Up @@ -94,16 +97,17 @@ public class FollowersChecker {
private final Set<DiscoveryNode> faultyNodes = new HashSet<>();

private final TransportService transportService;

private final NodeHealthService nodeHealthService;
private volatile FastResponseState fastResponseState;

public FollowersChecker(Settings settings, TransportService transportService,
Consumer<FollowerCheckRequest> handleRequestAndUpdateState,
BiConsumer<DiscoveryNode, String> onNodeFailure) {
BiConsumer<DiscoveryNode, String> onNodeFailure, NodeHealthService nodeHealthService) {
this.settings = settings;
this.transportService = transportService;
this.handleRequestAndUpdateState = handleRequestAndUpdateState;
this.onNodeFailure = onNodeFailure;
this.nodeHealthService = nodeHealthService;

followerCheckInterval = FOLLOWER_CHECK_INTERVAL_SETTING.get(settings);
followerCheckTimeout = FOLLOWER_CHECK_TIMEOUT_SETTING.get(settings);
Expand Down Expand Up @@ -160,8 +164,15 @@ public void updateFastResponseState(final long term, final Mode mode) {
}

private void handleFollowerCheck(FollowerCheckRequest request, TransportChannel transportChannel) throws IOException {
FastResponseState responder = this.fastResponseState;
final StatusInfo statusInfo = nodeHealthService.getHealth();
if (statusInfo.getStatus() == UNHEALTHY) {
final String message
= "handleFollowerCheck: node is unhealthy [" + statusInfo.getInfo() + "], rejecting " + statusInfo.getInfo();
logger.debug(message);
throw new NodeHealthCheckFailureException(message);
}

final FastResponseState responder = this.fastResponseState;
if (responder.mode == Mode.FOLLOWER && responder.term == request.term) {
logger.trace("responding to {} on fast path", request);
transportChannel.sendResponse(Empty.INSTANCE);
Expand Down Expand Up @@ -323,6 +334,9 @@ public void handleException(TransportException exp) {
|| exp.getCause() instanceof ConnectTransportException) {
logger.debug(() -> new ParameterizedMessage("{} disconnected", FollowerChecker.this), exp);
reason = "disconnected";
} else if (exp.getCause() instanceof NodeHealthCheckFailureException) {
logger.debug(() -> new ParameterizedMessage("{} health check failed", FollowerChecker.this), exp);
reason = "health check failed";
} else {
logger.debug(() -> new ParameterizedMessage("{} failed, retrying", FollowerChecker.this), exp);
scheduleNextWakeUp();
Expand Down
Expand Up @@ -42,6 +42,8 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.monitor.NodeHealthService;
import org.elasticsearch.monitor.StatusInfo;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.TransportChannel;
Expand Down Expand Up @@ -69,6 +71,8 @@
import java.util.function.LongSupplier;
import java.util.function.Supplier;

import static org.elasticsearch.monitor.StatusInfo.Status.UNHEALTHY;

public class JoinHelper {

private static final Logger logger = LogManager.getLogger(JoinHelper.class);
Expand All @@ -88,6 +92,7 @@ public class JoinHelper {

@Nullable // if using single-node discovery
private final TimeValue joinTimeout;
private final NodeHealthService nodeHealthService;

private final Set<Tuple<DiscoveryNode, JoinRequest>> pendingOutgoingJoins = Collections.synchronizedSet(new HashSet<>());

Expand All @@ -96,9 +101,11 @@ public class JoinHelper {
JoinHelper(Settings settings, AllocationService allocationService, MasterService masterService,
TransportService transportService, LongSupplier currentTermSupplier, Supplier<ClusterState> currentStateSupplier,
BiConsumer<JoinRequest, JoinCallback> joinHandler, Function<StartJoinRequest, Join> joinLeaderInTerm,
Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators, RerouteService rerouteService) {
Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators, RerouteService rerouteService,
NodeHealthService nodeHealthService) {
this.masterService = masterService;
this.transportService = transportService;
this.nodeHealthService = nodeHealthService;
this.joinTimeout = DiscoveryModule.isSingleNodeDiscovery(settings) ? null : JOIN_TIMEOUT_SETTING.get(settings);
this.joinTaskExecutor = new JoinTaskExecutor(allocationService, logger, rerouteService) {

Expand Down Expand Up @@ -232,6 +239,11 @@ void logLastFailedJoinAttempt() {

public void sendJoinRequest(DiscoveryNode destination, long term, Optional<Join> optionalJoin) {
assert destination.isMasterNode() : "trying to join master-ineligible " + destination;
final StatusInfo statusInfo = nodeHealthService.getHealth();
if (statusInfo.getStatus() == UNHEALTHY) {
logger.debug("dropping join request to [{}]: [{}]", destination, statusInfo.getInfo());
return;
}
final JoinRequest joinRequest = new JoinRequest(transportService.getLocalNode(), term, optionalJoin);
final Tuple<DiscoveryNode, JoinRequest> dedupKey = Tuple.tuple(destination, joinRequest);
if (pendingOutgoingJoins.add(dedupKey)) {
Expand Down
Expand Up @@ -32,6 +32,8 @@
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.monitor.NodeHealthService;
import org.elasticsearch.monitor.StatusInfo;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.NodeDisconnectedException;
Expand All @@ -53,6 +55,8 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

import static org.elasticsearch.monitor.StatusInfo.Status.UNHEALTHY;

/**
* The LeaderChecker is responsible for allowing followers to check that the currently elected leader is still connected and healthy. We are
* fairly lenient, possibly allowing multiple checks to fail before considering the leader to be faulty, to allow for the leader to
Expand Down Expand Up @@ -84,17 +88,20 @@ public class LeaderChecker {
private final int leaderCheckRetryCount;
private final TransportService transportService;
private final Consumer<Exception> onLeaderFailure;
private final NodeHealthService nodeHealthService;

private AtomicReference<CheckScheduler> currentChecker = new AtomicReference<>();

private volatile DiscoveryNodes discoveryNodes;

LeaderChecker(final Settings settings, final TransportService transportService, final Consumer<Exception> onLeaderFailure) {
LeaderChecker(final Settings settings, final TransportService transportService, final Consumer<Exception> onLeaderFailure,
NodeHealthService nodeHealthService) {
leaderCheckInterval = LEADER_CHECK_INTERVAL_SETTING.get(settings);
leaderCheckTimeout = LEADER_CHECK_TIMEOUT_SETTING.get(settings);
leaderCheckRetryCount = LEADER_CHECK_RETRY_COUNT_SETTING.get(settings);
this.transportService = transportService;
this.onLeaderFailure = onLeaderFailure;
this.nodeHealthService = nodeHealthService;

transportService.registerRequestHandler(LEADER_CHECK_ACTION_NAME, Names.SAME, false, false, LeaderCheckRequest::new,
(request, channel, task) -> {
Expand Down Expand Up @@ -154,8 +161,13 @@ boolean currentNodeIsMaster() {
private void handleLeaderCheck(LeaderCheckRequest request) {
final DiscoveryNodes discoveryNodes = this.discoveryNodes;
assert discoveryNodes != null;

if (discoveryNodes.isLocalNodeElectedMaster() == false) {
final StatusInfo statusInfo = nodeHealthService.getHealth();
if (statusInfo.getStatus() == UNHEALTHY) {
final String message = "rejecting leader check from [" + request.getSender() + "] " +
"since node is unhealthy [" + statusInfo.getInfo() + "]";
logger.debug(message);
throw new NodeHealthCheckFailureException(message);
} else if (discoveryNodes.isLocalNodeElectedMaster() == false) {
logger.debug("rejecting leader check on non-master {}", request);
throw new CoordinationStateRejectedException(
"rejecting leader check from [" + request.getSender() + "] sent to a node that is no longer the master");
Expand Down Expand Up @@ -237,8 +249,12 @@ public void handleException(TransportException exp) {
"leader [{}] disconnected during check", leader), exp);
leaderFailed(new ConnectTransportException(leader, "disconnected during check", exp));
return;
} else if (exp.getCause() instanceof NodeHealthCheckFailureException) {
logger.debug(new ParameterizedMessage(
"leader [{}] health check failed", leader), exp);
leaderFailed(new NodeHealthCheckFailureException("node [" + leader + "] failed health checks", exp));
return;
}

long failureCount = failureCountSinceLastSuccess.incrementAndGet();
if (failureCount >= leaderCheckRetryCount) {
logger.debug(new ParameterizedMessage(
Expand Down