From 438aa221892ea0d50ca129aadbc4f1e90b2c0299 Mon Sep 17 00:00:00 2001 From: Stefan Richter Date: Mon, 24 Oct 2016 17:49:54 +0200 Subject: [PATCH] [FLINK-4910] Introduce safety net for closing file system streams --- .../common/operators/CollectionExecutor.java | 31 +-- .../flink/core/fs/CloseableRegistry.java | 52 +++++ .../core/fs/ClosingFSDataInputStream.java | 97 +++++++++ .../core/fs/ClosingFSDataOutputStream.java | 102 +++++++++ .../core/fs/FSDataInputStreamWrapper.java | 96 +++++++++ .../core/fs/FSDataOutputStreamWrapper.java | 76 +++++++ .../org/apache/flink/core/fs/FileSystem.java | 88 +++++--- .../core/fs/SafetyNetCloseableRegistry.java | 181 ++++++++++++++++ .../core/fs/SafetyNetWrapperFileSystem.java | 150 ++++++++++++++ .../flink/core/fs/WrappingProxyCloseable.java | 30 +++ .../flink/util/AbstractCloseableRegistry.java | 52 ++--- .../java/org/apache/flink/util/IOUtils.java | 15 +- .../org/apache/flink/util/WrappingProxy.java | 25 +++ .../apache/flink/util/WrappingProxyUtil.java | 33 +++ .../apache/flink/core/fs/FileSystemTest.java | 29 +-- .../fs/SafetyNetCloseableRegistryTest.java | 193 ++++++++++++++++++ .../flink/runtime/filecache/FileCache.java | 42 ++-- .../state/AbstractKeyedStateBackend.java | 5 +- .../state/DefaultOperatorStateBackend.java | 5 +- .../state/StateInitializationContextImpl.java | 15 +- .../StateSnapshotContextSynchronousImpl.java | 5 +- .../state/filesystem/FileStateHandle.java | 18 +- .../flink/runtime/taskmanager/Task.java | 5 + .../streaming/runtime/tasks/StreamTask.java | 6 +- .../StateInitializationContextImplTest.java | 6 +- ...ateSnapshotContextSynchronousImplTest.java | 4 +- .../AbstractStreamOperatorTestHarness.java | 10 +- .../test/checkpointing/RescalingITCase.java | 1 + 28 files changed, 1239 insertions(+), 133 deletions(-) create mode 100644 flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java create mode 100644 flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataInputStream.java create mode 100644 flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataOutputStream.java create mode 100644 flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStreamWrapper.java create mode 100644 flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStreamWrapper.java create mode 100644 flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java create mode 100644 flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java create mode 100644 flink-core/src/main/java/org/apache/flink/core/fs/WrappingProxyCloseable.java rename flink-runtime/src/main/java/org/apache/flink/runtime/state/ClosableRegistry.java => flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java (59%) create mode 100644 flink-core/src/main/java/org/apache/flink/util/WrappingProxy.java create mode 100644 flink-core/src/main/java/org/apache/flink/util/WrappingProxyUtil.java create mode 100644 flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java index a6fc17ee65d41..07f48fc1675fe 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java @@ -18,20 +18,6 @@ package org.apache.flink.api.common.operators; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.InvalidProgramException; @@ -58,6 +44,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.CompositeType; import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.core.fs.local.LocalFileSystem; import org.apache.flink.metrics.MetricGroup; @@ -65,6 +52,20 @@ import org.apache.flink.types.Value; import org.apache.flink.util.Visitor; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + /** * Execution utility for serial, local, collection-based executions of Flink programs. */ @@ -571,7 +572,7 @@ private static final class CompletedFuture implements Future{ public CompletedFuture(Path entry) { try{ - LocalFileSystem fs = (LocalFileSystem) entry.getFileSystem(); + LocalFileSystem fs = (LocalFileSystem) FileSystem.getUnguardedFileSystem(entry.toUri()); result = entry.isAbsolute() ? new Path(entry.toUri().getPath()): new Path(fs.getWorkingDirectory(),entry); } catch (Exception e){ throw new RuntimeException("DistributedCache supports only local files for Collection Environments"); diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java b/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java new file mode 100644 index 0000000000000..81ba7abca8409 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs; + +import org.apache.flink.util.AbstractCloseableRegistry; + +import java.io.Closeable; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +/** + * This class allows to register instances of {@link Closeable}, which are all closed if this registry is closed. + *

+ * Registering to an already closed registry will throw an exception and close the provided {@link Closeable} + *

+ * All methods in this class are thread-safe. + */ +public class CloseableRegistry extends AbstractCloseableRegistry { + + private static final Object DUMMY = new Object(); + + public CloseableRegistry() { + super(new HashMap()); + } + + @Override + protected void doRegister(Closeable closeable, Map closeableMap) throws IOException { + closeableMap.put(closeable, DUMMY); + } + + @Override + protected void doUnRegister(Closeable closeable, Map closeableMap) { + closeableMap.remove(closeable); + } +} diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataInputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataInputStream.java new file mode 100644 index 0000000000000..23ac4f2b8bbff --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataInputStream.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs; + +import org.apache.flink.util.Preconditions; + +import java.io.IOException; + +/** + * This class is a {@link org.apache.flink.util.WrappingProxy} for {@link FSDataInputStream} that is used to + * implement a safety net against unclosed streams. + *

+ * See {@link SafetyNetCloseableRegistry} for more details on how this is utilized. + */ +public class ClosingFSDataInputStream + extends FSDataInputStreamWrapper + implements WrappingProxyCloseable { + + private final SafetyNetCloseableRegistry registry; + private final String debugInfo; + + private volatile boolean closed; + + private ClosingFSDataInputStream( + FSDataInputStream delegate, SafetyNetCloseableRegistry registry, String debugInfo) throws IOException { + super(delegate); + this.registry = Preconditions.checkNotNull(registry); + this.debugInfo = Preconditions.checkNotNull(debugInfo); + this.closed = false; + } + + public boolean isClosed() { + return closed; + } + + @Override + public void close() throws IOException { + if (!closed) { + closed = true; + registry.unregisterClosable(this); + inputStream.close(); + } + } + + @Override + public int hashCode() { + return inputStream.hashCode(); + } + + @Override + public boolean equals(Object obj) { + + if (this == obj) { + return true; + } + + if (obj instanceof ClosingFSDataInputStream) { + return inputStream.equals(((ClosingFSDataInputStream) obj).inputStream); + } + + return false; + } + + @Override + public String toString() { + return "ClosingFSDataInputStream(" + inputStream.toString() + ") : " + debugInfo; + } + + public static ClosingFSDataInputStream wrapSafe( + FSDataInputStream delegate, SafetyNetCloseableRegistry registry) throws IOException{ + return wrapSafe(delegate, registry, ""); + } + + public static ClosingFSDataInputStream wrapSafe( + FSDataInputStream delegate, SafetyNetCloseableRegistry registry, String debugInfo) throws IOException{ + + ClosingFSDataInputStream inputStream = new ClosingFSDataInputStream(delegate, registry, debugInfo); + registry.registerClosable(inputStream); + return inputStream; + } +} \ No newline at end of file diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataOutputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataOutputStream.java new file mode 100644 index 0000000000000..120ca67c29a8c --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataOutputStream.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs; + +import org.apache.flink.util.Preconditions; + +import java.io.IOException; + +/** + * This class is a {@link org.apache.flink.util.WrappingProxy} for {@link FSDataOutputStream} that is used to + * implement a safety net against unclosed streams. + *

+ * See {@link SafetyNetCloseableRegistry} for more details on how this is utilized. + */ +public class ClosingFSDataOutputStream + extends FSDataOutputStreamWrapper + implements WrappingProxyCloseable { + + private final SafetyNetCloseableRegistry registry; + private final String debugString; + + private volatile boolean closed; + + public ClosingFSDataOutputStream( + FSDataOutputStream delegate, SafetyNetCloseableRegistry registry) throws IOException { + this(delegate, registry, ""); + } + + private ClosingFSDataOutputStream( + FSDataOutputStream delegate, SafetyNetCloseableRegistry registry, String debugString) throws IOException { + super(delegate); + this.registry = Preconditions.checkNotNull(registry); + this.debugString = Preconditions.checkNotNull(debugString); + this.closed = false; + } + + public boolean isClosed() { + return closed; + } + + @Override + public void close() throws IOException { + if (!closed) { + closed = true; + registry.unregisterClosable(this); + outputStream.close(); + } + } + + @Override + public int hashCode() { + return outputStream.hashCode(); + } + + @Override + public boolean equals(Object obj) { + + if (this == obj) { + return true; + } + + if (obj instanceof ClosingFSDataOutputStream) { + return outputStream.equals(((ClosingFSDataOutputStream) obj).outputStream); + } + + return false; + } + + @Override + public String toString() { + return "ClosingFSDataOutputStream(" + outputStream.toString() + ") : " + debugString; + } + + public static ClosingFSDataOutputStream wrapSafe( + FSDataOutputStream delegate, SafetyNetCloseableRegistry registry) throws IOException { + return wrapSafe(delegate, registry, ""); + } + + public static ClosingFSDataOutputStream wrapSafe( + FSDataOutputStream delegate, SafetyNetCloseableRegistry registry, String debugInfo) throws IOException { + + ClosingFSDataOutputStream inputStream = new ClosingFSDataOutputStream(delegate, registry, debugInfo); + registry.registerClosable(inputStream); + return inputStream; + } +} \ No newline at end of file diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStreamWrapper.java b/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStreamWrapper.java new file mode 100644 index 0000000000000..507b7569a1b22 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStreamWrapper.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs; + +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.WrappingProxy; + +import java.io.IOException; + +/** + * Simple forwarding wrapper around {@link FSDataInputStream} + */ +public class FSDataInputStreamWrapper extends FSDataInputStream implements WrappingProxy { + + protected final FSDataInputStream inputStream; + + public FSDataInputStreamWrapper(FSDataInputStream inputStream) { + this.inputStream = Preconditions.checkNotNull(inputStream); + } + + @Override + public void seek(long desired) throws IOException { + inputStream.seek(desired); + } + + @Override + public long getPos() throws IOException { + return inputStream.getPos(); + } + + @Override + public int read() throws IOException { + return inputStream.read(); + } + + @Override + public int read(byte[] b) throws IOException { + return inputStream.read(b); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + return inputStream.read(b, off, len); + } + + @Override + public long skip(long n) throws IOException { + return inputStream.skip(n); + } + + @Override + public int available() throws IOException { + return inputStream.available(); + } + + @Override + public void close() throws IOException { + inputStream.close(); + } + + @Override + public void mark(int readlimit) { + inputStream.mark(readlimit); + } + + @Override + public void reset() throws IOException { + inputStream.reset(); + } + + @Override + public boolean markSupported() { + return inputStream.markSupported(); + } + + @Override + public FSDataInputStream getWrappedDelegate() { + return inputStream; + } +} \ No newline at end of file diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStreamWrapper.java b/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStreamWrapper.java new file mode 100644 index 0000000000000..36ebe10e4f577 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStreamWrapper.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs; + +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.WrappingProxy; + +import java.io.IOException; + +/** + * Simple forwarding wrapper around {@link FSDataInputStream} + */ +public class FSDataOutputStreamWrapper extends FSDataOutputStream implements WrappingProxy { + + protected final FSDataOutputStream outputStream; + + public FSDataOutputStreamWrapper(FSDataOutputStream outputStream) { + this.outputStream = Preconditions.checkNotNull(outputStream); + } + + @Override + public long getPos() throws IOException { + return outputStream.getPos(); + } + + @Override + public void flush() throws IOException { + outputStream.flush(); + } + + @Override + public void sync() throws IOException { + outputStream.sync(); + } + + @Override + public void write(int b) throws IOException { + outputStream.write(b); + } + + @Override + public void write(byte[] b) throws IOException { + outputStream.write(b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + outputStream.write(b, off, len); + } + + @Override + public void close() throws IOException { + outputStream.close(); + } + + @Override + public FSDataOutputStream getWrappedDelegate() { + return outputStream; + } +} \ No newline at end of file diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java index 1844d6487429e..5a608b5ef1162 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java @@ -25,6 +25,14 @@ package org.apache.flink.core.fs; +import org.apache.flink.annotation.Public; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.OperatingSystem; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; @@ -34,11 +42,6 @@ import java.util.HashMap; import java.util.Map; -import org.apache.flink.annotation.Public; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.util.OperatingSystem; - /** * An abstract base class for a fairly generic file system. It * may be implemented as a distributed file system, or as a local @@ -47,6 +50,8 @@ @Public public abstract class FileSystem { + private static final InheritableThreadLocal REGISTRIES = new InheritableThreadLocal<>(); + private static final String LOCAL_FILESYSTEM_CLASS = "org.apache.flink.core.fs.local.LocalFileSystem"; private static final String HADOOP_WRAPPER_FILESYSTEM_CLASS = "org.apache.flink.runtime.fs.hdfs.HadoopFileSystem"; @@ -55,6 +60,39 @@ public abstract class FileSystem { private static final String HADOOP_WRAPPER_SCHEME = "hdwrapper"; + private static final Logger LOG = LoggerFactory.getLogger(FileSystem.class); + + /** + * Create a SafetyNetCloseableRegistry for a Task. This method should be called at the beginning of the task's + * main thread. + */ + public static void createFileSystemCloseableRegistryForTask() { + SafetyNetCloseableRegistry oldRegistry = REGISTRIES.get(); + if (null != oldRegistry) { + IOUtils.closeQuietly(oldRegistry); + LOG.warn("Found existing SafetyNetCloseableRegistry. Closed and replaced it."); + } + SafetyNetCloseableRegistry newRegistry = new SafetyNetCloseableRegistry(); + REGISTRIES.set(newRegistry); + } + + /** + * Create a SafetyNetCloseableRegistry for a Task. This method should be called at the end of the task's + * main thread or when the task should be canceled. + */ + public static void disposeFileSystemCloseableRegistryForTask() { + SafetyNetCloseableRegistry registry = REGISTRIES.get(); + if (null != registry) { + LOG.info("Ensuring all FileSystem streams are closed"); + REGISTRIES.remove(); + IOUtils.closeQuietly(registry); + } + } + + private static FileSystem wrapWithSafetyNetWhenInTask(FileSystem fs) { + SafetyNetCloseableRegistry reg = REGISTRIES.get(); + return reg != null ? new SafetyNetWrapperFileSystem(fs, reg) : fs; + } /** Object used to protect calls to specific methods.*/ private static final Object SYNCHRONIZATION_OBJECT = new Object(); @@ -63,7 +101,7 @@ public abstract class FileSystem { * Enumeration for write modes. * */ - public static enum WriteMode { + public enum WriteMode { /** Creates write path if it does not exist. Does not overwrite existing files and directories. */ NO_OVERWRITE, @@ -214,18 +252,7 @@ public static void setDefaultScheme(Configuration config) throws IOException { } } - /** - * Returns a reference to the {@link FileSystem} instance for accessing the - * file system identified by the given {@link URI}. - * - * @param uri - * the {@link URI} identifying the file system - * @return a reference to the {@link FileSystem} instance for accessing the file system identified by the given - * {@link URI}. - * @throws IOException - * thrown if a reference to the file system instance could not be obtained - */ - public static FileSystem get(URI uri) throws IOException { + public static FileSystem getUnguardedFileSystem(URI uri) throws IOException { FileSystem fs; URI asked = uri; @@ -238,13 +265,13 @@ public static FileSystem get(URI uri) throws IOException { } uri = new URI(defaultScheme.getScheme(), null, defaultScheme.getHost(), - defaultScheme.getPort(), uri.getPath(), null, null); + defaultScheme.getPort(), uri.getPath(), null, null); } catch (URISyntaxException e) { try { if (defaultScheme.getScheme().equals("file")) { uri = new URI("file", null, - new Path(new File(uri.getPath()).getAbsolutePath()).toUri().getPath(), null); + new Path(new File(uri.getPath()).getAbsolutePath()).toUri().getPath(), null); } } catch (URISyntaxException ex) { // we tried to repair it, but could not. report the scheme error @@ -255,8 +282,8 @@ public static FileSystem get(URI uri) throws IOException { if(uri.getScheme() == null) { throw new IOException("The URI '" + uri + "' is invalid.\n" + - "The fs.default-scheme = " + defaultScheme + ", the requested URI = " + asked + - ", and the final URI = " + uri + "."); + "The fs.default-scheme = " + defaultScheme + ", the requested URI = " + asked + + ", and the final URI = " + uri + "."); } if (uri.getScheme().equals("file") && uri.getAuthority() != null && !uri.getAuthority().isEmpty()) { @@ -294,7 +321,7 @@ public static FileSystem get(URI uri) throws IOException { } else { // we can not read from this file system. throw new IOException("No file system found with scheme " + uri.getScheme() - + ", referenced in file URI '" + uri.toString() + "'."); + + ", referenced in file URI '" + uri.toString() + "'."); } } else { // we end up here if we have a file system with build-in flink support. @@ -315,6 +342,21 @@ public static FileSystem get(URI uri) throws IOException { return fs; } + /** + * Returns a reference to the {@link FileSystem} instance for accessing the + * file system identified by the given {@link URI}. + * + * @param uri + * the {@link URI} identifying the file system + * @return a reference to the {@link FileSystem} instance for accessing the file system identified by the given + * {@link URI}. + * @throws IOException + * thrown if a reference to the file system instance could not be obtained + */ + public static FileSystem get(URI uri) throws IOException { + return wrapWithSafetyNetWhenInTask(getUnguardedFileSystem(uri)); + } + /** * Returns a boolean indicating whether a scheme has built-in Flink support. * diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java new file mode 100644 index 0000000000000..de4fb3096e1f7 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs; + +import org.apache.flink.util.AbstractCloseableRegistry; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.WrappingProxyUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.lang.ref.PhantomReference; +import java.lang.ref.ReferenceQueue; +import java.util.IdentityHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +/** + * This implementation of an {@link AbstractCloseableRegistry} registers {@link WrappingProxyCloseable}. When + * the proxy becomes subject to GC, this registry takes care of closing unclosed {@link Closeable}s. + *

+ * Phantom references are used to track when {@link org.apache.flink.util.WrappingProxy}s of {@link Closeable} got + * GC'ed. We ensure that the wrapped {@link Closeable} is properly closed to avoid resource leaks. + *

+ * Other than that, it works like a normal {@link CloseableRegistry}. + *

+ * All methods in this class are thread-safe. + */ +public class SafetyNetCloseableRegistry extends + AbstractCloseableRegistry, + SafetyNetCloseableRegistry.PhantomDelegatingCloseableRef> { + + private static final Logger LOG = LoggerFactory.getLogger(SafetyNetCloseableRegistry.class); + private final ReferenceQueue> referenceQueue; + private final Thread reaperThread; + + public SafetyNetCloseableRegistry() { + super(new IdentityHashMap()); + this.referenceQueue = new ReferenceQueue<>(); + this.reaperThread = new CloseableReaperThread(); + reaperThread.start(); + } + + @Override + protected void doRegister( + WrappingProxyCloseable wrappingProxyCloseable, + Map closeableMap) throws IOException { + + Closeable innerCloseable = WrappingProxyUtil.stripProxy(wrappingProxyCloseable.getWrappedDelegate()); + + if (null == innerCloseable) { + return; + } + + PhantomDelegatingCloseableRef phantomRef = + new PhantomDelegatingCloseableRef(wrappingProxyCloseable, referenceQueue); + + closeableMap.put(innerCloseable, phantomRef); + } + + @Override + protected void doUnRegister( + WrappingProxyCloseable closeable, + Map closeableMap) { + + Closeable innerCloseable = WrappingProxyUtil.stripProxy(closeable.getWrappedDelegate()); + + if (null == innerCloseable) { + return; + } + + closeableMap.remove(innerCloseable); + } + + /** + * Phantom reference to {@link WrappingProxyCloseable}. + */ + static final class PhantomDelegatingCloseableRef + extends PhantomReference> + implements Closeable { + + private final Closeable innerCloseable; + private final String debugString; + + public PhantomDelegatingCloseableRef( + WrappingProxyCloseable referent, + ReferenceQueue> q) { + + super(referent, q); + this.innerCloseable = Preconditions.checkNotNull(WrappingProxyUtil.stripProxy(referent)); + this.debugString = referent.toString(); + } + + public Closeable getInnerCloseable() { + return innerCloseable; + } + + public String getDebugString() { + return debugString; + } + + @Override + public void close() throws IOException { + innerCloseable.close(); + } + } + + /** + * Reaper runnable collects and closes leaking resources + */ + final class CloseableReaperThread extends Thread { + + public CloseableReaperThread() { + super("CloseableReaperThread"); + this.running = false; + } + + private volatile boolean running; + + @Override + public void run() { + this.running = true; + try { + List closeableList = new LinkedList<>(); + while (running) { + PhantomDelegatingCloseableRef oldRef = (PhantomDelegatingCloseableRef) referenceQueue.remove(); + synchronized (getSynchronizationLock()) { + do { + closeableList.add(oldRef); + closeableToRef.remove(oldRef.getInnerCloseable()); + } + while ((oldRef = (PhantomDelegatingCloseableRef) referenceQueue.poll()) != null); + } + + // close outside the synchronized block in case this is blocking + for (PhantomDelegatingCloseableRef closeableRef : closeableList) { + IOUtils.closeQuietly(closeableRef); + if (LOG.isDebugEnabled()) { + LOG.debug("Closing unclosed resource: " + closeableRef.getDebugString()); + } + } + + closeableList.clear(); + } + } catch (InterruptedException e) { + // done + } + } + + @Override + public void interrupt() { + this.running = false; + super.interrupt(); + } + } + + @Override + public void close() throws IOException { + super.close(); + reaperThread.interrupt(); + } +} diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java new file mode 100644 index 0000000000000..bf30b4f49a6d0 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs; + +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.WrappingProxy; + +import java.io.IOException; +import java.net.URI; + +/** + * This is a {@link WrappingProxy} around {@link FileSystem} which (i) wraps all opened streams as + * {@link ClosingFSDataInputStream} or {@link ClosingFSDataOutputStream} and (ii) registers them to + * a {@link SafetyNetCloseableRegistry}. + * + * Streams obtained by this are therefore managed by the {@link SafetyNetCloseableRegistry} to prevent resource leaks + * from unclosed streams. + */ +public class SafetyNetWrapperFileSystem extends FileSystem implements WrappingProxy { + + private final SafetyNetCloseableRegistry registry; + private final FileSystem unsafeFileSystem; + + public SafetyNetWrapperFileSystem(FileSystem unsafeFileSystem, SafetyNetCloseableRegistry registry) { + this.registry = Preconditions.checkNotNull(registry); + this.unsafeFileSystem = Preconditions.checkNotNull(unsafeFileSystem); + } + + @Override + public Path getWorkingDirectory() { + return unsafeFileSystem.getWorkingDirectory(); + } + + @Override + public Path getHomeDirectory() { + return unsafeFileSystem.getHomeDirectory(); + } + + @Override + public URI getUri() { + return unsafeFileSystem.getUri(); + } + + @Override + public void initialize(URI name) throws IOException { + unsafeFileSystem.initialize(name); + } + + @Override + public FileStatus getFileStatus(Path f) throws IOException { + return unsafeFileSystem.getFileStatus(f); + } + + @Override + public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException { + return unsafeFileSystem.getFileBlockLocations(file, start, len); + } + + @Override + public FSDataInputStream open(Path f, int bufferSize) throws IOException { + FSDataInputStream innerStream = unsafeFileSystem.open(f, bufferSize); + return ClosingFSDataInputStream.wrapSafe(innerStream, registry, String.valueOf(f)); + } + + @Override + public FSDataInputStream open(Path f) throws IOException { + FSDataInputStream innerStream = unsafeFileSystem.open(f); + return ClosingFSDataInputStream.wrapSafe(innerStream, registry, String.valueOf(f)); + } + + @Override + public long getDefaultBlockSize() { + return unsafeFileSystem.getDefaultBlockSize(); + } + + @Override + public FileStatus[] listStatus(Path f) throws IOException { + return unsafeFileSystem.listStatus(f); + } + + @Override + public boolean exists(Path f) throws IOException { + return unsafeFileSystem.exists(f); + } + + @Override + public boolean delete(Path f, boolean recursive) throws IOException { + return unsafeFileSystem.delete(f, recursive); + } + + @Override + public boolean mkdirs(Path f) throws IOException { + return unsafeFileSystem.mkdirs(f); + } + + @Override + public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, short replication, long blockSize) + throws IOException { + + FSDataOutputStream innerStream = unsafeFileSystem.create(f, overwrite, bufferSize, replication, blockSize); + return ClosingFSDataOutputStream.wrapSafe(innerStream, registry, String.valueOf(f)); + } + + @Override + public FSDataOutputStream create(Path f, boolean overwrite) throws IOException { + FSDataOutputStream innerStream = unsafeFileSystem.create(f, overwrite); + return ClosingFSDataOutputStream.wrapSafe(innerStream, registry, String.valueOf(f)); + } + + @Override + public boolean rename(Path src, Path dst) throws IOException { + return unsafeFileSystem.rename(src, dst); + } + + @Override + public boolean initOutPathLocalFS(Path outPath, WriteMode writeMode, boolean createDirectory) throws IOException { + return unsafeFileSystem.initOutPathLocalFS(outPath, writeMode, createDirectory); + } + + @Override + public boolean initOutPathDistFS(Path outPath, WriteMode writeMode, boolean createDirectory) throws IOException { + return unsafeFileSystem.initOutPathDistFS(outPath, writeMode, createDirectory); + } + + @Override + public boolean isDistributedFS() { + return unsafeFileSystem.isDistributedFS(); + } + + @Override + public FileSystem getWrappedDelegate() { + return unsafeFileSystem; + } +} \ No newline at end of file diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/WrappingProxyCloseable.java b/flink-core/src/main/java/org/apache/flink/core/fs/WrappingProxyCloseable.java new file mode 100644 index 0000000000000..b74fc78a5426d --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/core/fs/WrappingProxyCloseable.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs; + +import org.apache.flink.util.WrappingProxy; + +import java.io.Closeable; + +/** + * {@link WrappingProxy} for {@link Closeable} that is also closeable. + */ +public interface WrappingProxyCloseable extends Closeable, WrappingProxy { + +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ClosableRegistry.java b/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java similarity index 59% rename from flink-runtime/src/main/java/org/apache/flink/runtime/state/ClosableRegistry.java rename to flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java index b5f7dade2d168..7c0291c0fb75a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ClosableRegistry.java +++ b/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java @@ -16,29 +16,30 @@ * limitations under the License. */ -package org.apache.flink.runtime.state; - -import org.apache.commons.io.IOUtils; +package org.apache.flink.util; import java.io.Closeable; import java.io.IOException; -import java.util.HashSet; -import java.util.Set; +import java.util.Map; /** - * This class allows to register instances of {@link Closeable}, which are all closed if this registry is closed. + * This is the abstract base class for registries that allow to register instances of {@link Closeable}, which are all + * closed if this registry is closed. *

* Registering to an already closed registry will throw an exception and close the provided {@link Closeable} *

* All methods in this class are thread-safe. + * + * @param Type of the closeable this registers + * @param Type for potential meta data associated with the registering closeables */ -public class ClosableRegistry implements Closeable { +public abstract class AbstractCloseableRegistry implements Closeable { - private final Set registeredCloseables; + protected final Map closeableToRef; private boolean closed; - public ClosableRegistry() { - this.registeredCloseables = new HashSet<>(); + public AbstractCloseableRegistry(Map closeableToRef) { + this.closeableToRef = closeableToRef; this.closed = false; } @@ -46,23 +47,23 @@ public ClosableRegistry() { * Registers a {@link Closeable} with the registry. In case the registry is already closed, this method throws an * {@link IllegalStateException} and closes the passed {@link Closeable}. * - * @param closeable Closable tor register - * @return true if the the Closable was newly added to the registry + * @param closeable Closeable tor register + * @return true if the the Closeable was newly added to the registry * @throws IOException exception when the registry was closed before */ - public boolean registerClosable(Closeable closeable) throws IOException { + public final void registerClosable(C closeable) throws IOException { if (null == closeable) { - return false; + return; } synchronized (getSynchronizationLock()) { if (closed) { IOUtils.closeQuietly(closeable); - throw new IOException("Cannot register Closable, registry is already closed. Closed passed closable."); + throw new IOException("Cannot register Closeable, registry is already closed. Closing argument."); } - return registeredCloseables.add(closeable); + doRegister(closeable, closeableToRef); } } @@ -72,14 +73,14 @@ public boolean registerClosable(Closeable closeable) throws IOException { * @param closeable instance to remove from the registry. * @return true, if the instance was actually registered and now removed */ - public boolean unregisterClosable(Closeable closeable) { + public final void unregisterClosable(C closeable) { if (null == closeable) { - return false; + return; } synchronized (getSynchronizationLock()) { - return registeredCloseables.remove(closeable); + doUnRegister(closeable, closeableToRef); } } @@ -87,11 +88,12 @@ public boolean unregisterClosable(Closeable closeable) { public void close() throws IOException { synchronized (getSynchronizationLock()) { - for (Closeable closeable : registeredCloseables) { + for (Closeable closeable : closeableToRef.keySet()) { IOUtils.closeQuietly(closeable); } - registeredCloseables.clear(); + closeableToRef.clear(); + closed = true; } } @@ -102,7 +104,11 @@ public boolean isClosed() { } } - private Object getSynchronizationLock() { - return registeredCloseables; + protected final Object getSynchronizationLock() { + return closeableToRef; } + + protected abstract void doUnRegister(C closeable, Map closeableMap); + + protected abstract void doRegister(C closeable, Map closeableMap) throws IOException; } diff --git a/flink-core/src/main/java/org/apache/flink/util/IOUtils.java b/flink-core/src/main/java/org/apache/flink/util/IOUtils.java index 12d70ce6a76c4..9810271505915 100644 --- a/flink-core/src/main/java/org/apache/flink/util/IOUtils.java +++ b/flink-core/src/main/java/org/apache/flink/util/IOUtils.java @@ -18,14 +18,15 @@ package org.apache.flink.util; +import org.slf4j.Logger; + +import java.io.Closeable; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.io.PrintStream; import java.net.Socket; -import org.slf4j.Logger; - /** * An utility class for I/O related functionality. * @@ -213,6 +214,16 @@ public static void closeSocket(final Socket sock) { } } } + + public static void closeQuietly(Closeable closeable) { + try { + if (closeable != null) { + closeable.close(); + } + } catch (IOException ignored) { + + } + } // ------------------------------------------------------------------------ diff --git a/flink-core/src/main/java/org/apache/flink/util/WrappingProxy.java b/flink-core/src/main/java/org/apache/flink/util/WrappingProxy.java new file mode 100644 index 0000000000000..82fcf04353c9a --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/util/WrappingProxy.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +public interface WrappingProxy { + + T getWrappedDelegate(); + +} diff --git a/flink-core/src/main/java/org/apache/flink/util/WrappingProxyUtil.java b/flink-core/src/main/java/org/apache/flink/util/WrappingProxyUtil.java new file mode 100644 index 0000000000000..0f62abd607ff4 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/util/WrappingProxyUtil.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +public final class WrappingProxyUtil { + + private WrappingProxyUtil() { + throw new AssertionError(); + } + + public static T stripProxy(T object) { + while (object instanceof WrappingProxy) { + object = ((WrappingProxy) object).getWrappedDelegate(); + } + return object; + } +} \ No newline at end of file diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTest.java index 04ebc0e64392d..1bde2fb5d06db 100644 --- a/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTest.java +++ b/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTest.java @@ -17,32 +17,37 @@ */ package org.apache.flink.core.fs; +import org.apache.flink.core.fs.local.LocalFileSystem; +import org.apache.flink.util.WrappingProxyUtil; +import org.junit.Test; + import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; -import org.apache.flink.core.fs.local.LocalFileSystem; -import org.junit.Test; -import static org.junit.Assert.*; + +import static org.junit.Assert.assertTrue; public class FileSystemTest { + @Test public void testGet() throws URISyntaxException, IOException { String scheme = "file"; - - assertTrue(FileSystem.get(new URI(scheme + ":///test/test")) instanceof LocalFileSystem); - + + assertTrue(WrappingProxyUtil.stripProxy(FileSystem.get(new URI(scheme + ":///test/test"))) instanceof LocalFileSystem); + try { FileSystem.get(new URI(scheme + "://test/test")); } catch (IOException ioe) { assertTrue(ioe.getMessage().startsWith("Found local file path with authority '")); } - assertTrue(FileSystem.get(new URI(scheme + ":/test/test")) instanceof LocalFileSystem); - - assertTrue(FileSystem.get(new URI(scheme + ":test/test")) instanceof LocalFileSystem); + assertTrue(WrappingProxyUtil.stripProxy(FileSystem.get(new URI(scheme + ":/test/test"))) instanceof LocalFileSystem); - assertTrue(FileSystem.get(new URI("/test/test")) instanceof LocalFileSystem); - - assertTrue(FileSystem.get(new URI("test/test")) instanceof LocalFileSystem); + assertTrue(WrappingProxyUtil.stripProxy(FileSystem.get(new URI(scheme + ":test/test"))) instanceof LocalFileSystem); + + assertTrue(WrappingProxyUtil.stripProxy(FileSystem.get(new URI("/test/test"))) instanceof LocalFileSystem); + + assertTrue(WrappingProxyUtil.stripProxy(FileSystem.get(new URI("test/test"))) instanceof LocalFileSystem); } + } diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java new file mode 100644 index 0000000000000..6628407841fc0 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; + +public class SafetyNetCloseableRegistryTest { + + private ProducerThread[] streamOpenThreads; + private SafetyNetCloseableRegistry closeableRegistry; + private AtomicInteger unclosedCounter; + + @Before + public void setup() { + this.closeableRegistry = new SafetyNetCloseableRegistry(); + this.unclosedCounter = new AtomicInteger(0); + this.streamOpenThreads = new ProducerThread[10]; + for (int i = 0; i < streamOpenThreads.length; ++i) { + streamOpenThreads[i] = new ProducerThread(closeableRegistry, unclosedCounter, Integer.MAX_VALUE); + } + } + + private void startThreads(int maxStreams) { + for (ProducerThread t : streamOpenThreads) { + t.setMaxStreams(maxStreams); + t.start(); + } + } + + private void joinThreads() throws InterruptedException { + for (Thread t : streamOpenThreads) { + t.join(); + } + } + + @Test + public void testClose() throws Exception { + + startThreads(Integer.MAX_VALUE); + + for (int i = 0; i < 5; ++i) { + System.gc(); + Thread.sleep(40); + } + + closeableRegistry.close(); + + joinThreads(); + + Assert.assertEquals(0, unclosedCounter.get()); + + try { + + WrappingProxyCloseable testCloseable = new WrappingProxyCloseable() { + @Override + public Closeable getWrappedDelegate() { + return this; + } + + @Override + public void close() throws IOException { + unclosedCounter.incrementAndGet(); + } + }; + + closeableRegistry.registerClosable(testCloseable); + + Assert.fail("Closed registry should not accept closeables!"); + + } catch (IOException expected) { + //expected + } + + Assert.assertEquals(1, unclosedCounter.get()); + } + + @Test + public void testSafetyNetClose() throws Exception { + + startThreads(20); + + joinThreads(); + + for (int i = 0; i < 5 && unclosedCounter.get() > 0; ++i) { + System.gc(); + Thread.sleep(50); + } + + Assert.assertEquals(0, unclosedCounter.get()); + closeableRegistry.close(); + } + + private static final class ProducerThread extends Thread { + + private final SafetyNetCloseableRegistry registry; + private final AtomicInteger refCount; + private int maxStreams; + + public ProducerThread(SafetyNetCloseableRegistry registry, AtomicInteger refCount, int maxStreams) { + this.registry = registry; + this.refCount = refCount; + this.maxStreams = maxStreams; + } + + public int getMaxStreams() { + return maxStreams; + } + + public void setMaxStreams(int maxStreams) { + this.maxStreams = maxStreams; + } + + @Override + public void run() { + try { + int count = 0; + while (maxStreams > 0) { + String debug = Thread.currentThread().getName() + " " + count; + TestStream testStream = new TestStream(refCount); + refCount.incrementAndGet(); + ClosingFSDataInputStream pis = ClosingFSDataInputStream.wrapSafe(testStream, registry, debug); //reference dies here + + try { + Thread.sleep(2); + } catch (InterruptedException e) { + + } + + if (maxStreams != Integer.MAX_VALUE) { + --maxStreams; + } + ++count; + } + } catch (Exception ex) { + + } + } + } + + private static final class TestStream extends FSDataInputStream { + + private AtomicInteger refCount; + + public TestStream(AtomicInteger refCount) { + this.refCount = refCount; + } + + @Override + public void seek(long desired) throws IOException { + + } + + @Override + public long getPos() throws IOException { + return 0; + } + + @Override + public int read() throws IOException { + return 0; + } + + @Override + public void close() throws IOException { + if (refCount != null) { + refCount.decrementAndGet(); + refCount = null; + } + } + } +} \ No newline at end of file diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java index b5bdcaf59e23b..d79be05d32054 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java @@ -18,19 +18,8 @@ package org.apache.flink.runtime.filecache; -import java.io.File; -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.Callable; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.FutureTask; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - import org.apache.commons.io.FileUtils; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.configuration.ConfigConstants; @@ -40,13 +29,23 @@ import org.apache.flink.core.fs.FileStatus; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; -import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.util.ExecutorThreadFactory; import org.apache.flink.util.IOUtils; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + /** * The FileCache is used to create the local files for the registered cache files when a task is deployed. * The files will be removed when the task is unregistered after a 5 second delay. @@ -236,8 +235,10 @@ boolean holdsStillReference(String name, JobID jobId) { // ------------------------------------------------------------------------ public static void copy(Path sourcePath, Path targetPath, boolean executable) throws IOException { - FileSystem sFS = sourcePath.getFileSystem(); - FileSystem tFS = targetPath.getFileSystem(); + // TODO rewrite this to make it participate in the closable registry and the lifecycle of a task. + // we unwrap the file system to get raw streams without safety net + FileSystem sFS = FileSystem.getUnguardedFileSystem(sourcePath.toUri()); + FileSystem tFS = FileSystem.getUnguardedFileSystem(targetPath.toUri()); if (!tFS.exists(targetPath)) { if (sFS.getFileStatus(sourcePath).isDir()) { tFS.mkdirs(targetPath); @@ -253,16 +254,11 @@ public static void copy(Path sourcePath, Path targetPath, boolean executable) th copy(content.getPath(), new Path(localPath), executable); } } else { - try { - FSDataOutputStream lfsOutput = tFS.create(targetPath, false); - FSDataInputStream fsInput = sFS.open(sourcePath); + try (FSDataOutputStream lfsOutput = tFS.create(targetPath, false); FSDataInputStream fsInput = sFS.open(sourcePath)) { IOUtils.copyBytes(fsInput, lfsOutput); //noinspection ResultOfMethodCallIgnored new File(targetPath.toString()).setExecutable(executable); - // closing the FSDataOutputStream - lfsOutput.close(); - } - catch (IOException ioe) { + } catch (IOException ioe) { LOG.error("could not copy file to local file cache.", ioe); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java index e5d9b2bf57a9f..ae71c7feba34d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java @@ -33,6 +33,7 @@ import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.util.Preconditions; @@ -80,7 +81,7 @@ public abstract class AbstractKeyedStateBackend protected final TaskKvStateRegistry kvStateRegistry; /** Registry for all opened streams, so they can be closed if the task using this backend is closed */ - protected ClosableRegistry cancelStreamRegistry; + protected CloseableRegistry cancelStreamRegistry; protected final ClassLoader userCodeClassLoader; @@ -96,7 +97,7 @@ public AbstractKeyedStateBackend( this.numberOfKeyGroups = Preconditions.checkNotNull(numberOfKeyGroups); this.userCodeClassLoader = Preconditions.checkNotNull(userCodeClassLoader); this.keyGroupRange = Preconditions.checkNotNull(keyGroupRange); - this.cancelStreamRegistry = new ClosableRegistry(); + this.cancelStreamRegistry = new CloseableRegistry(); } /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java index 2f5d3cb13bbbb..5b473628e73e0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java @@ -21,6 +21,7 @@ import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.core.memory.DataInputView; @@ -49,7 +50,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { private final Map> registeredStates; private final Collection restoreSnapshots; - private final ClosableRegistry closeStreamOnCancelRegistry; + private final CloseableRegistry closeStreamOnCancelRegistry; private final JavaSerializer javaSerializer; /** @@ -65,7 +66,7 @@ public DefaultOperatorStateBackend( this.javaSerializer = new JavaSerializer<>(userClassLoader); this.restoreSnapshots = restoreSnapshots; this.registeredStates = new HashMap<>(); - this.closeStreamOnCancelRegistry = new ClosableRegistry(); + this.closeStreamOnCancelRegistry = new CloseableRegistry(); } /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java index 8fbde051dd6be..b131d14a33d6f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.state.KeyedStateStore; import org.apache.flink.api.common.state.OperatorStateStore; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.util.Preconditions; @@ -36,7 +37,7 @@ public class StateInitializationContextImpl implements StateInitializationContext { /** Closable registry to participate in the operator's cancel/close methods */ - private final ClosableRegistry closableRegistry; + private final CloseableRegistry closableRegistry; /** Signal whether any state to restore was found */ private final boolean restored; @@ -55,7 +56,7 @@ public StateInitializationContextImpl( KeyedStateStore keyedStateStore, Collection keyGroupsStateHandles, Collection operatorStateHandles, - ClosableRegistry closableRegistry) { + CloseableRegistry closableRegistry) { this.restored = restored; this.closableRegistry = Preconditions.checkNotNull(closableRegistry); @@ -87,7 +88,7 @@ public Collection getKeyGroupsStateHandles() { return keyGroupsStateHandles; } - public ClosableRegistry getClosableRegistry() { + public CloseableRegistry getClosableRegistry() { return closableRegistry; } @@ -137,14 +138,14 @@ public void close() { private static class KeyGroupStreamIterator implements Iterator { private final Iterator stateHandleIterator; - private final ClosableRegistry closableRegistry; + private final CloseableRegistry closableRegistry; private KeyGroupsStateHandle currentStateHandle; private FSDataInputStream currentStream; private Iterator> currentOffsetsIterator; public KeyGroupStreamIterator( - Iterator stateHandleIterator, ClosableRegistry closableRegistry) { + Iterator stateHandleIterator, CloseableRegistry closableRegistry) { this.stateHandleIterator = Preconditions.checkNotNull(stateHandleIterator); this.closableRegistry = Preconditions.checkNotNull(closableRegistry); @@ -200,7 +201,7 @@ private static class OperatorStateStreamIterator implements Iterator stateHandleIterator; - private final ClosableRegistry closableRegistry; + private final CloseableRegistry closableRegistry; private OperatorStateHandle currentStateHandle; private FSDataInputStream currentStream; @@ -210,7 +211,7 @@ private static class OperatorStateStreamIterator implements Iterator stateHandleIterator, - ClosableRegistry closableRegistry) { + CloseableRegistry closableRegistry) { this.stateName = Preconditions.checkNotNull(stateName); this.stateHandleIterator = Preconditions.checkNotNull(stateHandleIterator); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java index d632529de49f7..ce8a6c4c3b516 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.state; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.util.Preconditions; import java.io.IOException; @@ -42,7 +43,7 @@ public class StateSnapshotContextSynchronousImpl implements StateSnapshotContext * Registry for opened streams to participate in the lifecycle of the stream task. Hence, this registry should be * obtained from and managed by the stream task. */ - private final ClosableRegistry closableRegistry; + private final CloseableRegistry closableRegistry; private KeyedStateCheckpointOutputStream keyedStateCheckpointOutputStream; private OperatorStateCheckpointOutputStream operatorStateCheckpointOutputStream; @@ -62,7 +63,7 @@ public StateSnapshotContextSynchronousImpl( long checkpointTimestamp, CheckpointStreamFactory streamFactory, KeyGroupRange keyGroupRange, - ClosableRegistry closableRegistry) { + CloseableRegistry closableRegistry) { this.checkpointId = checkpointId; this.checkpointTimestamp = checkpointTimestamp; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java index 29e905cceb413..b61c52d6c4146 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java @@ -43,9 +43,6 @@ public class FileStateHandle implements StreamStateHandle { /** The size of the state in the file */ private final long stateSize; - /** Cached file system handle */ - private transient FileSystem fs; - /** * Creates a new file state for the given file path. * @@ -79,13 +76,17 @@ public FSDataInputStream openInputStream() throws IOException { */ @Override public void discardState() throws Exception { - getFileSystem().delete(filePath, false); + + FileSystem fs = getFileSystem(); + + fs.delete(filePath, false); // send a call to delete the checkpoint directory containing the file. This will // fail (and be ignored) when some files still exist try { - getFileSystem().delete(filePath.getParent(), false); - } catch (IOException ignored) {} + fs.delete(filePath.getParent(), false); + } catch (IOException ignored) { + } } /** @@ -106,10 +107,7 @@ public long getStateSize() throws IOException { * @throws IOException Thrown if the file system cannot be accessed. */ private FileSystem getFileSystem() throws IOException { - if (fs == null) { - fs = FileSystem.get(filePath.toUri()); - } - return fs; + return FileSystem.get(filePath.toUri()); } // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index 3254fc1fbe300..c794f560f308a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -24,6 +24,7 @@ import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.cache.DistributedCache; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; @@ -538,6 +539,9 @@ else if (current == ExecutionState.CANCELING) { // check for canceling as a shortcut // ---------------------------- + // init closeable registry for this task + FileSystem.createFileSystemCloseableRegistryForTask(); + // first of all, get a user-code classloader // this may involve downloading the job's JAR files and/or classes LOG.info("Loading JAR files for task " + taskNameWithSubtask); @@ -758,6 +762,7 @@ else if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.FAILED)) { // remove all files in the distributed cache removeCachedFiles(distributedCacheEntries, fileCache); + FileSystem.disposeFileSystemCloseableRegistryForTask(); notifyFinalState(); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 4aaad71a3de30..659590130d801 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -23,6 +23,7 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.metrics.Gauge; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.SubtaskState; @@ -36,7 +37,6 @@ import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.ChainedStateHandle; import org.apache.flink.runtime.state.CheckpointStreamFactory; -import org.apache.flink.runtime.state.ClosableRegistry; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.OperatorStateBackend; @@ -161,7 +161,7 @@ public abstract class StreamTask> /** The currently active background materialization threads */ - private final ClosableRegistry cancelables = new ClosableRegistry(); + private final CloseableRegistry cancelables = new CloseableRegistry(); /** Flag to mark the task "in operation", in which case check * needs to be initialized to true, so that early cancel() before invoke() behaves correctly */ @@ -949,7 +949,7 @@ public void close() { } } - public ClosableRegistry getCancelables() { + public CloseableRegistry getCancelables() { return cancelables; } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java index 75c2261da59b4..cd940760ad0dc 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java @@ -20,13 +20,13 @@ import org.apache.flink.api.common.state.KeyedStateStore; import org.apache.flink.api.common.state.OperatorStateStore; +import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; -import org.apache.flink.runtime.state.ClosableRegistry; import org.apache.flink.runtime.state.DefaultOperatorStateBackend; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupRangeOffsets; @@ -58,7 +58,7 @@ public class StateInitializationContextImplTest { static final int NUM_HANDLES = 10; private StateInitializationContextImpl initializationContext; - private ClosableRegistry closableRegistry; + private CloseableRegistry closableRegistry; private int writtenKeyGroups; private Set writtenOperatorStates; @@ -70,7 +70,7 @@ public void setUp() throws Exception { this.writtenKeyGroups = 0; this.writtenOperatorStates = new HashSet<>(); - this.closableRegistry = new ClosableRegistry(); + this.closableRegistry = new CloseableRegistry(); OperatorStateStore stateStore = mock(OperatorStateStore.class); ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos(64); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java index 0ee839e8a7cd5..2b2df4c06b04a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java @@ -18,8 +18,8 @@ package org.apache.flink.streaming.api.operators; +import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.runtime.state.CheckpointStreamFactory; -import org.apache.flink.runtime.state.ClosableRegistry; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream; import org.apache.flink.runtime.state.OperatorStateCheckpointOutputStream; @@ -35,7 +35,7 @@ public class StateSnapshotContextSynchronousImplTest { @Before public void setUp() throws Exception { - ClosableRegistry closableRegistry = new ClosableRegistry(); + CloseableRegistry closableRegistry = new CloseableRegistry(); CheckpointStreamFactory streamFactory = new MemCheckpointStreamFactory(1024); KeyGroupRange keyGroupRange = new KeyGroupRange(0, 2); this.snapshotContext = new StateSnapshotContextSynchronousImpl(42, 4711, streamFactory, keyGroupRange, closableRegistry); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java index 23a31d52eddec..830cd6faadb0e 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.runtime.checkpoint.OperatorStateRepartitioner; @@ -32,7 +33,6 @@ import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.CheckpointStreamFactory; -import org.apache.flink.runtime.state.ClosableRegistry; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.OperatorStateBackend; @@ -65,7 +65,9 @@ import java.util.concurrent.ConcurrentLinkedQueue; import static org.mockito.Matchers.any; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; /** * Base class for {@code AbstractStreamOperator} test harnesses. @@ -86,7 +88,7 @@ public class AbstractStreamOperatorTestHarness { final Environment environment; - ClosableRegistry closableRegistry; + CloseableRegistry closableRegistry; // use this as default for tests protected AbstractStateBackend stateBackend = new MemoryStateBackend(); @@ -115,7 +117,7 @@ public AbstractStreamOperatorTestHarness( this.config = new StreamConfig(underlyingConfig); this.config.setCheckpointingEnabled(true); this.executionConfig = new ExecutionConfig(); - this.closableRegistry = new ClosableRegistry(); + this.closableRegistry = new CloseableRegistry(); this.checkpointLock = new Object(); environment = new MockEnvironment( diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java index 5a6417329010d..09de67f9c1c70 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java @@ -114,6 +114,7 @@ public static void setup() throws Exception { public static void teardown() { if (cluster != null) { cluster.shutdown(); + cluster.awaitTermination(); } }