Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions docs/apis/streaming/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1819,14 +1819,14 @@ of each element on the standard out / strandard error stream. Optionally, a pref
prepended to the output. This can help to distinguish between different calls to *print*. If the parallelism is
greater than 1, the output will also be prepended with the identifier of the task which produced the output.

- `write()` / `FileOutputFormat` - Method and base class for custom file outputs. Supports
- `writeUsingOutputFormat()` / `FileOutputFormat` - Method and base class for custom file outputs. Supports
custom object-to-bytes conversion.

- `writeToSocket` - Writes elements to a socket according to a `SerializationSchema`

- `addSink` - Invokes a custom sink function. Flink comes bundled with connectors to other systems (such as
Apache Kafka) that are implemented as sink functions.

</div>
<div data-lang="scala" markdown="1">

Expand All @@ -1847,7 +1847,7 @@ of each element on the standard out / strandard error stream. Optionally, a pref
prepended to the output. This can help to distinguish between different calls to *print*. If the parallelism is
greater than 1, the output will also be prepended with the identifier of the task which produced the output.

- `write()` / `FileOutputFormat` - Method and base class for custom file outputs. Supports
- `writeUsingOutputFormat()` / `FileOutputFormat` - Method and base class for custom file outputs. Supports
custom object-to-bytes conversion.

- `writeToSocket` - Writes elements to a socket according to a `SerializationSchema`
Expand All @@ -1858,6 +1858,17 @@ greater than 1, the output will also be prepended with the identifier of the tas
</div>
</div>

Note that the `write*()` methods on `DataStream` are mainly intended for debugging purposes.
They are not participating in Flink's checkpointing, this means these functions usually have
at-least-once semantics. The data flushing to the target system depends on the implementation of the
OutputFormat. This means that not all elements send to the OutputFormat are immediately showing up
in the target system. Also, in failure cases, those records might be lost.

For reliable, exactly-once delivery of a stream into a file system, use the `flink-connector-filesystem`.
Also, custom implementations through the `.addSink(...)` method can partiticpate in Flink's checkpointing
for exactly-once semantics.



{% top %}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
*/
public class HBaseWriteStreamExample {

public static void main(String[] args) {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment
.getExecutionEnvironment();

Expand All @@ -64,14 +64,9 @@ public void cancel() {
isRunning = false;
}
});
dataStream.write(new HBaseOutputFormat(), 0L);
dataStream.writeUsingOutputFormat(new HBaseOutputFormat());

try {
env.execute();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
env.execute();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public static void main(String[] args) throws Exception {

// emit results
if (fileOutput) {
numbers.writeAsText(outputPath, 1);
numbers.writeAsText(outputPath);
} else {
numbers.print();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public static void main(String[] args) throws Exception {

// emit result
if (fileOutput) {
joinedStream.writeAsText(outputPath, 1);
joinedStream.writeAsText(outputPath);
} else {
joinedStream.print();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@
*/
public class IncrementalLearningSkeleton {

private static DataStream<Integer> trainingData = null;
private static DataStream<Integer> newData = null;

// *************************************************************************
// PROGRAM
Expand All @@ -66,8 +64,8 @@ public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

trainingData = env.addSource(new FiniteTrainingDataSource());
newData = env.addSource(new FiniteNewDataSource());
DataStream<Integer> trainingData = env.addSource(new FiniteTrainingDataSource());
DataStream<Integer> newData = env.addSource(new FiniteNewDataSource());

// build new model on every second of new data
DataStream<Double[]> model = trainingData
Expand All @@ -80,7 +78,7 @@ public static void main(String[] args) throws Exception {

// emit result
if (fileOutput) {
prediction.writeAsText(outputPath, 1);
prediction.writeAsText(outputPath);
} else {
prediction.print();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public static void main(String[] args) throws Exception {
.sum(1);

if (fileOutput) {
counts.writeAsText(outputPath, 1);
counts.writeAsText(outputPath);
} else {
counts.print();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ object SocketTextStreamWordCount {
.sum(1)

if (fileOutput) {
counts.writeAsText(outputPath, 1)
counts.writeAsText(outputPath)
} else {
counts print
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public static void main(String[] args) throws Exception {

// emit result
if(fileOutput) {
counts.writeAsCsv(outputPath, 1);
counts.writeAsCsv(outputPath);
} else {
counts.print();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public void invoke() throws Exception {

ExecutionConfig executionConfig;
try {
ExecutionConfig c = (ExecutionConfig) InstantiationUtil.readObjectFromConfig(
ExecutionConfig c = InstantiationUtil.readObjectFromConfig(
getJobConfiguration(),
ExecutionConfig.CONFIG_KEY,
getUserCodeClassLoader());
Expand All @@ -130,7 +130,6 @@ public void invoke() throws Exception {
boolean objectReuseEnabled = executionConfig.isObjectReuseEnabled();

try {

// initialize local strategies
MutableObjectIterator<IT> input1;
switch (this.config.getInputLocalStrategy(0)) {
Expand Down
Loading