Skip to content

Commit

Permalink
[FLINK-2550] Rename ConnectedDataStream to ConnectedStreams, Remove s…
Browse files Browse the repository at this point in the history
…ome operations

The removed operations are tricky and some of them are not working
correctly. For now, co-reduce, stream-cross and stream-join are
removed.

I'm planning to add a new join implementation based on tagged union
that uses the new windowing code.
  • Loading branch information
aljoscha committed Oct 5, 2015
1 parent 9e6e0ae commit 23d8e26
Show file tree
Hide file tree
Showing 29 changed files with 514 additions and 1,888 deletions.
22 changes: 11 additions & 11 deletions docs/apis/streaming_guide.md
Expand Up @@ -991,9 +991,9 @@ dataStream1 cross dataStream2 onWindow (windowing_params)
### Co operators

Co operators allow the users to jointly transform two `DataStream`s of different types, providing a simple way to jointly manipulate streams with a shared state. It is designed to support joint stream transformations where union is not appropriate due to different data types, or in case the user needs explicit tracking of the origin of individual elements.
Co operators can be applied to `ConnectedDataStream`s which represent two `DataStream`s of possibly different types. A `ConnectedDataStream` can be created by calling the `connect(otherDataStream)` method of a `DataStream`.
Co operators can be applied to `ConnectedStreams` which represent two `DataStream`s of possibly different types. `ConnectedStreams` can be created by calling the `connect(otherDataStream)` method of a `DataStream`.

#### Map on ConnectedDataStream
#### Map on ConnectedStreams
Applies a CoMap transformation on two separate DataStreams, mapping them to a common output type. The transformation calls a `CoMapFunction.map1()` for each element of the first input and `CoMapFunction.map2()` for each element of the second input. Each CoMapFunction call returns exactly one element.
A CoMap operator that outputs true if an Integer value is received and false if a String value is received:

Expand Down Expand Up @@ -1032,8 +1032,8 @@ val dataStream2 : DataStream[String] = ...
</div>
</div>

#### FlatMap on ConnectedDataStream
The FlatMap operator for the `ConnectedDataStream` works similarly to CoMap, but instead of returning exactly one element after each map call the user can output arbitrarily many values using the Collector interface.
#### FlatMap on ConnectedStreams
The FlatMap operator for `ConnectedStreams` works similarly to CoMap, but instead of returning exactly one element after each map call the user can output arbitrarily many values using the Collector interface.

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
Expand Down Expand Up @@ -1072,11 +1072,11 @@ val dataStream2 : DataStream[String] = ...
</div>
</div>

#### WindowReduce on ConnectedDataStream
#### WindowReduce on ConnectedStreams
The windowReduce operator applies a user defined `CoWindowFunction` to time aligned windows of the two data streams and return zero or more elements of an arbitrary type. The user can define the window and slide intervals and can also implement custom timestamps to be used for calculating windows.

#### Reduce on ConnectedDataStream
The Reduce operator for the `ConnectedDataStream` applies a group-reduce transformation on the grouped joined data streams and then maps the reduced elements to a common output type. It works only for connected data streams where the inputs are grouped.
#### Reduce on ConnectedStreams
The Reduce operator for `ConnectedStreams` applies a group-reduce transformation on the grouped joined data streams and then maps the reduced elements to a common output type. It works only for connected data streams where the inputs are grouped.

### Output splitting
<div class="codetabs" markdown="1">
Expand Down Expand Up @@ -1188,7 +1188,7 @@ To use this functionality the user needs to add the maxWaitTimeMillis parameter
By default the partitioning of the feedback stream will be automatically set to be the same as the input of the iteration head. To override this the user can set an optional boolean flag in the `closeWith` method.

#### Iteration head as a co-operator
The user can also treat the input and feedback stream of a streaming iteration as a `ConnectedDataStream`. This can be used to distinguish the feedback tuples and also to change the type of the iteration feedback.
The user can also treat the input and feedback stream of a streaming iteration as `ConnectedStreams`. This can be used to distinguish the feedback tuples and also to change the type of the iteration feedback.

To use this feature the user needs to call the `withFeedbackType(type)` method of the iterative data stream and pass the type of the feedback stream:

Expand Down Expand Up @@ -1224,13 +1224,13 @@ To use this functionality the user needs to add the maxWaitTimeMillis parameter
By default the partitioning of the feedback stream will be automatically set to be the same as the input of the iteration head. To override this the user can set an optional boolean flag in the `iterate` method.

#### Iteration head as a co-operator
The user can also treat the input and feedback stream of a streaming iteration as a `ConnectedDataStream`. This can be used to distinguish the feedback tuples and also to change the type of the iteration feedback.
The user can also treat the input and feedback stream of a streaming iteration as `ConnectedStreams`. This can be used to distinguish the feedback tuples and also to change the type of the iteration feedback.

To use this feature the user needs to call implement a step function that operates on a `ConnectedDataStream` and pass it to the `iterate(…)` call.
To use this feature the user needs to call implement a step function that operates on `ConnectedStreams` and pass it to the `iterate(…)` call.

{% highlight scala %}
val iteratedStream = someDataStream.iterate(
stepFunction: ConnectedDataStream[T, F] => (DataStream[F], DataStream[R]),
stepFunction: ConnectedStreams[T, F] => (DataStream[F], DataStream[R]),
maxWaitTimeMillis)
{% endhighlight %}

Expand Down
Expand Up @@ -29,7 +29,7 @@
// StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
//
// @SuppressWarnings("unused")
// DataStream<String> dataStream1 = env.addSource(
// DataStream<String> inputStream1 = env.addSource(
// new FlumeSource<String>("localhost", 41414, new SimpleStringSchema())).addSink(
// new FlumeSink<String>("localhost", 42424, new StringToByteSerializer()));
//
Expand Down

0 comments on commit 23d8e26

Please sign in to comment.