Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-35528][task] Skip execution of interruptible mails when yielding #24904

Merged
merged 1 commit into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.common.operators;

import org.apache.flink.annotation.Internal;

/** Options to configure behaviour of executing mailbox mails. */
@Internal
public class MailOptionsImpl implements MailboxExecutor.MailOptions {
static final MailboxExecutor.MailOptions DEFAULT = new MailOptionsImpl(false);
static final MailboxExecutor.MailOptions DEFERRABLE = new MailOptionsImpl(true);

private final boolean deferrable;

private MailOptionsImpl(boolean deferrable) {
this.deferrable = deferrable;
}

public boolean isDeferrable() {
return deferrable;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,26 @@ public interface MailboxExecutor {
/** A constant for empty args to save on object allocation. */
Object[] EMPTY_ARGS = new Object[0];

/** Extra options to configure enqueued mails. */
@PublicEvolving
interface MailOptions {
rkhachatryan marked this conversation as resolved.
Show resolved Hide resolved
static MailOptions options() {
return MailOptionsImpl.DEFAULT;
}

/**
* Mark this mail as deferrable.
*
* <p>Runtime can decide to defer execution of deferrable mails. For example, to unblock
* subtask thread as quickly as possible, deferrable mails are not executed during {@link
* #yield()} or {@link #tryYield()}. This is done to speed up checkpointing, by skipping
* execution of potentially long-running mails.
*/
static MailOptions deferrable() {
return MailOptionsImpl.DEFERRABLE;
}
}

/**
* Executes the given command at some time in the future in the mailbox thread.
*
Expand All @@ -110,6 +130,49 @@ default void execute(ThrowingRunnable<? extends Exception> command, String descr
* The description may contain placeholder that refer to the provided description arguments
* using {@link java.util.Formatter} syntax. The actual description is only formatted on demand.
*
* @param mailOptions additional options to configure behaviour of the {@code command}
* @param command the runnable task to add to the mailbox for execution.
* @param description the optional description for the command that is used for debugging and
* error-reporting.
* @throws RejectedExecutionException if this task cannot be accepted for execution, e.g.
* because the mailbox is quiesced or closed.
*/
default void execute(
MailOptions mailOptions,
ThrowingRunnable<? extends Exception> command,
String description) {
execute(mailOptions, command, description, EMPTY_ARGS);
}

/**
* Executes the given command at some time in the future in the mailbox thread.
*
* <p>An optional description can (and should) be added to ease debugging and error-reporting.
* The description may contain placeholder that refer to the provided description arguments
* using {@link java.util.Formatter} syntax. The actual description is only formatted on demand.
*
* @param command the runnable task to add to the mailbox for execution.
* @param descriptionFormat the optional description for the command that is used for debugging
* and error-reporting.
* @param descriptionArgs the parameters used to format the final description string.
* @throws RejectedExecutionException if this task cannot be accepted for execution, e.g.
* because the mailbox is quiesced or closed.
*/
default void execute(
ThrowingRunnable<? extends Exception> command,
String descriptionFormat,
Object... descriptionArgs) {
execute(MailOptions.options(), command, descriptionFormat, descriptionArgs);
}

/**
* Executes the given command at some time in the future in the mailbox thread.
*
* <p>An optional description can (and should) be added to ease debugging and error-reporting.
* The description may contain placeholder that refer to the provided description arguments
* using {@link java.util.Formatter} syntax. The actual description is only formatted on demand.
*
* @param mailOptions additional options to configure behaviour of the {@code command}
* @param command the runnable task to add to the mailbox for execution.
* @param descriptionFormat the optional description for the command that is used for debugging
* and error-reporting.
Expand All @@ -118,6 +181,7 @@ default void execute(ThrowingRunnable<? extends Exception> command, String descr
* because the mailbox is quiesced or closed.
*/
void execute(
MailOptions mailOptions,
ThrowingRunnable<? extends Exception> command,
String descriptionFormat,
Object... descriptionArgs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -824,6 +824,7 @@ public TestMailboxExecutor(boolean fail) {

@Override
public void execute(
MailOptions mailOptions,
ThrowingRunnable<? extends Exception> command,
String descriptionFormat,
Object... descriptionArgs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
public class SyncMailboxExecutor implements MailboxExecutor {
@Override
public void execute(
MailOptions mailOptions,
ThrowingRunnable<? extends Exception> command,
String descriptionFormat,
Object... descriptionArgs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ private void emitWatermarkInsideMailbox() throws Exception {
progressWatermarkScheduled = true;
// We still have work to do, but we need to let other mails to be processed first.
mailboxExecutor.execute(
MailboxExecutor.MailOptions.deferrable(),
() -> {
progressWatermarkScheduled = false;
emitWatermarkInsideMailbox();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.flink.streaming.runtime.tasks.mailbox;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.operators.MailOptionsImpl;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ThrowingRunnable;
Expand All @@ -30,6 +32,7 @@
*/
@Internal
public class Mail {
private final MailOptionsImpl mailOptions;
/** The action to execute. */
private final ThrowingRunnable<? extends Exception> runnable;
/**
Expand All @@ -50,6 +53,7 @@ public Mail(
String descriptionFormat,
Object... descriptionArgs) {
this(
MailboxExecutor.MailOptions.options(),
runnable,
priority,
StreamTaskActionExecutor.IMMEDIATE,
Expand All @@ -58,11 +62,13 @@ public Mail(
}

public Mail(
MailboxExecutor.MailOptions mailOptions,
ThrowingRunnable<? extends Exception> runnable,
int priority,
StreamTaskActionExecutor actionExecutor,
String descriptionFormat,
Object... descriptionArgs) {
this.mailOptions = (MailOptionsImpl) mailOptions;
pnowojski marked this conversation as resolved.
Show resolved Hide resolved
this.runnable = Preconditions.checkNotNull(runnable);
this.priority = priority;
this.descriptionFormat =
Expand All @@ -71,8 +77,13 @@ public Mail(
this.actionExecutor = actionExecutor;
}

public MailboxExecutor.MailOptions getMailOptions() {
return mailOptions;
}

public int getPriority() {
return priority;
/** See {@link MailboxExecutor.MailOptions#deferrable()} ()}. */
return mailOptions.isDeferrable() ? TaskMailbox.MIN_PRIORITY : priority;
}

public void tryCancel(boolean mayInterruptIfRunning) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,19 @@ public boolean isIdle() {

@Override
public void execute(
MailOptions mailOptions,
final ThrowingRunnable<? extends Exception> command,
final String descriptionFormat,
final Object... descriptionArgs) {
try {
mailbox.put(
new Mail(
command, priority, actionExecutor, descriptionFormat, descriptionArgs));
mailOptions,
command,
priority,
actionExecutor,
descriptionFormat,
descriptionArgs));
} catch (MailboxClosedException mbex) {
throw new RejectedExecutionException(mbex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ private static class DummyMailboxExecutor implements MailboxExecutor {

@Override
public void execute(
MailOptions mailOptions,
ThrowingRunnable<? extends Exception> command,
String descriptionFormat,
Object... descriptionArgs) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

import static org.assertj.core.api.Assertions.assertThat;

Expand All @@ -43,14 +44,16 @@ class MailboxWatermarkProcessorTest {

@Test
void testEmitWatermarkInsideMailbox() throws Exception {
int priority = 42;
final List<StreamElement> emittedElements = new ArrayList<>();
final TaskMailboxImpl mailbox = new TaskMailboxImpl();
final InternalTimeServiceManager<?> timerService = new NoOpInternalTimeServiceManager();

final MailboxWatermarkProcessor<StreamRecord<String>> watermarkProcessor =
new MailboxWatermarkProcessor<>(
new CollectorOutput<>(emittedElements),
new MailboxExecutorImpl(mailbox, 0, StreamTaskActionExecutor.IMMEDIATE),
new MailboxExecutorImpl(
mailbox, priority, StreamTaskActionExecutor.IMMEDIATE),
timerService);
final List<Watermark> expectedOutput = new ArrayList<>();
watermarkProcessor.emitWatermarkInsideMailbox(new Watermark(1));
Expand All @@ -69,6 +72,10 @@ void testEmitWatermarkInsideMailbox() throws Exception {

assertThat(emittedElements).containsExactlyElementsOf(expectedOutput);

// FLINK-35528: do not allow yielding to continuation mails
assertThat(mailbox.tryTake(priority)).isEqualTo(Optional.empty());
assertThat(emittedElements).containsExactlyElementsOf(expectedOutput);

while (mailbox.hasMail()) {
mailbox.take(TaskMailbox.MIN_PRIORITY).run();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,26 @@ void testOperations() throws Exception {
assertThat(wasExecuted).isTrue();
}

@Test
void testDeferrable() throws Exception {
int priority = 42;
MailboxExecutor localExecutor = mailboxProcessor.getMailboxExecutor(priority);

AtomicBoolean deferrableMailExecuted = new AtomicBoolean();

localExecutor.execute(
MailboxExecutor.MailOptions.deferrable(),
() -> deferrableMailExecuted.set(true),
"deferrable mail");
assertThat(localExecutor.tryYield()).isFalse();
assertThat(deferrableMailExecuted.get()).isFalse();
assertThat(mailboxExecutor.tryYield()).isFalse();
assertThat(deferrableMailExecuted.get()).isFalse();

assertThat(mailboxProcessor.runMailboxStep()).isTrue();
assertThat(deferrableMailExecuted.get()).isTrue();
}

@Test
void testClose() throws Exception {
final TestRunnable yieldRun = new TestRunnable();
Expand Down