From b19cfc6332d2034ac899f7c33cf1dd9278f81538 Mon Sep 17 00:00:00 2001 From: Stephan Ewen Date: Tue, 25 Jul 2017 17:19:25 +0200 Subject: [PATCH 1/2] [FLINK-7265] [core] Introduce FileSystemKind and ConsistencyLevel for FileSystem These describe the characteristics of the file system, such as consistency and support for directories and efficient directory operations. --- .../flink/core/fs/ConsistencyLevel.java | 119 ++++++++++++++++++ .../org/apache/flink/core/fs/FileSystem.java | 5 + .../apache/flink/core/fs/FileSystemKind.java | 111 ++++++++++++++++ .../core/fs/SafetyNetWrapperFileSystem.java | 5 + .../flink/core/fs/local/LocalFileSystem.java | 10 ++ .../flink/core/fs/FileSystemKindTest.java | 68 ++++++++++ .../core/fs/local/LocalFileSystemTest.java | 13 ++ .../flink/runtime/fs/hdfs/HdfsKindTest.java | 101 +++++++++++++++ .../runtime/fs/hdfs/HadoopFileSystem.java | 45 +++++++ .../runtime/fs/maprfs/MapRFileSystem.java | 7 +- 10 files changed, 483 insertions(+), 1 deletion(-) create mode 100644 flink-core/src/main/java/org/apache/flink/core/fs/ConsistencyLevel.java create mode 100644 flink-core/src/main/java/org/apache/flink/core/fs/FileSystemKind.java create mode 100644 flink-core/src/test/java/org/apache/flink/core/fs/FileSystemKindTest.java create mode 100644 flink-fs-tests/src/test/java/org/apache/flink/runtime/fs/hdfs/HdfsKindTest.java diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/ConsistencyLevel.java b/flink-core/src/main/java/org/apache/flink/core/fs/ConsistencyLevel.java new file mode 100644 index 0000000000000..7072deeaf6b25 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/core/fs/ConsistencyLevel.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * An enumeration describing the level of consistency offered by a {@link FileSystem}. + * + *

The consistency levels described here make statements about the visibility + * of file existence and file contents in the presence of new file creation, + * file deletion, file renaming, and directory listing. + * + *

An operation is defined as consistent if the following holds: After the function + * call triggering the operation returns, its result is immediately reflected in + * in the view presented to any other party calling a file system function. + * + *

Please note that these levels do not make any statements about the effects or visibility of + * file content modification or file appends. In fact, content modification or appending are + * not supported in various file systems. + * + *

Some of these consistency levels indicate that the storage system does not actually + * qualify to be called a FileSystem, but rather a blob-/object store. + */ +@PublicEvolving +public enum ConsistencyLevel { + + /** + * This consistency level only guarantees that files are visible with a consistent + * view of their contents after their initial creation, once the writing stream has been closed. + * Any modifications, renames, deletes are not guaranteed to be immediately visible in a + * consistent manner. + * + *

To access a file/object consistently, the full path/key must be provided. Enumeration + * of files/objects is not consistent. + * + *

An example of a storage system with this consistency level is Amazon's S3 object store. + * + *

This is the weakest consistency level with which Flink's checkpointing can work. + * + * Summary: + *

+ */ + READ_AFTER_CREATE, + + /** + * This consistency level guarantees that files are visible with a consistent + * view of their contents after their initial creation, and after renaming them. + * The non-existence is consistently visible after delete operations. + * + *

To access a file/object consistently, the full path/key must be provided. Enumeration + * of files/objects is not necessarily consistent. + * + * Summary: + *

+ */ + CONSISTENT_RENAME_DELETE, + + /** + * This consistency level guarantees that files are visible with a consistent + * view of their contents after their initial creation, as well as after renaming operations. + * File deletion is immediately visible to all parties. + * + *

Directory listings are consistent, meaning after file creation/rename/delete, the file + * existence or non-existence is reflected when enumerating the parent directory's contents. + * + *

An example of storage systems and file systems falling under this consistency level are + * HDFS, MapR FS, and the Windows file systems. + * + * Summary: + *

+ */ + CONSISTENT_LIST_RENAME_DELETE, + + /** + * This consistency level has all guarantees of {@link ConsistencyLevel#CONSISTENT_LIST_RENAME_DELETE} + * and supports in addition atomic renames of files. + * + * Summary: + * + */ + POSIX_STYLE_CONSISTENCY +} diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java index fab0f4d67cb0e..e76992d325804 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java @@ -659,6 +659,11 @@ public FSDataOutputStream create(Path f, boolean overwrite) throws IOException { */ public abstract boolean isDistributedFS(); + /** + * Gets a description of the characteristics of this file system. + */ + public abstract FileSystemKind getKind(); + // ------------------------------------------------------------------------ // output directory initialization // ------------------------------------------------------------------------ diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemKind.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemKind.java new file mode 100644 index 0000000000000..5a49c16993203 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemKind.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * An enumeration defining the kind and characteristics of a {@link FileSystem}. + * + *

Note that the categorization here makes statements only about consistency and + * directory handling. It explicitly does not look at the ability to modify files, + * which we assume for no operation going through Flink's File System abstraction, at the moment. + * This might change in the future. + */ +@PublicEvolving +public enum FileSystemKind { + + /** + * A POSIX compliant file system, as for example found on UNIX / Linux. + * + *

Posix file systems support directories, a consistent view, atomic renames, + * and deletion of open files. + */ + POSIX_COMPLIANT(ConsistencyLevel.POSIX_STYLE_CONSISTENCY), + + /** + * A file system that gives a consistent view of its contents. + * + *

File systems in this category are for example Windows file systems, + * HDFS, or MapR FS. They support directories, a consistent view, but not + * necessarily atomic file renaming, or deletion of open files. + */ + CONSISTENT_FILESYSTEM(ConsistencyLevel.CONSISTENT_LIST_RENAME_DELETE), + + /** + * A consistent object store (not an actual file system). + * + *

"File systems" of this kind support no real directories, but and no consistent + * renaming and delete operations. + */ + CONSISTENT_OBJECT_STORE(ConsistencyLevel.CONSISTENT_RENAME_DELETE), + + /** + * An eventually consistent object store (not an actual file system), like + * Amazon's S3. + * + *

"File systems" of this kind support no real directories and no consistent + * renaming and delete operations. + */ + EVENTUALLY_CONSISTENT_OBJECT_STORE(ConsistencyLevel.READ_AFTER_CREATE); + + // ------------------------------------------------------------------------ + + private final ConsistencyLevel consistencyLevel; + + FileSystemKind(ConsistencyLevel consistencyLevel) { + this.consistencyLevel = consistencyLevel; + } + + /** + * Gets the consistency level of the file system, which describes how consistently + * visible operations like file creation, deletion, and directory listings are. + */ + public ConsistencyLevel consistencyLevel() { + return consistencyLevel; + } + + + /** + * Checks whether the file system support real directories, or whether it actually only + * encodes a hierarchy into the file names. + */ + public boolean supportsRealDirectories() { + return this != EVENTUALLY_CONSISTENT_OBJECT_STORE && this != CONSISTENT_OBJECT_STORE; + } + + /** + * Checks whether the file system supports efficient recursive directory deletes, or + * whether the + */ + public boolean supportsEfficientRecursiveDeletes() { + return this != EVENTUALLY_CONSISTENT_OBJECT_STORE && this != CONSISTENT_OBJECT_STORE; + } + + /** + * Checks whether the file system supports to delete files from a directory, while there + * are still open streams to the file. + * + *

File systems that do not support that operation typically throw an exception when trying + * to delete that file. + */ + public boolean supportsDeleteOfOpenFiles() { + return this == POSIX_COMPLIANT; + } +} diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java index 1dacafd2d9428..63a263a63e6e5 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java @@ -145,6 +145,11 @@ public boolean isDistributedFS() { return unsafeFileSystem.isDistributedFS(); } + @Override + public FileSystemKind getKind() { + return unsafeFileSystem.getKind(); + } + @Override public FileSystem getWrappedDelegate() { return unsafeFileSystem; diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java index 0e3e9f3baa275..288a3f3307a9c 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java @@ -31,6 +31,7 @@ import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.core.fs.FileStatus; import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.FileSystemKind; import org.apache.flink.core.fs.Path; import org.apache.flink.util.OperatingSystem; @@ -289,4 +290,13 @@ public boolean rename(final Path src, final Path dst) throws IOException { public boolean isDistributedFS() { return false; } + + @Override + public FileSystemKind getKind() { + return OperatingSystem.isLinux() || + OperatingSystem.isMac() || + OperatingSystem.isSolaris() || + OperatingSystem.isFreeBSD() ? + FileSystemKind.POSIX_COMPLIANT : FileSystemKind.CONSISTENT_FILESYSTEM; + } } diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemKindTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemKindTest.java new file mode 100644 index 0000000000000..31c1319d4663f --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemKindTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class FileSystemKindTest { + + @Test + public void testPosixFileSystemKind() { + final FileSystemKind kind = FileSystemKind.POSIX_COMPLIANT; + assertEquals(ConsistencyLevel.POSIX_STYLE_CONSISTENCY, kind.consistencyLevel()); + + assertTrue(kind.supportsDeleteOfOpenFiles()); + assertTrue(kind.supportsRealDirectories()); + assertTrue(kind.supportsEfficientRecursiveDeletes()); + } + + @Test + public void testConsistentFileSystemKind() { + final FileSystemKind kind = FileSystemKind.CONSISTENT_FILESYSTEM; + + assertEquals(ConsistencyLevel.CONSISTENT_LIST_RENAME_DELETE, kind.consistencyLevel()); + assertFalse(kind.supportsDeleteOfOpenFiles()); + assertTrue(kind.supportsRealDirectories()); + assertTrue(kind.supportsEfficientRecursiveDeletes()); + } + + @Test + public void testConsistentObjectStoreKind() { + final FileSystemKind kind = FileSystemKind.CONSISTENT_OBJECT_STORE; + + assertEquals(ConsistencyLevel.CONSISTENT_RENAME_DELETE, kind.consistencyLevel()); + assertFalse(kind.supportsDeleteOfOpenFiles()); + assertFalse(kind.supportsRealDirectories()); + assertFalse(kind.supportsEfficientRecursiveDeletes()); + } + + @Test + public void testEventuallyConsistentObjectStoreKind() { + final FileSystemKind kind = FileSystemKind.EVENTUALLY_CONSISTENT_OBJECT_STORE; + + assertEquals(ConsistencyLevel.READ_AFTER_CREATE, kind.consistencyLevel()); + assertFalse(kind.supportsDeleteOfOpenFiles()); + assertFalse(kind.supportsRealDirectories()); + assertFalse(kind.supportsEfficientRecursiveDeletes()); + } +} diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java index 2312ee9856ef3..8c113ef5d26d9 100644 --- a/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java +++ b/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java @@ -34,10 +34,12 @@ import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.core.fs.FileStatus; import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.FileSystemKind; import org.apache.flink.core.fs.Path; import org.apache.flink.core.fs.FileSystem.WriteMode; import org.apache.flink.util.FileUtils; +import org.apache.flink.util.OperatingSystem; import org.junit.Assume; import org.junit.Rule; import org.junit.Test; @@ -312,4 +314,15 @@ public void testRenameToNonEmptyTargetDir() throws IOException { assertTrue(fs.rename(new Path(srcFolder.toURI()), new Path(dstFolder.toURI()))); assertTrue(new File(dstFolder, srcFile.getName()).exists()); } + + @Test + public void testKind() { + final FileSystem fs = FileSystem.getLocalFileSystem(); + + if (OperatingSystem.isWindows()) { + assertEquals(FileSystemKind.CONSISTENT_FILESYSTEM, fs.getKind()); + } else { + assertEquals(FileSystemKind.POSIX_COMPLIANT, fs.getKind()); + } + } } diff --git a/flink-fs-tests/src/test/java/org/apache/flink/runtime/fs/hdfs/HdfsKindTest.java b/flink-fs-tests/src/test/java/org/apache/flink/runtime/fs/hdfs/HdfsKindTest.java new file mode 100644 index 0000000000000..141a40cfc053e --- /dev/null +++ b/flink-fs-tests/src/test/java/org/apache/flink/runtime/fs/hdfs/HdfsKindTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.fs.hdfs; + +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.FileSystemKind; +import org.apache.flink.core.fs.Path; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.io.IOException; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for extracting the {@link FileSystemKind} from file systems that Flink + * accesses through Hadoop's File System interface. + * + *

This class needs to be in this package, because it accesses package private methods + * from the HDFS file system wrapper class. + */ +public class HdfsKindTest extends TestLogger { + + @Test + public void testHdfsKind() throws IOException { + final FileSystem fs = new Path("hdfs://localhost:55445/my/file").getFileSystem(); + assertEquals(FileSystemKind.CONSISTENT_FILESYSTEM, fs.getKind()); + } + + @Test + public void testS3Kind() throws IOException { + try { + Class.forName("org.apache.hadoop.fs.s3.S3FileSystem"); + } catch (ClassNotFoundException ignored) { + // not in the classpath, cannot run this test + log.info("Skipping test 'testS3Kind()' because the S3 file system is not in the class path"); + return; + } + + final FileSystem s3 = new Path("s3://myId:mySecret@bucket/some/bucket/some/object").getFileSystem(); + assertEquals(FileSystemKind.EVENTUALLY_CONSISTENT_OBJECT_STORE, s3.getKind()); + } + + @Test + public void testS3nKind() throws IOException { + try { + Class.forName("org.apache.hadoop.fs.s3native.NativeS3FileSystem"); + } catch (ClassNotFoundException ignored) { + // not in the classpath, cannot run this test + log.info("Skipping test 'testS3nKind()' because the Native S3 file system is not in the class path"); + return; + } + + final FileSystem s3n = new Path("s3n://myId:mySecret@bucket/some/bucket/some/object").getFileSystem(); + assertEquals(FileSystemKind.EVENTUALLY_CONSISTENT_OBJECT_STORE, s3n.getKind()); + } + + @Test + public void testS3aKind() throws IOException { + try { + Class.forName("org.apache.hadoop.fs.s3a.S3AFileSystem"); + } catch (ClassNotFoundException ignored) { + // not in the classpath, cannot run this test + log.info("Skipping test 'testS3aKind()' because the S3AFileSystem is not in the class path"); + return; + } + + final FileSystem s3a = new Path("s3a://myId:mySecret@bucket/some/bucket/some/object").getFileSystem(); + assertEquals(FileSystemKind.EVENTUALLY_CONSISTENT_OBJECT_STORE, s3a.getKind()); + } + + @Test + public void testS3fileSystemSchemes() { + assertEquals(FileSystemKind.EVENTUALLY_CONSISTENT_OBJECT_STORE, HadoopFileSystem.getKindForScheme("s3")); + assertEquals(FileSystemKind.EVENTUALLY_CONSISTENT_OBJECT_STORE, HadoopFileSystem.getKindForScheme("s3n")); + assertEquals(FileSystemKind.EVENTUALLY_CONSISTENT_OBJECT_STORE, HadoopFileSystem.getKindForScheme("s3a")); + assertEquals(FileSystemKind.EVENTUALLY_CONSISTENT_OBJECT_STORE, HadoopFileSystem.getKindForScheme("EMRFS")); + } + + @Test + public void testViewFs() { + assertEquals(FileSystemKind.CONSISTENT_FILESYSTEM, HadoopFileSystem.getKindForScheme("viewfs")); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java index 7ab7ab7705037..a2775d587d6b1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java @@ -23,11 +23,13 @@ import org.apache.flink.core.fs.BlockLocation; import org.apache.flink.core.fs.FileStatus; import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.FileSystemKind; import org.apache.flink.core.fs.HadoopFileSystemWrapper; import org.apache.flink.core.fs.Path; import org.apache.flink.util.InstantiationUtil; import org.apache.hadoop.conf.Configuration; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,6 +38,7 @@ import java.lang.reflect.Method; import java.net.URI; import java.net.UnknownHostException; +import java.util.Locale; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -62,6 +65,8 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst private final org.apache.hadoop.fs.FileSystem fs; + private FileSystemKind fsKind; + /** * Creates a new DistributedFileSystem object to access HDFS, based on a class name @@ -456,6 +461,14 @@ public boolean isDistributedFS() { return true; } + @Override + public FileSystemKind getKind() { + if (fsKind == null) { + fsKind = getKindForScheme(this.fs.getUri().getScheme()); + } + return fsKind; + } + @Override public Class getHadoopWrapperClassNameForFileSystem(String scheme) { Configuration hadoopConf = getHadoopConfiguration(); @@ -471,4 +484,36 @@ public Class getHadoopWrapperClassNameForFileSystem(String scheme) { } return clazz; } + + /** + * Gets the kind of the file system from its scheme. + * + *

Implementation node: Initially, especially within the Flink 1.3.x line + * (in order to not break backwards compatibility), we must only label file systems + * as 'inconsistent' or as 'not proper filesystems' if we are sure about it. + * Otherwise, we cause regression for example in the performance and cleanup handling + * of checkpoints. + * For that reason, we initially mark some filesystems as 'eventually consistent' or + * as 'object stores', and leave the others as 'consistent file systems'. + */ + static FileSystemKind getKindForScheme(String scheme) { + scheme = scheme.toLowerCase(Locale.US); + + if (scheme.startsWith("s3") || scheme.startsWith("emr")) { + // the Amazon S3 storage + return FileSystemKind.EVENTUALLY_CONSISTENT_OBJECT_STORE; + } + else if (scheme.startsWith("http") || scheme.startsWith("ftp")) { + // file servers instead of file systems + // they might actually be consistent, but we have no hard guarantees + // currently to rely on that + return FileSystemKind.EVENTUALLY_CONSISTENT_OBJECT_STORE; + } + else { + // the remainder should include hdfs, kosmos, ceph, ... + // this also includes federated HDFS (viewfs). + return FileSystemKind.CONSISTENT_FILESYSTEM; + } + } + } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java index 275e492758eb2..8c8a7105cb756 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java @@ -23,6 +23,7 @@ import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.core.fs.FileStatus; import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.FileSystemKind; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.fs.hdfs.HadoopBlockLocation; import org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream; @@ -382,7 +383,11 @@ public long getDefaultBlockSize() { @Override public boolean isDistributedFS() { - return true; } + + @Override + public FileSystemKind getKind() { + return FileSystemKind.CONSISTENT_FILESYSTEM; + } } From 88159763a3f784db368bd6e22b0543ae2b82dd05 Mon Sep 17 00:00:00 2001 From: Stephan Ewen Date: Tue, 25 Jul 2017 17:26:38 +0200 Subject: [PATCH 2/2] [FLINK-7266] [core] Prevent attempt for parent directory deletion for object stores This closes #4397 --- .../apache/flink/runtime/fs/hdfs/HadoopFileSystem.java | 4 ++-- .../flink/runtime/state/filesystem/FileStateHandle.java | 9 +++++---- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java index a2775d587d6b1..3c5a9cc26da38 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java @@ -487,8 +487,8 @@ public Class getHadoopWrapperClassNameForFileSystem(String scheme) { /** * Gets the kind of the file system from its scheme. - * - *

Implementation node: Initially, especially within the Flink 1.3.x line + * + *

Implementation note: Initially, especially within the Flink 1.3.x line * (in order to not break backwards compatibility), we must only label file systems * as 'inconsistent' or as 'not proper filesystems' if we are sure about it. * Otherwise, we cause regression for example in the performance and cleanup handling diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java index bdf3f42aaaddc..aeca1669d073f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java @@ -77,14 +77,15 @@ public FSDataInputStream openInputStream() throws IOException { */ @Override public void discardState() throws Exception { - FileSystem fs = getFileSystem(); fs.delete(filePath, false); - try { - FileUtils.deletePathIfEmpty(fs, filePath.getParent()); - } catch (Exception ignored) {} + if (fs.getKind().supportsRealDirectories()) { + try { + FileUtils.deletePathIfEmpty(fs, filePath.getParent()); + } catch (Exception ignored) {} + } } /**