From 522ad09ec83cfffae18936decd0c49f33da19373 Mon Sep 17 00:00:00 2001 From: "wenlong.lwl" Date: Thu, 11 Aug 2016 21:26:01 +0800 Subject: [PATCH 1/2] enable rolling sink to custom hdfs client configuration --- .../streaming/connectors/fs/RollingSink.java | 59 ++++++++++--- .../connectors/fs/RollingSinkITCase.java | 85 ++++++++++++++++++- 2 files changed, 131 insertions(+), 13 deletions(-) diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java index 738857f74fd97..13b8291f6a2fd 100644 --- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java +++ b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java @@ -284,9 +284,16 @@ public class RollingSink extends RichSinkFunction implements InputTypeConf * current part file path, the valid length of the in-progress files and pending part files. */ private transient BucketState bucketState; - - private transient org.apache.hadoop.conf.Configuration hadoopConf; - + + /** + * user defined HDFS parameters + */ + private Configuration hdfsConfig = null; + + /** + * The hdfs file system + */ + private transient FileSystem fs; /** * Creates a new {@code RollingSink} that writes files to the given base directory. * @@ -303,6 +310,20 @@ public RollingSink(String basePath) { this.writerTemplate = new StringWriter<>(); } + public RollingSink setHDFSConfig(Configuration config) { + this.hdfsConfig = new Configuration(); + hdfsConfig.addAll(config); + return this; + } + + public RollingSink setHDFSConfig(org.apache.hadoop.conf.Configuration config) { + this.hdfsConfig = new Configuration(); + for(Map.Entry entry : config) { + hdfsConfig.setString(entry.getKey(), entry.getValue()); + }; + return this; + } + @Override @SuppressWarnings("unchecked") public void setInputType(TypeInformation type, ExecutionConfig executionConfig) { @@ -324,8 +345,7 @@ public void open(Configuration parameters) throws Exception { bucketState = new BucketState(); } - hadoopConf = HadoopFileSystem.getHadoopConfiguration(); - FileSystem fs = new Path(basePath).getFileSystem(hadoopConf); + initFileSystem(); refTruncate = reflectTruncate(fs); // delete pending/in-progress files that might be left if we fail while @@ -358,6 +378,27 @@ public void open(Configuration parameters) throws Exception { } } + /** + * create a file system with the user defined hdfs config + * @throws IOException + */ + private void initFileSystem() throws IOException { + if(fs != null) { + return; + } + org.apache.hadoop.conf.Configuration hadoopConf = HadoopFileSystem.getHadoopConfiguration(); + if(hdfsConfig != null) { + String disableCacheName + = String.format("fs.%s.impl.disable.cache", new Object[]{new Path(basePath).toUri().getScheme()}); + hadoopConf.setBoolean(disableCacheName, true); + for (String key : hdfsConfig.keySet()) { + hadoopConf.set(key, hdfsConfig.getString(key, null)); + } + } + + fs = new Path(basePath).getFileSystem(hadoopConf); + } + @Override public void close() throws Exception { // boolean interrupted = Thread.interrupted(); @@ -420,8 +461,6 @@ private boolean shouldRoll() throws IOException { private void openNewPartFile() throws Exception { closeCurrentPartFile(); - FileSystem fs = new Path(basePath).getFileSystem(hadoopConf); - Path newBucketDirectory = bucketer.getNextBucketPath(new Path(basePath)); if (!newBucketDirectory.equals(currentBucketDirectory)) { @@ -451,7 +490,6 @@ private void openNewPartFile() throws Exception { LOG.debug("Next part path is {}", currentPartPath.toString()); Path inProgressPath = new Path(currentPartPath.getParent(), inProgressPrefix + currentPartPath.getName()).suffix(inProgressSuffix); - writer.open(fs, inProgressPath); isWriterOpen = true; } @@ -472,7 +510,6 @@ private void closeCurrentPartFile() throws Exception { if (currentPartPath != null) { Path inProgressPath = new Path(currentPartPath.getParent(), inProgressPrefix + currentPartPath.getName()).suffix(inProgressSuffix); Path pendingPath = new Path(currentPartPath.getParent(), pendingPrefix + currentPartPath.getName()).suffix(pendingSuffix); - FileSystem fs = inProgressPath.getFileSystem(hadoopConf); fs.rename(inProgressPath, pendingPath); LOG.debug("Moving in-progress bucket {} to pending file {}", inProgressPath, @@ -547,7 +584,6 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception { Path pendingPath = new Path(finalPath.getParent(), pendingPrefix + finalPath.getName()).suffix(pendingSuffix); - FileSystem fs = pendingPath.getFileSystem(hadoopConf); fs.rename(pendingPath, finalPath); LOG.debug( "Moving pending file {} to final location after complete checkpoint {}.", @@ -583,9 +619,8 @@ public void restoreState(BucketState state) { // we can clean all the pending files since they where renamed to final files // after this checkpoint was successfull bucketState.pendingFiles.clear(); - FileSystem fs = null; try { - fs = new Path(basePath).getFileSystem(HadoopFileSystem.getHadoopConfiguration()); + initFileSystem(); } catch (IOException e) { LOG.error("Error while creating FileSystem in checkpoint restore.", e); throw new RuntimeException("Error while creating FileSystem in checkpoint restore.", e); diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java index c3c8df55183e1..6c9be0adeaf4e 100644 --- a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java +++ b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java @@ -42,6 +42,7 @@ import org.apache.flink.util.NetUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; @@ -457,7 +458,68 @@ public void testNonRollingAvroKeyValueWithCompressionWriter() throws Exception { dataFileStream.close(); inStream.close(); } - + + + /** + * This tests user defined hdfs configuration + * @throws Exception + */ + @Test + public void testUserDefinedConfiguration() throws Exception { + final int NUM_ELEMENTS = 20; + final int PARALLELISM = 2; + final String outPath = hdfsURI + "/string-non-rolling-with-config"; + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(PARALLELISM); + + DataStream> source = env.addSource(new TestSourceFunction(NUM_ELEMENTS)) + .broadcast() + .filter(new OddEvenFilter()); + + Configuration conf = new Configuration(); + conf.set("io.file.buffer.size", "40960"); + RollingSink sink = new RollingSink(outPath) + .setHDFSConfig(conf) + .setWriter(new StreamWriterWithConfigCheck("io.file.buffer.size", "40960")) + .setBucketer(new NonRollingBucketer()) + .setPartPrefix("part") + .setPendingPrefix("") + .setPendingSuffix(""); + + source + .map(new MapFunction, String>() { + private static final long serialVersionUID = 1L; + @Override + public String map(Tuple2 value) throws Exception { + return value.f1; + } + }) + .addSink(sink); + + env.execute("RollingSink with configuration Test"); + + FSDataInputStream inStream = dfs.open(new Path(outPath + "/part-0-0")); + + BufferedReader br = new BufferedReader(new InputStreamReader(inStream)); + + for (int i = 0; i < NUM_ELEMENTS; i += 2) { + String line = br.readLine(); + Assert.assertEquals("message #" + i, line); + } + + inStream.close(); + + inStream = dfs.open(new Path(outPath + "/part-1-0")); + + br = new BufferedReader(new InputStreamReader(inStream)); + + for (int i = 1; i < NUM_ELEMENTS; i += 2) { + String line = br.readLine(); + Assert.assertEquals("message #" + i, line); + } + + inStream.close(); + } // we use this to synchronize the clock changes to elements being processed final static MultiShotLatch latch1 = new MultiShotLatch(); @@ -639,6 +701,27 @@ public void cancel() { } } + + private static class StreamWriterWithConfigCheck extends StringWriter { + private String key; + private String expect; + public StreamWriterWithConfigCheck(String key, String expect) { + this.key = key; + this.expect = expect; + } + + @Override + public void open(FileSystem fs, Path path) throws IOException { + super.open(fs, path); + Assert.assertEquals(expect, fs.getConf().get(key)); + } + + @Override + public Writer duplicate() { + return new StreamWriterWithConfigCheck<>(key, expect); + } + } + public static class OddEvenFilter extends RichFilterFunction> { private static final long serialVersionUID = 1L; From eb8785f6ffc1f4e354ac8094408aeb3f79cee313 Mon Sep 17 00:00:00 2001 From: "wenlong.lwl" Date: Thu, 27 Oct 2016 11:44:41 +0800 Subject: [PATCH 2/2] foward port changes from rollingsink to bucketsink --- .../fs/bucketing/BucketingSink.java | 55 ++++++++++-- .../fs/bucketing/BucketingSinkTest.java | 90 +++++++++++++++++++ 2 files changed, 136 insertions(+), 9 deletions(-) diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java index 52de00d2f3044..71bd57a827fa6 100644 --- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java +++ b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java @@ -281,7 +281,15 @@ public class BucketingSink */ private transient State state; - private transient org.apache.hadoop.conf.Configuration hadoopConf; + /** + * user defined HDFS parameters + */ + private Configuration hdfsConfig = null; + + /** + * The hdfs file system + */ + private transient FileSystem fs; private transient Clock clock; @@ -302,6 +310,20 @@ public BucketingSink(String basePath) { this.writerTemplate = new StringWriter<>(); } + public BucketingSink setHDFSConfig(Configuration config) { + this.hdfsConfig = new Configuration(); + hdfsConfig.addAll(config); + return this; + } + + public BucketingSink setHDFSConfig(org.apache.hadoop.conf.Configuration config) { + this.hdfsConfig = new Configuration(); + for(Map.Entry entry : config) { + hdfsConfig.setString(entry.getKey(), entry.getValue()); + }; + return this; + } + @Override @SuppressWarnings("unchecked") public void setInputType(TypeInformation type, ExecutionConfig executionConfig) { @@ -319,8 +341,7 @@ public void open(Configuration parameters) throws Exception { state = new State(); Path baseDirectory = new Path(basePath); - hadoopConf = HadoopFileSystem.getHadoopConfiguration(); - FileSystem fs = baseDirectory.getFileSystem(hadoopConf); + initFileSystem(); refTruncate = reflectTruncate(fs); processingTimeService = @@ -369,6 +390,27 @@ public long currentTimeMillis() { } } + /** + * create a file system with the user defined hdfs config + * @throws IOException + */ + private void initFileSystem() throws IOException { + if(fs != null) { + return; + } + org.apache.hadoop.conf.Configuration hadoopConf = HadoopFileSystem.getHadoopConfiguration(); + if(hdfsConfig != null) { + String disableCacheName + = String.format("fs.%s.impl.disable.cache", new Object[]{new Path(basePath).toUri().getScheme()}); + hadoopConf.setBoolean(disableCacheName, true); + for (String key : hdfsConfig.keySet()) { + hadoopConf.set(key, hdfsConfig.getString(key, null)); + } + } + + fs = new Path(basePath).getFileSystem(hadoopConf); + } + @Override public void close() throws Exception { for (Map.Entry> entry : state.bucketStates.entrySet()) { @@ -456,8 +498,6 @@ private void checkForInactiveBuckets(long currentProcessingTime) throws Exceptio private void openNewPartFile(Path bucketPath, BucketState bucketState) throws Exception { closeCurrentPartFile(bucketState); - FileSystem fs = new Path(basePath).getFileSystem(hadoopConf); - if (!fs.exists(bucketPath)) { try { if (fs.mkdirs(bucketPath)) { @@ -511,7 +551,6 @@ private void closeCurrentPartFile(BucketState bucketState) throws Exception { Path currentPartPath = new Path(bucketState.currentFile); Path inProgressPath = new Path(currentPartPath.getParent(), inProgressPrefix + currentPartPath.getName()).suffix(inProgressSuffix); Path pendingPath = new Path(currentPartPath.getParent(), pendingPrefix + currentPartPath.getName()).suffix(pendingSuffix); - FileSystem fs = inProgressPath.getFileSystem(hadoopConf); fs.rename(inProgressPath, pendingPath); LOG.debug("Moving in-progress bucket {} to pending file {}", inProgressPath, @@ -589,7 +628,6 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception { Path pendingPath = new Path(finalPath.getParent(), pendingPrefix + finalPath.getName()).suffix(pendingSuffix); - FileSystem fs = pendingPath.getFileSystem(hadoopConf); fs.rename(pendingPath, finalPath); LOG.debug( "Moving pending file {} to final location having completed checkpoint {}.", @@ -634,9 +672,8 @@ public State snapshotState(long checkpointId, long checkpointTimestamp) throw public void restoreState(State state) { this.state = state; - FileSystem fs; try { - fs = new Path(basePath).getFileSystem(HadoopFileSystem.getHadoopConfiguration()); + initFileSystem(); } catch (IOException e) { LOG.error("Error while creating FileSystem in checkpoint restore.", e); throw new RuntimeException("Error while creating FileSystem in checkpoint restore.", e); diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java index e4b0460c51ba8..fa22b9670be5a 100644 --- a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java +++ b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java @@ -34,11 +34,13 @@ import org.apache.flink.streaming.connectors.fs.Clock; import org.apache.flink.streaming.connectors.fs.SequenceFileWriter; import org.apache.flink.streaming.connectors.fs.StringWriter; +import org.apache.flink.streaming.connectors.fs.Writer; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.util.NetUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; @@ -497,4 +499,92 @@ public void testCustomBucketingInactiveBucketCleanup() throws Exception { Assert.assertEquals(4, numFiles); Assert.assertEquals(2, numInProgress); } + + /** + * This tests user defined hdfs configuration + * @throws Exception + */ + @Test + public void testUserDefinedConfiguration() throws Exception { + final String outPath = hdfsURI + "/string-non-rolling-with-config"; + final int numElements = 20; + + Map properties = new HashMap<>(); + Schema keySchema = Schema.create(Schema.Type.INT); + Schema valueSchema = Schema.create(Schema.Type.STRING); + properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_KEY_SCHEMA, keySchema.toString()); + properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_VALUE_SCHEMA, valueSchema.toString()); + properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS, String.valueOf(true)); + properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS_CODEC, DataFileConstants.SNAPPY_CODEC); + + Configuration conf = new Configuration(); + conf.set("io.file.buffer.size", "40960"); + + BucketingSink> sink = new BucketingSink>(outPath) + .setHDFSConfig(conf) + .setWriter(new StreamWriterWithConfigCheck(properties, "io.file.buffer.size", "40960")) + .setBucketer(new BasePathBucketer>()) + .setPartPrefix("part") + .setPendingPrefix("") + .setPendingSuffix(""); + + OneInputStreamOperatorTestHarness, Object> testHarness = + createTestSink(sink); + + testHarness.setProcessingTime(0L); + + testHarness.setup(); + testHarness.open(); + + for (int i = 0; i < numElements; i++) { + testHarness.processElement(new StreamRecord<>(Tuple2.of( + i, "message #" + Integer.toString(i) + ))); + } + + testHarness.close(); + + GenericData.setStringType(valueSchema, GenericData.StringType.String); + Schema elementSchema = AvroKeyValueSinkWriter.AvroKeyValue.getSchema(keySchema, valueSchema); + + FSDataInputStream inStream = dfs.open(new Path(outPath + "/part-0-0")); + + SpecificDatumReader elementReader = new SpecificDatumReader<>(elementSchema); + DataFileStream dataFileStream = new DataFileStream<>(inStream, elementReader); + for (int i = 0; i < numElements; i++) { + AvroKeyValueSinkWriter.AvroKeyValue wrappedEntry = + new AvroKeyValueSinkWriter.AvroKeyValue<>(dataFileStream.next()); + int key = wrappedEntry.getKey(); + Assert.assertEquals(i, key); + String value = wrappedEntry.getValue(); + Assert.assertEquals("message #" + i, value); + } + + dataFileStream.close(); + inStream.close(); + } + + private static class StreamWriterWithConfigCheck extends AvroKeyValueSinkWriter { + private Map properties; + private String key; + private String expect; + public StreamWriterWithConfigCheck(Map properties, String key, String expect) { + super(properties); + this.properties = properties; + this.key = key; + this.expect = expect; + } + + @Override + public void open(FileSystem fs, Path path) throws IOException { + super.open(fs, path); + Assert.assertEquals(expect, fs.getConf().get(key)); + } + + @Override + public Writer> duplicate() { + return new StreamWriterWithConfigCheck<>(properties, key, expect); + } + } + }