Skip to content

Commit

Permalink
[FLINK-2419] [hotfix] addSink now uses transform + remove double chec…
Browse files Browse the repository at this point in the history
…kpoint commit at head operator
  • Loading branch information
gyfora committed Jul 28, 2015
1 parent 4b44e02 commit 78fd214
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1162,17 +1162,7 @@ protected <X> void connectGraph(DataStream<X> inputStream, Integer outputID, int
* @return The closed DataStream.
*/
public DataStreamSink<OUT> addSink(SinkFunction<OUT> sinkFunction) {

OneInputStreamOperator<OUT, Object> sinkOperator = new StreamSink<OUT>(clean(sinkFunction));

DataStreamSink<OUT> returnStream = new DataStreamSink<OUT>(environment, "sink", getType(),
sinkOperator);

streamGraph.addOperator(returnStream.getId(), sinkOperator, getType(), null, "Stream Sink");

this.connectGraph(this.copy(), returnStream.getId(), 0);

return returnStream;
return new DataStreamSink<OUT>((DataStream<OUT>) transform("StreamSink", null, new StreamSink<OUT>(clean(sinkFunction))));
}

private void validateUnion(Integer id) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,22 +279,18 @@ public void triggerCheckpoint(long checkpointId, long timestamp) throws Exceptio

}

@SuppressWarnings({ "unchecked", "rawtypes" })
@SuppressWarnings("rawtypes")
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
// we do nothing here so far. this should call commit on the source function, for example
synchronized (checkpointLock) {
if (streamOperator instanceof StatefulStreamOperator) {
((StatefulStreamOperator) streamOperator).notifyCheckpointComplete(checkpointId);
}

if (hasChainedOperators) {
for (StreamOperator<?> chainedOperator : outputHandler.getChainedOperators()) {
if (chainedOperator instanceof StatefulStreamOperator) {
((StatefulStreamOperator) chainedOperator).notifyCheckpointComplete(checkpointId);
}
for (StreamOperator<?> chainedOperator : outputHandler.getChainedOperators()) {
if (chainedOperator instanceof StatefulStreamOperator) {
((StatefulStreamOperator) chainedOperator).notifyCheckpointComplete(checkpointId);
}
}

}
}

Expand Down

0 comments on commit 78fd214

Please sign in to comment.