Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds resiliency to read-only filesystems #45286 #52680

Merged
merged 50 commits into from Jul 7, 2020
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
569e8cc
Merge pull request #2 from elastic/master
Bukhtawar Jul 4, 2019
64815f1
Merge remote-tracking branch 'upstream/master'
Bukhtawar Feb 22, 2020
b598944
[Initial DRAFT] Adds a FsHealthService that periodically tries to wr…
Bukhtawar Feb 23, 2020
d4fb892
Test case addition and PR comments
Bukhtawar Mar 25, 2020
38f1a4e
Merge remote-tracking branch 'upstream/master'
Bukhtawar Mar 25, 2020
f3ac906
Merge branch 'master' into ro-fs-handling
Bukhtawar Mar 25, 2020
79948f3
Changes for FsHealthService and tests
Bukhtawar Mar 25, 2020
20d9ba2
Review comments for simplication and better tests
Bukhtawar May 3, 2020
fa3ed38
Merge remote-tracking branch 'upstream/master'
Bukhtawar May 3, 2020
1646319
Merge branch 'master' into ro-fs-handling
Bukhtawar May 3, 2020
5305ebb
Fixing tests and check styles
Bukhtawar May 3, 2020
26fbce7
FsHealthService comments on slow IO
Bukhtawar May 5, 2020
8a86051
Restricting FS health checks to IOExceptions
Bukhtawar May 11, 2020
c9dd1a7
Addressing comments on logging and tests
Bukhtawar May 20, 2020
c99a68e
Minor edits
Bukhtawar May 20, 2020
545eaf5
Merge branch 'master' into ro-fs-handling
Bukhtawar May 27, 2020
86fa7c9
Updated the exception id
Bukhtawar May 27, 2020
8102c81
Merge branch 'master' into ro-fs-handling
Bukhtawar Jun 4, 2020
043db93
Fix merge conflict
DaveCTurner Jun 16, 2020
bbf5517
Fix spacing in StatusInfo#toString
DaveCTurner Jun 18, 2020
1459937
Tidy 'skip prevoting' log message
DaveCTurner Jun 18, 2020
8eb5e20
Tidy response messages in FollowersChecker
DaveCTurner Jun 18, 2020
2095d82
Tidy log message in JoinHelper
DaveCTurner Jun 18, 2020
39a0565
Tidy message in PreVoteCollector
DaveCTurner Jun 18, 2020
136bc44
Tidy info messages
DaveCTurner Jun 18, 2020
1ab13b2
Tidy tracing messages
DaveCTurner Jun 18, 2020
4143f8f
Tidy warn/error messages
DaveCTurner Jun 18, 2020
1d9a7ab
Fix up tests
DaveCTurner Jun 18, 2020
f222529
Fix too-short delay
DaveCTurner Jun 18, 2020
befd822
Minor fixes to Follower and FsHealthService
Bukhtawar Jun 18, 2020
061dd33
Fix assertions
Bukhtawar Jun 18, 2020
cda2179
Leader checks
Bukhtawar Jun 18, 2020
4d83de0
Leader check tests
Bukhtawar Jun 19, 2020
e41392f
cluster reduce stabilization time after unhealthy node
Bukhtawar Jun 19, 2020
67d49bb
Minor fix up
Bukhtawar Jun 19, 2020
fa3cc69
ClusterFormationFailureHelper changes and more tests
Bukhtawar Jun 19, 2020
89035fb
Minor changes to LeaderChecker
Bukhtawar Jun 21, 2020
adbe670
Pass StatusInfo to ClusterFormationState and simplify message
DaveCTurner Jun 24, 2020
fdcdf45
Whitespace
DaveCTurner Jun 24, 2020
deafeca
Imports
DaveCTurner Jun 24, 2020
1120428
Fixing Random
Bukhtawar Jun 24, 2020
23bc4e5
Merge remote-tracking branch 'upstream/master'
Bukhtawar Jun 24, 2020
06b14b8
Merge branch 'master' into ro-fs-handling
Bukhtawar Jun 24, 2020
56fb9b3
ForbiddenApis for charset
Bukhtawar Jun 24, 2020
0d7b72f
Fix logger
Bukhtawar Jun 24, 2020
f390ed8
Merge remote-tracking branch 'upstream/master' into ro-fs-handling
Bukhtawar Jun 24, 2020
f44cf0d
NPE handling
Bukhtawar Jun 29, 2020
97a4c02
Merge remote-tracking branch 'upstream/master' into ro-fs-handling
Bukhtawar Jun 29, 2020
54d7c98
Merge remote-tracking branch 'upstream/master' into ro-fs-handling
Bukhtawar Jul 2, 2020
aae5142
Merge remote-tracking branch 'upstream/master' into ro-fs-handling
Bukhtawar Jul 3, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -24,6 +24,7 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
Expand Down Expand Up @@ -67,6 +68,7 @@
import org.elasticsearch.discovery.PeerFinder;
import org.elasticsearch.discovery.SeedHostsProvider;
import org.elasticsearch.discovery.SeedHostsResolver;
import org.elasticsearch.monitor.fs.FsService;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.TransportResponse.Empty;
Expand Down Expand Up @@ -149,6 +151,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
private Optional<Join> lastJoin;
private JoinHelper.JoinAccumulator joinAccumulator;
private Optional<CoordinatorPublication> currentPublication = Optional.empty();
private final FsService fsService;

/**
* @param nodeName The name of the node, used to name the {@link java.util.concurrent.ExecutorService} of the {@link SeedHostsResolver}.
Expand All @@ -158,7 +161,7 @@ public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSe
NamedWriteableRegistry namedWriteableRegistry, AllocationService allocationService, MasterService masterService,
Supplier<CoordinationState.PersistedState> persistedStateSupplier, SeedHostsProvider seedHostsProvider,
ClusterApplier clusterApplier, Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators, Random random,
RerouteService rerouteService, ElectionStrategy electionStrategy) {
RerouteService rerouteService, ElectionStrategy electionStrategy, FsService fsService, Client nodeClient) {
this.settings = settings;
this.transportService = transportService;
this.masterService = masterService;
Expand All @@ -168,7 +171,7 @@ public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSe
this.electionStrategy = electionStrategy;
this.joinHelper = new JoinHelper(settings, allocationService, masterService, transportService,
this::getCurrentTerm, this::getStateForMasterService, this::handleJoinRequest, this::joinLeaderInTerm, this.onJoinValidators,
rerouteService);
rerouteService, fsService);
this.persistedStateSupplier = persistedStateSupplier;
this.noMasterBlockService = new NoMasterBlockService(settings, clusterSettings);
this.lastKnownLeader = Optional.empty();
Expand All @@ -178,14 +181,16 @@ public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSe
this.publishInfoTimeout = PUBLISH_INFO_TIMEOUT_SETTING.get(settings);
this.random = random;
this.electionSchedulerFactory = new ElectionSchedulerFactory(settings, random, transportService.getThreadPool());
this.preVoteCollector = new PreVoteCollector(transportService, this::startElection, this::updateMaxTermSeen, electionStrategy);
this.preVoteCollector = new PreVoteCollector(transportService, this::startElection, this::updateMaxTermSeen, electionStrategy,
fsService);
configuredHostsResolver = new SeedHostsResolver(nodeName, settings, transportService, seedHostsProvider);
this.peerFinder = new CoordinatorPeerFinder(settings, transportService,
new HandshakingTransportAddressConnector(settings, transportService), configuredHostsResolver);
this.publicationHandler = new PublicationTransportHandler(transportService, namedWriteableRegistry,
this::handlePublishRequest, this::handleApplyCommit);
this.leaderChecker = new LeaderChecker(settings, transportService, this::onLeaderFailure);
this.followersChecker = new FollowersChecker(settings, transportService, this::onFollowerCheckRequest, this::removeNode);
this.followersChecker = new FollowersChecker(settings, transportService, this::onFollowerCheckRequest, this::removeNode,
nodeClient);
this.nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, logger);
this.clusterApplier = clusterApplier;
masterService.setClusterStateSupplier(this::getStateForMasterService);
Expand All @@ -196,6 +201,7 @@ public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSe
transportService::getLocalNode);
this.clusterFormationFailureHelper = new ClusterFormationFailureHelper(settings, this::getClusterFormationState,
transportService.getThreadPool(), joinHelper::logLastFailedJoinAttempt);
this.fsService = fsService;
}

private ClusterFormationState getClusterFormationState() {
Expand Down Expand Up @@ -1173,6 +1179,12 @@ public void run() {
return;
}

if(fsService.stats().getTotal().isWritable() == Boolean.FALSE){
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have left out spaces assuming checkStyles would catch. But unfortunate. I'll fix white spacing

logger.warn("skip prevoting as local node is not writable: {}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A warning here isn't helpful, we should be logging the failure elsewhere so this will simply result in confusion.

Suggested change
logger.warn("skip prevoting as local node is not writable: {}",
logger.debug("skip prevoting as local node is not writable: {}",

Also, we have this generic NodeHealthService but the log message is very specific: local node is not writeable. Maybe the NodeHealthService should describe the problem rather than returning a simple boolean.

lastAcceptedState.coordinationMetaData());
return;
}

if (prevotingRound != null) {
prevotingRound.close();
}
Expand Down
Expand Up @@ -22,6 +22,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.coordination.Coordinator.Mode;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
Expand All @@ -39,10 +40,10 @@
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.TransportRequestOptions.Type;
import org.elasticsearch.transport.TransportResponse.Empty;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.HashSet;
Expand All @@ -52,6 +53,7 @@
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.Supplier;

import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;

Expand Down Expand Up @@ -94,12 +96,13 @@ public class FollowersChecker {
private final Set<DiscoveryNode> faultyNodes = new HashSet<>();

private final TransportService transportService;
private final NodeFsHealthChecker nodeFsHealthChecker;

private volatile FastResponseState fastResponseState;

public FollowersChecker(Settings settings, TransportService transportService,
Consumer<FollowerCheckRequest> handleRequestAndUpdateState,
BiConsumer<DiscoveryNode, String> onNodeFailure) {
BiConsumer<DiscoveryNode, String> onNodeFailure, Client nodeClient) {
this.settings = settings;
this.transportService = transportService;
this.handleRequestAndUpdateState = handleRequestAndUpdateState;
Expand All @@ -118,6 +121,7 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti
handleDisconnectedNode(node);
}
});
nodeFsHealthChecker = new NodeFsHealthChecker(settings, transportService, nodeClient, this::failFollower, this::followers);
}

/**
Expand All @@ -139,9 +143,37 @@ public void setCurrentNodes(DiscoveryNodes discoveryNodes) {
followerChecker.start();
}
});
nodeFsHealthChecker.start();
}
}

private Set<DiscoveryNode> followers(){
return followerCheckers.keySet();
}

private void failFollower(DiscoveryNode discoveryNode, String reason, Supplier<Boolean> supplier) {
transportService.getThreadPool().generic().execute(new Runnable() {
@Override
public void run() {
synchronized (mutex) {
if (supplier.get() == false) {
logger.trace("{} no longer running, not marking faulty", discoveryNode);
return;
}
logger.debug("{} marking node as faulty", discoveryNode);
faultyNodes.add(discoveryNode);
followerCheckers.remove(discoveryNode);
}
onNodeFailure.accept(discoveryNode, reason);
}

@Override
public String toString() {
return "detected failure of " + discoveryNode;
}
});
}

/**
* Clear the set of known nodes, stopping all checks.
*/
Expand Down Expand Up @@ -351,28 +383,10 @@ public String executor() {
}

void failNode(String reason) {
transportService.getThreadPool().generic().execute(new Runnable() {
@Override
public void run() {
synchronized (mutex) {
if (running() == false) {
logger.trace("{} no longer running, not marking faulty", FollowerChecker.this);
return;
}
logger.debug("{} marking node as faulty", FollowerChecker.this);
faultyNodes.add(discoveryNode);
followerCheckers.remove(discoveryNode);
}
onNodeFailure.accept(discoveryNode, reason);
}

@Override
public String toString() {
return "detected failure of " + discoveryNode;
}
});
failFollower(discoveryNode, reason, () -> running());
}


private void scheduleNextWakeUp() {
transportService.getThreadPool().schedule(new Runnable() {
@Override
Expand Down
Expand Up @@ -42,6 +42,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.monitor.fs.FsService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.TransportChannel;
Expand Down Expand Up @@ -88,6 +89,8 @@ public class JoinHelper {

@Nullable // if using single-node discovery
private final TimeValue joinTimeout;
private final FsService fsService;


private final Set<Tuple<DiscoveryNode, JoinRequest>> pendingOutgoingJoins = Collections.synchronizedSet(new HashSet<>());

Expand All @@ -96,9 +99,10 @@ public class JoinHelper {
JoinHelper(Settings settings, AllocationService allocationService, MasterService masterService,
TransportService transportService, LongSupplier currentTermSupplier, Supplier<ClusterState> currentStateSupplier,
BiConsumer<JoinRequest, JoinCallback> joinHandler, Function<StartJoinRequest, Join> joinLeaderInTerm,
Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators, RerouteService rerouteService) {
Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators, RerouteService rerouteService, FsService fsService) {
this.masterService = masterService;
this.transportService = transportService;
this.fsService = fsService;
this.joinTimeout = DiscoveryModule.isSingleNodeDiscovery(settings) ? null : JOIN_TIMEOUT_SETTING.get(settings);
this.joinTaskExecutor = new JoinTaskExecutor(allocationService, logger, rerouteService) {

Expand Down Expand Up @@ -232,6 +236,10 @@ void logLastFailedJoinAttempt() {

public void sendJoinRequest(DiscoveryNode destination, long term, Optional<Join> optionalJoin) {
assert destination.isMasterNode() : "trying to join master-ineligible " + destination;
if (fsService.stats().getTotal().isWritable() == Boolean.FALSE) {
logger.warn("All paths are not writable. Blocking join request");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A warning here isn't helpful, we should be logging the failure elsewhere so this will simply result in confusion.

Suggested change
logger.warn("All paths are not writable. Blocking join request");
logger.debug("All paths are not writable. Blocking join request");

Also, we have this generic NodeHealthService but the log message is very specific: all paths are not writable. Maybe the NodeHealthService should describe the problem rather than returning a simple boolean.

return;
}
final JoinRequest joinRequest = new JoinRequest(transportService.getLocalNode(), term, optionalJoin);
final Tuple<DiscoveryNode, JoinRequest> dedupKey = Tuple.tuple(destination, joinRequest);
if (pendingOutgoingJoins.add(dedupKey)) {
Expand Down
@@ -0,0 +1,131 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.cluster.coordination;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequest;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.TriConsumer;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.monitor.fs.FsHealthService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ReceiveTimeoutTransportException;
import org.elasticsearch.transport.TransportService;

import java.util.Set;
import java.util.function.Supplier;

public class NodeFsHealthChecker {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem necessary, it's enough for followers to reject the today's health checks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. That simplifies a great deal


private static final Logger logger = LogManager.getLogger(FollowersChecker.class);

public static final Setting<TimeValue> FS_HEALTH_CHECK_INTERVAL_SETTING =
Setting.timeSetting("cluster.fault_detection.fs_health_check.interval",
TimeValue.timeValueMillis(5000), TimeValue.timeValueMillis(100), Setting.Property.NodeScope);

private final TimeValue fsHealthCheckInterval;
private final boolean fsHealthCheckEnabled;
private final TriConsumer<DiscoveryNode, String, Supplier<Boolean>> failFollower;
private final TransportService transportService;
private final Supplier<Set<DiscoveryNode>> followerNodesSupplier;
private final Client nodeClient;

public NodeFsHealthChecker(Settings settings, TransportService transportService, Client nodeClient, TriConsumer<DiscoveryNode,
String, Supplier<Boolean>> failFollower, Supplier<Set<DiscoveryNode>> followerNodesSupplier){
fsHealthCheckEnabled = FsHealthService.ENABLED_SETTING.get(settings);
fsHealthCheckInterval = FS_HEALTH_CHECK_INTERVAL_SETTING.get(settings);
this.followerNodesSupplier = followerNodesSupplier;
this.failFollower = failFollower;
this.transportService = transportService;
this.nodeClient = nodeClient;
}


void start() {
handleWakeUp();
}


private void scheduleNextWakeUp() {
if (followerNodesSupplier.get().isEmpty() == false) {
transportService.getThreadPool().schedule(new Runnable() {
@Override
public void run() {
handleWakeUp();
}

@Override
public String toString() {
return NodeFsHealthChecker.this + "::handleWakeUp";
}
}, fsHealthCheckInterval, ThreadPool.Names.SAME);
}
}


private void handleWakeUp() {
if (fsHealthCheckEnabled && followerNodesSupplier.get().isEmpty() == false) {
NodesStatsRequest nodesStatsRequest = new NodesStatsRequest().clear().fs(true).timeout(fsHealthCheckInterval);
NodesStatsResponse nodesStatsResponse = fetchNodeStats(nodesStatsRequest);
if(nodesStatsResponse == null){
return;
}
for (NodeStats nodeStats : nodesStatsResponse.getNodes()) {
if (nodeStats.getFs() == null) {
logger.warn("Unable to retrieve node FS stats for {}", nodeStats.getNode().getName());
} else {
if (nodeStats.getFs().getTotal().isWritable() == Boolean.FALSE) {
failFollower.apply(nodeStats.getNode(), "read-only-file-system", () ->
followerNodesSupplier.get().contains(nodeStats.getNode()));
}
}
}
}
scheduleNextWakeUp();
}

private NodesStatsResponse fetchNodeStats(NodesStatsRequest nodeStatsRequest) {
NodesStatsResponse nodesStatsResponse = null;
try {
nodesStatsResponse = nodeClient.admin().cluster().nodesStats(nodeStatsRequest).actionGet();
} catch (Exception e){
if (e instanceof ReceiveTimeoutTransportException) {
logger.error("NodeStatsRequest timed out for FollowerChecker", e);
} else {
if (e instanceof ClusterBlockException) {
if (logger.isTraceEnabled()) {
logger.trace("Failed to execute NodeStatsRequest for FollowerChecker", e);
}
} else {
logger.warn("Failed to execute NodeStatsRequest for FollowerChecker", e);
}
}
}
return nodesStatsResponse;
}
}