diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java index a0b31297139f5..36a8c9f5d4974 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java @@ -70,8 +70,9 @@ * while (running) { * Message msg = queue.retrieve(); * synchronized (ctx.getCheckpointLock()) { - * ctx.collect(msg.getMessageData()); - * addId(msg.getMessageId()); + * if (addId(msg.getMessageId())) { + * ctx.collect(msg.getMessageData()); + * } * } * } * } @@ -187,7 +188,8 @@ public void close() throws Exception { protected abstract void acknowledgeIDs(long checkpointId, Set uIds); /** - * Adds an ID to be stored with the current checkpoint. + * Adds an ID to be stored with the current checkpoint. In order to achieve exactly-once guarantees, implementing + * classes should only emit records with IDs for which this method return true. * @param uid The ID to add. * @return True if the id has not been processed previously. */