Skip to content
Permalink
Browse files
[FLINK-27251][checkpoint] Refactor the barrier alignment timer and de…
…fault priority sequence number
  • Loading branch information
1996fanrui authored and pnowojski committed May 25, 2022
1 parent 8a2a396 commit 10b7afae7423d75f94f397699b09deb9fbbdaca5
Showing 7 changed files with 97 additions and 59 deletions.
@@ -71,6 +71,8 @@ public class PipelinedSubpartition extends ResultSubpartition

private static final Logger LOG = LoggerFactory.getLogger(PipelinedSubpartition.class);

private static final int DEFAULT_PRIORITY_SEQUENCE_NUMBER = -1;

// ------------------------------------------------------------------------

/**
@@ -171,7 +173,7 @@ private int add(BufferConsumer bufferConsumer, int partialRecordLength, boolean
checkNotNull(bufferConsumer);

final boolean notifyDataAvailable;
int prioritySequenceNumber = -1;
int prioritySequenceNumber = DEFAULT_PRIORITY_SEQUENCE_NUMBER;
int newBufferSize;
synchronized (buffers) {
if (isFinished || isReleased) {
@@ -191,9 +193,7 @@ private int add(BufferConsumer bufferConsumer, int partialRecordLength, boolean
newBufferSize = bufferSize;
}

if (prioritySequenceNumber != -1) {
notifyPriorityEvent(prioritySequenceNumber);
}
notifyPriorityEvent(prioritySequenceNumber);
if (notifyDataAvailable) {
notifyDataAvailable();
}
@@ -593,7 +593,7 @@ private void notifyDataAvailable() {

private void notifyPriorityEvent(int prioritySequenceNumber) {
final PipelinedSubpartitionView readView = this.readView;
if (readView != null) {
if (readView != null && prioritySequenceNumber != DEFAULT_PRIORITY_SEQUENCE_NUMBER) {
readView.notifyPriorityEvent(prioritySequenceNumber);
}
}
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.runtime.io.checkpointing;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.streaming.runtime.tasks.TimerService;
import org.apache.flink.util.clock.Clock;

import java.time.Duration;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledFuture;

/** Utility for barrier alignment. */
@Internal
public class BarrierAlignmentUtil {

public static long getTimerDelay(Clock clock, CheckpointBarrier announcedBarrier) {
long alignedCheckpointTimeout =
announcedBarrier.getCheckpointOptions().getAlignedCheckpointTimeout();
long timePassedSinceCheckpointStart =
clock.absoluteTimeMillis() - announcedBarrier.getTimestamp();

return Math.max(alignedCheckpointTimeout - timePassedSinceCheckpointStart, 0);
}

public static DelayableTimer createRegisterTimerCallback(
MailboxExecutor mailboxExecutor, TimerService timerService) {
return (callable, delay) -> {
ScheduledFuture<?> scheduledFuture =
timerService.registerTimer(
timerService.getCurrentProcessingTime() + delay.toMillis(),
timestamp ->
mailboxExecutor.submit(
callable,
"Execute checkpoint barrier handler delayed action"));
return () -> scheduledFuture.cancel(false);
};
}

/** It can register a task to be executed some time later. */
public interface DelayableTimer {

/**
* Register a task to be executed some time later.
*
* @param callable the task to submit
* @param delay how long after the delay to execute the task
* @return the Cancellable, it can cancel the task.
*/
Cancellable registerTask(Callable<?> callable, Duration delay);
}

/** A handle to a delayed action which can be cancelled. */
public interface Cancellable {
void cancel();
}
}
@@ -212,9 +212,4 @@ boolean isDuringAlignment() {
protected final Clock getClock() {
return clock;
}

/** A handle to a delayed action which can be cancelled. */
interface Cancellable {
void cancel();
}
}
@@ -30,20 +30,15 @@
import org.apache.flink.streaming.runtime.io.InputGateUtil;
import org.apache.flink.streaming.runtime.io.StreamOneInputProcessor;
import org.apache.flink.streaming.runtime.io.StreamTaskSourceInput;
import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.Cancellable;
import org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinator;
import org.apache.flink.streaming.runtime.tasks.TimerService;
import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.SystemClock;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledFuture;
import java.util.function.BiFunction;
import java.util.stream.Stream;

/**
@@ -160,7 +155,7 @@ private static SingleCheckpointBarrierHandler createBarrierHandler(
checkpointCoordinator,
clock,
numberOfChannels,
createRegisterTimerCallback(mailboxExecutor, timerService),
BarrierAlignmentUtil.createRegisterTimerCallback(mailboxExecutor, timerService),
enableCheckpointAfterTasksFinished,
inputs);
} else {
@@ -169,26 +164,12 @@ private static SingleCheckpointBarrierHandler createBarrierHandler(
toNotifyOnCheckpoint,
clock,
numberOfChannels,
createRegisterTimerCallback(mailboxExecutor, timerService),
BarrierAlignmentUtil.createRegisterTimerCallback(mailboxExecutor, timerService),
enableCheckpointAfterTasksFinished,
inputs);
}
}

private static BiFunction<Callable<?>, Duration, Cancellable> createRegisterTimerCallback(
MailboxExecutor mailboxExecutor, TimerService timerService) {
return (callable, delay) -> {
ScheduledFuture<?> scheduledFuture =
timerService.registerTimer(
timerService.getCurrentProcessingTime() + delay.toMillis(),
timestamp ->
mailboxExecutor.submit(
callable,
"Execute checkpoint barrier handler delayed action"));
return () -> scheduledFuture.cancel(false);
};
}

private static void registerCheckpointMetrics(
TaskIOMetricGroup taskIOMetricGroup, CheckpointBarrierHandler barrierHandler) {
taskIOMetricGroup.gauge(
@@ -27,6 +27,8 @@
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.partition.consumer.CheckpointableInput;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointableTask;
import org.apache.flink.streaming.runtime.io.checkpointing.BarrierAlignmentUtil.Cancellable;
import org.apache.flink.streaming.runtime.io.checkpointing.BarrierAlignmentUtil.DelayableTimer;
import org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinator;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.clock.Clock;
@@ -44,9 +46,7 @@
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;

import static org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM;
import static org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED;
@@ -67,7 +67,7 @@ public class SingleCheckpointBarrierHandler extends CheckpointBarrierHandler {

private final String taskName;
private final ControllerImpl context;
private final BiFunction<Callable<?>, Duration, Cancellable> registerTimer;
private final DelayableTimer registerTimer;
private final SubtaskCheckpointCoordinator subTaskCheckpointCoordinator;
private final CheckpointableInput[] inputs;

@@ -128,7 +128,7 @@ public static SingleCheckpointBarrierHandler unaligned(
SubtaskCheckpointCoordinator checkpointCoordinator,
Clock clock,
int numOpenChannels,
BiFunction<Callable<?>, Duration, Cancellable> registerTimer,
DelayableTimer registerTimer,
boolean enableCheckpointAfterTasksFinished,
CheckpointableInput... inputs) {
return new SingleCheckpointBarrierHandler(
@@ -149,7 +149,7 @@ public static SingleCheckpointBarrierHandler aligned(
CheckpointableTask toNotifyOnCheckpoint,
Clock clock,
int numOpenChannels,
BiFunction<Callable<?>, Duration, Cancellable> registerTimer,
DelayableTimer registerTimer,
boolean enableCheckpointAfterTasksFinished,
CheckpointableInput... inputs) {
return new SingleCheckpointBarrierHandler(
@@ -171,7 +171,7 @@ public static SingleCheckpointBarrierHandler alternating(
SubtaskCheckpointCoordinator checkpointCoordinator,
Clock clock,
int numOpenChannels,
BiFunction<Callable<?>, Duration, Cancellable> registerTimer,
DelayableTimer registerTimer,
boolean enableCheckpointAfterTasksFinished,
CheckpointableInput... inputs) {
return new SingleCheckpointBarrierHandler(
@@ -195,7 +195,7 @@ private SingleCheckpointBarrierHandler(
int numOpenChannels,
BarrierHandlerState currentState,
boolean alternating,
BiFunction<Callable<?>, Duration, Cancellable> registerTimer,
DelayableTimer registerTimer,
CheckpointableInput[] inputs,
boolean enableCheckpointAfterTasksFinished) {
super(toNotifyOnCheckpoint, clock, enableCheckpointAfterTasksFinished);
@@ -308,15 +308,10 @@ public void processBarrierAnnouncement(
}

private void registerAlignmentTimer(CheckpointBarrier announcedBarrier) {
long alignedCheckpointTimeout =
announcedBarrier.getCheckpointOptions().getAlignedCheckpointTimeout();
long timePassedSinceCheckpointStart =
getClock().absoluteTimeMillis() - announcedBarrier.getTimestamp();

long timerDelay = Math.max(alignedCheckpointTimeout - timePassedSinceCheckpointStart, 0);
long timerDelay = BarrierAlignmentUtil.getTimerDelay(getClock(), announcedBarrier);

this.currentAlignmentTimer =
registerTimer.apply(
registerTimer.registerTask(
() -> {
long barrierId = announcedBarrier.getId();
try {
@@ -38,7 +38,7 @@
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel;
import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.Cancellable;
import org.apache.flink.streaming.runtime.io.checkpointing.BarrierAlignmentUtil.Cancellable;
import org.apache.flink.streaming.util.TestCheckpointedInputGateBuilder;
import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.ManualClock;
@@ -57,7 +57,6 @@
import java.util.PriorityQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;

import static java.util.Collections.singletonList;
import static junit.framework.TestCase.assertTrue;
@@ -684,8 +683,8 @@ public void testNoActiveTimeoutAlignmentAfterClose() throws Exception {
ClockWithDelayedActions clockWithDelayedActions =
new ClockWithDelayedActions() {
@Override
public Cancellable apply(Callable<?> callable, Duration delay) {
super.apply(callable, delay);
public Cancellable registerTask(Callable<?> callable, Duration delay) {
super.registerTask(callable, delay);
// do not unregister timers on cancel
return () -> {};
}
@@ -1465,7 +1464,7 @@ public Callable<?> getCallable() {
}

private static class ClockWithDelayedActions extends Clock
implements BiFunction<Callable<?>, Duration, Cancellable> {
implements BarrierAlignmentUtil.DelayableTimer {

// must start at least at 100 ms, because ValidatingCheckpointHandler
// expects barriers to have positive timestamps
@@ -1474,7 +1473,7 @@ private static class ClockWithDelayedActions extends Clock
new PriorityQueue<>(Comparator.comparingLong(CallableWithTimestamp::getTimestamp));

@Override
public Cancellable apply(Callable<?> callable, Duration delay) {
public Cancellable registerTask(Callable<?> callable, Duration delay) {
CallableWithTimestamp callableWithTimestamp =
new CallableWithTimestamp(
clock.relativeTimeNanos() + delay.toNanos(), callable);
@@ -22,20 +22,14 @@
import org.apache.flink.runtime.checkpoint.channel.RecordingChannelStateWriter;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.Cancellable;
import org.apache.flink.streaming.runtime.tasks.TestSubtaskCheckpointCoordinator;
import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.SystemClock;

import java.time.Duration;
import java.util.concurrent.Callable;
import java.util.function.BiFunction;

/** A factory for creating instances of {@link SingleCheckpointBarrierHandler} for tests. */
public class TestBarrierHandlerFactory {
private final AbstractInvokable target;
private BiFunction<Callable<?>, Duration, Cancellable> actionRegistration =
(callable, delay) -> () -> {};
private BarrierAlignmentUtil.DelayableTimer actionRegistration = (callable, delay) -> () -> {};
private Clock clock = SystemClock.getInstance();
private boolean enableCheckpointsAfterTasksFinish = true;

@@ -48,7 +42,7 @@ public static TestBarrierHandlerFactory forTarget(AbstractInvokable target) {
}

public TestBarrierHandlerFactory withActionRegistration(
BiFunction<Callable<?>, Duration, Cancellable> actionRegistration) {
BarrierAlignmentUtil.DelayableTimer actionRegistration) {
this.actionRegistration = actionRegistration;
return this;
}

0 comments on commit 10b7afa

Please sign in to comment.