From 02eef87dc2bfaa6737efad023916898719d34fe2 Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Thu, 16 Feb 2017 19:24:43 +0800 Subject: [PATCH] change checkpoint dir permission to 700 --- flink-core/pom.xml | 6 ++++++ .../main/java/org/apache/flink/core/fs/FileSystem.java | 2 ++ .../flink/core/fs/SafetyNetWrapperFileSystem.java | 7 ++++++- .../apache/flink/core/fs/local/LocalFileSystem.java | 10 ++++++++++ .../apache/flink/runtime/fs/hdfs/HadoopFileSystem.java | 6 ++++++ .../apache/flink/runtime/fs/maprfs/MapRFileSystem.java | 6 ++++++ .../state/filesystem/FsCheckpointStreamFactory.java | 1 + 7 files changed, 37 insertions(+), 1 deletion(-) diff --git a/flink-core/pom.xml b/flink-core/pom.xml index e9738a21b7664..b3774efe68382 100644 --- a/flink-core/pom.xml +++ b/flink-core/pom.xml @@ -47,6 +47,12 @@ under the License. ${project.version} + + org.apache.flink + flink-shaded-hadoop2 + ${project.version} + + org.apache.commons diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java index c3828fbdd0b5a..36d0ff6e75276 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java @@ -631,6 +631,8 @@ public abstract FSDataOutputStream create(Path f, boolean overwrite, int bufferS */ public abstract boolean rename(Path src, Path dst) throws IOException; + public abstract void setPermission(Path f, String perm) throws IOException; + /** * Returns true if this is a distributed file system, false otherwise. * diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java index bf30b4f49a6d0..df642b0c763dd 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java @@ -128,6 +128,11 @@ public boolean rename(Path src, Path dst) throws IOException { return unsafeFileSystem.rename(src, dst); } + @Override + public void setPermission(Path p, String perm) throws IOException { + unsafeFileSystem.setPermission(p, perm); + } + @Override public boolean initOutPathLocalFS(Path outPath, WriteMode writeMode, boolean createDirectory) throws IOException { return unsafeFileSystem.initOutPathLocalFS(outPath, writeMode, createDirectory); @@ -147,4 +152,4 @@ public boolean isDistributedFS() { public FileSystem getWrappedDelegate() { return unsafeFileSystem; } -} \ No newline at end of file +} diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java index acbf814da5611..f25ca96020901 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java @@ -34,6 +34,7 @@ import org.apache.flink.core.fs.Path; import org.apache.flink.util.OperatingSystem; +import org.apache.hadoop.fs.permission.FsPermission; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,6 +44,8 @@ import java.net.InetAddress; import java.net.URI; import java.net.UnknownHostException; +import java.nio.file.Files; +import java.nio.file.attribute.PosixFilePermissions; /** * The class LocalFile provides an implementation of the {@link FileSystem} interface @@ -264,6 +267,13 @@ public boolean rename(final Path src, final Path dst) throws IOException { return srcFile.renameTo(dstFile); } + @Override + public void setPermission(Path p, String perm) throws IOException { + // use Hadoop class to translate numbers into String + FsPermission fsPerm = new FsPermission(perm); + Files.setPosixFilePermissions(pathToFile(p).toPath(), PosixFilePermissions.fromString(fsPerm.toString())); + } + @Override public boolean isDistributedFS() { return false; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java index 36dfa55c5c590..9228011218310 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java @@ -26,6 +26,7 @@ import org.apache.flink.core.fs.Path; import org.apache.flink.util.InstantiationUtil; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.permission.FsPermission; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -452,6 +453,11 @@ public boolean rename(final Path src, final Path dst) throws IOException { new org.apache.hadoop.fs.Path(dst.toString())); } + @Override + public void setPermission(Path p, String perm) throws IOException { + fs.setPermission(new org.apache.hadoop.fs.Path(p.toString()), new FsPermission(perm)); + } + @SuppressWarnings("deprecation") @Override public long getDefaultBlockSize() { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java index a7ef441d9a6da..7c7015bbd301f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java @@ -27,6 +27,7 @@ import java.util.ArrayList; import java.util.List; +import org.apache.hadoop.fs.permission.FsPermission; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.flink.core.fs.BlockLocation; @@ -372,6 +373,11 @@ public boolean rename(final Path src, final Path dst) throws IOException { new org.apache.hadoop.fs.Path(dst.toString())); } + @Override + public void setPermission(Path p, String perm) throws IOException { + fs.setPermission(new org.apache.hadoop.fs.Path(p.toString()), new FsPermission(perm)); + } + @SuppressWarnings("deprecation") @Override public long getDefaultBlockSize() { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java index 30b1da6a2ed46..5295f671eb3dc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java @@ -104,6 +104,7 @@ public FsCheckpointStreamFactory( filesystem = basePath.getFileSystem(); filesystem.mkdirs(dir); + filesystem.setPermission(dir, "700"); // set permission for path. checkpointDirectory = dir; }