From 31ae956fdd83a2f65bf22c3ad601d4d65ad61439 Mon Sep 17 00:00:00 2001 From: Stefan Richter Date: Fri, 27 Jan 2017 19:47:12 +0100 Subject: [PATCH 1/2] [FLINK-5663] Prevent leaking SafetyNetCloseableRegistry through InheritableThreadLocal --- .../org/apache/flink/core/fs/FileSystem.java | 21 ++++++++++--------- .../flink/runtime/taskmanager/Task.java | 12 ++++++++--- 2 files changed, 20 insertions(+), 13 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java index 33addbb14d842..d8efcbc03146e 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java @@ -32,7 +32,7 @@ import org.apache.flink.core.fs.local.LocalFileSystem; import org.apache.flink.util.IOUtils; import org.apache.flink.util.OperatingSystem; - +import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,7 +78,7 @@ public enum WriteMode { // ------------------------------------------------------------------------ - private static final InheritableThreadLocal REGISTRIES = new InheritableThreadLocal<>(); + private static final ThreadLocal REGISTRIES = new ThreadLocal<>(); private static final String HADOOP_WRAPPER_FILESYSTEM_CLASS = "org.apache.flink.runtime.fs.hdfs.HadoopFileSystem"; @@ -99,14 +99,15 @@ public enum WriteMode { * main thread. */ @Internal - public static void createFileSystemCloseableRegistryForTask() { + public static void createAndSetFileSystemCloseableRegistryForThread() { SafetyNetCloseableRegistry oldRegistry = REGISTRIES.get(); - if (null != oldRegistry) { - IOUtils.closeQuietly(oldRegistry); - LOG.warn("Found existing SafetyNetCloseableRegistry. Closed and replaced it."); - } + Preconditions.checkState(null == oldRegistry, + "Found old CloseableRegistry " + oldRegistry + + ". This indicates a leak of the InheritableThreadLocal through a ThreadPool!"); + SafetyNetCloseableRegistry newRegistry = new SafetyNetCloseableRegistry(); REGISTRIES.set(newRegistry); + LOG.info("Created new CloseableRegistry " + newRegistry + " for {}", Thread.currentThread().getName()); } /** @@ -114,7 +115,7 @@ public static void createFileSystemCloseableRegistryForTask() { * main thread or when the task should be canceled. */ @Internal - public static void disposeFileSystemCloseableRegistryForTask() { + public static void closeAndDisposeFileSystemCloseableRegistryForThread() { SafetyNetCloseableRegistry registry = REGISTRIES.get(); if (null != registry) { LOG.info("Ensuring all FileSystem streams are closed for {}", Thread.currentThread().getName()); @@ -123,7 +124,7 @@ public static void disposeFileSystemCloseableRegistryForTask() { } } - private static FileSystem wrapWithSafetyNetWhenInTask(FileSystem fs) { + private static FileSystem wrapWithSafetyNetWhenActivated(FileSystem fs) { SafetyNetCloseableRegistry reg = REGISTRIES.get(); return reg != null ? new SafetyNetWrapperFileSystem(fs, reg) : fs; } @@ -306,7 +307,7 @@ public static FileSystem getUnguardedFileSystem(URI uri) throws IOException { * thrown if a reference to the file system instance could not be obtained */ public static FileSystem get(URI uri) throws IOException { - return wrapWithSafetyNetWhenInTask(getUnguardedFileSystem(uri)); + return wrapWithSafetyNetWhenActivated(getUnguardedFileSystem(uri)); } /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index 6925d0fe77dbe..c2e6d09037c91 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -550,8 +550,8 @@ else if (current == ExecutionState.CANCELING) { // check for canceling as a shortcut // ---------------------------- - // init closeable registry for this task - FileSystem.createFileSystemCloseableRegistryForTask(); + // activate safety net for task thread + FileSystem.createAndSetFileSystemCloseableRegistryForThread(); // first of all, get a user-code classloader // this may involve downloading the job's JAR files and/or classes @@ -775,7 +775,8 @@ else if (transitionState(current, ExecutionState.FAILED, t)) { // remove all files in the distributed cache removeCachedFiles(distributedCacheEntries, fileCache); - FileSystem.disposeFileSystemCloseableRegistryForTask(); + // close and de-activate safety net for task thread + FileSystem.closeAndDisposeFileSystemCloseableRegistryForThread(); notifyFinalState(); } @@ -1115,6 +1116,8 @@ public void triggerCheckpointBarrier(final long checkpointID, long checkpointTim Runnable runnable = new Runnable() { @Override public void run() { + // activate safety net for checkpointing thread + FileSystem.createAndSetFileSystemCloseableRegistryForThread(); try { boolean success = statefulTask.triggerCheckpoint(checkpointMetaData); if (!success) { @@ -1133,6 +1136,9 @@ public void run() { "{} ({}) while being not in state running.", checkpointID, taskNameWithSubtask, executionId, t); } + } finally { + // close and de-activate safety net for checkpointing thread + FileSystem.closeAndDisposeFileSystemCloseableRegistryForThread(); } } }; From 355eb650bdf3859889cfe2c9c35b56708b692519 Mon Sep 17 00:00:00 2001 From: Stefan Richter Date: Fri, 27 Jan 2017 21:00:55 +0100 Subject: [PATCH 2/2] Unit test --- .../fs/SafetyNetCloseableRegistryTest.java | 71 ++++++++++++++++++- 1 file changed, 68 insertions(+), 3 deletions(-) diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java index 6628407841fc0..40856b42ee7e1 100644 --- a/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java +++ b/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java @@ -19,11 +19,11 @@ package org.apache.flink.core.fs; import org.junit.Assert; -import org.junit.Before; import org.junit.Test; import java.io.Closeable; import java.io.IOException; +import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; public class SafetyNetCloseableRegistryTest { @@ -32,7 +32,6 @@ public class SafetyNetCloseableRegistryTest { private SafetyNetCloseableRegistry closeableRegistry; private AtomicInteger unclosedCounter; - @Before public void setup() { this.closeableRegistry = new SafetyNetCloseableRegistry(); this.unclosedCounter = new AtomicInteger(0); @@ -55,9 +54,75 @@ private void joinThreads() throws InterruptedException { } } + @Test + public void testCorrectScopesForSafetyNet() throws Exception { + Thread t1 = new Thread() { + @Override + public void run() { + try { + FileSystem fs1 = FileSystem.getLocalFileSystem(); + // ensure no safety net in place + Assert.assertFalse(fs1 instanceof SafetyNetWrapperFileSystem); + FileSystem.createAndSetFileSystemCloseableRegistryForThread(); + fs1 = FileSystem.getLocalFileSystem(); + // ensure safety net is in place now + Assert.assertTrue(fs1 instanceof SafetyNetWrapperFileSystem); + Path tmp = new Path(fs1.getWorkingDirectory(), UUID.randomUUID().toString()); + try (FSDataOutputStream stream = fs1.create(tmp, false)) { + Thread t2 = new Thread() { + @Override + public void run() { + FileSystem fs2 = FileSystem.getLocalFileSystem(); + // ensure the safety net does not leak here + Assert.assertFalse(fs2 instanceof SafetyNetWrapperFileSystem); + FileSystem.createAndSetFileSystemCloseableRegistryForThread(); + fs2 = FileSystem.getLocalFileSystem(); + // ensure we can bring another safety net in place + Assert.assertTrue(fs2 instanceof SafetyNetWrapperFileSystem); + FileSystem.closeAndDisposeFileSystemCloseableRegistryForThread(); + fs2 = FileSystem.getLocalFileSystem(); + // and that we can remove it again + Assert.assertFalse(fs2 instanceof SafetyNetWrapperFileSystem); + } + }; + t2.start(); + try { + t2.join(); + } catch (InterruptedException e) { + Assert.fail(); + } + + //ensure stream is still open and was never closed by any interferences + stream.write(42); + FileSystem.closeAndDisposeFileSystemCloseableRegistryForThread(); + + // ensure leaking stream was closed + try { + stream.write(43); + Assert.fail(); + } catch (IOException ignore) { + + } + fs1 = FileSystem.getLocalFileSystem(); + // ensure safety net was removed + Assert.assertFalse(fs1 instanceof SafetyNetWrapperFileSystem); + } finally { + fs1.delete(tmp, false); + } + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(); + } + } + }; + t1.start(); + t1.join(); + } + @Test public void testClose() throws Exception { + setup(); startThreads(Integer.MAX_VALUE); for (int i = 0; i < 5; ++i) { @@ -98,7 +163,7 @@ public void close() throws IOException { @Test public void testSafetyNetClose() throws Exception { - + setup(); startThreads(20); joinThreads();