diff --git a/docs/apis/streaming/index.md b/docs/apis/streaming/index.md index 31e6df986c6e1..ddac52f077e25 100644 --- a/docs/apis/streaming/index.md +++ b/docs/apis/streaming/index.md @@ -1819,14 +1819,14 @@ of each element on the standard out / strandard error stream. Optionally, a pref prepended to the output. This can help to distinguish between different calls to *print*. If the parallelism is greater than 1, the output will also be prepended with the identifier of the task which produced the output. -- `write()` / `FileOutputFormat` - Method and base class for custom file outputs. Supports +- `writeUsingOutputFormat()` / `FileOutputFormat` - Method and base class for custom file outputs. Supports custom object-to-bytes conversion. - `writeToSocket` - Writes elements to a socket according to a `SerializationSchema` - `addSink` - Invokes a custom sink function. Flink comes bundled with connectors to other systems (such as Apache Kafka) that are implemented as sink functions. - +
@@ -1847,7 +1847,7 @@ of each element on the standard out / strandard error stream. Optionally, a pref prepended to the output. This can help to distinguish between different calls to *print*. If the parallelism is greater than 1, the output will also be prepended with the identifier of the task which produced the output. -- `write()` / `FileOutputFormat` - Method and base class for custom file outputs. Supports +- `writeUsingOutputFormat()` / `FileOutputFormat` - Method and base class for custom file outputs. Supports custom object-to-bytes conversion. - `writeToSocket` - Writes elements to a socket according to a `SerializationSchema` @@ -1858,6 +1858,17 @@ greater than 1, the output will also be prepended with the identifier of the tas
+Note that the `write*()` methods on `DataStream` are mainly intended for debugging purposes. +They are not participating in Flink's checkpointing, this means these functions usually have +at-least-once semantics. The data flushing to the target system depends on the implementation of the +OutputFormat. This means that not all elements send to the OutputFormat are immediately showing up +in the target system. Also, in failure cases, those records might be lost. + +For reliable, exactly-once delivery of a stream into a file system, use the `flink-connector-filesystem`. +Also, custom implementations through the `.addSink(...)` method can partiticpate in Flink's checkpointing +for exactly-once semantics. + + {% top %} diff --git a/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java b/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java index a6be1a61815b2..05398dbcc947b 100644 --- a/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java +++ b/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java @@ -41,7 +41,7 @@ */ public class HBaseWriteStreamExample { - public static void main(String[] args) { + public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment .getExecutionEnvironment(); @@ -64,14 +64,9 @@ public void cancel() { isRunning = false; } }); - dataStream.write(new HBaseOutputFormat(), 0L); + dataStream.writeUsingOutputFormat(new HBaseOutputFormat()); - try { - env.execute(); - } catch (Exception e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } + env.execute(); } /** diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java index b6e1a612a66a4..31fa3fdd0c2f7 100644 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java @@ -90,7 +90,7 @@ public static void main(String[] args) throws Exception { // emit results if (fileOutput) { - numbers.writeAsText(outputPath, 1); + numbers.writeAsText(outputPath); } else { numbers.print(); } diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java index 3355f1cff4819..2bb153cd0e090 100644 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java @@ -87,7 +87,7 @@ public static void main(String[] args) throws Exception { // emit result if (fileOutput) { - joinedStream.writeAsText(outputPath, 1); + joinedStream.writeAsText(outputPath); } else { joinedStream.print(); } diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java index 8f502dd35815e..3f38312d28d2d 100644 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java @@ -50,8 +50,6 @@ */ public class IncrementalLearningSkeleton { - private static DataStream trainingData = null; - private static DataStream newData = null; // ************************************************************************* // PROGRAM @@ -66,8 +64,8 @@ public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); - trainingData = env.addSource(new FiniteTrainingDataSource()); - newData = env.addSource(new FiniteNewDataSource()); + DataStream trainingData = env.addSource(new FiniteTrainingDataSource()); + DataStream newData = env.addSource(new FiniteNewDataSource()); // build new model on every second of new data DataStream model = trainingData @@ -80,7 +78,7 @@ public static void main(String[] args) throws Exception { // emit result if (fileOutput) { - prediction.writeAsText(outputPath, 1); + prediction.writeAsText(outputPath); } else { prediction.print(); } diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java index cecabddaf722d..2c0bcda1b34df 100644 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java @@ -67,7 +67,7 @@ public static void main(String[] args) throws Exception { .sum(1); if (fileOutput) { - counts.writeAsText(outputPath, 1); + counts.writeAsText(outputPath); } else { counts.print(); } diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketTextStreamWordCount.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketTextStreamWordCount.scala index 9ec17d4611320..b319a03e458f8 100644 --- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketTextStreamWordCount.scala +++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketTextStreamWordCount.scala @@ -61,7 +61,7 @@ object SocketTextStreamWordCount { .sum(1) if (fileOutput) { - counts.writeAsText(outputPath, 1) + counts.writeAsText(outputPath) } else { counts print } diff --git a/flink-java8/src/main/java/org/apache/flink/streaming/examples/java8/wordcount/WordCount.java b/flink-java8/src/main/java/org/apache/flink/streaming/examples/java8/wordcount/WordCount.java index 41a674f2b3438..80b0ce0a099d1 100644 --- a/flink-java8/src/main/java/org/apache/flink/streaming/examples/java8/wordcount/WordCount.java +++ b/flink-java8/src/main/java/org/apache/flink/streaming/examples/java8/wordcount/WordCount.java @@ -78,7 +78,7 @@ public static void main(String[] args) throws Exception { // emit result if(fileOutput) { - counts.writeAsCsv(outputPath, 1); + counts.writeAsCsv(outputPath); } else { counts.print(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java index 0645668ac3b82..6972e1d78afc4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java @@ -114,7 +114,7 @@ public void invoke() throws Exception { ExecutionConfig executionConfig; try { - ExecutionConfig c = (ExecutionConfig) InstantiationUtil.readObjectFromConfig( + ExecutionConfig c = InstantiationUtil.readObjectFromConfig( getJobConfiguration(), ExecutionConfig.CONFIG_KEY, getUserCodeClassLoader()); @@ -130,7 +130,6 @@ public void invoke() throws Exception { boolean objectReuseEnabled = executionConfig.isObjectReuseEnabled(); try { - // initialize local strategies MutableObjectIterator input1; switch (this.config.getInputLocalStrategy(0)) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java index 64d0821a50c4b..7e9c308d25157 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -49,7 +49,7 @@ import org.apache.flink.streaming.api.collector.selector.OutputSelector; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.TimestampExtractor; -import org.apache.flink.streaming.api.functions.sink.FileSinkFunctionByMillis; +import org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction; import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.sink.SocketClientSink; @@ -193,7 +193,7 @@ public final DataStream union(DataStream... streams) { unionedTransforms.add(newStream.getTransformation()); } - return new DataStream(this.environment, new UnionTransformation(unionedTransforms)); + return new DataStream<>(this.environment, new UnionTransformation<>(unionedTransforms)); } /** @@ -208,7 +208,7 @@ public final DataStream union(DataStream... streams) { * @return The {@link SplitStream} */ public SplitStream split(OutputSelector outputSelector) { - return new SplitStream(this, clean(outputSelector)); + return new SplitStream<>(this, clean(outputSelector)); } /** @@ -222,7 +222,7 @@ public SplitStream split(OutputSelector outputSelector) { * @return The {@link ConnectedStreams}. */ public ConnectedStreams connect(DataStream dataStream) { - return new ConnectedStreams(environment, this, dataStream); + return new ConnectedStreams<>(environment, this, dataStream); } /** @@ -235,7 +235,7 @@ public ConnectedStreams connect(DataStream dataStream) { * @return The {@link DataStream} with partitioned state (i.e. KeyedStream) */ public KeyedStream keyBy(KeySelector key) { - return new KeyedStream(this, clean(key)); + return new KeyedStream<>(this, clean(key)); } /** @@ -250,7 +250,7 @@ public KeyedStream keyBy(int... fields) { if (getType() instanceof BasicArrayTypeInfo || getType() instanceof PrimitiveArrayTypeInfo) { return keyBy(KeySelectorUtil.getSelectorForArray(fields, getType())); } else { - return keyBy(new Keys.ExpressionKeys(fields, getType())); + return keyBy(new Keys.ExpressionKeys<>(fields, getType())); } } @@ -266,11 +266,11 @@ public KeyedStream keyBy(int... fields) { * @return The {@link DataStream} with partitioned state (i.e. KeyedStream) **/ public KeyedStream keyBy(String... fields) { - return keyBy(new Keys.ExpressionKeys(fields, getType())); + return keyBy(new Keys.ExpressionKeys<>(fields, getType())); } private KeyedStream keyBy(Keys keys) { - return new KeyedStream(this, clean(KeySelectorUtil.getSelectorForKeys(keys, + return new KeyedStream<>(this, clean(KeySelectorUtil.getSelectorForKeys(keys, getType(), getExecutionConfig()))); } @@ -288,7 +288,7 @@ public DataStream partitionByHash(int... fields) { if (getType() instanceof BasicArrayTypeInfo || getType() instanceof PrimitiveArrayTypeInfo) { return partitionByHash(KeySelectorUtil.getSelectorForArray(fields, getType())); } else { - return partitionByHash(new Keys.ExpressionKeys(fields, getType())); + return partitionByHash(new Keys.ExpressionKeys<>(fields, getType())); } } @@ -303,7 +303,7 @@ public DataStream partitionByHash(int... fields) { * */ public DataStream partitionByHash(String... fields) { - return partitionByHash(new Keys.ExpressionKeys(fields, getType())); + return partitionByHash(new Keys.ExpressionKeys<>(fields, getType())); } /** @@ -316,7 +316,7 @@ public DataStream partitionByHash(String... fields) { * @return The partitioned DataStream */ public DataStream partitionByHash(KeySelector keySelector) { - return setConnectionType(new HashPartitioner(clean(keySelector))); + return setConnectionType(new HashPartitioner<>(clean(keySelector))); } //private helper method for partitioning @@ -326,7 +326,7 @@ private DataStream partitionByHash(Keys keys) { getType(), getExecutionConfig())); - return setConnectionType(new HashPartitioner(keySelector)); + return setConnectionType(new HashPartitioner<>(keySelector)); } /** @@ -340,7 +340,7 @@ private DataStream partitionByHash(Keys keys) { * @return The partitioned DataStream. */ public DataStream partitionCustom(Partitioner partitioner, int field) { - Keys.ExpressionKeys outExpressionKeys = new Keys.ExpressionKeys(new int[]{field}, getType()); + Keys.ExpressionKeys outExpressionKeys = new Keys.ExpressionKeys<>(new int[]{field}, getType()); return partitionCustom(partitioner, outExpressionKeys); } @@ -355,7 +355,7 @@ public DataStream partitionCustom(Partitioner partitioner, int field) * @return The partitioned DataStream. */ public DataStream partitionCustom(Partitioner partitioner, String field) { - Keys.ExpressionKeys outExpressionKeys = new Keys.ExpressionKeys(new String[]{field}, getType()); + Keys.ExpressionKeys outExpressionKeys = new Keys.ExpressionKeys<>(new String[]{field}, getType()); return partitionCustom(partitioner, outExpressionKeys); } @@ -376,7 +376,7 @@ public DataStream partitionCustom(Partitioner partitioner, String fiel * @see KeySelector */ public DataStream partitionCustom(Partitioner partitioner, KeySelector keySelector) { - return setConnectionType(new CustomPartitionerWrapper(clean(partitioner), + return setConnectionType(new CustomPartitionerWrapper<>(clean(partitioner), clean(keySelector))); } @@ -385,7 +385,7 @@ private DataStream partitionCustom(Partitioner partitioner, Keys ke KeySelector keySelector = KeySelectorUtil.getSelectorForOneKey(keys, partitioner, getType(), getExecutionConfig()); return setConnectionType( - new CustomPartitionerWrapper( + new CustomPartitionerWrapper<>( clean(partitioner), clean(keySelector))); } @@ -499,7 +499,7 @@ public DataStream global() { */ @PublicEvolving public IterativeStream iterate() { - return new IterativeStream(this, 0); + return new IterativeStream<>(this, 0); } /** @@ -535,7 +535,7 @@ public IterativeStream iterate() { */ @PublicEvolving public IterativeStream iterate(long maxWaitTimeMillis) { - return new IterativeStream(this, maxWaitTimeMillis); + return new IterativeStream<>(this, maxWaitTimeMillis); } /** @@ -557,7 +557,7 @@ public IterativeStream iterate(long maxWaitTimeMillis) { TypeInformation outType = TypeExtractor.getMapReturnTypes(clean(mapper), getType(), Utils.getCallLocationName(), true); - return transform("Map", outType, new StreamMap(clean(mapper))); + return transform("Map", outType, new StreamMap<>(clean(mapper))); } /** @@ -581,7 +581,7 @@ public IterativeStream iterate(long maxWaitTimeMillis) { TypeInformation outType = TypeExtractor.getFlatMapReturnTypes(clean(flatMapper), getType(), Utils.getCallLocationName(), true); - return transform("Flat Map", outType, new StreamFlatMap(clean(flatMapper))); + return transform("Flat Map", outType, new StreamFlatMap<>(clean(flatMapper))); } @@ -600,7 +600,7 @@ public IterativeStream iterate(long maxWaitTimeMillis) { * @return The filtered DataStream. */ public SingleOutputStreamOperator filter(FilterFunction filter) { - return transform("Filter", getType(), new StreamFilter(clean(filter))); + return transform("Filter", getType(), new StreamFilter<>(clean(filter))); } @@ -623,7 +623,7 @@ public IterativeStream iterate(long maxWaitTimeMillis) { */ @PublicEvolving public SingleOutputStreamOperator project(int... fieldIndexes) { - return new StreamProjection(this, fieldIndexes).projectTupleX(); + return new StreamProjection<>(this, fieldIndexes).projectTupleX(); } /** @@ -774,7 +774,7 @@ public AllWindowedStream windowAll(WindowAssigner print() { - PrintSinkFunction printFunction = new PrintSinkFunction(); + PrintSinkFunction printFunction = new PrintSinkFunction<>(); return addSink(printFunction); } @@ -789,7 +789,7 @@ public DataStreamSink print() { */ @PublicEvolving public DataStreamSink printToErr() { - PrintSinkFunction printFunction = new PrintSinkFunction(true); + PrintSinkFunction printFunction = new PrintSinkFunction<>(true); return addSink(printFunction); } @@ -807,29 +807,9 @@ public DataStreamSink printToErr() { */ @PublicEvolving public DataStreamSink writeAsText(String path) { - return write(new TextOutputFormat(new Path(path)), 0L); + return writeUsingOutputFormat(new TextOutputFormat(new Path(path))); } - /** - * Writes a DataStream to the file specified by path in text format. The - * writing is performed periodically, in every millis milliseconds. - * - *

- * For every element of the DataStream the result of {@link Object#toString()} - * is written. - * - * @param path - * The path pointing to the location the text file is written to. - * @param millis - * The file update frequency. - * - * @return The closed DataStream. - */ - @PublicEvolving - public DataStreamSink writeAsText(String path, long millis) { - TextOutputFormat tof = new TextOutputFormat(new Path(path)); - return write(tof, millis); - } /** * Writes a DataStream to the file specified by path in text format. @@ -848,34 +828,11 @@ public DataStreamSink writeAsText(String path, long millis) { */ @PublicEvolving public DataStreamSink writeAsText(String path, WriteMode writeMode) { - TextOutputFormat tof = new TextOutputFormat(new Path(path)); + TextOutputFormat tof = new TextOutputFormat<>(new Path(path)); tof.setWriteMode(writeMode); - return write(tof, 0L); + return writeUsingOutputFormat(tof); } - /** - * Writes a DataStream to the file specified by path in text format. - * - *

- * For every element of the DataStream the result of {@link Object#toString()} - * is written. - * - * @param path - * The path pointing to the location the text file is written to - * @param writeMode - * Controls the behavior for existing files. Options are - * NO_OVERWRITE and OVERWRITE. - * @param millis - T the file update frequency - * - * @return The closed DataStream. - */ - @PublicEvolving - public DataStreamSink writeAsText(String path, WriteMode writeMode, long millis) { - TextOutputFormat tof = new TextOutputFormat(new Path(path)); - tof.setWriteMode(writeMode); - return write(tof, millis); - } /** * Writes a DataStream to the file specified by the path parameter. @@ -891,28 +848,9 @@ public DataStreamSink writeAsText(String path, WriteMode writeMode, long mill */ @PublicEvolving public DataStreamSink writeAsCsv(String path) { - return writeAsCsv(path, null, 0L, CsvOutputFormat.DEFAULT_LINE_DELIMITER, CsvOutputFormat.DEFAULT_FIELD_DELIMITER); + return writeAsCsv(path, null, CsvOutputFormat.DEFAULT_LINE_DELIMITER, CsvOutputFormat.DEFAULT_FIELD_DELIMITER); } - /** - * Writes a DataStream to the file specified by the path parameter. The - * writing is performed periodically, in every millis milliseconds. - * - *

- * For every field of an element of the DataStream the result of {@link Object#toString()} - * is written. This method can only be used on data streams of tuples. - * - * @param path - * the path pointing to the location the text file is written to - * @param millis - * the file update frequency - * - * @return the closed DataStream - */ - @PublicEvolving - public DataStreamSink writeAsCsv(String path, long millis) { - return writeAsCsv(path, null, millis, CsvOutputFormat.DEFAULT_LINE_DELIMITER, CsvOutputFormat.DEFAULT_FIELD_DELIMITER); - } /** * Writes a DataStream to the file specified by the path parameter. @@ -931,7 +869,7 @@ public DataStreamSink writeAsCsv(String path, long millis) { */ @PublicEvolving public DataStreamSink writeAsCsv(String path, WriteMode writeMode) { - return writeAsCsv(path, writeMode, 0L, CsvOutputFormat.DEFAULT_LINE_DELIMITER, CsvOutputFormat.DEFAULT_FIELD_DELIMITER); + return writeAsCsv(path, writeMode, CsvOutputFormat.DEFAULT_LINE_DELIMITER, CsvOutputFormat.DEFAULT_FIELD_DELIMITER); } /** @@ -947,31 +885,6 @@ public DataStreamSink writeAsCsv(String path, WriteMode writeMode) { * @param writeMode * Controls the behavior for existing files. Options are * NO_OVERWRITE and OVERWRITE. - * @param millis - * the file update frequency - * - * @return the closed DataStream - */ - @PublicEvolving - public DataStreamSink writeAsCsv(String path, WriteMode writeMode, long millis) { - return writeAsCsv(path, writeMode, millis, CsvOutputFormat.DEFAULT_LINE_DELIMITER, CsvOutputFormat.DEFAULT_FIELD_DELIMITER); - } - - /** - * Writes a DataStream to the file specified by the path parameter. The - * writing is performed periodically every millis milliseconds. - * - *

- * For every field of an element of the DataStream the result of {@link Object#toString()} - * is written. This method can only be used on data streams of tuples. - * - * @param path - * the path pointing to the location the text file is written to - * @param writeMode - * Controls the behavior for existing files. Options are - * NO_OVERWRITE and OVERWRITE. - * @param millis - * the file update frequency * @param rowDelimiter * the delimiter for two rows * @param fieldDelimiter @@ -984,14 +897,13 @@ public DataStreamSink writeAsCsv(String path, WriteMode writeMode, long milli public DataStreamSink writeAsCsv( String path, WriteMode writeMode, - long millis, String rowDelimiter, String fieldDelimiter) { Preconditions.checkArgument( getType().isTupleType(), "The writeAsCsv() method can only be used on data streams of tuples."); - CsvOutputFormat of = new CsvOutputFormat( + CsvOutputFormat of = new CsvOutputFormat<>( new Path(path), rowDelimiter, fieldDelimiter); @@ -1000,7 +912,7 @@ public DataStreamSink writeAsCsv( of.setWriteMode(writeMode); } - return write((OutputFormat) of, millis); + return writeUsingOutputFormat((OutputFormat) of); } /** @@ -1017,7 +929,7 @@ public DataStreamSink writeAsCsv( */ @PublicEvolving public DataStreamSink writeToSocket(String hostName, int port, SerializationSchema schema) { - DataStreamSink returnStream = addSink(new SocketClientSink(hostName, port, schema, 0)); + DataStreamSink returnStream = addSink(new SocketClientSink<>(hostName, port, schema, 0)); returnStream.setParallelism(1); // It would not work if multiple instances would connect to the same port return returnStream; } @@ -1025,13 +937,16 @@ public DataStreamSink writeToSocket(String hostName, int port, SerializationS /** * Writes the dataStream into an output, described by an OutputFormat. * + * The output is not participating in Flink's checkpointing! + * + * For writing to a file system periodically, the use of the "flink-connector-filesystem" is recommended. + * * @param format The output format - * @param millis the write frequency * @return The closed DataStream */ @PublicEvolving - public DataStreamSink write(OutputFormat format, long millis) { - return addSink(new FileSinkFunctionByMillis(format, millis)); + public DataStreamSink writeUsingOutputFormat(OutputFormat format) { + return addSink(new OutputFormatSinkFunction<>(format)); } /** @@ -1054,7 +969,7 @@ public DataStreamSink write(OutputFormat format, long millis) { // read the output type of the input Transform to coax out errors about MissingTypeInfo transformation.getOutputType(); - OneInputTransformation resultTransform = new OneInputTransformation( + OneInputTransformation resultTransform = new OneInputTransformation<>( this.transformation, operatorName, operator, @@ -1077,7 +992,7 @@ public DataStreamSink write(OutputFormat format, long millis) { * @return The modified DataStream. */ protected DataStream setConnectionType(StreamPartitioner partitioner) { - return new DataStream(this.getExecutionEnvironment(), new PartitionTransformation(this.getTransformation(), partitioner)); + return new DataStream<>(this.getExecutionEnvironment(), new PartitionTransformation<>(this.getTransformation(), partitioner)); } /** @@ -1099,9 +1014,9 @@ public DataStreamSink addSink(SinkFunction sinkFunction) { ((InputTypeConfigurable) sinkFunction).setInputType(getType(), getExecutionConfig() ); } - StreamSink sinkOperator = new StreamSink(clean(sinkFunction)); + StreamSink sinkOperator = new StreamSink<>(clean(sinkFunction)); - DataStreamSink sink = new DataStreamSink(this, sinkOperator); + DataStreamSink sink = new DataStreamSink<>(this, sinkOperator); getExecutionEnvironment().addOperator(sink.getTransformation()); return sink; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunctionByMillis.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunctionByMillis.java deleted file mode 100644 index c7a0385d26441..0000000000000 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunctionByMillis.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.functions.sink; - -import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.api.common.io.OutputFormat; - -/** - * Implementation of FileSinkFunction. Writes tuples to file in every millis - * milliseconds. - * - * @param - * Input type - */ -@PublicEvolving -public class FileSinkFunctionByMillis extends FileSinkFunction { - private static final long serialVersionUID = 1L; - - private final long millis; - private long lastTime; - - public FileSinkFunctionByMillis(OutputFormat format, long millis) { - super(format); - this.millis = millis; - lastTime = System.currentTimeMillis(); - } - - /** - * Condition for writing the contents of tupleList and clearing it. - * - * @return value of the updating condition - */ - @Override - protected boolean updateCondition() { - return System.currentTimeMillis() - lastTime >= millis; - } - - /** - * Statements to be executed after writing a batch goes here. - */ - @Override - protected void resetParameters() { - tupleList.clear(); - lastTime = System.currentTimeMillis(); - } -} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/OutputFormatSinkFunction.java similarity index 52% rename from flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java rename to flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/OutputFormatSinkFunction.java index b732198ad1faa..16ec3f5857577 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/OutputFormatSinkFunction.java @@ -18,7 +18,6 @@ package org.apache.flink.streaming.api.functions.sink; import java.io.IOException; -import java.util.ArrayList; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.ExecutionConfig; @@ -34,28 +33,21 @@ /** * Simple implementation of the SinkFunction writing tuples in the specified - * OutputFormat format. Tuples are collected to a list and written to the file - * periodically. The target path and the overwrite mode are pre-packaged in - * format. + * OutputFormat format. * - * @param - * Input type + * @param Input type */ @PublicEvolving -public abstract class FileSinkFunction extends RichSinkFunction implements - InputTypeConfigurable { +public class OutputFormatSinkFunction extends RichSinkFunction implements InputTypeConfigurable { private static final long serialVersionUID = 1L; - private static final Logger LOG = LoggerFactory.getLogger(FileSinkFunction.class); + private static final Logger LOG = LoggerFactory.getLogger(OutputFormatSinkFunction.class); - protected ArrayList tupleList = new ArrayList(); - protected volatile OutputFormat format; - protected volatile boolean cleanupCalled = false; - protected int indexInSubtaskGroup; - protected int currentNumberOfSubtasks; + private OutputFormat format; + private boolean cleanupCalled = false; - public FileSinkFunction(OutputFormat format) { + public OutputFormatSinkFunction(OutputFormat format) { this.format = format; } @@ -63,8 +55,8 @@ public FileSinkFunction(OutputFormat format) { public void open(Configuration parameters) throws Exception { RuntimeContext context = getRuntimeContext(); format.configure(parameters); - indexInSubtaskGroup = context.getIndexOfThisSubtask(); - currentNumberOfSubtasks = context.getNumberOfParallelSubtasks(); + int indexInSubtaskGroup = context.getIndexOfThisSubtask(); + int currentNumberOfSubtasks = context.getNumberOfParallelSubtasks(); format.open(indexInSubtaskGroup, currentNumberOfSubtasks); } @@ -78,66 +70,33 @@ public void setInputType(TypeInformation type, ExecutionConfig executionConfi @Override public void invoke(IN record) throws Exception { - tupleList.add(record); - if (updateCondition()) { - flush(); + try { + format.writeRecord(record); + } catch (Exception ex) { + cleanup(); + throw ex; } } @Override public void close() throws IOException { - if (!tupleList.isEmpty()) { - flush(); - } try { format.close(); } catch (Exception ex) { - if (LOG.isErrorEnabled()) { - LOG.error("Error while writing element.", ex); - } - try { - if (!cleanupCalled && format instanceof CleanupWhenUnsuccessful) { - cleanupCalled = true; - ((CleanupWhenUnsuccessful) format).tryCleanupOnError(); - } - } catch (Throwable t) { - LOG.error("Cleanup on error failed.", t); - } + cleanup(); + throw ex; } } - protected void flush() { + private void cleanup() { try { - for (IN rec : tupleList) { - format.writeRecord(rec); + if (!cleanupCalled && format instanceof CleanupWhenUnsuccessful) { + cleanupCalled = true; + ((CleanupWhenUnsuccessful) format).tryCleanupOnError(); } - } catch (Exception ex) { - try { - if (LOG.isErrorEnabled()) { - LOG.error("Error while writing element.", ex); - } - if (!cleanupCalled && format instanceof CleanupWhenUnsuccessful) { - cleanupCalled = true; - ((CleanupWhenUnsuccessful) format).tryCleanupOnError(); - } - } catch (Throwable t) { - LOG.error("Cleanup on error failed.", t); - } - throw new RuntimeException(ex); + } catch (Throwable t) { + LOG.error("Cleanup on error failed.", t); } - resetParameters(); } - /** - * Condition for writing the contents of tupleList and clearing it. - * - * @return value of the updating condition - */ - protected abstract boolean updateCondition(); - - /** - * Statements to be executed after writing a batch goes here. - */ - protected abstract void resetParameters(); - } diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala index 04c1980aa7a94..f66644ee19f01 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala @@ -738,21 +738,9 @@ class DataStream[T](stream: JavaStream[T]) { */ @PublicEvolving def writeAsText(path: String): DataStreamSink[T] = - stream.writeAsText(path, 0L) + stream.writeAsText(path) + - /** - * Writes a DataStream to the file specified by path in text format. The - * writing is performed periodically, every millis milliseconds. For - * every element of the DataStream the result of .toString - * is written. - * - * @param path The path pointing to the location the text file is written to - * @param millis The file update frequency - * @return The closed DataStream - */ - @PublicEvolving - def writeAsText(path: String, millis: Long): DataStreamSink[T] = - stream.writeAsText(path, millis) /** * Writes a DataStream to the file specified by path in text format. For @@ -772,30 +760,6 @@ class DataStream[T](stream: JavaStream[T]) { } } - /** - * Writes a DataStream to the file specified by path in text format. The writing is performed - * periodically every millis milliseconds. For every element of the DataStream the result of - * .toString is written. - * - * @param path The path pointing to the location the text file is written to - * @param writeMode Controls the behavior for existing files. Options are NO_OVERWRITE and - * OVERWRITE. - * @param millis The file update frequency - * @return The closed DataStream - */ - @PublicEvolving - def writeAsText( - path: String, - writeMode: FileSystem.WriteMode, - millis: Long) - : DataStreamSink[T] = { - if (writeMode != null) { - stream.writeAsText(path, writeMode, millis) - } else { - stream.writeAsText(path, millis) - } - } - /** * Writes the DataStream in CSV format to the file specified by the path parameter. The writing * is performed periodically every millis milliseconds. @@ -808,25 +772,6 @@ class DataStream[T](stream: JavaStream[T]) { writeAsCsv( path, null, - 0L, - ScalaCsvOutputFormat.DEFAULT_LINE_DELIMITER, - ScalaCsvOutputFormat.DEFAULT_FIELD_DELIMITER) - } - - /** - * Writes the DataStream in CSV format to the file specified by the path parameter. The writing - * is performed periodically every millis milliseconds. - * - * @param path Path to the location of the CSV file - * @param millis File update frequency - * @return The closed DataStream - */ - @PublicEvolving - def writeAsCsv(path: String, millis: Long): DataStreamSink[T] = { - writeAsCsv( - path, - null, - millis, ScalaCsvOutputFormat.DEFAULT_LINE_DELIMITER, ScalaCsvOutputFormat.DEFAULT_FIELD_DELIMITER) } @@ -844,26 +789,6 @@ class DataStream[T](stream: JavaStream[T]) { writeAsCsv( path, writeMode, - 0L, - ScalaCsvOutputFormat.DEFAULT_LINE_DELIMITER, - ScalaCsvOutputFormat.DEFAULT_FIELD_DELIMITER) - } - - /** - * Writes the DataStream in CSV format to the file specified by the path parameter. The writing - * is performed periodically every millis milliseconds. - * - * @param path Path to the location of the CSV file - * @param writeMode Controls whether an existing file is overwritten or not - * @param millis File update frequency - * @return The closed DataStream - */ - @PublicEvolving - def writeAsCsv(path: String, writeMode: FileSystem.WriteMode, millis: Long): DataStreamSink[T] = { - writeAsCsv( - path, - writeMode, - millis, ScalaCsvOutputFormat.DEFAULT_LINE_DELIMITER, ScalaCsvOutputFormat.DEFAULT_FIELD_DELIMITER) } @@ -874,7 +799,6 @@ class DataStream[T](stream: JavaStream[T]) { * * @param path Path to the location of the CSV file * @param writeMode Controls whether an existing file is overwritten or not - * @param millis File update frequency * @param rowDelimiter Delimiter for consecutive rows * @param fieldDelimiter Delimiter for consecutive fields * @return The closed DataStream @@ -883,7 +807,6 @@ class DataStream[T](stream: JavaStream[T]) { def writeAsCsv( path: String, writeMode: FileSystem.WriteMode, - millis: Long, rowDelimiter: String, fieldDelimiter: String) : DataStreamSink[T] = { @@ -892,16 +815,15 @@ class DataStream[T](stream: JavaStream[T]) { if (writeMode != null) { of.setWriteMode(writeMode) } - stream.write(of.asInstanceOf[OutputFormat[T]], millis) + stream.writeUsingOutputFormat(of.asInstanceOf[OutputFormat[T]]) } /** - * Writes a DataStream using the given [[OutputFormat]]. The - * writing is performed periodically, in every millis milliseconds. + * Writes a DataStream using the given [[OutputFormat]]. */ @PublicEvolving - def write(format: OutputFormat[T], millis: Long): DataStreamSink[T] = { - stream.write(format, millis) + def writeUsingOutputFormat(format: OutputFormat[T]): DataStreamSink[T] = { + stream.writeUsingOutputFormat(format) } /** diff --git a/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/CsvOutputFormatITCase.java b/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/CsvOutputFormatITCase.java index 10dd5d315dc3d..5dbd9f9683c0d 100644 --- a/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/CsvOutputFormatITCase.java +++ b/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/CsvOutputFormatITCase.java @@ -58,7 +58,7 @@ public void testPath() throws Exception { @Test public void testPathMillis() throws Exception { - OutputFormatTestPrograms.wordCountToCsv(WordCountData.TEXT, resultPath, 1); + OutputFormatTestPrograms.wordCountToCsv(WordCountData.TEXT, resultPath); } @Test @@ -68,12 +68,12 @@ public void testPathWriteMode() throws Exception { @Test public void testPathWriteModeMillis() throws Exception { - OutputFormatTestPrograms.wordCountToCsv(WordCountData.TEXT, resultPath, FileSystem.WriteMode.NO_OVERWRITE, 1); + OutputFormatTestPrograms.wordCountToCsv(WordCountData.TEXT, resultPath, FileSystem.WriteMode.NO_OVERWRITE); } @Test public void testPathWriteModeMillisDelimiter() throws Exception { - OutputFormatTestPrograms.wordCountToCsv(WordCountData.TEXT, resultPath, FileSystem.WriteMode.NO_OVERWRITE, 1, "\n", ","); + OutputFormatTestPrograms.wordCountToCsv(WordCountData.TEXT, resultPath, FileSystem.WriteMode.NO_OVERWRITE, "\n", ","); } @Test @@ -91,7 +91,7 @@ public void failPathWriteMode() throws Exception { public void failPathWriteModeMillis() throws Exception { OutputFormatTestPrograms.wordCountToCsv(WordCountData.TEXT, resultPath); try { - OutputFormatTestPrograms.wordCountToCsv(WordCountData.TEXT, resultPath, FileSystem.WriteMode.NO_OVERWRITE, 1); + OutputFormatTestPrograms.wordCountToCsv(WordCountData.TEXT, resultPath, FileSystem.WriteMode.NO_OVERWRITE); fail("File should exist"); } catch (Exception e) { assertTrue(e.getCause().getMessage().contains("File already exists")); @@ -102,7 +102,7 @@ public void failPathWriteModeMillis() throws Exception { public void failPathWriteModeMillisDelimiter() throws Exception { OutputFormatTestPrograms.wordCountToCsv(WordCountData.TEXT, resultPath); try { - OutputFormatTestPrograms.wordCountToCsv(WordCountData.TEXT, resultPath, FileSystem.WriteMode.NO_OVERWRITE, 1, "\n", ","); + OutputFormatTestPrograms.wordCountToCsv(WordCountData.TEXT, resultPath, FileSystem.WriteMode.NO_OVERWRITE, "\n", ","); fail("File should exist."); } catch (Exception e) { assertTrue(e.getCause().getMessage().contains("File already exists")); diff --git a/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/TextOutputFormatITCase.java b/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/TextOutputFormatITCase.java index d2cc6dbb61120..0adfaad583ae1 100644 --- a/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/TextOutputFormatITCase.java +++ b/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/TextOutputFormatITCase.java @@ -53,20 +53,12 @@ public void testPath() throws Exception { OutputFormatTestPrograms.wordCountToText(WordCountData.TEXT, resultPath); } - @Test - public void testPathMillis() throws Exception { - OutputFormatTestPrograms.wordCountToText(WordCountData.TEXT, resultPath, 1); - } @Test public void testPathWriteMode() throws Exception { OutputFormatTestPrograms.wordCountToText(WordCountData.TEXT, resultPath, FileSystem.WriteMode.NO_OVERWRITE); } - @Test - public void testPathWriteModeMillis() throws Exception { - OutputFormatTestPrograms.wordCountToText(WordCountData.TEXT, resultPath, FileSystem.WriteMode.NO_OVERWRITE, 1); - } @Test public void failPathWriteMode() throws Exception { @@ -79,17 +71,6 @@ public void failPathWriteMode() throws Exception { } } - @Test - public void failPathWriteModeMillis() throws Exception { - OutputFormatTestPrograms.wordCountToText(WordCountData.TEXT, resultPath); - try { - OutputFormatTestPrograms.wordCountToText(WordCountData.TEXT, resultPath, FileSystem.WriteMode.NO_OVERWRITE, 1); - fail("File should exist."); - } catch (Exception e) { - assertTrue(e.getCause().getMessage().contains("File already exists")); - } - } - @After public void closeFile() throws Exception { compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, resultPath); diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/OutputFormatTestPrograms.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/OutputFormatTestPrograms.scala index e0a9a9c09a593..3b47429f22727 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/OutputFormatTestPrograms.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/OutputFormatTestPrograms.scala @@ -46,15 +46,6 @@ object OutputFormatTestPrograms { env.execute("Scala WordCountToText") } - def wordCountToText(input : String, outputPath : String, millis : Long) : Unit = { - val env = StreamExecutionEnvironment.getExecutionEnvironment - val text = env.fromElements(input) - - val counts = wordCountProgram(text) - counts.writeAsText(outputPath, millis) - - env.execute("Scala WordCountToText") - } def wordCountToText( input : String, @@ -70,20 +61,6 @@ object OutputFormatTestPrograms { env.execute("Scala WordCountToText") } - def wordCountToText( - input : String, - outputPath : String, - writeMode : FileSystem.WriteMode, - millis : Long) : Unit = { - val env = StreamExecutionEnvironment.getExecutionEnvironment - val text = env.fromElements(input) - - val counts = wordCountProgram(text) - - counts.writeAsText(outputPath, writeMode, millis) - - env.execute("Scala WordCountToText") - } def wordCountToCsv(input : String, outputPath : String) : Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment @@ -96,16 +73,6 @@ object OutputFormatTestPrograms { env.execute("Scala WordCountToCsv") } - def wordCountToCsv(input : String, outputPath : String, millis : Long) : Unit = { - val env = StreamExecutionEnvironment.getExecutionEnvironment - val text = env.fromElements(input) - - val counts = wordCountProgram(text) - - counts.writeAsCsv(outputPath, millis) - - env.execute("Scala WordCountToCsv") - } def wordCountToCsv( input : String, @@ -121,26 +88,11 @@ object OutputFormatTestPrograms { env.execute("Scala WordCountToCsv") } - def wordCountToCsv( - input : String, - outputPath : String, - writeMode : FileSystem.WriteMode, - millis : Long) : Unit = { - val env = StreamExecutionEnvironment.getExecutionEnvironment - val text = env.fromElements(input) - - val counts = wordCountProgram(text) - - counts.writeAsCsv(outputPath, writeMode, millis) - - env.execute("Scala WordCountToCsv") - } def wordCountToCsv( input : String, outputPath : String, writeMode : FileSystem.WriteMode, - millis : Long, rowDelimiter: String, fieldDelimiter: String) : Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment @@ -148,7 +100,7 @@ object OutputFormatTestPrograms { val counts = wordCountProgram(text) - counts.writeAsCsv(outputPath, writeMode, millis, rowDelimiter, fieldDelimiter) + counts.writeAsCsv(outputPath, writeMode, rowDelimiter, fieldDelimiter) env.execute("Scala WordCountToCsv") } diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java index 922fc43ffdd9f..49e18e04b69ec 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java @@ -173,7 +173,7 @@ public void testStreaming() throws Exception { DataStream input = env.fromCollection(inputData); input .flatMap(new NotifyingMapper()) - .write(new NotifyingOutputFormat(), 1000).disableChaining(); + .writeUsingOutputFormat(new NotifyingOutputFormat()).disableChaining(); jobGraph = env.getStreamGraph().getJobGraph();