Skip to content

Latest commit

 

History

History
4089 lines (3467 loc) · 145 KB

streaming_guide.md

File metadata and controls

4089 lines (3467 loc) · 145 KB
title is_beta
Flink DataStream API Programming Guide
false

DataStream programs in Flink are regular programs that implement transformations on data streams (e.g., filtering, updating state, defining windows, aggregating). The data streams are initially created from various sources (e.g., message queues, socket streams, files). Results are returned via sinks, which may for example write the data to files, or to standard output (for example the command line terminal). Flink programs run in a variety of contexts, standalone, or embedded in other programs. The execution can happen in a local JVM, or on clusters of many machines.

In order to create your own Flink DataStream program, we encourage you to start with the program skeleton and gradually add your own transformations. The remaining sections act as references for additional operations and advanced features.

  • This will be replaced by the TOC {:toc}

Example Program

The following program is a complete, working example of streaming window word count application, that counts the words coming from a web socket in 5 second windows. You can copy & paste the code to run it locally.

{% highlight java %} public class WindowWordCount {

public static void main(String[] args) throws Exception {

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    DataStream<Tuple2<String, Integer>> dataStream = env
            .socketTextStream("localhost", 9999)
            .flatMap(new Splitter())
            .keyBy(0)
            .timeWindow(Time.of(5, TimeUnit.SECONDS))
            .sum(1);

    dataStream.print();

    env.execute("Window WordCount");
}

public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
    @Override
    public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
        for (String word: sentence.split(" ")) {
            out.collect(new Tuple2<String, Integer>(word, 1));
        }
    }
}

} {% endhighlight %}

{% highlight scala %}

object WindowWordCount { def main(args: Array[String]) {

val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.socketTextStream("localhost", 9999)

val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
  .map { (_, 1) }
  .keyBy(0)
  .timeWindow(Time.of(5, TimeUnit.SECONDS))
  .sum(1)

counts.print

env.execute("Window Stream WordCount")

} } {% endhighlight %}

To run the example program, start the input stream with netcat first from a terminal:

nc -lk 9999

Just type some words hitting return for a new word. These will be the input to the word count program. If you want to see counts greater than 1, type the same word again and again within 5 seconds (increase the window size from 5 seconds if you cannot type that fast ☺).

Back to top

Linking with Flink

To write programs with Flink, you need to include the Flink DataStream library corresponding to your programming language in your project.

The simplest way to do this is to use one of the quickstart scripts: either for [Java]({{ site.baseurl }}/quickstart/java_api_quickstart.html) or for [Scala]({{ site.baseurl }}/quickstart/scala_api_quickstart.html). They create a blank project from a template (a Maven Archetype), which sets up everything for you. To manually create the project, you can use the archetype and create a project by calling:

{% highlight bash %} mvn archetype:generate / -DarchetypeGroupId=org.apache.flink/ -DarchetypeArtifactId=flink-quickstart-java / -DarchetypeVersion={{site.version }} {% endhighlight %}
{% highlight bash %} mvn archetype:generate / -DarchetypeGroupId=org.apache.flink/ -DarchetypeArtifactId=flink-quickstart-scala / -DarchetypeVersion={{site.version }} {% endhighlight %}

The archetypes are working for stable releases and preview versions (-SNAPSHOT).

If you want to add Flink to an existing Maven project, add the following entry to your dependencies section in the pom.xml file of your project:

{% highlight xml %} org.apache.flink flink-streaming-java {{site.version }} org.apache.flink flink-clients {{site.version }} {% endhighlight %}
{% highlight xml %} org.apache.flink flink-streaming-scala {{site.version }} org.apache.flink flink-clients {{site.version }} {% endhighlight %}

In order to create your own Flink program, we encourage you to start with the program skeleton and gradually add your own transformations.

Back to top

Program Skeleton


As presented in the example, Flink DataStream programs look like regular Java programs with a main() method. Each program consists of the same basic parts:

  1. Obtaining a StreamExecutionEnvironment,
  2. Connecting to data stream sources,
  3. Specifying transformations on the data streams,
  4. Specifying output for the processed data,
  5. Executing the program.

We will now give an overview of each of those steps, please refer to the respective sections for more details.

The StreamExecutionEnvironment is the basis for all Flink DataStream programs. You can obtain one using these static methods on class StreamExecutionEnvironment:

{% highlight java %} getExecutionEnvironment()

createLocalEnvironment() createLocalEnvironment(int parallelism) createLocalEnvironment(int parallelism, Configuration customConfiguration)

createRemoteEnvironment(String host, int port, String... jarFiles) createRemoteEnvironment(String host, int port, int parallelism, String... jarFiles) {% endhighlight %}

Typically, you only need to use getExecutionEnvironment(), since this will do the right thing depending on the context: if you are executing your program inside an IDE or as a regular Java program it will create a local environment that will execute your program on your local machine. If you created a JAR file from your program, and invoke it through the command line or the web interface, the Flink cluster manager will execute your main method and getExecutionEnvironment() will return an execution environment for executing your program on a cluster.

For specifying data sources the execution environment has several methods to read from files, sockets, and external systems using various methods. To just read data from a socket (useful also for debugging), you can use:

{% highlight java %} StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream lines = env.socketTextStream("localhost", 9999) {% endhighlight %}

This will give you a DataStream on which you can then apply transformations. For more information on data sources and input formats, please refer to Data Sources.

Once you have a DataStream you can apply transformations to create a new DataStream which you can then write to a socket, transform again, combine with other DataStreams, or push to an external system (e.g., a message queue, or a file system). You apply transformations by calling methods on DataStream with your own custom transformation functions. For example, a map transformation looks like this:

{% highlight java %} DataStream input = ...;

DataStream intValues = input.map(new MapFunction<String, Integer>() { @Override public Integer map(String value) { return Integer.parseInt(value); } }); {% endhighlight %}

This will create a new DataStream by converting every String in the original stream to an Integer. For more information and a list of all the transformations, please refer to Transformations.

Once you have a DataStream containing your final results, you can push the result to an external system (HDFS, Kafka, Elasticsearch), write it to a socket, write to a file, or print it.

{% highlight java %} writeAsText(String path, ...) writeAsCsv(String path, ...) writeToSocket(String hostname, int port, ...)

print()

addSink(...) {% endhighlight %}

Once you specified the complete program you need to trigger the program execution by calling execute() on StreamExecutionEnvironment. This will either execute on the local machine or submit the program for execution on a cluster, depending on the chosen execution environment.

{% highlight java %} env.execute(); {% endhighlight %}


As presented in the example, Flink DataStream programs look like regular Scala programs with a main() method. Each program consists of the same basic parts:

  1. Obtaining a StreamExecutionEnvironment,
  2. Connecting to data stream sources,
  3. Specifying transformations on the data streams,
  4. Specifying output for the processed data,
  5. Executing the program.

We will now give an overview of each of those steps, please refer to the respective sections for more details.

The StreamExecutionEnvironment is the basis for all Flink DataStream programs. You can obtain one using these static methods on class StreamExecutionEnvironment:

{% highlight scala %} def getExecutionEnvironment

def createLocalEnvironment(parallelism: Int = Runtime.getRuntime.availableProcessors())

def createRemoteEnvironment(host: String, port: Int, jarFiles: String*) def createRemoteEnvironment(host: String, port: Int, parallelism: Int, jarFiles: String*) {% endhighlight %}

Typically, you only need to use getExecutionEnvironment, since this will do the right thing depending on the context: if you are executing your program inside an IDE or as a regular Java program it will create a local environment that will execute your program on your local machine. If you created a JAR file from your program, and invoke it through the command line or the web interface, the Flink cluster manager will execute your main method and getExecutionEnvironment() will return an execution environment for executing your program on a cluster.

For specifying data sources the execution environment has several methods to read from files, sockets, and external systems using various methods. To just read data from a socket (useful also for debugging), you can use:

{% highlight scala %} StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment

DataStream lines = env.socketTextStream("localhost", 9999) {% endhighlight %}

This will give you a DataStream on which you can then apply transformations. For more information on data sources and input formats, please refer to Data Sources.

Once you have a DataStream you can apply transformations to create a new DataStream which you can then write to a file, transform again, combine with other DataStreams, or push to an external system. You apply transformations by calling methods on DataStream with your own custom transformation function. For example, a map transformation looks like this:

{% highlight scala %} val input: DataStream[String] = ...

val mapped = input.map { x => x.toInt } {% endhighlight %}

This will create a new DataStream by converting every String in the original set to an Integer. For more information and a list of all the transformations, please refer to Transformations.

Once you have a DataStream containing your final results, you can push the result to an external system (HDFS, Kafka, Elasticsearch), write it to a socket, write to a file, or print it.

{% highlight scala %} writeAsText(path: String, ...) writeAsCsv(path: String, ...) writeToSocket(hostname: String, port: Int, ...)

print()

addSink(...) {% endhighlight %}

Once you specified the complete program you need to trigger the program execution by calling execute on StreamExecutionEnvironment. This will either execute on the local machine or submit the program for execution on a cluster, depending on the chosen execution environment.

{% highlight scala %} env.execute() {% endhighlight %}

Back to top

DataStream Abstraction

A DataStream is a possibly unbounded immutable collection of data items of a the same type.

Transformations may return different subtypes of DataStream allowing specialized transformations. For example the keyBy(…) method returns a KeyedDataStream which is a stream of data that is logically partitioned by a certain key, and can be further windowed.

Back to top

Lazy Evaluation

All Flink DataStream programs are executed lazily: When the program's main method is executed, the data loading and transformations do not happen directly. Rather, each operation is created and added to the program's plan. The operations are actually executed when the execution is explicitly triggered by an execute() call on the StreamExecutionEnvironment object. Whether the program is executed locally or on a cluster depends on the type of StreamExecutionEnvironment.

The lazy evaluation lets you construct sophisticated programs that Flink executes as one holistically planned unit.

Back to top

Transformations

Data transformations transform one or more DataStreams into a new DataStream. Programs can combine multiple transformations into sophisticated topologies.

This section gives a description of all the available transformations.


    <tr>
      <td><strong>FlatMap</strong><br>DataStream &rarr; DataStream</td>
      <td>
        <p>Takes one element and produces zero, one, or more elements. A flatmap function that splits sentences to words:</p>
{% highlight java %}

dataStream.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String value, Collector out) throws Exception { for(String word: value.split(" ")){ out.collect(word); } } }); {% endhighlight %}

Transformation Description
Map
DataStream → DataStream

Takes one element and produces one element. A map function that doubles the values of the input stream:

{% highlight java %} DataStream dataStream = //... dataStream.map(new MapFunction() { @Override public Integer map(Integer value) throws Exception { return 2 * value; } }); {% endhighlight %}
Filter
DataStream → DataStream

Evaluates a boolean function for each element and retains those for which the function returns true. A filter that filters out zero values:

{% highlight java %} dataStream.filter(new FilterFunction() { @Override public boolean filter(Integer value) throws Exception { return value != 0; } }); {% endhighlight %}
KeyBy
DataStream → KeyedStream

Logically partitions a stream into disjoint partitions, each partition containing elements of the same key. Internally, this is implemented with hash partitioning. See keys on how to specify keys. This transformation returns a KeyedDataStream.

{% highlight java %} dataStream.keyBy("someKey") // Key by field "someKey" dataStream.keyBy(0) // Key by the first element of a Tuple {% endhighlight %}
Reduce
KeyedStream → DataStream

A "rolling" reduce on a keyed data stream. Combines the current element with the last reduced value and emits the new value.

A reduce function that creates a stream of partial sums:

{% highlight java %} keyedStream.reduce(new ReduceFunction() { @Override public Integer reduce(Integer value1, Integer value2) throws Exception { return value1 + value2; } }); {% endhighlight %}

Fold
KeyedStream → DataStream

A "rolling" fold on a keyed data stream with an initial value. Combines the current element with the last folded value and emits the new value.

A fold function that, when applied on the sequence (1,2,3,4,5), emits the sequence "start-1", "start-1-2", "start-1-2-3", ...

{% highlight java %} DataStream result = keyedStream.fold("start", new FoldFunction<Integer, String>() { @Override public String fold(String current, Integer value) { return current + "-" + value; } }); {% endhighlight %}

Aggregations
KeyedStream → DataStream

Rolling aggregations on a keyed data stream. The difference between min and minBy is that min returns the minimun value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy).

{% highlight java %} keyedStream.sum(0); keyedStream.sum("key"); keyedStream.min(0); keyedStream.min("key"); keyedStream.max(0); keyedStream.max("key"); keyedStream.minBy(0); keyedStream.minBy("key"); keyedStream.maxBy(0); keyedStream.maxBy("key"); {% endhighlight %}
Window
KeyedStream → WindowedStream

Windows can be defined on already partitioned KeyedStreams. Windows group the data in each key according to some characteristic (e.g., the data that arrived within the last 5 seconds). See windows for a complete description of windows. {% highlight java %} dataStream.keyBy(0).window(TumblingTimeWindows.of(5, TimeUnit.SECONDS)); // Last 5 seconds of data {% endhighlight %}

WindowAll
DataStream → AllWindowedStream

Windows can be defined on regular DataStreams. Windows group all the stream events according to some characteristic (e.g., the data that arrived within the last 5 seconds). See windows for a complete description of windows.

WARNING: This is in many cases a non-parallel transformation. All records will be gathered in one task for the windowAll operator.

{% highlight java %} dataStream.windowAll(TumblingTimeWindows.of(Time.of(5, TimeUnit.SECONDS))); // Last 5 seconds of data {% endhighlight %}
Window Apply
WindowedStream → DataStream
AllWindowedStream → DataStream

Applies a general function to the window as a whole. Below is a function that manually sums the elements of a window.

Note: If you are using a windowAll transformation, you need to use an AllWindowFunction instead.

{% highlight java %} windowedStream.apply (new WindowFunction<Tuple2<String,Integer>,Integer>, Tuple, Window>() { public void apply (Tuple tuple, Window window, Iterable<Tuple2<String, Integer>> values, Collector out) throws Exception { int sum = 0; for (value t: values) { sum += t.f1; } out.collect (new Integer(sum)); } }; {% endhighlight %}
Window Reduce
WindowedStream → DataStream

Applies a functional reduce function to the window and returns the reduced value.

{% highlight java %} windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>() { public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception { return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1); } }; {% endhighlight %}
Window Fold
WindowedStream → DataStream

Applies a functional fold function to the window and returns the folded value. The example function, when applied on the sequence (1,2,3,4,5), folds the sequence into the string "start-1-2-3-4-5":

{% highlight java %} windowedStream.fold("start-", new FoldFunction<Integer, String>() { public String fold(String current, Integer value) { return current + "-" + value; } }; {% endhighlight %}
Aggregations on windows
WindowedStream → DataStream

Aggregates the contents of a window. The difference between min and minBy is that min returns the minimun value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy).

{% highlight java %} windowedStream.sum(0); windowedStream.sum("key"); windowedStream.min(0); windowedStream.min("key"); windowedStream.max(0); windowedStream.max("key"); windowedStream.minBy(0); windowedStream.minBy("key"); windowedStream.maxBy(0); windowedStream.maxBy("key"); {% endhighlight %}
Union
DataStream* → DataStream

Union of two or more data streams creating a new stream containing all the elements from all the streams. Node: If you union a data stream with itself you will still only get each element once.

{% highlight java %} dataStream.union(otherStream1, otherStream2, ...); {% endhighlight %}
Window Join
DataStream,DataStream → DataStream

Join two data streams on a given key and a common window.

{% highlight java %} dataStream.join(otherStream) .where(0).equalTo(1) .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.SECONDS))) .apply (new JoinFunction () {...}); {% endhighlight %}
Window CoGroup
DataStream,DataStream → DataStream

Cogroups two data streams on a given key and a common window.

{% highlight java %} dataStream.coGroup(otherStream) .where(0).equalTo(1) .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.SECONDS))) .apply (new CoGroupFunction () {...}); {% endhighlight %}
Connect
DataStream,DataStream → ConnectedStreams

"Connects" two data streams retaining their types. Connect allowing for shared state between the two streams.

{% highlight java %} DataStream someStream = //... DataStream otherStream = //...

ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream); {% endhighlight %}

CoMap, CoFlatMap
ConnectedStreams → DataStream

Similar to map and flatMap on a connected data stream

{% highlight java %} connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() { @Override public Boolean map1(Integer value) { return true; }

@Override
public Boolean map2(String value) {
    return false;
}

}); connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {

@Override public void flatMap1(Integer value, Collector out) { out.collect(value.toString()); }

@Override public void flatMap2(String value, Collector out) { for (String word: value.split(" ")) { out.collect(word); } } }); {% endhighlight %}

Split
DataStream → SplitStream

Split the stream into two or more streams according to some criterion. {% highlight java %} SplitStream split = someDataStream.split(new OutputSelector() { @Override public Iterable select(Integer value) { List output = new ArrayList(); if (value % 2 == 0) { output.add("even"); } else { output.add("odd"); } return output; } }); {% endhighlight %}

Select
SplitStream → DataStream

Select one or more streams from a split stream. {% highlight java %} SplitStream split; DataStream even = split.select("even"); DataStream odd = split.select("odd"); DataStream all = split.select("even","odd"); {% endhighlight %}

Iterate
DataStream → IterativeStream → DataStream

Creates a "feedback" loop in the flow, by redirecting the output of one operator to some previous operator. This is especially useful for defining algorithms that continuously update a model. The following code starts with a stream and applies the iteration body continuously. Elements that are greater than 0 are sent back to the feedback channel, and the rest of the elements are forwarded downstream. See iterations for a complete description. {% highlight java %} IterativeStream iteration = initialStream.iterate(); DataStream iterationBody = iteration.map (/do something/); DataStream feedback = iterationBody.filter(new FilterFunction(){ @Override public boolean filter(Integer value) throws Exception { return value > 0; } }); iteration.closeWith(feedback); DataStream output = iterationBody.filter(new FilterFunction(){ @Override public boolean filter(Integer value) throws Exception { return value <= 0; } }); {% endhighlight %}

Extract Timestamps
DataStream → DataStream

Extracts timestamps from records in order to work with windows that use event time semantics. See working with time. {% highlight java %} stream.assignTimestamps (new TimeStampExtractor() {...}); {% endhighlight %}


    <tr>
      <td><strong>FlatMap</strong><br>DataStream &rarr; DataStream</td>
      <td>
        <p>Takes one element and produces zero, one, or more elements. A flatmap function that splits sentences to words:</p>
{% highlight scala %}

dataStream.flatMap { str => str.split(" ") } {% endhighlight %}

Transformation Description
Map
DataStream → DataStream

Takes one element and produces one element. A map function that doubles the values of the input stream:

{% highlight scala %} dataStream.map { x => x * 2 } {% endhighlight %}
Filter
DataStream → DataStream

Evaluates a boolean function for each element and retains those for which the function returns true. A filter that filters out zero values:

{% highlight scala %} dataStream.filter { _ != 0 } {% endhighlight %}
KeyBy
DataStream → KeyedStream

Logically partitions a stream into disjoint partitions, each partition containing elements of the same key. Internally, this is implemented with hash partitioning. See keys on how to specify keys. This transformation returns a KeyedDataStream.

{% highlight scala %} dataStream.keyBy("someKey") // Key by field "someKey" dataStream.keyBy(0) // Key by the first element of a Tuple {% endhighlight %}
Reduce
KeyedStream → DataStream

A "rolling" reduce on a keyed data stream. Combines the current element with the last reduced value and emits the new value.

A reduce function that creates a stream of partial sums:

{% highlight scala %} keyedStream.reduce { _ + _ } {% endhighlight %}

Fold
KeyedStream → DataStream

A "rolling" fold on a keyed data stream with an initial value. Combines the current element with the last folded value and emits the new value.

A fold function that, when applied on the sequence (1,2,3,4,5), emits the sequence "start-1", "start-1-2", "start-1-2-3", ...

{% highlight scala %} val result: DataStream[String] = keyedStream.fold("start", (str, i) => { str + "-" + i }) {% endhighlight %}

Aggregations
KeyedStream → DataStream

Rolling aggregations on a keyed data stream. The difference between min and minBy is that min returns the minimun value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy).

{% highlight scala %} keyedStream.sum(0) keyedStream.sum("key") keyedStream.min(0) keyedStream.min("key") keyedStream.max(0) keyedStream.max("key") keyedStream.minBy(0) keyedStream.minBy("key") keyedStream.maxBy(0) keyedStream.maxBy("key") {% endhighlight %}
Window
KeyedStream → WindowedStream

Windows can be defined on already partitioned KeyedStreams. Windows group the data in each key according to some characteristic (e.g., the data that arrived within the last 5 seconds). See windows for a description of windows. {% highlight scala %} dataStream.keyBy(0).window(TumblingTimeWindows.of(5, TimeUnit.SECONDS)) // Last 5 seconds of data // Last 5 seconds of data {% endhighlight %}

WindowAll
DataStream → AllWindowedStream

Windows can be defined on regular DataStreams. Windows group all the stream events according to some characteristic (e.g., the data that arrived within the last 5 seconds). See windows for a complete description of windows.

WARNING: This is in many cases a non-parallel transformation. All records will be gathered in one task for the windowAll operator.

{% highlight scala %} dataStream.windowAll(TumblingTimeWindows.of(Time.of(5, TimeUnit.SECONDS))) // Last 5 seconds of data {% endhighlight %}
Window Apply
WindowedStream → DataStream
AllWindowedStream → DataStream

Applies a general function to the window as a whole. Below is a function that manually sums the elements of a window.

Note: If you are using a windowAll transformation, you need to use an AllWindowFunction instead.

{% highlight scala %} windowedStream.apply { applyFunction } {% endhighlight %}
Window Reduce
WindowedStream → DataStream

Applies a functional reduce function to the window and returns the reduced value.

{% highlight scala %} windowedStream.reduce { _ + _ } {% endhighlight %}
Window Fold
WindowedStream → DataStream

Applies a functional fold function to the window and returns the folded value. The example function, when applied on the sequence (1,2,3,4,5), folds the sequence into the string "start-1-2-3-4-5":

{% highlight scala %} val result: DataStream[String] = windowedStream.fold("start", (str, i) => { str + "-" + i }) {% endhighlight %}
Aggregations on windows
WindowedStream → DataStream

Aggregates the contents of a window. The difference between min and minBy is that min returns the minimun value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy).

{% highlight scala %} windowedStream.sum(0) windowedStream.sum("key") windowedStream.min(0) windowedStream.min("key") windowedStream.max(0) windowedStream.max("key") windowedStream.minBy(0) windowedStream.minBy("key") windowedStream.maxBy(0) windowedStream.maxBy("key") {% endhighlight %}
Union
DataStream* → DataStream

Union of two or more data streams creating a new stream containing all the elements from all the streams. Node: If you union a data stream with itself you will still only get each element once.

{% highlight scala %} dataStream.union(otherStream1, otherStream2, ...) {% endhighlight %}
Window Join
DataStream,DataStream → DataStream

Join two data streams on a given key and a common window.

{% highlight scala %} dataStream.join(otherStream) .where(0).equalTo(1) .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.SECONDS))) .apply { ... } {% endhighlight %}
Window CoGroup
DataStream,DataStream → DataStream

Cogroups two data streams on a given key and a common window.

{% highlight scala %} dataStream.coGroup(otherStream) .where(0).equalTo(1) .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.SECONDS))) .apply {} {% endhighlight %}
Connect
DataStream,DataStream → ConnectedStreams

"Connects" two data streams retaining their types, allowing for shared state between the two streams.

{% highlight scala %} someStream : DataStream[Int] = ... otherStream : DataStream[String] = ...

val connectedStreams = someStream.connect(otherStream) {% endhighlight %}

CoMap, CoFlatMap
ConnectedStreams → DataStream

Similar to map and flatMap on a connected data stream

{% highlight scala %} connectedStreams.map( (_ : Int) => true, (_ : String) => false ) connectedStreams.flatMap( (_ : Int) => true, (_ : String) => false ) {% endhighlight %}
Split
DataStream → SplitStream

Split the stream into two or more streams according to some criterion. {% highlight scala %} val split = someDataStream.split( (num: Int) => (num % 2) match { case 0 => List("even") case 1 => List("odd") } ) {% endhighlight %}

Select
SplitStream → DataStream

Select one or more streams from a split stream. {% highlight scala %}

val even = split select "even" val odd = split select "odd" val all = split.select("even","odd") {% endhighlight %}

Iterate
DataStream → IterativeStream → DataStream

Creates a "feedback" loop in the flow, by redirecting the output of one operator to some previous operator. This is especially useful for defining algorithms that continuously update a model. The following code starts with a stream and applies the iteration body continuously. Elements that are greater than 0 are sent back to the feedback channel, and the rest of the elements are forwarded downstream. See iterations for a complete description. {% highlight java %} initialStream. iterate { iteration => { val iterationBody = iteration.map {/do something/} (iterationBody.filter(_ > 0), iterationBody.filter(_ <= 0)) } } IterativeStream iteration = initialStream.iterate(); DataStream iterationBody = iteration.map (/do something/); DataStream feedback = iterationBody.filter ( _ > 0); iteration.closeWith(feedback); {% endhighlight %}

Extract Timestamps
DataStream → DataStream

Extracts timestamps from records in order to work with windows that use event time semantics. See working with time. {% highlight scala %} stream.assignTimestamps { timestampExtractor } {% endhighlight %}

The following transformations are available on data streams of Tuples:


Transformation Description
Project
DataStream → DataStream

Selects a subset of fields from the tuples {% highlight java %} DataStream> in = // [...] DataStream> out = in.project(2,0); {% endhighlight %}


Transformation Description
Project
DataStream → DataStream

Selects a subset of fields from the tuples {% highlight scala %} val in : DataStream[(Int,Double,String)] = // [...] val out = in.project(2,0) {% endhighlight %}

Physical partitioning

Flink also gives low-level control (if desired) on the exact stream partitioning after a transformation, via the following functions.


Transformation Description
Hash partitioning
DataStream → DataStream

Identical to keyBy but returns a DataStream instead of a KeyedStream. {% highlight java %} dataStream.partitionByHash("someKey"); dataStream.partitionByHash(0); {% endhighlight %}

Custom partitioning
DataStream → DataStream

Uses a user-defined Partitioner to select the target task for each element. {% highlight java %} dataStream.partitionCustom(new Partitioner(){...}, "someKey"); dataStream.partitionCustom(new Partitioner(){...}, 0); {% endhighlight %}

Random partitioning
DataStream → DataStream

Partitions elements randomly according to a uniform distribution. {% highlight java %} dataStream.partitionRandom(); {% endhighlight %}

Rebalancing (Round-robin partitioning)
DataStream → DataStream

Partitions elements round-robin, creating equal load per partition. Useful for performance optimization in the presence of data skew. {% highlight java %} dataStream.rebalance(); {% endhighlight %}

Broadcasting
DataStream → DataStream

Broadcasts elements to every partition. {% highlight java %} dataStream.broadcast(); {% endhighlight %}


Transformation Description
Hash partitioning
DataStream → DataStream

Identical to keyBy but returns a DataStream instead of a KeyedStream. {% highlight scala %} dataStream.partitionByHash("someKey") dataStream.partitionByHash(0) {% endhighlight %}

Custom partitioning
DataStream → DataStream

Uses a user-defined Partitioner to select the target task for each element. {% highlight scala %} dataStream.partitionCustom(partitioner, "someKey") dataStream.partitionCustom(partitioner, 0) {% endhighlight %}

Random partitioning
DataStream → DataStream

Partitions elements randomly according to a uniform distribution. {% highlight scala %} dataStream.partitionRandom() {% endhighlight %}

Rebalancing (Round-robin partitioning)
DataStream → DataStream

Partitions elements round-robin, creating equal load per partition. Useful for performance optimization in the presence of data skew. {% highlight scala %} dataStream.rebalance() {% endhighlight %}

Broadcasting
DataStream → DataStream

Broadcasts elements to every partition. {% highlight scala %} dataStream.broadcast() {% endhighlight %}

Task chaining and resource groups

Chaining two subsequent transformations means co-locating them within the same thread for better performance. Flink by default chains operators if this is possible (e.g., two subsequent map transformations). The API gives fine-grained control over chaining if desired:

Use StreamExecutionEnvironment.disableOperatorChaining() if you want to disable chaining in the whole job. For more fine grained control, the following functions are available. Note that these functions can only be used right after a DataStream transformation as they refer to the previous transformation. For example, you can use someStream.map(...).startNewChain(), but you cannot use someStream.startNewChain().

A resource group is a slot in Flink, see slots. You can manually isolate operators in separate slots if desired.


Transformation Description
Start new chain

Begin a new chain, starting with this operator. The two mappers will be chained, and filter will not be chained to the first mapper. {% highlight java %} someStream.filter(...).map(...).startNewChain().map(...); {% endhighlight %}

Disable chaining

Do not chain the map operator {% highlight java %} someStream.map(...).disableChaining(); {% endhighlight %}

Start a new resource group

Start a new resource group containing the map and the subsequent operators. {% highlight java %} someStream.filter(...).startNewResourceGroup(); {% endhighlight %}

Isolate resources

Isolate the operator in its own slot. {% highlight java %} someStream.map(...).isolateResources(); {% endhighlight %}


Transformation Description
Start new chain

Begin a new chain, starting with this operator. The two mappers will be chained, and filter will not be chained to the first mapper. {% highlight scala %} someStream.filter(...).map(...).startNewChain().map(...) {% endhighlight %}

Disable chaining

Do not chain the map operator {% highlight scala %} someStream.map(...).disableChaining() {% endhighlight %}

Start a new resource group

Start a new resource group containing the map and the subsequent operators. {% highlight scala %} someStream.filter(...).startNewResourceGroup() {% endhighlight %}

Isolate resources

Isolate the operator in its own slot. {% highlight scala %} someStream.map(...).isolateResources() {% endhighlight %}

Back to top

Specifying Keys

The keyBy transformation requires that a key is defined on its argument DataStream.

A DataStream is keyed as {% highlight java %} DataStream<...> input = // [...] DataStream<...> windowed = input .keyBy(/define key here/) .window(/define window here/); {% endhighlight %}

The data model of Flink is not based on key-value pairs. Therefore, you do not need to physically pack the data stream types into keys and values. Keys are "virtual": they are defined as functions over the actual data to guide the grouping operator.

See the relevant section of the DataSet API documentation on how to specify keys. Just replace DataSet with DataStream, and groupBy with keyBy.

Passing Functions to Flink

Some transformations take user-defined functions as arguments.

See the relevant section of the DataSet API documentation.

Back to top

Data Types

Flink places some restrictions on the type of elements that are used in DataStreams and in results of transformations. The reason for this is that the system analyzes the types to determine efficient execution strategies.

See the relevant section of the DataSet API documentation.

Back to top

Data Sources


Sources can by created by using StreamExecutionEnvironment.addSource(sourceFunction). You can either use one of the source functions that come with Flink or write a custom source by implementing the SourceFunction for non-parallel sources, or by implementing the ParallelSourceFunction interface or extending RichParallelSourceFunction for parallel sources.

There are several predefined stream sources accessible from the StreamExecutionEnvironment:

File-based:

  • readTextFile(path) / TextInputFormat - Reads files line wise and returns them as Strings.

  • readTextFileWithValue(path) / TextValueInputFormat - Reads files line wise and returns them as StringValues. StringValues are mutable strings.

  • readFile(path) / Any input format - Reads files as dictated by the input format.

  • readFileOfPrimitives(path, Class) / PrimitiveInputFormat - Parses files of new-line (or another char sequence) delimited primitive data types such as String or Integer.

  • readFileStream - create a stream by appending elements when there are changes to a file

Socket-based:

  • socketTextStream - Reads from a socket. Elements can be separated by a delimiter.

Collection-based:

  • fromCollection(Collection) - Creates a data stream from the Java Java.util.Collection. All elements in the collection must be of the same type.

  • fromCollection(Iterator, Class) - Creates a data stream from an iterator. The class specifies the data type of the elements returned by the iterator.

  • fromElements(T ...) - Creates a data stream from the given sequence of objects. All objects must be of the same type.

  • fromParallelCollection(SplittableIterator, Class) - Creates a data stream from an iterator, in parallel. The class specifies the data type of the elements returned by the iterator.

  • generateSequence(from, to) - Generates the sequence of numbers in the given interval, in parallel.

Custom:

  • addSource - Attache a new source function. For example, to read from Apache Kafka you can use addSource(new FlinkKafkaConsumer082<>(...)). See connectors for more details.

Sources can by created by using StreamExecutionEnvironment.addSource(sourceFunction). You can either use one of the source functions that come with Flink or write a custom source by implementing the SourceFunction for non-parallel sources, or by implementing the ParallelSourceFunction interface or extending RichParallelSourceFunction for parallel sources.

There are several predefined stream sources accessible from the StreamExecutionEnvironment:

File-based:

  • readTextFile(path) / TextInputFormat - Reads files line wise and returns them as Strings.

  • readTextFileWithValue(path) / TextValueInputFormat - Reads files line wise and returns them as StringValues. StringValues are mutable strings.

  • readFile(path) / Any input format - Reads files as dictated by the input format.

  • readFileOfPrimitives(path, Class) / PrimitiveInputFormat - Parses files of new-line (or another char sequence) delimited primitive data types such as String or Integer.

  • readFileStream - create a stream by appending elements when there are changes to a file

Socket-based:

  • socketTextStream - Reads from a socket. Elements can be separated by a delimiter.

Collection-based:

  • fromCollection(Seq) - Creates a data stream from the Java Java.util.Collection. All elements in the collection must be of the same type.

  • fromCollection(Iterator) - Creates a data stream from an iterator. The class specifies the data type of the elements returned by the iterator.

  • fromElements(elements: _*) - Creates a data stream from the given sequence of objects. All objects must be of the same type.

  • fromParallelCollection(SplittableIterator) - Creates a data stream from an iterator, in parallel. The class specifies the data type of the elements returned by the iterator.

  • generateSequence(from, to) - Generates the sequence of numbers in the given interval, in parallel.

Custom:

  • addSource - Attache a new source function. For example, to read from Apache Kafka you can use addSource(new FlinkKafkaConsumer082<>(...)). See connectors for more details.

Back to top

Execution Configuration

The StreamExecutionEnvironment also contains the ExecutionConfig which allows to set job specific configuration values for the runtime.

See the relevant section of the DataSet API documentation.

Parameters in the ExecutionConfig that pertain specifically to the DataStream API are:

  • enableTimestamps() / disableTimestamps(): Attach a timestamp to each event emitted from a source. areTimestampsEnabled() returns the current value.

  • setAutoWatermarkInterval(long milliseconds): Set the interval for automatic watermark emission. You can get the current value with long getAutoWatermarkInterval()

Back to top

Data Sinks


Data sinks consume DataStreams and forward them to files, sockets, external systems, or print them. Flink comes with a variety of built-in output formats that are encapsulated behind operations on the DataStreams:

  • writeAsText() / TextOuputFormat - Writes elements line-wise as Strings. The Strings are obtained by calling the toString() method of each element.

  • writeAsCsv(...) / CsvOutputFormat - Writes tuples as comma-separated value files. Row and field delimiters are configurable. The value for each field comes from the toString() method of the objects.

  • print() / printToErr() - Prints the toString() value of each element on the standard out / strandard error stream. Optionally, a prefix (msg) can be provided which is prepended to the output. This can help to distinguish between different calls to print. If the parallelism is greater than 1, the output will also be prepended with the identifier of the task which produced the output.

  • write() / FileOutputFormat - Method and base class for custom file outputs. Supports custom object-to-bytes conversion.

  • writeToSocket - Writes elements to a socket according to a SerializationSchema

  • addSink - Invokes a custom sink function. Flink comes bundled with connectors to other systems (such as Apache Kafka) that are implemented as sink functions.


Data sinks consume DataStreams and forward them to files, sockets, external systems, or print them. Flink comes with a variety of built-in output formats that are encapsulated behind operations on the DataStreams:

  • writeAsText() / TextOuputFormat - Writes elements line-wise as Strings. The Strings are obtained by calling the toString() method of each element.

  • writeAsCsv(...) / CsvOutputFormat - Writes tuples as comma-separated value files. Row and field delimiters are configurable. The value for each field comes from the toString() method of the objects.

  • print() / printToErr() - Prints the toString() value of each element on the standard out / strandard error stream. Optionally, a prefix (msg) can be provided which is prepended to the output. This can help to distinguish between different calls to print. If the parallelism is greater than 1, the output will also be prepended with the identifier of the task which produced the output.

  • write() / FileOutputFormat - Method and base class for custom file outputs. Supports custom object-to-bytes conversion.

  • writeToSocket - Writes elements to a socket according to a SerializationSchema

  • addSink - Invokes a custom sink function. Flink comes bundled with connectors to other systems (such as Apache Kafka) that are implemented as sink functions.

Back to top

Debugging

Before running a streaming program in a distributed cluster, it is a good idea to make sure that the implemented algorithm works as desired. Hence, implementing data analysis programs is usually an incremental process of checking results, debugging, and improving.

Flink provides features to significantly ease the development process of data analysis programs by supporting local debugging from within an IDE, injection of test data, and collection of result data. This section give some hints how to ease the development of Flink programs.

Local Execution Environment

A LocalStreamEnvironment starts a Flink system within the same JVM process it was created in. If you start the LocalEnvironement from an IDE, you can set breakpoints in your code and easily debug your program.

A LocalEnvironment is created and used as follows:

{% highlight java %} final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

DataStream lines = env.addSource(/* some source */); // build your program

env.execute(); {% endhighlight %}

{% highlight scala %} val env = StreamExecutionEnvironment.createLocalEnvironment()

val lines = env.addSource(/* some source */) // build your program

env.execute() {% endhighlight %}

Collection Data Sources

Flink provides special data sources which are backed by Java collections to ease testing. Once a program has been tested, the sources and sinks can be easily replaced by sources and sinks that read from / write to external systems.

Collection data sources can be used as follows:

{% highlight java %} final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

// Create a DataStream from a list of elements DataStream myInts = env.fromElements(1, 2, 3, 4, 5);

// Create a DataStream from any Java collection List<Tuple2<String, Integer>> data = ... DataStream<Tuple2<String, Integer>> myTuples = env.fromCollection(data);

// Create a DataStream from an Iterator Iterator longIt = ... DataStream myLongs = env.fromCollection(longIt, Long.class); {% endhighlight %}

{% highlight scala %} val env = StreamExecutionEnvironment.createLocalEnvironment()

// Create a DataStream from a list of elements val myInts = env.fromElements(1, 2, 3, 4, 5)

// Create a DataStream from any Collection val data: Seq[(String, Int)] = ... val myTuples = env.fromCollection(data)

// Create a DataStream from an Iterator val longIt: Iterator[Long] = ... val myLongs = env.fromCollection(longIt) {% endhighlight %}

Note: Currently, the collection data source requires that data types and iterators implement Serializable. Furthermore, collection data sources can not be executed in parallel ( parallelism = 1).

Back to top

Windows

Working with Time

Windows are typically groups of events within a certain time period. Reasoning about time and windows assumes a definition of time. Flink has support for three kinds of time:

  • Processing time: Processing time is simply the wall clock time of the machine that happens to be executing the transformation. Processing time is the simplest notion of time and provides the best performance. However, in distributed and asynchronous environments processing time does not provide determinism.

  • Event time: Event time is the time that each individual event occurred. This time is typically embedded within the records before they enter Flink or can be extracted from their contents. When using event time, out-of-order events can be properly handled. For example, an event with a lower timestamp may arrive after an event with a higher timestamp, but transformations will handle these events correctly. Event time processing provides predictable results, but incurs more latency, as out-of-order events need to be buffered

  • Ingestion time: Ingestion time is the time that events enter Flink. In particular, the timestamp of an event is assigned by the source operator as the current wall clock time of the machine that executes the source task at the time the records enter the Flink source. Ingestion time is more predictable than processing time, and gives lower latencies than event time as the latency does not depend on external systems. Ingestion time provides thus a middle ground between processing time and event time. Ingestion time is a special case of event time (and indeed, it is treated by Flink identically to event time).

When dealing with event time, transformations need to avoid indefinite wait times for events to arrive. Watermarks provide the mechanism to control the event time-processing time skew. Watermarks are emitted by the sources. A watermark with a certain timestamp denotes the knowledge that no event with timestamp lower than the timestamp of the watermark will ever arrive.

You can specify the semantics of time in a Flink DataStream program using StreamExecutionEnviroment, as

{% highlight java %} env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); {% endhighlight %}
{% highlight java %} env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) {% endhighlight %}

The default value is TimeCharacteristic.ProcessingTime, so in order to write a program with processing time semantics nothing needs to be specified (e.g., the first example in this guide follows processing time semantics).

In order to work with event time semantics, you need to follow four steps:

  • Set env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

  • Use DataStream.assignTimestamps(...) in order to tell Flink how timestamps relate to events (e.g., which record field is the timestamp)

  • Set enableTimestamps(), as well the interval for watermark emission (setAutoWatermarkInterval(long milliseconds)) in ExecutionConfig.

For example, assume that we have a data stream of tuples, in which the first field is the timestamp (assigned by the system that generates these data streams), and we know that the lag between the current processing time and the timestamp of an event is never more than 1 second:

{% highlight java %} DataStream> stream = //... stream.assignTimestamps(new TimestampExtractor>{ @Override public long extractTimestamp(Tuple4 element, long currentTimestamp) { return element.f0; }
@Override
public long extractWatermark(Tuple4<Long,Integer,Double,String> element, long currentTimestamp) {
    return element.f0 - 1000;
}

@Override
public long getCurrentWatermark() {
    return Long.MIN_VALUE;
}

}); {% endhighlight %}

{% highlight scala %} val stream: DataStream[(Long,Int,Double,String)] = null; stream.assignTimestampts(new TimestampExtractor[(Long, Int, Double, String)] { override def extractTimestamp(element: (Long, Int, Double, String), currentTimestamp: Long): Long = element._1

override def extractWatermark(element: (Long, Int, Double, String), currentTimestamp: Long): Long = element._1 - 1000

override def getCurrentWatermark: Long = Long.MinValue }) {% endhighlight %}

If you know that timestamps of events are always ascending, i.e., elements arrive in order, you can use the AscendingTimestampExtractor, and the system generates watermarks automatically:

{% highlight java %} DataStream> stream = //... stream.assignTimestamps(new AscendingTimestampExtractor>{ @Override public long extractAscendingTimestamp(Tuple4 element, long currentTimestamp) { return element.f0; } }); {% endhighlight %}
{% highlight scala %} stream.extractAscendingTimestamp(record => record._1) {% endhighlight %}

In order to write a program with ingestion time semantics, you need to set env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime). You can think of this setting as a shortcut for writing a TimestampExtractor which assignes timestamps to events at the sources based on the current source wall-clock time. Flink injects this timestamp extractor automatically.

Windows on Keyed Data Streams

Flink offers a variety of methods for defining windows on a KeyedStream. All of these group elements per key, i.e., each window will contain elements with the same key value.

Basic Window Constructs

Flink offers a general window mechanism that provides flexibility, as well as a number of pre-defined windows for common use cases. See first if your use case can be served by the pre-defined windows below before moving to defining your own windows.


Transformation Description
Tumbling time window
KeyedStream → WindowedStream

Defines a window of 5 seconds, that "tumbles". This means that elements are grouped according to their timestamp in groups of 5 second duration, and every element belongs to exactly one window. The notion of time is specified by the selected TimeCharacteristic (see time). {% highlight java %} keyedStream.timeWindow(Time.of(5, TimeUnit.SECONDS)); {% endhighlight %}

Sliding time window
KeyedStream → WindowedStream

Defines a window of 5 seconds, that "slides" by 1 seconds. This means that elements are grouped according to their timestamp in groups of 5 second duration, and elements can belong to more than one window (since windows overlap by at most 4 seconds) The notion of time is specified by the selected TimeCharacteristic (see time). {% highlight java %} keyedStream.timeWindow(Time.of(5, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS)); {% endhighlight %}

Tumbling count window
KeyedStream → WindowedStream

Defines a window of 1000 elements, that "tumbles". This means that elements are grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, and every element belongs to exactly one window. {% highlight java %} keyedStream.countWindow(1000); {% endhighlight %}

Sliding count window
KeyedStream → WindowedStream

Defines a window of 1000 elements, that "slides" every 100 elements. This means that elements are grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, and every element can belong to more than one window (as windows overlap by at most 900 elements). {% highlight java %} keyedStream.countWindow(1000, 100) {% endhighlight %}


Transformation Description
Tumbling time window
KeyedStream → WindowedStream

Defines a window of 5 seconds, that "tumbles". This means that elements are grouped according to their timestamp in groups of 5 second duration, and every element belongs to exactly one window. The notion of time is specified by the selected TimeCharacteristic (see time). {% highlight scala %} keyedStream.timeWindow(Time.of(5, TimeUnit.SECONDS)) {% endhighlight %}

Sliding time window
KeyedStream → WindowedStream

Defines a window of 5 seconds, that "slides" by 1 seconds. This means that elements are grouped according to their timestamp in groups of 5 second duration, and elements can belong to more than one window (since windows overlap by at most 4 seconds) The notion of time is specified by the selected TimeCharacteristic (see time). {% highlight scala %} keyedStream.timeWindow(Time.of(5, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS)) {% endhighlight %}

Tumbling count window
KeyedStream → WindowedStream

Defines a window of 1000 elements, that "tumbles". This means that elements are grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, and every element belongs to exactly one window. {% highlight scala %} keyedStream.countWindow(1000) {% endhighlight %}

Sliding count window
KeyedStream → WindowedStream

Defines a window of 1000 elements, that "slides" every 100 elements. This means that elements are grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, and every element can belong to more than one window (as windows overlap by at most 900 elements). {% highlight scala %} keyedStream.countWindow(1000, 100) {% endhighlight %}

Advanced Window Constructs

The general mechanism can define more powerful windows at the cost of more verbose syntax. For example, below is a window definition where windows hold elements of the last 5 seconds and slides every 1 second, but the execution of the window function is triggered when 100 elements have been added to the window, and every time execution is triggered, 10 elements are retained in the window:

{% highlight java %} keyedStream .window(SlidingTimeWindows.of(Time.of(5, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS)) .trigger(CountTrigger.of(100)) .evictor(CountEvictor.of(10)); {% endhighlight %}
{% highlight scala %} keyedStream .window(SlidingTimeWindows.of(Time.of(5, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS)) .trigger(CountTrigger.of(100)) .evictor(CountEvictor.of(10)) {% endhighlight %}

The general recipe for building a custom window is to specify (1) a WindowAssigner, (2) a Trigger (optionally), and (3) an Evictor (optionally).

The WindowAssigner defines how incoming elements are assigned to windows. A window is a logical group of elements that has a begin-value, and an end-value corresponding to a begin-time and end-time. Elements with timestamp (according to some notion of time described above within these values are part of the window).

For example, the SlidingTimeWindows assigner in the code above defines a window of size 5 seconds, and a slide of 1 second. Assume that time starts from 0 and is measured in milliseconds. Then, we have 6 windows that overlap: [0,5000], [1000,6000], [2000,7000], [3000, 8000], [4000, 9000], and [5000, 10000]. Each incoming element is assigned to the windows according to its timestamp. For example, an element with timestamp 2000 will be assigned to the first three windows. Flink comes bundled with window assigners that cover the most common use cases. You can write your own window types by extending the WindowAssigner class.

Transformation Description
Global window
KeyedStream → WindowedStream

All incoming elements of a given key are assigned to the same window. The window does not contain a default trigger, hence it will never be triggered if a trigger is not explicitly specified.

{% highlight java %} stream.window(GlobalWindows.create()); {% endhighlight %}
Tumbling time windows
KeyedStream → WindowedStream

Incoming elements are assigned to a window of a certain size (1 second below) based on their timestamp. Windows do not overlap, i.e., each element is assigned to exactly one window. The notion of time is picked from the specified TimeCharacteristic (see time). The window comes with a default trigger. For event/ingestion time, a window is triggered when a watermark with value higher than its end-value is received, whereas for processing time when the current processing time exceeds its current end value.

{% highlight java %} stream.window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))); {% endhighlight %}
Sliding time windows
KeyedStream → WindowedStream

Incoming elements are assigned to a window of a certain size (5 seconds below) based on their timestamp. Windows "slide" by the provided value (1 second in the example), and hence overlap. The window comes with a default trigger. For event/ingestion time, a window is triggered when a watermark with value higher than its end-value is received, whereas for processing time when the current processing time exceeds its current end value.

{% highlight java %} stream.window(SlidingTimeWindows.of(Time.of(5, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS))); {% endhighlight %}
Transformation Description
Global window
KeyedStream → WindowedStream

All incoming elements of a given key are assigned to the same window. The window does not contain a default trigger, hence it will never be triggered if a trigger is not explicitly specified.

{% highlight scala %} stream.window(GlobalWindows.create) {% endhighlight %}
Tumbling time windows
KeyedStream → WindowedStream

Incoming elements are assigned to a window of a certain size (1 second below) based on their timestamp. Windows do not overlap, i.e., each element is assigned to exactly one window. The notion of time is specified by the selected TimeCharacteristic (see time). The window comes with a default trigger. For event/ingestion time, a window is triggered when a watermark with value higher than its end-value is received, whereas for processing time when the current processing time exceeds its current end value.

{% highlight scala %} stream.window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) {% endhighlight %}
Sliding time windows
KeyedStream → WindowedStream

Incoming elements are assigned to a window of a certain size (5 seconds below) based on their timestamp. Windows "slide" by the provided value (1 second in the example), and hence overlap. The window comes with a default trigger. For event/ingestion time, a window is triggered when a watermark with value higher than its end-value is received, whereas for processing time when the current processing time exceeds its current end value.

{% highlight scala %} stream.window(SlidingTimeWindows.of(Time.of(5, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS))) {% endhighlight %}

The Trigger specifies when the function that comes after the window clause (e.g., sum, count) is evaluated ("fires") for each window. If a trigger is not specified, a default trigger for each window type is used (that is part of the definition of the WindowAssigner). Flink comes bundled with a set of triggers if the ones that windows use by default do not fit the application. You can write your own trigger by implementing the Trigger interface. Note that specifying a trigger will override the default trigger of the window assigner.

Transformation Description
Processing time trigger

A window is fired when the current processing time exceeds its end-value. The elements on the triggered window are henceforth discarded.

{% highlight java %} windowedStream.trigger(ProcessingTimeTrigger.create()); {% endhighlight %}
Watermark trigger

A window is fired when a watermark with value that exceeds the window's end-value has been received. The elements on the triggered window are henceforth discarded.

{% highlight java %} windowedStream.trigger(EventTimeTrigger.create()); {% endhighlight %}
Continuous processing time trigger

A window is periodically considered for being fired (every 5 seconds in the example). The window is actually fired only when the current processing time exceeds its end-value. The elements on the triggered window are retained.

{% highlight java %} windowedStream.trigger(ContinuousProcessingTimeTrigger.of(Time.of(5, TimeUnit.SECONDS))); {% endhighlight %}
Continuous watermark time trigger

A window is periodically considered for being fired (every 5 seconds in the example). A window is actually fired when a watermark with value that exceeds the window's end-value has been received. The elements on the triggered window are retained.

{% highlight java %} windowedStream.trigger(ContinuousEventTimeTrigger.of(Time.of(5, TimeUnit.SECONDS))); {% endhighlight %}
Count trigger

A window is fired when it has more than a certain number of elements (1000 below). The elements of the triggered window are retained.

{% highlight java %} windowedStream.trigger(CountTrigger.of(1000)); {% endhighlight %}
Purging trigger

Takes any trigger as an argument and forces the triggered window elements to be "purged" (discarded) after triggering.

{% highlight java %} windowedStream.trigger(PurgingTrigger.of(CountTrigger.of(1000))); {% endhighlight %}
Delta trigger

A window is periodically considered for being fired (every 5000 milliseconds in the example). A window is actually fired when the value of the last added element exceeds the value of the first element inserted in the window according to a `DeltaFunction`.

{% highlight java %} windowedStream.trigger(new DeltaTrigger.of(5000.0, new DeltaFunction() { @Override public double getDelta (Double old, Double new) { return (new - old > 0.01); } })); {% endhighlight %}
Transformation Description
Processing time trigger

A window is fired when the current processing time exceeds its end-value. The elements on the triggered window are henceforth discarded.

{% highlight scala %} windowedStream.trigger(ProcessingTimeTrigger.create); {% endhighlight %}
Watermark trigger

A window is fired when a watermark with value that exceeds the window's end-value has been received. The elements on the triggered window are henceforth discarded.

{% highlight scala %} windowedStream.trigger(EventTimeTrigger.create); {% endhighlight %}
Continuous processing time trigger

A window is periodically considered for being fired (every 5 seconds in the example). The window is actually fired only when the current processing time exceeds its end-value. The elements on the triggered window are retained.

{% highlight scala %} windowedStream.trigger(ContinuousProcessingTimeTrigger.of(Time.of(5, TimeUnit.SECONDS))); {% endhighlight %}
Continuous watermark time trigger

A window is periodically considered for being fired (every 5 seconds in the example). A window is actually fired when a watermark with value that exceeds the window's end-value has been received. The elements on the triggered window are retained.

{% highlight scala %} windowedStream.trigger(ContinuousEventTimeTrigger.of(Time.of(5, TimeUnit.SECONDS))); {% endhighlight %}
Count trigger

A window is fired when it has more than a certain number of elements (1000 below). The elements of the triggered window are retained.

{% highlight scala %} windowedStream.trigger(CountTrigger.of(1000)); {% endhighlight %}
Purging trigger

Takes any trigger as an argument and forces the triggered window elements to be "purged" (discarded) after triggering.

{% highlight scala %} windowedStream.trigger(PurgingTrigger.of(CountTrigger.of(1000))); {% endhighlight %}
Delta trigger

A window is periodically considered for being fired (every 5000 milliseconds in the example). A window is actually fired when the value of the last added element exceeds the value of the first element inserted in the window according to a `DeltaFunction`.

{% highlight scala %} windowedStream.trigger(DeltaTrigger.of(5000.0, { (old,new) => new - old > 0.01 })) {% endhighlight %}

After the trigger fires, and before the function (e.g., sum, count) is applied to the window contents, an optional Evictor removes some elements from the beginning of the window before the remaining elements are passed on to the function. Flink comes bundled with a set of evictors You can write your own evictor by implementing the Evictor interface.

Transformation Description
Time evictor

Evict all elements from the beginning of the window, so that elements from end-value - 1 second until end-value are retained (the resulting window size is 1 second).

{% highlight java %} triggeredStream.evict(TimeEvictor.of(Time.of(1, TimeUnit.SECONDS))); {% endhighlight %}
Count evictor

Retain 1000 elements from the end of the window backwards, evicting all others.

{% highlight java %} triggeredStream.evict(CountEvictor.of(1000)); {% endhighlight %}
Delta evictor

Starting from the beginning of the window, evict elements until an element with value lower than the value of the last element is found (by a threshold and a DeltaFunction).

{% highlight java %} triggeredStream.evict(DeltaEvictor.of(5000, new DeltaFunction() { public double (Double oldValue, Double newValue) { return newValue - oldValue; } })); {% endhighlight %}
Transformation Description
Time evictor

Evict all elements from the beginning of the window, so that elements from end-value - 1 second until end-value are retained (the resulting window size is 1 second).

{% highlight scala %} triggeredStream.evict(TimeEvictor.of(Time.of(1, TimeUnit.SECONDS))); {% endhighlight %}
Count evictor

Retain 1000 elements from the end of the window backwards, evicting all others.

{% highlight scala %} triggeredStream.evict(CountEvictor.of(1000)); {% endhighlight %}
Delta evictor

Starting from the beginning of the window, evict elements until an element with value lower than the value of the last element is found (by a threshold and a DeltaFunction).

{% highlight scala %} windowedStream.evict(DeltaEvictor.of(5000.0, { (old,new) => new - old > 0.01 })) {% endhighlight %}

Recipes for Building Windows

The mechanism of window assigner, trigger, and evictor is very powerful, and it allows you to define many different kinds of windows. Flink's basic window constructs are, in fact, syntactic sugar on top of the general mechanism. Below is how some common types of windows can be constructed using the general mechanism

Window type Definition
Tumbling count window
{% highlight java %} stream.countWindow(1000) {% endhighlight %}
{% highlight java %} stream.window(GlobalWindows.create()) .trigger(CountTrigger.of(1000) .evict(CountEvictor.of(1000))) {% endhighlight %}
Sliding count window
{% highlight java %} stream.countWindow(1000, 100) {% endhighlight %}
{% highlight java %} stream.window(GlobalWindows.create()) .trigger(CountTrigger.of(1000) .evict(CountEvictor.of(100))) {% endhighlight %}
Tumbling event time window
{% highlight java %} stream.timeWindow(Time.of(5, TimeUnit.SECONDS)) {% endhighlight %}
{% highlight java %} stream.window(TumblingTimeWindows.of((Time.of(5, TimeUnit.SECONDS))) .trigger(EventTimeTrigger.create()) {% endhighlight %}
Sliding event time window
{% highlight java %} stream.timeWindow(Time.of(5, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS)) {% endhighlight %}
{% highlight java %} stream.window(SlidingTimeWindows.of(Time.of(5, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS))) .trigger(EventTimeTrigger.create()) {% endhighlight %}
Tumbling processing time window
{% highlight java %} stream.timeWindow(Time.of(5, TimeUnit.SECONDS)) {% endhighlight %}
{% highlight java %} stream.window(TumblingTimeWindows.of((Time.of(5, TimeUnit.SECONDS))) .trigger(ProcessingTimeTrigger.create()) {% endhighlight %}
Sliding processing time window
{% highlight java %} stream.timeWindow(Time.of(5, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS)) {% endhighlight %}
{% highlight java %} stream.window(SlidingTimeWindows.of(Time.of(5, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS))) .trigger(ProcessingTimeTrigger.create()) {% endhighlight %}

Windows on Unkeyed Data Streams

You can also define windows on regular (non-keyed) data streams using the windowAll transformation. These windowed data streams have all the capabilities of keyed windowed data streams, but are evaluated at a single task (and hence at a single computing node). The syntax for defining triggers and evictors is exactly the same:

{% highlight java %} nonKeyedStream .windowAll(SlidingTimeWindows.of(Time.of(5, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS)) .trigger(CountTrigger.of(100)) .evictor(CountEvictor.of(10)); {% endhighlight %}
{% highlight scala %} nonKeyedStream .windowAll(SlidingTimeWindows.of(Time.of(5, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS)) .trigger(CountTrigger.of(100)) .evictor(CountEvictor.of(10)) {% endhighlight %}

Basic window definitions are also available for windows on non-keyed streams:


Transformation Description
Tumbling time window all
DataStream → WindowedStream

Defines a window of 5 seconds, that "tumbles". This means that elements are grouped according to their timestamp in groups of 5 second duration, and every element belongs to exactly one window. The notion of time used is controlled by the StreamExecutionEnvironment. {% highlight java %} nonKeyedStream.timeWindowAll(Time.of(5, TimeUnit.SECONDS)); {% endhighlight %}

Sliding time window all
DataStream → WindowedStream

Defines a window of 5 seconds, that "slides" by 1 seconds. This means that elements are grouped according to their timestamp in groups of 5 second duration, and elements can belong to more than one window (since windows overlap by at least 4 seconds) The notion of time used is controlled by the StreamExecutionEnvironment. {% highlight java %} nonKeyedStream.timeWindowAll(Time.of(5, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS)); {% endhighlight %}

Tumbling count window all
DataStream → WindowedStream

Defines a window of 1000 elements, that "tumbles". This means that elements are grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, and every element belongs to exactly one window. {% highlight java %} nonKeyedStream.countWindowAll(1000) {% endhighlight %}

Sliding count window all
DataStream → WindowedStream

Defines a window of 1000 elements, that "slides" every 100 elements. This means that elements are grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, and every element can belong to more than one window (as windows overlap by at least 900 elements). {% highlight java %} nonKeyedStream.countWindowAll(1000, 100) {% endhighlight %}


Transformation Description
Tumbling time window all
DataStream → WindowedStream

Defines a window of 5 seconds, that "tumbles". This means that elements are grouped according to their timestamp in groups of 5 second duration, and every element belongs to exactly one window. The notion of time used is controlled by the StreamExecutionEnvironment. {% highlight scala %} nonKeyedStream.timeWindowAll(Time.of(5, TimeUnit.SECONDS)); {% endhighlight %}

Sliding time window all
DataStream → WindowedStream

Defines a window of 5 seconds, that "slides" by 1 seconds. This means that elements are grouped according to their timestamp in groups of 5 second duration, and elements can belong to more than one window (since windows overlap by at least 4 seconds) The notion of time used is controlled by the StreamExecutionEnvironment. {% highlight scala %} nonKeyedStream.timeWindowAll(Time.of(5, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS)); {% endhighlight %}

Tumbling count window all
DataStream → WindowedStream

Defines a window of 1000 elements, that "tumbles". This means that elements are grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, and every element belongs to exactly one window. {% highlight scala %} nonKeyedStream.countWindowAll(1000) {% endhighlight %}

Sliding count window all
DataStream → WindowedStream

Defines a window of 1000 elements, that "slides" every 100 elements. This means that elements are grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, and every element can belong to more than one window (as windows overlap by at least 900 elements). {% highlight scala %} nonKeyedStream.countWindowAll(1000, 100) {% endhighlight %}

Back to top

Execution Parameters

Fault Tolerance

Flink has a checkpointing mechanism that recovers streaming jobs after failues. The checkpointing mechanism requires a persistent or durable source that can be asked for prior records again (Apache Kafka is a good example of a durable source).

The checkpointing mechanism stores the progress in the source as well as the user-defined state (see Working with State) consistently to provide exactly once processing guarantees.

To enable checkpointing, call enableCheckpointing(n) on the StreamExecutionEnvironment, where n is the checkpoint interval in milliseconds.

Other parameters for checkpointing include:

  • Number of retries: The setNumberOfExecutionRerties() method defines how many times the job is restarted after a failure. When checkpointing is activated, but this value is not explicitly set, the job is restarted infinitely often.
  • exactly-once vs. at-least-once: You can optionally pass a mode to the enableCheckpointing(n) method to choose between the two guarantee levels. Exactly-once is preferrable for most applications. At-least-once may be relevant for certain super-low-latency (consistently few milliseconds) applications.

The docs on streaming fault tolerance describe in detail the technique behind Flink's streaming fault tolerance mechanism.

Flink can guarantee exactly-once state updates to user-defined state only when the source participates in the snapshotting mechanism. This is currently guaranteed for the Kafka source (and internal number generators), but not for other sources. The following table lists the state update guarantees of Flink coupled with the bundled sources:

Source Guarantees Notes
Apache Kafka exactly once Use the appropriate Kafka connector for your version
RabbitMQ at most once
Twitter Streaming API at most once
Collections at most once
Files at least once At failure the file will be read from the beginning
Sockets at most once

To guarantee end-to-end exactly-once record delivery (in addition to exactly-once updates), the data sink needs to take part in the snapshotting mechanism. The following table lists the delivery guarantees (assuming exactly-once state updates) of Flink coupled with bundled sinks:

Sink Guarantees Notes
HDFS rolling sink exactly once Implementation depends on Hadoop version
Elasticsearch at least once
Kafka producer at least once
File sinks at least once
Socket sinks at lest once
Standard output at least once

Parallelism

You can control the number of parallel instances created for each operator by calling the operator.setParallelism(int) method.

Controlling Latency

By default, elements are not transferred on the network one-by-one (which would cause unnecessary network traffic) but are buffered. The size of the buffers (which are actually transferred between machines) can be set in the Flink config files. While this method is good for optimizing throughput, it can cause latency issues when the incoming stream is not fast enough. To control throughput and latency, you can use env.setBufferTimeout(timeoutMillis) on the execution environment (or on individual operators) to set a maximum wait time for the buffers to fill up. After this time, the buffers are sent automatically even if they are not full. The default value for this timeout is 100 ms.

Usage:

{% highlight java %} LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); env.setBufferTimeout(timeoutMillis);

env.genereateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis); {% endhighlight %}

{% highlight scala %} LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment env.setBufferTimeout(timeoutMillis)

env.genereateSequence(1,10).map(myMap).setBufferTimeout(timeoutMillis) {% endhighlight %}

To maximize throughput, set setBufferTimeout(-1) which will remove the timeout and buffers will only be flushed when they are full. To minimize latency, set the timeout to a value close to 0 (for example 5 or 10 ms). A buffer timeout of 0 should be avoided, because it can cause severe performance degradation.

Back to top

Working with State

All transformations in Flink may look like functions (in the functional processing terminology), but are in fact stateful operators. You can make every transformation (map, filter, etc) stateful by declaring local variables or using Flink's state interface. You can register any local variable as managed state by implementing an interface. In this case, and also in the case of using Flink's native state interface, Flink will automatically take consistent snapshots of your state periodically, and restore its value in the case of a failure.

The end effect is that updates to any form of state are the same under failure-free execution and execution under failures.

First, we look at how to make local variables consistent under failures, and then we look at Flink's state interface.

By default state checkpoints will be stored in-memory at the JobManager. For proper persistence of large state, Flink supports storing the checkpoints on file systems (HDFS, S3, or any mounted POSIX file system), which can be configured in the flink-conf.yaml or via StreamExecutionEnvironment.setStateBackend(…).

Checkpointing Local Variables

Local variables can be checkpointed by using the Checkpointed interface.

When the user-defined function implements the Checkpointed interface, the snapshotState(…) and restoreState(…) methods will be executed to draw and restore function state.

In addition to that, user functions can also implement the CheckpointNotifier interface to receive notifications on completed checkpoints via the notifyCheckpointComplete(long checkpointId) method. Note that there is no guarantee for the user function to receive a notification if a failure happens between checkpoint completion and notification. The notifications should hence be treated in a way that notifications from later checkpoints can subsume missing notifications.

For example the same counting, reduce function shown for OperatorStates by using the Checkpointed interface instead:

{% highlight java %} public class CounterSum implements ReduceFunction, Checkpointed {

// persistent counter
private long counter = 0;

@Override
public Long reduce(Long value1, Long value2) {
    counter++;
    return value1 + value2;
}

// regularly persists state during normal operation
@Override
public Serializable snapshotState(long checkpointId, long checkpointTimestamp) {
    return counter;
}

// restores state on recovery from failure
@Override
public void restoreState(Long state) {
    counter = state;
}

} {% endhighlight %}

Using the Key/Value State Interface

The state interface gives access to key/value states, which are a collection of key/value pairs. Because the state is partitioned by the keys (distributed accross workers), it can only be used on the KeyedStream, created via stream.keyBy(…) (which means also that it is usable in all types of functions on keyed windows).

The handle to the state can be obtained from the function's RuntimeContext. The state handle will then give access to the value mapped under the key of the current record or window - each key consequently has its own value.

The following code sample shows how to use the key/value state inside a reduce function. When creating the state handle, one needs to supply a name for that state (a function can have multiple states of different types), the type of the state (used to create efficient serializers), and the default value (returned as a value for keys that do not yet have a value associated).

{% highlight java %} public class CounterSum implements RichReduceFunction {

/** The state handle */
private OperatorState<Long> counter;

@Override
public Long reduce(Long value1, Long value2) {
    counter.update(counter.value() + 1);
    return value1 + value2;
}

@Override
public void open(Configuration config) {
    counter = getRuntimeContext().getKeyValueState("myCounter", Long.class, 0L);
}

} {% endhighlight %}

State updated by this is usually kept locally inside the flink process (unless one configures explicitly an external state backend). This means that lookups and updates are process local and this very fast.

The important implication of having the keys set implicitly is that it forces programs to group the stream by key (via the keyBy() function), making the key partitioning transparent to Flink. That allows the system to efficiently restore and redistribute keys and state.

The Scala API has shortcuts that for stateful map() or flatMap() functions on KeyedStream, which give the state of the current key as an option directly into the function, and return the result with a state update:

{% highlight scala %} val stream: DataStream[(String, Int)] = ...

val counts: DataStream[(String, Int)] = stream .keyBy(_._1) .mapWithState((in: (String, Int), count: Option[Int]) => count match { case Some(c) => ( (in._1, c), Some(c + in._2) ) case None => ( (in._1, 0), Some(in._2) ) }) {% endhighlight %}

Stateful Source Functions

Stateful sources require a bit more care as opposed to other operators. In order to make the updates to the state and output collection atomic (required for exactly-once semantics on failure/recovery), the user is required to get a lock from the source's context.

{% highlight java %} public static class CounterSource implements RichParallelSourceFunction, Checkpointed {

/**  current offset for exactly once semantics */
private long offset;

/** flag for job cancellation */
private volatile boolean isRunning = true;

@Override
public void run(SourceContext<Long> ctx) {
    final Object lock = ctx.getCheckpointLock();
    
    while (isRunning) {
        // output and state update are atomic
        synchronized (lock) {
            ctx.collect(offset);
            offset += 1;
        }
    }
}

@Override
public void cancel() {
    isRunning = false;
}

@Override
public Long snapshotState(long checkpointId, long checkpointTimestamp) {
    return offset;

}

@Override
public void restoreState(Long state) {
    offset = state;
}

} {% endhighlight %}

Some operators might need the information when a checkpoint is fully acknowledged by Flink to communicate that with the outside world. In this case see the flink.streaming.api.checkpoint.CheckpointNotifier interface.

State Checkpoints in Iterative Jobs

Flink currently only provides processing guarantees for jobs without iterations. Enabling checkpointing on an iterative job causes an exception. In order to force checkpointing on an iterative program the user needs to set a special flag when enabling checkpointing: env.enableCheckpointing(interval, force = true).

Please note that records in flight in the loop edges (and the state changes associated with them) will be lost during failure.

Back to top

Iterations


Iterative streaming programs implement a step function and embed it into an IterativeStream. As a DataStream program may never finish, there is no maximum number of iterations. Instead, you need to specify which part of the stream is fed back to the iteration and which part is forwarded downstream using a split transformation or a filter. Here, we show an example using filters. First, we define an IterativeStream

{% highlight java %} IterativeStream iteration = input.iterate(); {% endhighlight %}

Then, we specify the logic that will be executed inside the loop using a series of trasformations (here a simple map transformation)

{% highlight java %} DataStream iterationBody = iteration.map(/* this is executed many times */); {% endhighlight %}

To close an iteration and define the iteration tail, call the closeWith(feedbackStream) method of the IterativeStream. The DataStream given to the closeWith function will be fed back to the iteration head. A common pattern is to use a filter to separate the part of the strem that is fed back, and the part of the stream which is propagated forward. These filters can, e.g., define the "termination" logic, where an element is allowed to propagate downstream rather than being fed back.

{% highlight java %} iteration.closeWith(iterationBody.filter(/* one part of the stream /)); DataStream output = iterationBody.filter(/ some other part of the stream */); {% endhighlight %}

By default the partitioning of the feedback stream will be automatically set to be the same as the input of the iteration head. To override this the user can set an optional boolean flag in the closeWith method.

For example, here is program that continuously subtracts 1 from a series of integers until they reach zero:

{% highlight java %} DataStream someIntegers = env.generateSequence(0, 1000);

IterativeStream iteration = someIntegers.iterate();

DataStream minusOne = iteration.map(new MapFunction<Long, Long>() { @Override public Long map(Long value) throws Exception { return value - 1 ; } });

DataStream stillGreaterThanZero = minusOne.filter(new FilterFunction() { @Override public boolean filter(Long value) throws Exception { return (value > 0); } });

iteration.closeWith(stillGreaterThanZero);

DataStream lessThanZero = minusOne.filter(new FilterFunction() { @Override public boolean filter(Long value) throws Exception { return (value <= 0); } }); {% endhighlight %}


Iterative streaming programs implement a step function and embed it into an IterativeStream. As a DataStream program may never finish, there is no maximum number of iterations. Instead, you need to specify which part of the stream is fed back to the iteration and which part is forwarded downstream using a split transformation or a filter. Here, we show an example iteration where the body (the part of the computation that is repeated) is a simple map transformation, and the elements that are fed back are distinguished by the elements that are forwarded downstream using filters.

{% highlight scala %} val iteratedStream = someDataStream.iterate( iteration => { val iterationBody = iteration.map(/* this is executed many times /) (tail.filter(/ one part of the stream /), tail.filter(/ some other part of the stream */)) }) {% endhighlight %}

By default the partitioning of the feedback stream will be automatically set to be the same as the input of the iteration head. To override this the user can set an optional boolean flag in the closeWith method.

For example, here is program that continuously subtracts 1 from a series of integers until they reach zero:

{% highlight scala %} val someIntegers: DataStream[Long] = env.generateSequence(0, 1000)

val iteratedStream = someIntegers.iterate( iteration => { val minusOne = iteration.map( v => v - 1) val stillGreaterThanZero = minusOne.filter (_ > 0) val lessThanZero = minusOne.filter(_ <= 0) (stillGreaterThanZero, lessThanZero) } ) {% endhighlight %}

Back to top

Connectors

Connectors provide code for interfacing with various third-party systems.

Currently these systems are supported:

To run an application using one of these connectors, additional third party components are usually required to be installed and launched, e.g. the servers for the message queues. Further instructions for these can be found in the corresponding subsections. Docker containers are also provided encapsulating these services to aid users getting started with connectors.

Apache Kafka

This connector provides access to event streams served by Apache Kafka.

Flink provides special Kafka Connectors for reading and writing data from/to Kafka topics. The Flink Kafka Consumer integrates with Flink's checkpointing mechanism to provide exactly-once processing semantics. To achieve that, Flink does not purely rely on Kafka's consumer group offset tracking, but tracks and checkpoints these offsets internally as well.

Please pick a package (maven artifact id) and class name for your use-case and environment. For most users, the FlinkKafkaConsumer082 (part of flink-connector-kafka) is appropriate.

Maven Dependency Supported since Class name Kafka version Notes
flink-connector-kafka 0.9.1, 0.10 FlinkKafkaConsumer081 0.8.1 Uses the SimpleConsumer API of Kafka internally. Offsets are committed to ZK by Flink.
flink-connector-kafka 0.9.1, 0.10 FlinkKafkaConsumer082 0.8.2 Uses the SimpleConsumer API of Kafka internally. Offsets are committed to ZK by Flink.

Then, import the connector in your maven project:

{% highlight xml %} org.apache.flink flink-connector-kafka {{site.version }} {% endhighlight %}

Note that the streaming connectors are currently not part of the binary distribution. See how to link with them for cluster execution here.

Installing Apache Kafka

  • Follow the instructions from Kafka's quickstart to download the code and launch a server (launching a Zookeeper and a Kafka server is required every time before starting the application).
  • On 32 bit computers this problem may occur.
  • If the Kafka and Zookeeper servers are running on a remote machine, then the advertised.host.name setting in the config/server.properties file must be set to the machine's IP address.

Kafka Consumer

The standard FlinkKafkaConsumer082 is a Kafka consumer providing access to one topic. It takes the following parameters to the constructor:

  1. The topic name
  2. A DeserializationSchema
  3. Properties for the Kafka consumer. The following properties are required:
  • "bootstrap.servers" (comma separated list of Kafka brokers)
  • "zookeeper.connect" (comma separated list of Zookeeper servers)
  • "group.id" the id of the consumer group

Example:

{% highlight java %} Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("zookeeper.connect", "localhost:2181"); properties.setProperty("group.id", "test"); DataStream stream = env .addSource(new FlinkKafkaConsumer082<>("topic", new SimpleStringSchema(), properties)) .print(); {% endhighlight %}
{% highlight scala %} val properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("zookeeper.connect", "localhost:2181"); properties.setProperty("group.id", "test"); stream = env .addSource(new KafkaSource[String]("topic", new SimpleStringSchema(), properties)) .print {% endhighlight %}

Kafka Consumers and Fault Tolerance

With Flink's checkpointing enabled, the Flink Kafka Consumer will consume records from a topic and periodically checkpoint all its Kafka offsets, together with the state of other operations, in a consistent manner. In case of a job failure, Flink will restore the streaming program to the state of the latest checkpoint and re-consume the records from Kafka, starting from the offsets that where stored in the checkpoint.

The interval of drawing checkpoints therefore defines how much the program may have to go back at most, in case of a failure.

To use fault tolerant Kafka Consumers, checkpointing of the topology needs to be enabled at the execution environment:

{% highlight java %} final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(5000); // checkpoint every 5000 msecs {% endhighlight %}
{% highlight scala %} val env = StreamExecutionEnvironment.getExecutionEnvironment() env.enableCheckpointing(5000) // checkpoint every 5000 msecs {% endhighlight %}

Also note that Flink can only restart the topology if enough processing slots are available to restart the topology. So if the topology fails due to loss of a TaskManager, there must still be enough slots available afterwards. Flink on YARN supports automatic restart of lost YARN containers.

If checkpointing is not enabled, the Kafka consumer will periodically commit the offsets to Zookeeper.

Kafka Producer

The FlinkKafkaProducer writes data to a Kafka topic. The producer can specify a custom partitioner that assigns recors to partitions.

Example:

{% highlight java %} stream.addSink(new FlinkKafkaProducer("localhost:9092", "my-topic", new SimpleStringSchema())); {% endhighlight %}
{% highlight scala %} stream.addSink(new FlinkKafkaProducer[String]("localhost:9092", "my-topic", new SimpleStringSchema())) {% endhighlight %}

You can also define a custom Kafka producer configuration for the KafkaSink with the constructor. Please refer to the Apache Kafka documentation for details on how to configure Kafka Producers.

Back to top

Elasticsearch

This connector provides a Sink that can write to an Elasticsearch Index. To use this connector, add the following dependency to your project:

{% highlight xml %} org.apache.flink flink-connector-elasticsearch {{site.version }} {% endhighlight %}

Note that the streaming connectors are currently not part of the binary distribution. See here for information about how to package the program with the libraries for cluster execution.

Installing Elasticsearch

Instructions for setting up an Elasticsearch cluster can be found here. Make sure to set and remember a cluster name. This must be set when creating a Sink for writing to your cluster

Elasticsearch Sink

The connector provides a Sink that can send data to an Elasticsearch Index.

The sink can use two different methods for communicating with Elasticsearch:

  1. An embedded Node
  2. The TransportClient

See here for information about the differences between the two modes.

This code shows how to create a sink that uses an embedded Node for communication:

{% highlight java %} DataStream input = ...;

Map<String, String> config = Maps.newHashMap(); // This instructs the sink to emit after every element, otherwise they would be buffered config.put("bulk.flush.max.actions", "1"); config.put("cluster.name", "my-cluster-name");

input.addSink(new ElasticsearchSink<>(config, new IndexRequestBuilder() { @Override public IndexRequest createIndexRequest(String element, RuntimeContext ctx) { Map<String, Object> json = new HashMap<>(); json.put("data", element);

    return Requests.indexRequest()
            .index("my-index")
            .type("my-type")
            .source(json);
}

})); {% endhighlight %}

{% highlight scala %} val input: DataStream[String] = ...

val config = new util.HashMap[String, String] config.put("bulk.flush.max.actions", "1") config.put("cluster.name", "my-cluster-name")

text.addSink(new ElasticsearchSink(config, new IndexRequestBuilder[String] { override def createIndexRequest(element: String, ctx: RuntimeContext): IndexRequest = { val json = new util.HashMap[String, AnyRef] json.put("data", element) println("SENDING: " + element) Requests.indexRequest.index("my-index").type("my-type").source(json) } })) {% endhighlight %}

Note how a Map of Strings is used to configure the Sink. The configuration keys are documented in the Elasticsearch documentation here. Especially important is the cluster.name parameter that must correspond to the name of your cluster.

Internally, the sink uses a BulkProcessor to send index requests to the cluster. This will buffer elements before sending a request to the cluster. The behaviour of the BulkProcessor can be configured using these config keys:

  • bulk.flush.max.actions: Maximum amount of elements to buffer
  • bulk.flush.max.size.mb: Maximum amount of data (in megabytes) to buffer
  • bulk.flush.interval.ms: Interval at which to flush data regardless of the other two settings in milliseconds

This example code does the same, but with a TransportClient:

{% highlight java %} DataStream input = ...;

Map<String, String> config = Maps.newHashMap(); // This instructs the sink to emit after every element, otherwise they would be buffered config.put("bulk.flush.max.actions", "1"); config.put("cluster.name", "my-cluster-name");

List transports = new ArrayList(); transports.add(new InetSocketTransportAddress("node-1", 9300)); transports.add(new InetSocketTransportAddress("node-2", 9300));

input.addSink(new ElasticsearchSink<>(config, transports, new IndexRequestBuilder() { @Override public IndexRequest createIndexRequest(String element, RuntimeContext ctx) { Map<String, Object> json = new HashMap<>(); json.put("data", element);

    return Requests.indexRequest()
            .index("my-index")
            .type("my-type")
            .source(json);
}

})); {% endhighlight %}

{% highlight scala %} val input: DataStream[String] = ...

val config = new util.HashMap[String, String] config.put("bulk.flush.max.actions", "1") config.put("cluster.name", "my-cluster-name")

val transports = new ArrayList[String] transports.add(new InetSocketTransportAddress("node-1", 9300)) transports.add(new InetSocketTransportAddress("node-2", 9300))

text.addSink(new ElasticsearchSink(config, transports, new IndexRequestBuilder[String] { override def createIndexRequest(element: String, ctx: RuntimeContext): IndexRequest = { val json = new util.HashMap[String, AnyRef] json.put("data", element) println("SENDING: " + element) Requests.indexRequest.index("my-index").type("my-type").source(json) } })) {% endhighlight %}

The difference is that we now need to provide a list of Elasticsearch Nodes to which the sink should connect using a TransportClient.

More about information about Elasticsearch can be found here.

Back to top

Hadoop FileSystem

This connector provides a Sink that writes rolling files to any filesystem supported by Hadoop FileSystem. To use this connector, add the following dependency to your project:

{% highlight xml %} org.apache.flink flink-connector-filesystem {{site.version}} {% endhighlight %}

Note that the streaming connectors are currently not part of the binary distribution. See here for information about how to package the program with the libraries for cluster execution.

Rolling File Sink

The rolling behaviour as well as the writing can be configured but we will get to that later. This is how you can create a default rolling sink:

{% highlight java %} DataStream input = ...;

input.addSink(new RollingSink("/base/path"));

{% endhighlight %}

{% highlight scala %} val input: DataStream[String] = ...

input.addSink(new RollingSink("/base/path"))

{% endhighlight %}

The only required parameter is the base path where the rolling files (buckets) will be stored. The sink can be configured by specifying a custom bucketer, writer and batch size.

By default the rolling sink will use the pattern "yyyy-MM-dd--HH" to name the rolling buckets. This pattern is passed to SimpleDateFormat with the current system time to form a bucket path. A new bucket will be created whenever the bucket path changes. For example, if you have a pattern that contains minutes as the finest granularity you will get a new bucket every minute. Each bucket is itself a directory that contains several part files: Each parallel instance of the sink will create its own part file and when part files get too big the sink will also create a new part file next to the others. To specify a custom bucketer use setBucketer() on a RollingSink.

The default writer is StringWriter. This will call toString() on the incoming elements and write them to part files, separated by newline. To specify a custom writer use setWriter() on a RollingSink. If you want to write Hadoop SequenceFiles you can use the provided SequenceFileWriter which can also be configured to use compression.

The last configuration option is the batch size. This specifies when a part file should be closed and a new one started. (The default part file size is 384 MB).

Example:

{% highlight java %} DataStream> input = ...;

RollingSink sink = new RollingSink("/base/path"); sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm")); sink.setWriter(new SequenceFileWriter<IntWritable, Text>()); sink.setBatchSize(1024 * 1024 * 400); // this is 400 MB,

input.addSink(sink);

{% endhighlight %}

{% highlight scala %} val input: DataStream[Tuple2[IntWritable, Text]] = ...

val sink = new RollingSinkString sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm")) sink.setWriter(new SequenceFileWriterIntWritable, Text) sink.setBatchSize(1024 * 1024 * 400) // this is 400 MB,

input.addSink(sink)

{% endhighlight %}

This will create a sink that writes to bucket files that follow this schema:

/base/path/{date-time}/part-{parallel-task}-{count}

Where date-time is the string that we get from the date/time format, parallel-task is the index of the parallel sink instance and count is the running number of part files that where created because of the batch size.

For in-depth information, please refer to the JavaDoc for RollingSink.

Back to top

RabbitMQ

This connector provides access to data streams from RabbitMQ. To use this connector, add the following dependency to your project:

{% highlight xml %} org.apache.flink flink-connector-rabbitmq {{site.version }} {% endhighlight %}

Note that the streaming connectors are currently not part of the binary distribution. See linking with them for cluster execution here.

Installing RabbitMQ

Follow the instructions from the RabbitMQ download page. After the installation the server automatically starts, and the application connecting to RabbitMQ can be launched.

RabbitMQ Source

A class providing an interface for receiving data from RabbitMQ.

The followings have to be provided for the RMQSource(…) constructor in order:

  1. The hostname
  2. The queue name
  3. Deserialization schema

Example:

{% highlight java %} DataStream stream = env .addSource(new RMQSource("localhost", "hello", new SimpleStringSchema())) .print {% endhighlight %}
{% highlight scala %} stream = env .addSource(new RMQSource[String]("localhost", "hello", new SimpleStringSchema)) .print {% endhighlight %}

RabbitMQ Sink

A class providing an interface for sending data to RabbitMQ.

The followings have to be provided for the RMQSink(…) constructor in order:

  1. The hostname
  2. The queue name
  3. Serialization schema

Example:

{% highlight java %} stream.addSink(new RMQSink("localhost", "hello", new StringToByteSerializer())); {% endhighlight %}
{% highlight scala %} stream.addSink(new RMQSink[String]("localhost", "hello", new StringToByteSerializer)) {% endhighlight %}

More about RabbitMQ can be found here.

Back to top

Twitter Streaming API

Twitter Streaming API provides opportunity to connect to the stream of tweets made available by Twitter. Flink Streaming comes with a built-in TwitterSource class for establishing a connection to this stream. To use this connector, add the following dependency to your project:

{% highlight xml %} org.apache.flink flink-connector-twitter {{site.version }} {% endhighlight %}

Note that the streaming connectors are currently not part of the binary distribution. See linking with them for cluster execution here.

Authentication

In order to connect to Twitter stream the user has to register their program and acquire the necessary information for the authentication. The process is described below.

Acquiring the authentication information

First of all, a Twitter account is needed. Sign up for free at twitter.com/signup or sign in at Twitter's Application Management and register the application by clicking on the "Create New App" button. Fill out a form about your program and accept the Terms and Conditions. After selecting the application, the API key and API secret (called consumerKey and consumerSecret in TwitterSource respectively) is located on the "API Keys" tab. The necessary OAuth Access Token data (token and secret in TwitterSource) can be generated and acquired on the "Keys and Access Tokens" tab. Remember to keep these pieces of information secret and do not push them to public repositories.

Accessing the authentication information

Create a properties file, and pass its path in the constructor of TwitterSource. The content of the file should be similar to this:

#properties file for my app
secret=***
consumerSecret=***
token=***-***
consumerKey=***

Constructors

The TwitterSource class has two constructors.

  1. public TwitterSource(String authPath, int numberOfTweets); to emit a finite number of tweets
  2. public TwitterSource(String authPath); for streaming

Both constructors expect a String authPath argument determining the location of the properties file containing the authentication information. In the first case, numberOfTweets determines how many tweet the source emits.

Usage

In contrast to other connectors, the TwitterSource depends on no additional services. For example the following code should run gracefully:

{% highlight java %} DataStream streamSource = env.addSource(new TwitterSource("/PATH/TO/myFile.properties")); {% endhighlight %}
{% highlight scala %} streamSource = env.addSource(new TwitterSource("/PATH/TO/myFile.properties")) {% endhighlight %}

The TwitterSource emits strings containing a JSON code. To retrieve information from the JSON code you can add a FlatMap or a Map function handling JSON code. For example, there is an implementation JSONParseFlatMap abstract class among the examples. JSONParseFlatMap is an extension of the FlatMapFunction and has a

{% highlight java %} String getField(String jsonText, String field); {% endhighlight %}
{% highlight scala %} getField(jsonText : String, field : String) : String {% endhighlight %}

function which can be use to acquire the value of a given field.

There are two basic types of tweets. The usual tweets contain information such as date and time of creation, id, user, language and many more details. The other type is the delete information.

Example

TwitterStream is an example of how to use TwitterSource. It implements a language frequency counter program.

Back to top

Docker containers for connectors

A Docker container is provided with all the required configurations for test running the connectors of Apache Flink. The servers for the message queues will be running on the docker container while the example topology can be run on the user's computer.

Installing Docker

The official Docker installation guide can be found here. After installing Docker an image can be pulled for each connector. Containers can be started from these images where all the required configurations are set.

Creating a jar with all the dependencies

For the easiest setup, create a jar with all the dependencies of the flink-streaming-connectors project.

cd /PATH/TO/GIT/flink/flink-staging/flink-streaming-connectors
mvn assembly:assembly
~~~bash

This creates an assembly jar under *flink-streaming-connectors/target*.

#### RabbitMQ
Pull the docker image:

~~~bash
sudo docker pull flinkstreaming/flink-connectors-rabbitmq

To run the container, type:

sudo docker run -p 127.0.0.1:5672:5672 -t -i flinkstreaming/flink-connectors-rabbitmq

Now a terminal has started running from the image with all the necessary configurations to test run the RabbitMQ connector. The -p flag binds the localhost's and the Docker container's ports so RabbitMQ can communicate with the application through these.

To start the RabbitMQ server:

sudo /etc/init.d/rabbitmq-server start

To launch the example on the host computer, execute:

java -cp /PATH/TO/JAR-WITH-DEPENDENCIES org.apache.flink.streaming.connectors.rabbitmq.RMQTopology \
> log.txt 2> errorlog.txt

There are two connectors in the example. One that sends messages to RabbitMQ, and one that receives messages from the same queue. In the logger messages, the arriving messages can be observed in the following format:

<DATE> INFO rabbitmq.RMQTopology: String: <one> arrived from RMQ
<DATE> INFO rabbitmq.RMQTopology: String: <two> arrived from RMQ
<DATE> INFO rabbitmq.RMQTopology: String: <three> arrived from RMQ
<DATE> INFO rabbitmq.RMQTopology: String: <four> arrived from RMQ
<DATE> INFO rabbitmq.RMQTopology: String: <five> arrived from RMQ

Apache Kafka

Pull the image:

sudo docker pull flinkstreaming/flink-connectors-kafka

To run the container type:

sudo docker run -p 127.0.0.1:2181:2181 -p 127.0.0.1:9092:9092 -t -i \
flinkstreaming/flink-connectors-kafka

Now a terminal has started running from the image with all the necessary configurations to test run the Kafka connector. The -p flag binds the localhost's and the Docker container's ports so Kafka can communicate with the application through these. First start a zookeeper in the background:

/kafka_2.9.2-0.8.1.1/bin/zookeeper-server-start.sh /kafka_2.9.2-0.8.1.1/config/zookeeper.properties \
> zookeeperlog.txt &

Then start the kafka server in the background:

/kafka_2.9.2-0.8.1.1/bin/kafka-server-start.sh /kafka_2.9.2-0.8.1.1/config/server.properties \
 > serverlog.txt 2> servererr.txt &

To launch the example on the host computer execute:

java -cp /PATH/TO/JAR-WITH-DEPENDENCIES org.apache.flink.streaming.connectors.kafka.KafkaTopology \
> log.txt 2> errorlog.txt

In the example there are two connectors. One that sends messages to Kafka, and one that receives messages from the same queue. In the logger messages, the arriving messages can be observed in the following format:

<DATE> INFO kafka.KafkaTopology: String: (0) arrived from Kafka
<DATE> INFO kafka.KafkaTopology: String: (1) arrived from Kafka
<DATE> INFO kafka.KafkaTopology: String: (2) arrived from Kafka
<DATE> INFO kafka.KafkaTopology: String: (3) arrived from Kafka
<DATE> INFO kafka.KafkaTopology: String: (4) arrived from Kafka
<DATE> INFO kafka.KafkaTopology: String: (5) arrived from Kafka
<DATE> INFO kafka.KafkaTopology: String: (6) arrived from Kafka
<DATE> INFO kafka.KafkaTopology: String: (7) arrived from Kafka
<DATE> INFO kafka.KafkaTopology: String: (8) arrived from Kafka
<DATE> INFO kafka.KafkaTopology: String: (9) arrived from Kafka

Back to top

Program Packaging & Distributed Execution

See the relevant section of the DataSet API documentation.

Back to top

Parallel Execution

See the relevant section of the DataSet API documentation.

Back to top

Execution Plans

See the relevant section of the DataSet API documentation.

Back to top