diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/sink/SinkWriter.java b/flink-core/src/main/java/org/apache/flink/api/connector/sink/SinkWriter.java index 1695e8adc4f09..814ffeeaebae3 100644 --- a/flink-core/src/main/java/org/apache/flink/api/connector/sink/SinkWriter.java +++ b/flink-core/src/main/java/org/apache/flink/api/connector/sink/SinkWriter.java @@ -63,6 +63,9 @@ default void writeWatermark(Watermark watermark) throws IOException, Interrupted * *

This will be called before we checkpoint the Writer's state in Streaming execution mode. * + *

In case the sink has no explicit committer, this method is still called to allow the + * writer to implement a 1-phase commit protocol. + * * @param flush Whether flushing the un-staged data or not * @return The data is ready to commit. * @throws IOException if fail to prepare for a commit. diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/AbstractCommitterHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/AbstractCommitterHandler.java index 9cb570c2c2775..5a8b9455e8ec2 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/AbstractCommitterHandler.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/AbstractCommitterHandler.java @@ -17,8 +17,6 @@ package org.apache.flink.streaming.runtime.operators.sink; -import org.apache.flink.util.function.SupplierWithException; - import java.io.IOException; import java.util.ArrayDeque; import java.util.ArrayList; @@ -72,9 +70,8 @@ protected abstract void retry(List recoveredCommittables) throws IOException, InterruptedException; @Override - public List processCommittables( - SupplierWithException, Exception> committableSupplier) throws Exception { - this.committables.addAll(committableSupplier.get()); + public List processCommittables(List committables) { + this.committables.addAll(committables); return Collections.emptyList(); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/BatchCommitterHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/BatchCommitterHandler.java index c6cd6bdbfdec9..8d158db6d6f73 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/BatchCommitterHandler.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/BatchCommitterHandler.java @@ -19,7 +19,6 @@ package org.apache.flink.streaming.runtime.operators.sink; import org.apache.flink.api.connector.sink.Committer; -import org.apache.flink.util.function.SupplierWithException; import java.io.IOException; import java.util.List; @@ -51,11 +50,9 @@ public BatchCommitterHandler( } @Override - public List processCommittables( - SupplierWithException, Exception> committableSupplier) throws Exception { - List committables = committableSupplier.get(); - super.processCommittables(() -> committables); - return chainedHandler.processCommittables(() -> committables); + public List processCommittables(List committables) { + super.processCommittables(committables); + return chainedHandler.processCommittables(committables); } @Override diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterHandler.java index 26ce1da954819..1bd192defa13d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterHandler.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterHandler.java @@ -19,7 +19,6 @@ import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; -import org.apache.flink.util.function.SupplierWithException; import java.io.IOException; import java.io.Serializable; @@ -44,16 +43,15 @@ default void snapshotState(StateSnapshotContext context) throws Exception {} /** * Processes the committables by either directly transforming them or by adding them to the - * internal state of this handler. The supplier should only be queried once. + * internal state of this handler. * * @return a list of output committables that is send downstream. */ - List processCommittables( - SupplierWithException, Exception> committableSupplier) throws Exception; + List processCommittables(List committables); /** * Called when no more committables are going to be added through {@link - * #processCommittables(SupplierWithException)}. + * #processCommittables(List)}. * * @return a list of output committables that is send downstream. */ diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java index ee0a436cff0b8..c62c94473c9c5 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java @@ -99,10 +99,9 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception { @Override public void processElement(StreamRecord element) throws Exception { committerHandler.processCommittables( - () -> - Collections.singletonList( - SimpleVersionedSerialization.readVersionAndDeSerialize( - inputSerializer, element.getValue()))); + Collections.singletonList( + SimpleVersionedSerialization.readVersionAndDeSerialize( + inputSerializer, element.getValue()))); } @Override diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/ForwardCommittingHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/ForwardCommittingHandler.java index e54461150b529..9d892eaa1a138 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/ForwardCommittingHandler.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/ForwardCommittingHandler.java @@ -17,8 +17,6 @@ package org.apache.flink.streaming.runtime.operators.sink; -import org.apache.flink.util.function.SupplierWithException; - import java.io.IOException; import java.util.List; @@ -31,9 +29,8 @@ class ForwardCommittingHandler extends AbstractCommitterHandler processCommittables( - SupplierWithException, Exception> committableSupplier) throws Exception { - return committableSupplier.get(); + public List processCommittables(List committables) { + return committables; } @Override diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/NoopCommitterHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/NoopCommitterHandler.java index bca5e9d064f56..579fe9866345e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/NoopCommitterHandler.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/NoopCommitterHandler.java @@ -17,8 +17,6 @@ package org.apache.flink.streaming.runtime.operators.sink; -import org.apache.flink.util.function.SupplierWithException; - import java.io.IOException; import java.util.Collections; import java.util.List; @@ -33,8 +31,7 @@ static CommitterHandler getInstance() { } @Override - public List processCommittables( - SupplierWithException, Exception> committableSupplier) { + public List processCommittables(List committables) { return Collections.emptyList(); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkOperator.java index 72108aa91f25c..abcc670d71b82 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkOperator.java @@ -171,8 +171,7 @@ public void processElement(StreamRecord element) throws Exception { public void prepareSnapshotPreBarrier(long checkpointId) throws Exception { super.prepareSnapshotPreBarrier(checkpointId); if (!endOfInput) { - emitCommittables( - committerHandler.processCommittables(() -> sinkWriter.prepareCommit(false))); + emitCommittables(committerHandler.processCommittables(sinkWriter.prepareCommit(false))); } } @@ -194,8 +193,7 @@ public void processWatermark(Watermark mark) throws Exception { @Override public void endInput() throws Exception { endOfInput = true; - emitCommittables( - committerHandler.processCommittables(() -> sinkWriter.prepareCommit(true))); + emitCommittables(committerHandler.processCommittables(sinkWriter.prepareCommit(true))); emitCommittables(committerHandler.endOfInput()); commitRetrier.retryIndefinitely(); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTest.java index f4d42d6993f87..59a67a66f2b8e 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTest.java @@ -54,6 +54,7 @@ import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; +import static org.junit.jupiter.api.Assertions.assertTrue; /** Test stateful and stateless {@link SinkWriterStateHandler} in {@link SinkOperator}. */ @RunWith(Parameterized.class) @@ -318,6 +319,58 @@ public void loadPreviousSinkState() throws Exception { containsInAnyOrder(expectedOutput2.toArray())); } + @Test + public void receivePreCommitWithoutCommitter() throws Exception { + final long initialTime = 0; + + PreBarrierSinkWriter writer = new PreBarrierSinkWriter(); + final OneInputStreamOperatorTestHarness testHarness = + createTestHarness(writer, false); + testHarness.open(); + + testHarness.processWatermark(initialTime); + testHarness.processElement(1, initialTime + 1); + testHarness.processElement(2, initialTime + 2); + + testHarness.prepareSnapshotPreBarrier(1L); + // Expect that preCommit was called + assertTrue(writer.hasReceivedPreCommit()); + testHarness.snapshot(1L, 1L); + + assertThat( + writer.getElements(), + contains( + Tuple3.of(1, initialTime + 1, initialTime).toString(), + Tuple3.of(2, initialTime + 2, initialTime).toString())); + + assertThat( + writer.getWatermarks(), + contains(new org.apache.flink.api.common.eventtime.Watermark(initialTime))); + } + + private static class PreBarrierSinkWriter extends TestSink.DefaultSinkWriter { + + private boolean receivedPreCommit = false; + + @Override + public List prepareCommit(boolean flush) { + receivedPreCommit = true; + return Collections.emptyList(); + } + + public boolean hasReceivedPreCommit() { + return receivedPreCommit; + } + + public List getWatermarks() { + return watermarks; + } + + public List getElements() { + return elements; + } + } + /** A {@link SinkWriter} buffers elements and snapshots them when asked. */ private static class SnapshottingBufferingSinkWriter extends BufferingSinkWriter { public static final int NOT_SNAPSHOTTED = -1; @@ -417,8 +470,13 @@ public void onProcessingTime(long time) throws IOException { private OneInputStreamOperatorTestHarness createTestHarness( TestSink.DefaultSinkWriter writer) throws Exception { + return createTestHarness(writer, true); + } + + private OneInputStreamOperatorTestHarness createTestHarness( + TestSink.DefaultSinkWriter writer, boolean withCommitter) throws Exception { return new OneInputStreamOperatorTestHarness<>( - new SinkOperatorFactory<>(getBuilder(writer).build(), false, true), + new SinkOperatorFactory<>(getBuilder(writer).build(), false, withCommitter), IntSerializer.INSTANCE); }