Skip to content

Commit

Permalink
Remove node from cluster when node locks broken (#61400)
Browse files Browse the repository at this point in the history
In #52680 we introduced a mechanism that will allow nodes to remove
themselves from the cluster if they locally determine themselves to be
unhealthy. The only check today is that their data paths are all
empirically writeable. This commit extends this check to consider a
failure of `NodeEnvironment#assertEnvIsLocked()` to be an indication of
unhealthiness.

Closes #58373
  • Loading branch information
amoghRZP committed Sep 22, 2020
1 parent e02555c commit 71d0958
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 4 deletions.
Expand Up @@ -58,6 +58,7 @@ public class FsHealthService extends AbstractLifecycleComponent implements NodeH
private static final Logger logger = LogManager.getLogger(FsHealthService.class);
private final ThreadPool threadPool;
private volatile boolean enabled;
private volatile boolean brokenLock;
private final TimeValue refreshInterval;
private volatile TimeValue slowPathLoggingThreshold;
private final NodeEnvironment nodeEnv;
Expand Down Expand Up @@ -117,6 +118,8 @@ public StatusInfo getHealth() {
Set<Path> unhealthyPaths = this.unhealthyPaths;
if (enabled == false) {
statusInfo = new StatusInfo(HEALTHY, "health check disabled");
} else if (brokenLock) {
statusInfo = new StatusInfo(UNHEALTHY, "health check failed due to broken node lock");
} else if (unhealthyPaths == null) {
statusInfo = new StatusInfo(HEALTHY, "health check passed");
} else {
Expand Down Expand Up @@ -150,7 +153,16 @@ public void run() {

private void monitorFSHealth() {
Set<Path> currentUnhealthyPaths = null;
for (Path path : nodeEnv.nodeDataPaths()) {
Path[] paths = null;
try {
paths = nodeEnv.nodeDataPaths();
} catch (IllegalStateException e) {
logger.error("health check failed", e);
brokenLock = true;
return;
}

for (Path path : paths) {
long executionStartTime = currentTimeMillisSupplier.getAsLong();
try {
if (Files.exists(path)) {
Expand All @@ -176,6 +188,7 @@ private void monitorFSHealth() {
}
}
unhealthyPaths = currentUnhealthyPaths;
brokenLock = false;
}
}
}
Expand Down
Expand Up @@ -42,8 +42,8 @@
import java.io.OutputStream;
import java.nio.channels.FileChannel;
import java.nio.file.FileSystem;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.OpenOption;
import java.nio.file.attribute.FileAttribute;
import java.util.Set;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -231,6 +231,36 @@ public void testFailsHealthOnSinglePathWriteFailure() throws IOException {
}
}

public void testFailsHealthOnUnexpectedLockFileSize() throws IOException {
FileSystem fileSystem = PathUtils.getDefaultFileSystem();
final Settings settings = Settings.EMPTY;
TestThreadPool testThreadPool = new TestThreadPool(getClass().getName(), settings);
FileSystemUnexpectedLockFileSizeProvider unexpectedLockFileSizeFileSystemProvider = new FileSystemUnexpectedLockFileSizeProvider(
fileSystem, 1, testThreadPool);
fileSystem = unexpectedLockFileSizeFileSystemProvider.getFileSystem(null);
PathUtilsForTesting.installMock(fileSystem);
final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
try (NodeEnvironment env = newNodeEnvironment()) {
FsHealthService fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env);
fsHealthService.new FsHealthMonitor().run();
assertEquals(HEALTHY, fsHealthService.getHealth().getStatus());
assertEquals("health check passed", fsHealthService.getHealth().getInfo());

// enabling unexpected file size injection
unexpectedLockFileSizeFileSystemProvider.injectUnexpectedFileSize.set(true);

fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env);
fsHealthService.new FsHealthMonitor().run();
assertEquals(UNHEALTHY, fsHealthService.getHealth().getStatus());
assertThat(fsHealthService.getHealth().getInfo(), is("health check failed due to broken node lock"));
assertEquals(1, unexpectedLockFileSizeFileSystemProvider.getInjectedPathCount());
} finally {
unexpectedLockFileSizeFileSystemProvider.injectUnexpectedFileSize.set(false);
PathUtilsForTesting.teardown();
ThreadPool.terminate(testThreadPool, 500, TimeUnit.MILLISECONDS);
}
}

private static class FileSystemIOExceptionProvider extends FilterFileSystemProvider {

AtomicBoolean injectIOException = new AtomicBoolean();
Expand All @@ -254,7 +284,8 @@ public int getInjectedPathCount(){
public OutputStream newOutputStream(Path path, OpenOption... options) throws IOException {
if (injectIOException.get()){
assert pathPrefix != null : "must set pathPrefix before starting disruptions";
if (path.toString().startsWith(pathPrefix) && path.toString().endsWith(".es_temp_file")) {
if (path.toString().startsWith(pathPrefix) && path.toString().
endsWith(FsHealthService.FsHealthMonitor.TEMP_FILE_NAME)) {
injectedPaths.incrementAndGet();
throw new IOException("fake IOException");
}
Expand Down Expand Up @@ -289,7 +320,8 @@ public FileChannel newFileChannel(Path path, Set<? extends OpenOption> options,
public void force(boolean metaData) throws IOException {
if (injectIOException.get()) {
assert pathPrefix != null : "must set pathPrefix before starting disruptions";
if (path.toString().startsWith(pathPrefix) && path.toString().endsWith(".es_temp_file")) {
if (path.toString().startsWith(pathPrefix) && path.toString().
endsWith(FsHealthService.FsHealthMonitor.TEMP_FILE_NAME)) {
injectedPaths.incrementAndGet();
throw new IOException("fake IOException");
}
Expand Down Expand Up @@ -341,4 +373,39 @@ public void force(boolean metaData) throws IOException {
};
}
}

private static class FileSystemUnexpectedLockFileSizeProvider extends FilterFileSystemProvider {

AtomicBoolean injectUnexpectedFileSize = new AtomicBoolean();
AtomicInteger injectedPaths = new AtomicInteger();

private final long size;
private final ThreadPool threadPool;

FileSystemUnexpectedLockFileSizeProvider(FileSystem inner, long size, ThreadPool threadPool) {
super("disrupt_fs_health://", inner);
this.size = size;
this.threadPool = threadPool;
}

public int getInjectedPathCount(){
return injectedPaths.get();
}

@Override
public FileChannel newFileChannel(Path path, Set<? extends OpenOption> options, FileAttribute<?>... attrs) throws IOException {
return new FilterFileChannel(super.newFileChannel(path, options, attrs)) {
@Override
public long size() throws IOException {
if (injectUnexpectedFileSize.get()) {
if (path.getFileName().toString().equals(NodeEnvironment.NODE_LOCK_FILENAME)) {
injectedPaths.incrementAndGet();
return size;
}
}
return super.size();
}
};
}
}
}

0 comments on commit 71d0958

Please sign in to comment.