Skip to content

Commit

Permalink
[FLINK-11668][checkpointing] Allow sources to advance to MAX_WATERMARK.
Browse files Browse the repository at this point in the history
Allows the sources to inject a MAX_WATERMARK before processing a
synchronous savepoint. This is needed for the "drain" functionality,
as described in FLIP-34.
  • Loading branch information
kl0u committed Apr 17, 2019
1 parent f22dd5b commit 093ddc1
Show file tree
Hide file tree
Showing 25 changed files with 363 additions and 49 deletions.
Expand Up @@ -50,7 +50,7 @@
* the initial state structure by the Garbage Collector.
*
* <p>Any subclass that supports recoverable state and participates in
* checkpointing needs to override {@link #triggerCheckpoint(CheckpointMetaData, CheckpointOptions)},
* checkpointing needs to override {@link #triggerCheckpoint(CheckpointMetaData, CheckpointOptions, boolean)},
* {@link #triggerCheckpointOnBarrier(CheckpointMetaData, CheckpointOptions, CheckpointMetrics)},
* {@link #abortCheckpointOnBarrier(long, Throwable)} and {@link #notifyCheckpointComplete(long)}.
*/
Expand Down Expand Up @@ -203,10 +203,12 @@ public ExecutionConfig getExecutionConfig() {
*
* @param checkpointMetaData Meta data for about this checkpoint
* @param checkpointOptions Options for performing this checkpoint
* @param advanceToEndOfEventTime Flag indicating if the source should inject a {@code MAX_WATERMARK} in the pipeline
* to fire any registered event-time timers
*
* @return {@code false} if the checkpoint can not be carried out, {@code true} otherwise
*/
public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception {
public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, boolean advanceToEndOfEventTime) throws Exception {
throw new UnsupportedOperationException(String.format("triggerCheckpoint not supported by %s", this.getClass().getName()));
}

Expand Down
Expand Up @@ -709,7 +709,7 @@ public CompletableFuture<Acknowledge> triggerCheckpoint(
final Task task = taskSlotTable.getTask(executionAttemptID);

if (task != null) {
task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions);
task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions, false);

return CompletableFuture.completedFuture(Acknowledge.get());
} else {
Expand Down
Expand Up @@ -1164,11 +1164,14 @@ public void triggerPartitionProducerStateCheck(
* @param checkpointID The ID identifying the checkpoint.
* @param checkpointTimestamp The timestamp associated with the checkpoint.
* @param checkpointOptions Options for performing this checkpoint.
* @param advanceToEndOfEventTime Flag indicating if the source should inject a {@code MAX_WATERMARK} in the pipeline
* to fire any registered event-time timers.
*/
public void triggerCheckpointBarrier(
final long checkpointID,
final long checkpointTimestamp,
final CheckpointOptions checkpointOptions) {
final CheckpointOptions checkpointOptions,
final boolean advanceToEndOfEventTime) {

final AbstractInvokable invokable = this.invokable;
final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointID, checkpointTimestamp);
Expand All @@ -1188,7 +1191,7 @@ public void run() {
FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry);

try {
boolean success = invokable.triggerCheckpoint(checkpointMetaData, checkpointOptions);
boolean success = invokable.triggerCheckpoint(checkpointMetaData, checkpointOptions, advanceToEndOfEventTime);
if (!success) {
checkpointResponder.declineCheckpoint(
getJobID(), getExecutionId(), checkpointID,
Expand Down
Empty file.
Expand Up @@ -92,7 +92,7 @@ public class TaskAsyncCallTest extends TestLogger {
private static OneShotLatch awaitLatch;

/**
* Triggered when {@link CheckpointsInOrderInvokable#triggerCheckpoint(CheckpointMetaData, CheckpointOptions)}
* Triggered when {@link CheckpointsInOrderInvokable#triggerCheckpoint(CheckpointMetaData, CheckpointOptions, boolean)}
* was called {@link #numCalls} times.
*/
private static OneShotLatch triggerLatch;
Expand Down Expand Up @@ -139,7 +139,7 @@ public void testCheckpointCallsInOrder() throws Exception {
awaitLatch.await();

for (int i = 1; i <= numCalls; i++) {
task.triggerCheckpointBarrier(i, 156865867234L, CheckpointOptions.forCheckpointWithDefaultLocation());
task.triggerCheckpointBarrier(i, 156865867234L, CheckpointOptions.forCheckpointWithDefaultLocation(), false);
}

triggerLatch.await();
Expand All @@ -165,7 +165,7 @@ public void testMixedAsyncCallsInOrder() throws Exception {
awaitLatch.await();

for (int i = 1; i <= numCalls; i++) {
task.triggerCheckpointBarrier(i, 156865867234L, CheckpointOptions.forCheckpointWithDefaultLocation());
task.triggerCheckpointBarrier(i, 156865867234L, CheckpointOptions.forCheckpointWithDefaultLocation(), false);
task.notifyCheckpointComplete(i);
}

Expand Down Expand Up @@ -195,7 +195,7 @@ public void testThrowExceptionIfStopInvokedWithNotStoppableTask() throws Excepti
}

/**
* Asserts that {@link AbstractInvokable#triggerCheckpoint(CheckpointMetaData, CheckpointOptions)},
* Asserts that {@link AbstractInvokable#triggerCheckpoint(CheckpointMetaData, CheckpointOptions, boolean)},
* {@link AbstractInvokable#notifyCheckpointComplete(long)}, and {@link StoppableTask#stop()} are
* invoked by a thread whose context class loader is set to the user code class loader.
*/
Expand All @@ -209,7 +209,7 @@ public void testSetsUserCodeClassLoader() throws Exception {

awaitLatch.await();

task.triggerCheckpointBarrier(1, 1, CheckpointOptions.forCheckpointWithDefaultLocation());
task.triggerCheckpointBarrier(1, 1, CheckpointOptions.forCheckpointWithDefaultLocation(), false);
triggerLatch.await();

task.notifyCheckpointComplete(1);
Expand Down Expand Up @@ -319,7 +319,7 @@ public void invoke() throws Exception {
}

@Override
public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, boolean advanceToEndOfEventTime) {
lastCheckpointId++;
if (checkpointMetaData.getCheckpointId() == lastCheckpointId) {
if (lastCheckpointId == numCalls) {
Expand Down Expand Up @@ -371,10 +371,10 @@ public ContextClassLoaderInterceptingInvokable(Environment environment) {
}

@Override
public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, boolean advanceToEndOfEventTime) {
classLoaders.add(Thread.currentThread().getContextClassLoader());

return super.triggerCheckpoint(checkpointMetaData, checkpointOptions);
return super.triggerCheckpoint(checkpointMetaData, checkpointOptions, advanceToEndOfEventTime);
}

@Override
Expand Down
Expand Up @@ -226,7 +226,7 @@ public void declineCheckpoint(
}
}

task.triggerCheckpoint(new CheckpointMetaData(42, 17), CheckpointOptions.forCheckpointWithDefaultLocation());
task.triggerCheckpoint(new CheckpointMetaData(42, 17), CheckpointOptions.forCheckpointWithDefaultLocation(), false);

testHarness.processElement(new StreamRecord<>("Wohoo", 0));

Expand Down Expand Up @@ -342,7 +342,8 @@ public CheckpointStateOutputStream createCheckpointStateOutputStream(Checkpointe

task.triggerCheckpoint(
new CheckpointMetaData(42, 17),
CheckpointOptions.forCheckpointWithDefaultLocation());
CheckpointOptions.forCheckpointWithDefaultLocation(),
false);

testHarness.processElement(new StreamRecord<>("Wohoo", 0));
blockerCheckpointStreamFactory.getWaiterLatch().await();
Expand Down
Expand Up @@ -47,6 +47,8 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>> extends Abstract

private transient volatile boolean canceledOrStopped = false;

private transient volatile boolean hasSentMaxWatermark = false;

public StreamSource(SRC sourceFunction) {
super(sourceFunction);

Expand Down Expand Up @@ -96,7 +98,7 @@ public void run(final Object lockingObject,
// or the function was canceled or stopped. For the finite source case, we should emit
// a final watermark that indicates that we reached the end of event-time
if (!isCanceledOrStopped()) {
ctx.emitWatermark(Watermark.MAX_WATERMARK);
advanceToEndOfEventTime();
}
} finally {
// make sure that the context is closed in any case
Expand All @@ -107,6 +109,13 @@ public void run(final Object lockingObject,
}
}

public void advanceToEndOfEventTime() {
if (!hasSentMaxWatermark) {
ctx.emitWatermark(Watermark.MAX_WATERMARK);
hasSentMaxWatermark = true;
}
}

public void cancel() {
// important: marking the source as stopped has to happen before the function is stopped.
// the flag that tracks this status is volatile, so the memory model also guarantees
Expand Down
Expand Up @@ -72,7 +72,7 @@ public void triggerCheckpoint(long checkpointId) throws FlinkException {
final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp);

try {
SourceStreamTask.super.triggerCheckpoint(checkpointMetaData, checkpointOptions);
SourceStreamTask.super.triggerCheckpoint(checkpointMetaData, checkpointOptions, false);
}
catch (RuntimeException | FlinkException e) {
throw e;
Expand All @@ -87,6 +87,11 @@ public void triggerCheckpoint(long checkpointId) throws FlinkException {
}
}

@Override
protected void advanceToEndOfEventTime() throws Exception {
headOperator.advanceToEndOfEventTime();
}

@Override
protected void cleanup() {
// does not hold any resources, so no cleanup needed
Expand All @@ -104,14 +109,19 @@ protected void cancelTask() throws Exception {
}
}

@Override
protected void finishTask() throws Exception {
cancelTask();
}

// ------------------------------------------------------------------------
// Checkpointing
// ------------------------------------------------------------------------

@Override
public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception {
public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, boolean advanceToEndOfEventTime) throws Exception {
if (!externallyInducedCheckpoints) {
return super.triggerCheckpoint(checkpointMetaData, checkpointOptions);
return super.triggerCheckpoint(checkpointMetaData, checkpointOptions, advanceToEndOfEventTime);
}
else {
// we do not trigger checkpoints here, we simply state whether we can trigger them
Expand Down
Expand Up @@ -228,6 +228,32 @@ protected StreamTask(

protected abstract void cancelTask() throws Exception;

/**
* Emits the {@link org.apache.flink.streaming.api.watermark.Watermark#MAX_WATERMARK MAX_WATERMARK}
* so that all registered timers are fired.
*
* <p>This is used by the source task when the job is {@code TERMINATED}. In the case,
* we want all the timers registered throughout the pipeline to fire and the related
* state (e.g. windows) to be flushed.
*
* <p>For tasks other than the source task, this method does nothing.
*/
protected void advanceToEndOfEventTime() throws Exception {

}

/**
* Instructs the task to go through its normal termination routine, i.e. exit the run-loop
* and call {@link StreamOperator#close()} and {@link StreamOperator#dispose()} on its operators.
*
* <p>This is used by the source task to get out of the run-loop when the job is stoppped with a savepoint.
*
* <p>For tasks other than the source task, this method does nothing.
*/
protected void finishTask() throws Exception {

}

// ------------------------------------------------------------------------
// Core work methods of the Stream Task
// ------------------------------------------------------------------------
Expand Down Expand Up @@ -564,14 +590,18 @@ RecordWriterOutput<?>[] getStreamOutputs() {
// ------------------------------------------------------------------------

@Override
public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception {
public boolean triggerCheckpoint(
CheckpointMetaData checkpointMetaData,
CheckpointOptions checkpointOptions,
boolean advanceToEndOfEventTime) throws Exception {

try {
// No alignment if we inject a checkpoint
CheckpointMetrics checkpointMetrics = new CheckpointMetrics()
.setBytesBufferedInAlignment(0L)
.setAlignmentDurationNanos(0L);

return performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics);
return performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics, advanceToEndOfEventTime);
}
catch (Exception e) {
// propagate exceptions only if the task is still in "running" state
Expand All @@ -593,7 +623,7 @@ public void triggerCheckpointOnBarrier(
CheckpointMetrics checkpointMetrics) throws Exception {

try {
performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics);
performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics, false);
}
catch (CancelTaskException e) {
LOG.info("Operator {} was cancelled while performing checkpoint {}.",
Expand Down Expand Up @@ -622,7 +652,8 @@ public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) throws
private boolean performCheckpoint(
CheckpointMetaData checkpointMetaData,
CheckpointOptions checkpointOptions,
CheckpointMetrics checkpointMetrics) throws Exception {
CheckpointMetrics checkpointMetrics,
boolean advanceToEndOfTime) throws Exception {

LOG.debug("Starting checkpoint ({}) {} on task {}",
checkpointMetaData.getCheckpointId(), checkpointOptions.getCheckpointType(), getName());
Expand All @@ -635,6 +666,10 @@ private boolean performCheckpoint(

if (checkpointOptions.getCheckpointType().isSynchronous()) {
syncSavepointLatch.setCheckpointId(checkpointId);

if (advanceToEndOfTime) {
advanceToEndOfEventTime();
}
}

// All of the following steps happen as an atomic step from the perspective of barriers and
Expand Down Expand Up @@ -686,7 +721,13 @@ private boolean performCheckpoint(
}

if (isRunning && syncSavepointLatch.isSet()) {
syncSavepointLatch.blockUntilCheckpointIsAcknowledged();

final boolean checkpointWasAcked =
syncSavepointLatch.blockUntilCheckpointIsAcknowledged();

if (checkpointWasAcked) {
finishTask();
}
}

return result;
Expand Down
Expand Up @@ -251,7 +251,8 @@ public void run() {
runStarted.await();
if (getContainingTask().isCanceled() || getContainingTask().triggerCheckpoint(
new CheckpointMetaData(0, System.currentTimeMillis()),
CheckpointOptions.forCheckpointWithDefaultLocation())) {
CheckpointOptions.forCheckpointWithDefaultLocation(),
false)) {
LifecycleTrackingStreamSource.runFinish.trigger();
}
} catch (Exception e) {
Expand Down
Expand Up @@ -543,7 +543,7 @@ public void testStateSnapshotAndRestore() throws Exception {

final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, checkpointTimestamp);

task.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forCheckpointWithDefaultLocation());
task.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forCheckpointWithDefaultLocation(), false);

taskStateManagerMock.getWaitForReportLatch().await();

Expand Down Expand Up @@ -584,7 +584,7 @@ public void testStateSnapshotAndRestore() throws Exception {
restoredTaskHarness.processElement(new StreamRecord<>(7, initialTime + 7));

// trigger the checkpoint while processing stream elements
restoredTask.triggerCheckpoint(new CheckpointMetaData(checkpointId, checkpointTimestamp), CheckpointOptions.forCheckpointWithDefaultLocation());
restoredTask.triggerCheckpoint(new CheckpointMetaData(checkpointId, checkpointTimestamp), CheckpointOptions.forCheckpointWithDefaultLocation(), false);

restoredTaskHarness.processElement(new StreamRecord<>(8, initialTime + 8));

Expand Down
Expand Up @@ -1373,7 +1373,8 @@ public void invoke() {
@Override
public boolean triggerCheckpoint(
CheckpointMetaData checkpointMetaData,
CheckpointOptions checkpointOptions) throws Exception {
CheckpointOptions checkpointOptions,
boolean advanceToEndOfEventTime) throws Exception {
throw new UnsupportedOperationException("should never be called");
}

Expand Down
Expand Up @@ -504,7 +504,7 @@ public void invoke() {
}

@Override
public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception {
public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, boolean advanceToEndOfEventTime) throws Exception {
throw new UnsupportedOperationException("should never be called");
}

Expand Down
Expand Up @@ -526,7 +526,7 @@ public void testSnapshottingAndRestoring() throws Exception {

CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, checkpointTimestamp);

while (!streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forCheckpointWithDefaultLocation())) {}
while (!streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forCheckpointWithDefaultLocation(), false)) {}

// since no state was set, there shouldn't be restore calls
assertEquals(0, TestingStreamOperator.numberRestoreCalls);
Expand Down
Expand Up @@ -268,7 +268,7 @@ private void triggerCheckpoint(

testHarness.taskStateManager.setWaitForReportLatch(new OneShotLatch());

while (!streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forCheckpointWithDefaultLocation())) {}
while (!streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forCheckpointWithDefaultLocation(), false)) {}

testHarness.taskStateManager.getWaitForReportLatch().await();
long reportedCheckpointId = testHarness.taskStateManager.getReportedCheckpointId();
Expand Down
Expand Up @@ -85,7 +85,7 @@ public void testCheckpointsTriggeredBySource() throws Exception {
ready.await();

// now send an external trigger that should be ignored
assertTrue(sourceTask.triggerCheckpoint(new CheckpointMetaData(32, 829), CheckpointOptions.forCheckpointWithDefaultLocation()));
assertTrue(sourceTask.triggerCheckpoint(new CheckpointMetaData(32, 829), CheckpointOptions.forCheckpointWithDefaultLocation(), false));

// step by step let the source thread emit elements
sync.trigger();
Expand All @@ -101,7 +101,7 @@ public void testCheckpointsTriggeredBySource() throws Exception {
verifyNextElement(testHarness.getOutput(), 4L);

// now send an regular trigger command that should be ignored
assertTrue(sourceTask.triggerCheckpoint(new CheckpointMetaData(34, 900), CheckpointOptions.forCheckpointWithDefaultLocation()));
assertTrue(sourceTask.triggerCheckpoint(new CheckpointMetaData(34, 900), CheckpointOptions.forCheckpointWithDefaultLocation(), false));

sync.trigger();
verifyNextElement(testHarness.getOutput(), 5L);
Expand Down

0 comments on commit 093ddc1

Please sign in to comment.