Skip to content

Commit

Permalink
[FLINK-35528][task] Skip execution of interruptible mails when yielding
Browse files Browse the repository at this point in the history
When operators are yielding, for example waiting for async state access to complete before a checkpoint, it would be beneficial to not execute interruptible mails. Otherwise continuation mail for firing timers would be continuously re-enqeueed. To achieve that MailboxExecutor must be aware which mails are interruptible.

The easiest way to achieve this is to set MIN_PRIORITY for interruptible mails.
  • Loading branch information
pnowojski committed Jun 10, 2024
1 parent 10d91a9 commit fb9c4b5
Show file tree
Hide file tree
Showing 10 changed files with 151 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.common.operators;

import org.apache.flink.annotation.Internal;

/** Options to configure behaviour of executing mailbox mails. */
@Internal
public class MailOptionsImpl implements MailboxExecutor.MailOptions {
private boolean interruptible;

@Override
public MailboxExecutor.MailOptions setInterruptible() {
this.interruptible = true;
return this;
}

public boolean isInterruptible() {
return interruptible;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.flink.api.common.operators;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.function.FutureTaskWithException;
Expand Down Expand Up @@ -86,6 +87,25 @@ public interface MailboxExecutor {
/** A constant for empty args to save on object allocation. */
Object[] EMPTY_ARGS = new Object[0];

/** Extra options to configure enqueued mails. */
@Experimental
interface MailOptions {
static MailOptions options() {
return new MailOptionsImpl();
}

/**
* Mark this mail as interruptible.
*
* <p>Interruptible mails, are those that respect {@link MailboxExecutor#shouldInterrupt()}
* flag. Marking mail as interruptible allows {@link MailboxExecutor} to optimize execution
* order. For example interruptible mails are not executed during {@link #yield()} or {@link
* #tryYield()}, to avoid having to immediately interrupt them. This is done to speed up
* checkpointing, by skipping execution of potentially long running mails.
*/
MailOptions setInterruptible();
}

/**
* Executes the given command at some time in the future in the mailbox thread.
*
Expand All @@ -110,6 +130,49 @@ default void execute(ThrowingRunnable<? extends Exception> command, String descr
* The description may contain placeholder that refer to the provided description arguments
* using {@link java.util.Formatter} syntax. The actual description is only formatted on demand.
*
* @param mailOptions additional options to configure behaviour of the {@code command}
* @param command the runnable task to add to the mailbox for execution.
* @param description the optional description for the command that is used for debugging and
* error-reporting.
* @throws RejectedExecutionException if this task cannot be accepted for execution, e.g.
* because the mailbox is quiesced or closed.
*/
default void execute(
MailOptions mailOptions,
ThrowingRunnable<? extends Exception> command,
String description) {
execute(mailOptions, command, description, EMPTY_ARGS);
}

/**
* Executes the given command at some time in the future in the mailbox thread.
*
* <p>An optional description can (and should) be added to ease debugging and error-reporting.
* The description may contain placeholder that refer to the provided description arguments
* using {@link java.util.Formatter} syntax. The actual description is only formatted on demand.
*
* @param command the runnable task to add to the mailbox for execution.
* @param descriptionFormat the optional description for the command that is used for debugging
* and error-reporting.
* @param descriptionArgs the parameters used to format the final description string.
* @throws RejectedExecutionException if this task cannot be accepted for execution, e.g.
* because the mailbox is quiesced or closed.
*/
default void execute(
ThrowingRunnable<? extends Exception> command,
String descriptionFormat,
Object... descriptionArgs) {
execute(MailOptions.options(), command, descriptionFormat, descriptionArgs);
}

/**
* Executes the given command at some time in the future in the mailbox thread.
*
* <p>An optional description can (and should) be added to ease debugging and error-reporting.
* The description may contain placeholder that refer to the provided description arguments
* using {@link java.util.Formatter} syntax. The actual description is only formatted on demand.
*
* @param mailOptions additional options to configure behaviour of the {@code command}
* @param command the runnable task to add to the mailbox for execution.
* @param descriptionFormat the optional description for the command that is used for debugging
* and error-reporting.
Expand All @@ -118,6 +181,7 @@ default void execute(ThrowingRunnable<? extends Exception> command, String descr
* because the mailbox is quiesced or closed.
*/
void execute(
MailOptions mailOptions,
ThrowingRunnable<? extends Exception> command,
String descriptionFormat,
Object... descriptionArgs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -824,6 +824,7 @@ public TestMailboxExecutor(boolean fail) {

@Override
public void execute(
MailOptions mailOptions,
ThrowingRunnable<? extends Exception> command,
String descriptionFormat,
Object... descriptionArgs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
public class SyncMailboxExecutor implements MailboxExecutor {
@Override
public void execute(
MailOptions mailOptions,
ThrowingRunnable<? extends Exception> command,
String descriptionFormat,
Object... descriptionArgs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ private void emitWatermarkInsideMailbox() throws Exception {
progressWatermarkScheduled = true;
// We still have work to do, but we need to let other mails to be processed first.
mailboxExecutor.execute(
MailboxExecutor.MailOptions.options().setInterruptible(),
() -> {
progressWatermarkScheduled = false;
emitWatermarkInsideMailbox();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.flink.streaming.runtime.tasks.mailbox;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.operators.MailOptionsImpl;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ThrowingRunnable;
Expand All @@ -30,6 +32,7 @@
*/
@Internal
public class Mail {
private final MailOptionsImpl mailOptions;
/** The action to execute. */
private final ThrowingRunnable<? extends Exception> runnable;
/**
Expand All @@ -50,6 +53,7 @@ public Mail(
String descriptionFormat,
Object... descriptionArgs) {
this(
MailboxExecutor.MailOptions.options(),
runnable,
priority,
StreamTaskActionExecutor.IMMEDIATE,
Expand All @@ -58,11 +62,13 @@ public Mail(
}

public Mail(
MailboxExecutor.MailOptions mailOptions,
ThrowingRunnable<? extends Exception> runnable,
int priority,
StreamTaskActionExecutor actionExecutor,
String descriptionFormat,
Object... descriptionArgs) {
this.mailOptions = (MailOptionsImpl) mailOptions;
this.runnable = Preconditions.checkNotNull(runnable);
this.priority = priority;
this.descriptionFormat =
Expand All @@ -71,8 +77,13 @@ public Mail(
this.actionExecutor = actionExecutor;
}

public MailboxExecutor.MailOptions getMailOptions() {
return mailOptions;
}

public int getPriority() {
return priority;
/** See {@link MailboxExecutor.MailOptions#setInterruptible()}. */
return mailOptions.isInterruptible() ? TaskMailbox.MIN_PRIORITY : priority;
}

public void tryCancel(boolean mayInterruptIfRunning) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,19 @@ public boolean isIdle() {

@Override
public void execute(
MailOptions mailOptions,
final ThrowingRunnable<? extends Exception> command,
final String descriptionFormat,
final Object... descriptionArgs) {
try {
mailbox.put(
new Mail(
command, priority, actionExecutor, descriptionFormat, descriptionArgs));
mailOptions,
command,
priority,
actionExecutor,
descriptionFormat,
descriptionArgs));
} catch (MailboxClosedException mbex) {
throw new RejectedExecutionException(mbex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ private static class DummyMailboxExecutor implements MailboxExecutor {

@Override
public void execute(
MailOptions mailOptions,
ThrowingRunnable<? extends Exception> command,
String descriptionFormat,
Object... descriptionArgs) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

import static org.assertj.core.api.Assertions.assertThat;

Expand All @@ -43,14 +44,16 @@ class MailboxWatermarkProcessorTest {

@Test
void testEmitWatermarkInsideMailbox() throws Exception {
int priority = 42;
final List<StreamElement> emittedElements = new ArrayList<>();
final TaskMailboxImpl mailbox = new TaskMailboxImpl();
final InternalTimeServiceManager<?> timerService = new NoOpInternalTimeServiceManager();

final MailboxWatermarkProcessor<StreamRecord<String>> watermarkProcessor =
new MailboxWatermarkProcessor<>(
new CollectorOutput<>(emittedElements),
new MailboxExecutorImpl(mailbox, 0, StreamTaskActionExecutor.IMMEDIATE),
new MailboxExecutorImpl(
mailbox, priority, StreamTaskActionExecutor.IMMEDIATE),
timerService);
final List<Watermark> expectedOutput = new ArrayList<>();
watermarkProcessor.emitWatermarkInsideMailbox(new Watermark(1));
Expand All @@ -69,6 +72,10 @@ void testEmitWatermarkInsideMailbox() throws Exception {

assertThat(emittedElements).containsExactlyElementsOf(expectedOutput);

// FLINK-35528: do not allow yielding to continuation mails
assertThat(mailbox.tryTake(priority)).isEqualTo(Optional.empty());
assertThat(emittedElements).containsExactlyElementsOf(expectedOutput);

while (mailbox.hasMail()) {
mailbox.take(TaskMailbox.MIN_PRIORITY).run();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,26 @@ void testOperations() throws Exception {
assertThat(wasExecuted).isTrue();
}

@Test
void testInterruptible() throws Exception {
int priority = 42;
MailboxExecutor localExecutor = mailboxProcessor.getMailboxExecutor(priority);

AtomicBoolean interruptibleExecuted = new AtomicBoolean();

localExecutor.execute(
MailboxExecutor.MailOptions.options().setInterruptible(),
() -> interruptibleExecuted.set(true),
"interruptible mail");
assertThat(localExecutor.tryYield()).isFalse();
assertThat(interruptibleExecuted.get()).isFalse();
assertThat(mailboxExecutor.tryYield()).isFalse();
assertThat(interruptibleExecuted.get()).isFalse();

assertThat(mailboxProcessor.runMailboxStep()).isTrue();
assertThat(interruptibleExecuted.get()).isTrue();
}

@Test
void testClose() throws Exception {
final TestRunnable yieldRun = new TestRunnable();
Expand Down

0 comments on commit fb9c4b5

Please sign in to comment.