From 7843c2ffb44f99967dc71746ac1c79b04a74fe80 Mon Sep 17 00:00:00 2001 From: Stephan Ewen Date: Thu, 5 Oct 2017 11:27:48 +0200 Subject: [PATCH 1/2] [FLINK-7766] [file system sink] Drop obsolete reflective hflush calls This was done reflectively before for Hadoop 1 compatibility. Since Hadoop 1 is no longer supported, this is obsolete now. --- .../connectors/fs/StreamWriterBase.java | 81 +------------------ 1 file changed, 2 insertions(+), 79 deletions(-) diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java index 3e9eb11abbb0e..0a3a60362f166 100644 --- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java +++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java @@ -18,38 +18,24 @@ package org.apache.flink.streaming.connectors.fs; -import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink; - import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.util.EnumSet; /** * Base class for {@link Writer Writers} that write to a {@link FSDataOutputStream}. */ public abstract class StreamWriterBase implements Writer { - private static final Logger LOG = LoggerFactory.getLogger(BucketingSink.class); + private static final long serialVersionUID = 2L; /** * The {@code FSDataOutputStream} for the current part file. */ private transient FSDataOutputStream outStream; - /** - * We use reflection to get the hflush method or use sync as a fallback. - * The idea for this and the code comes from the Flume HDFS Sink. - */ - private transient Method refHflushOrSync; - /** * Returns the current output stream, if the stream is open. */ @@ -60,74 +46,12 @@ protected FSDataOutputStream getStream() { return outStream; } - /** - * If hflush is available in this version of HDFS, then this method calls - * hflush, else it calls sync. - * - *

Note: This code comes from Flume - * - * @param os - The stream to flush/sync - * @throws java.io.IOException - */ - protected void hflushOrSync(FSDataOutputStream os) throws IOException { - try { - // At this point the refHflushOrSync cannot be null, - // since register method would have thrown if it was. - this.refHflushOrSync.invoke(os); - - if (os instanceof HdfsDataOutputStream) { - ((HdfsDataOutputStream) os).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH)); - } - } catch (InvocationTargetException e) { - String msg = "Error while trying to hflushOrSync!"; - LOG.error(msg + " " + e.getCause()); - Throwable cause = e.getCause(); - if (cause != null && cause instanceof IOException) { - throw (IOException) cause; - } - throw new RuntimeException(msg, e); - } catch (Exception e) { - String msg = "Error while trying to hflushOrSync!"; - LOG.error(msg + " " + e); - throw new RuntimeException(msg, e); - } - } - - /** - * Gets the hflush call using reflection. Fallback to sync if hflush is not available. - * - *

Note: This code comes from Flume - */ - private Method reflectHflushOrSync(FSDataOutputStream os) { - Method m = null; - if (os != null) { - Class fsDataOutputStreamClass = os.getClass(); - try { - m = fsDataOutputStreamClass.getMethod("hflush"); - } catch (NoSuchMethodException ex) { - LOG.debug("HFlush not found. Will use sync() instead"); - try { - m = fsDataOutputStreamClass.getMethod("sync"); - } catch (Exception ex1) { - String msg = "Neither hflush not sync were found. That seems to be " + - "a problem!"; - LOG.error(msg); - throw new RuntimeException(msg, ex1); - } - } - } - return m; - } - @Override public void open(FileSystem fs, Path path) throws IOException { if (outStream != null) { throw new IllegalStateException("Writer has already been opened"); } outStream = fs.create(path, false); - if (refHflushOrSync == null) { - refHflushOrSync = reflectHflushOrSync(outStream); - } } @Override @@ -135,7 +59,7 @@ public long flush() throws IOException { if (outStream == null) { throw new IllegalStateException("Writer is not open"); } - hflushOrSync(outStream); + outStream.hflush(); return outStream.getPos(); } @@ -155,5 +79,4 @@ public void close() throws IOException { outStream = null; } } - } From bad3df54d20677157f48c3ee1f3251d2c4bce8ba Mon Sep 17 00:00:00 2001 From: Stephan Ewen Date: Thu, 5 Oct 2017 11:26:13 +0200 Subject: [PATCH 2/2] [FLINK-7767] [file system sinks] Avoid loading Hadoop conf dynamically at runtime --- .../streaming/connectors/fs/RollingSink.java | 21 ++-- .../connectors/fs/SequenceFileWriter.java | 3 +- .../fs/bucketing/BucketingSink.java | 117 ++++++++++++++++-- .../fs/bucketing/BucketingSinkTest.java | 1 + .../runtime/fs/hdfs/HadoopFileSystem.java | 26 +--- 5 files changed, 116 insertions(+), 52 deletions(-) diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java index e5758e8920fdb..9e547758dfaa1 100644 --- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java +++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java @@ -24,7 +24,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.InputTypeConfigurable; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; @@ -34,13 +33,17 @@ import org.apache.flink.util.Preconditions; import org.apache.commons.lang3.time.StopWatch; + import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DistributedFileSystem; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + import java.io.IOException; import java.io.Serializable; import java.lang.reflect.InvocationTargetException; @@ -279,6 +282,7 @@ public class RollingSink extends RichSinkFunction /** * User-defined FileSystem parameters. */ + @Nullable private Configuration fsConfig; /** @@ -382,19 +386,10 @@ public void open(Configuration parameters) throws Exception { * @throws IOException */ private void initFileSystem() throws IOException { - if (fs != null) { - return; + if (fs == null) { + Path path = new Path(basePath); + fs = BucketingSink.createHadoopFileSystem(path, fsConfig); } - org.apache.hadoop.conf.Configuration hadoopConf = HadoopFileSystem.getHadoopConfiguration(); - if (fsConfig != null) { - String disableCacheName = String.format("fs.%s.impl.disable.cache", new Path(basePath).toUri().getScheme()); - hadoopConf.setBoolean(disableCacheName, true); - for (String key : fsConfig.keySet()) { - hadoopConf.set(key, fsConfig.getString(key, null)); - } - } - - fs = new Path(basePath).getFileSystem(hadoopConf); } @Override diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java index 901589f697729..6f805448aae78 100644 --- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java +++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java @@ -23,7 +23,6 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.InputTypeConfigurable; import org.apache.flink.api.java.typeutils.TupleTypeInfoBase; -import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink; import org.apache.hadoop.conf.Configuration; @@ -90,7 +89,7 @@ public void open(FileSystem fs, Path path) throws IOException { CompressionCodec codec = null; - Configuration conf = HadoopFileSystem.getHadoopConfiguration(); + Configuration conf = fs.getConf(); if (!compressionCodecName.equals("None")) { CompressionCodecFactory codecFactory = new CompressionCodecFactory(conf); diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java index cc924a4a2056f..55400c6ce19fa 100644 --- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java +++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java @@ -42,17 +42,22 @@ import org.apache.flink.util.Preconditions; import org.apache.commons.lang3.time.StopWatch; + import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DistributedFileSystem; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + import java.io.IOException; import java.io.Serializable; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.net.URI; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; @@ -287,6 +292,7 @@ public class BucketingSink /** * User-defined FileSystem parameters. */ + @Nullable private Configuration fsConfig; /** @@ -402,19 +408,10 @@ public long currentTimeMillis() { * @throws IOException */ private void initFileSystem() throws IOException { - if (fs != null) { - return; + if (fs == null) { + Path path = new Path(basePath); + fs = createHadoopFileSystem(path, fsConfig); } - org.apache.hadoop.conf.Configuration hadoopConf = HadoopFileSystem.getHadoopConfiguration(); - if (fsConfig != null) { - String disableCacheName = String.format("fs.%s.impl.disable.cache", new Path(basePath).toUri().getScheme()); - hadoopConf.setBoolean(disableCacheName, true); - for (String key : fsConfig.keySet()) { - hadoopConf.set(key, fsConfig.getString(key, null)); - } - } - - fs = new Path(basePath).getFileSystem(hadoopConf); } @Override @@ -1113,4 +1110,100 @@ public String toString() { this.lastWrittenToTime = lastWrittenToTime; } } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + public static FileSystem createHadoopFileSystem( + Path path, + @Nullable Configuration extraUserConf) throws IOException { + + // try to get the Hadoop File System via the Flink File Systems + // that way we get the proper configuration + + final org.apache.flink.core.fs.FileSystem flinkFs = org.apache.flink.core.fs.FileSystem.get(path.toUri()); + final FileSystem hadoopFs = (flinkFs instanceof HadoopFileSystem) ? + ((HadoopFileSystem) flinkFs).getHadoopFileSystem() : null; + + // fast path: if the Flink file system wraps Hadoop anyways and we need no extra config, + // then we use it directly + if (extraUserConf == null && hadoopFs != null) { + return hadoopFs; + } + else { + // we need to re-instantiate the Hadoop file system, because we either have + // a special config, or the Path gave us a Flink FS that is not backed by + // Hadoop (like file://) + + final org.apache.hadoop.conf.Configuration hadoopConf; + if (hadoopFs != null) { + // have a Hadoop FS but need to apply extra config + hadoopConf = hadoopFs.getConf(); + } + else { + // the Path gave us a Flink FS that is not backed by Hadoop (like file://) + // we need to get access to the Hadoop file system first + + // we access the Hadoop FS in Flink, which carries the proper + // Hadoop configuration. we should get rid of this once the bucketing sink is + // properly implemented against Flink's FS abstraction + + URI genericHdfsUri = URI.create("hdfs://localhost:12345/"); + org.apache.flink.core.fs.FileSystem accessor = + org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(genericHdfsUri); + + if (!(accessor instanceof HadoopFileSystem)) { + throw new IOException( + "Cannot instantiate a Hadoop file system to access the Hadoop configuration. " + + "FS for hdfs:// is " + accessor.getClass().getName()); + } + + hadoopConf = ((HadoopFileSystem) accessor).getHadoopFileSystem().getConf(); + } + + // finalize the configuration + + final org.apache.hadoop.conf.Configuration finalConf; + if (extraUserConf == null) { + finalConf = hadoopConf; + } + else { + finalConf = new org.apache.hadoop.conf.Configuration(hadoopConf); + + for (String key : extraUserConf.keySet()) { + finalConf.set(key, extraUserConf.getString(key, null)); + } + } + + // we explicitly re-instantiate the file system here in order to make sure + // that the configuration is applied. + + URI fsUri = path.toUri(); + final String scheme = fsUri.getScheme(); + final String authority = fsUri.getAuthority(); + + if (scheme == null && authority == null) { + fsUri = FileSystem.getDefaultUri(finalConf); + } + else if (scheme != null && authority == null) { + URI defaultUri = FileSystem.getDefaultUri(finalConf); + if (scheme.equals(defaultUri.getScheme()) && defaultUri.getAuthority() != null) { + fsUri = defaultUri; + } + } + + final Class fsClass = FileSystem.getFileSystemClass(fsUri.getScheme(), finalConf); + final FileSystem fs; + try { + fs = fsClass.newInstance(); + } + catch (Exception e) { + throw new IOException("Cannot instantiate the Hadoop file system", e); + } + + fs.initialize(fsUri, finalConf); + return fs; + } + } } diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java index d6852efad2200..695b6962abdc2 100644 --- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java +++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java @@ -52,6 +52,7 @@ import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; + import org.junit.AfterClass; import org.junit.Assert; import org.junit.Assume; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java index fd6b9da2b3093..4ebf4bce14357 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java @@ -18,12 +18,10 @@ package org.apache.flink.runtime.fs.hdfs; -import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.core.fs.BlockLocation; import org.apache.flink.core.fs.FileStatus; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; -import org.apache.flink.runtime.util.HadoopUtils; import java.io.IOException; import java.net.URI; @@ -116,6 +114,7 @@ public HadoopDataInputStream open(final Path f) throws IOException { } @Override + @SuppressWarnings("deprecation") public HadoopDataOutputStream create(final Path f, final boolean overwrite, final int bufferSize, final short replication, final long blockSize) throws IOException { final org.apache.hadoop.fs.FSDataOutputStream fdos = this.fs.create( @@ -169,27 +168,4 @@ public long getDefaultBlockSize() { public boolean isDistributedFS() { return true; } - - // ------------------------------------------------------------------------ - // Miscellaneous Utilities - // ------------------------------------------------------------------------ - - /** - * Returns a new Hadoop Configuration object using the path to the hadoop conf configured - * in the main configuration (flink-conf.yaml). - * This method is public because its being used in the HadoopDataSource. - * - * @deprecated This method should not be used, because it dynamically (and possibly incorrectly) - * re-loads the Flink configuration. - * Use {@link HadoopUtils#getHadoopConfiguration(org.apache.flink.configuration.Configuration)} - * instead. - */ - @Deprecated - public static org.apache.hadoop.conf.Configuration getHadoopConfiguration() { - - org.apache.flink.configuration.Configuration flinkConfiguration = - GlobalConfiguration.loadConfiguration(); - - return HadoopUtils.getHadoopConfiguration(flinkConfiguration); - } }