Skip to content

Commit

Permalink
[FLINK-24371][datastream] Call preCommit on SinkWriter although no co…
Browse files Browse the repository at this point in the history
…mmitter is available

Before this change if no committer was after the SinkWriter in the
pipeline preCommit was never called. With this change preCommit is now
called as well but the return values are discarded as before.
  • Loading branch information
Fabian Paul authored and AHeise committed Sep 28, 2021
1 parent aacdf55 commit 770684b
Show file tree
Hide file tree
Showing 9 changed files with 78 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ default void writeWatermark(Watermark watermark) throws IOException, Interrupted
*
* <p>This will be called before we checkpoint the Writer's state in Streaming execution mode.
*
* <p>In case the sink has no explicit committer, this method is still called to allow the
* writer to implement a 1-phase commit protocol.
*
* @param flush Whether flushing the un-staged data or not
* @return The data is ready to commit.
* @throws IOException if fail to prepare for a commit.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.flink.streaming.runtime.operators.sink;

import org.apache.flink.util.function.SupplierWithException;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
Expand Down Expand Up @@ -72,9 +70,8 @@ protected abstract void retry(List<RecoverT> recoveredCommittables)
throws IOException, InterruptedException;

@Override
public List<OutputT> processCommittables(
SupplierWithException<List<InputT>, Exception> committableSupplier) throws Exception {
this.committables.addAll(committableSupplier.get());
public List<OutputT> processCommittables(List<InputT> committables) {
this.committables.addAll(committables);
return Collections.emptyList();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.flink.streaming.runtime.operators.sink;

import org.apache.flink.api.connector.sink.Committer;
import org.apache.flink.util.function.SupplierWithException;

import java.io.IOException;
import java.util.List;
Expand Down Expand Up @@ -51,11 +50,9 @@ public BatchCommitterHandler(
}

@Override
public List<OutputT> processCommittables(
SupplierWithException<List<InputT>, Exception> committableSupplier) throws Exception {
List<InputT> committables = committableSupplier.get();
super.processCommittables(() -> committables);
return chainedHandler.processCommittables(() -> committables);
public List<OutputT> processCommittables(List<InputT> committables) {
super.processCommittables(committables);
return chainedHandler.processCommittables(committables);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.util.function.SupplierWithException;

import java.io.IOException;
import java.io.Serializable;
Expand All @@ -44,16 +43,15 @@ default void snapshotState(StateSnapshotContext context) throws Exception {}

/**
* Processes the committables by either directly transforming them or by adding them to the
* internal state of this handler. The supplier should only be queried once.
* internal state of this handler.
*
* @return a list of output committables that is send downstream.
*/
List<OutputT> processCommittables(
SupplierWithException<List<InputT>, Exception> committableSupplier) throws Exception;
List<OutputT> processCommittables(List<InputT> committables);

/**
* Called when no more committables are going to be added through {@link
* #processCommittables(SupplierWithException)}.
* #processCommittables(List)}.
*
* @return a list of output committables that is send downstream.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,9 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
@Override
public void processElement(StreamRecord<byte[]> element) throws Exception {
committerHandler.processCommittables(
() ->
Collections.singletonList(
SimpleVersionedSerialization.readVersionAndDeSerialize(
inputSerializer, element.getValue())));
Collections.singletonList(
SimpleVersionedSerialization.readVersionAndDeSerialize(
inputSerializer, element.getValue())));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.flink.streaming.runtime.operators.sink;

import org.apache.flink.util.function.SupplierWithException;

import java.io.IOException;
import java.util.List;

Expand All @@ -31,9 +29,8 @@ class ForwardCommittingHandler<CommT> extends AbstractCommitterHandler<CommT, Co
ForwardCommittingHandler() {}

@Override
public List<CommT> processCommittables(
SupplierWithException<List<CommT>, Exception> committableSupplier) throws Exception {
return committableSupplier.get();
public List<CommT> processCommittables(List<CommT> committables) {
return committables;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.flink.streaming.runtime.operators.sink;

import org.apache.flink.util.function.SupplierWithException;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
Expand All @@ -33,8 +31,7 @@ static <InputT, OutputT> CommitterHandler<InputT, OutputT> getInstance() {
}

@Override
public List<Object> processCommittables(
SupplierWithException<List<Object>, Exception> committableSupplier) {
public List<Object> processCommittables(List<Object> committables) {
return Collections.emptyList();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,7 @@ public void processElement(StreamRecord<InputT> element) throws Exception {
public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
super.prepareSnapshotPreBarrier(checkpointId);
if (!endOfInput) {
emitCommittables(
committerHandler.processCommittables(() -> sinkWriter.prepareCommit(false)));
emitCommittables(committerHandler.processCommittables(sinkWriter.prepareCommit(false)));
}
}

Expand All @@ -194,8 +193,7 @@ public void processWatermark(Watermark mark) throws Exception {
@Override
public void endInput() throws Exception {
endOfInput = true;
emitCommittables(
committerHandler.processCommittables(() -> sinkWriter.prepareCommit(true)));
emitCommittables(committerHandler.processCommittables(sinkWriter.prepareCommit(true)));
emitCommittables(committerHandler.endOfInput());
commitRetrier.retryIndefinitely();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.jupiter.api.Assertions.assertTrue;

/** Test stateful and stateless {@link SinkWriterStateHandler} in {@link SinkOperator}. */
@RunWith(Parameterized.class)
Expand Down Expand Up @@ -318,6 +319,58 @@ public void loadPreviousSinkState() throws Exception {
containsInAnyOrder(expectedOutput2.toArray()));
}

@Test
public void receivePreCommitWithoutCommitter() throws Exception {
final long initialTime = 0;

PreBarrierSinkWriter writer = new PreBarrierSinkWriter();
final OneInputStreamOperatorTestHarness<Integer, byte[]> testHarness =
createTestHarness(writer, false);
testHarness.open();

testHarness.processWatermark(initialTime);
testHarness.processElement(1, initialTime + 1);
testHarness.processElement(2, initialTime + 2);

testHarness.prepareSnapshotPreBarrier(1L);
// Expect that preCommit was called
assertTrue(writer.hasReceivedPreCommit());
testHarness.snapshot(1L, 1L);

assertThat(
writer.getElements(),
contains(
Tuple3.of(1, initialTime + 1, initialTime).toString(),
Tuple3.of(2, initialTime + 2, initialTime).toString()));

assertThat(
writer.getWatermarks(),
contains(new org.apache.flink.api.common.eventtime.Watermark(initialTime)));
}

private static class PreBarrierSinkWriter extends TestSink.DefaultSinkWriter<Integer> {

private boolean receivedPreCommit = false;

@Override
public List<String> prepareCommit(boolean flush) {
receivedPreCommit = true;
return Collections.emptyList();
}

public boolean hasReceivedPreCommit() {
return receivedPreCommit;
}

public List<org.apache.flink.api.common.eventtime.Watermark> getWatermarks() {
return watermarks;
}

public List<String> getElements() {
return elements;
}
}

/** A {@link SinkWriter} buffers elements and snapshots them when asked. */
private static class SnapshottingBufferingSinkWriter extends BufferingSinkWriter {
public static final int NOT_SNAPSHOTTED = -1;
Expand Down Expand Up @@ -417,8 +470,13 @@ public void onProcessingTime(long time) throws IOException {

private OneInputStreamOperatorTestHarness<Integer, byte[]> createTestHarness(
TestSink.DefaultSinkWriter<Integer> writer) throws Exception {
return createTestHarness(writer, true);
}

private OneInputStreamOperatorTestHarness<Integer, byte[]> createTestHarness(
TestSink.DefaultSinkWriter<Integer> writer, boolean withCommitter) throws Exception {
return new OneInputStreamOperatorTestHarness<>(
new SinkOperatorFactory<>(getBuilder(writer).build(), false, true),
new SinkOperatorFactory<>(getBuilder(writer).build(), false, withCommitter),
IntSerializer.INSTANCE);
}

Expand Down

0 comments on commit 770684b

Please sign in to comment.