diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/MailOptionsImpl.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/MailOptionsImpl.java new file mode 100644 index 00000000000000..4551a4f7e60783 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/MailOptionsImpl.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.operators; + +import org.apache.flink.annotation.Internal; + +/** Options to configure behaviour of executing mailbox mails. */ +@Internal +public class MailOptionsImpl implements MailboxExecutor.MailOptions { + private boolean interruptible; + + @Override + public MailboxExecutor.MailOptions setInterruptible() { + this.interruptible = true; + return this; + } + + public boolean isInterruptible() { + return interruptible; + } +} diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/MailboxExecutor.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/MailboxExecutor.java index 639ed18b6733e4..a5271b1dc5831a 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/MailboxExecutor.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/MailboxExecutor.java @@ -17,6 +17,7 @@ package org.apache.flink.api.common.operators; +import org.apache.flink.annotation.Experimental; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.function.FutureTaskWithException; @@ -86,6 +87,25 @@ public interface MailboxExecutor { /** A constant for empty args to save on object allocation. */ Object[] EMPTY_ARGS = new Object[0]; + /** Extra options to configure enqueued mails. */ + @Experimental + interface MailOptions { + static MailOptions options() { + return new MailOptionsImpl(); + } + + /** + * Mark this mail as interruptible. + * + *

Interruptible mails, are those that respect {@link MailboxExecutor#shouldInterrupt()} + * flag. Marking mail as interruptible allows {@link MailboxExecutor} to optimize execution + * order. For example interruptible mails are not executed during {@link #yield()} or {@link + * #tryYield()}, to avoid having to immediately interrupt them. This is done to speed up + * checkpointing, by skipping execution of potentially long running mails. + */ + MailOptions setInterruptible(); + } + /** * Executes the given command at some time in the future in the mailbox thread. * @@ -110,6 +130,49 @@ default void execute(ThrowingRunnable command, String descr * The description may contain placeholder that refer to the provided description arguments * using {@link java.util.Formatter} syntax. The actual description is only formatted on demand. * + * @param mailOptions additional options to configure behaviour of the {@code command} + * @param command the runnable task to add to the mailbox for execution. + * @param description the optional description for the command that is used for debugging and + * error-reporting. + * @throws RejectedExecutionException if this task cannot be accepted for execution, e.g. + * because the mailbox is quiesced or closed. + */ + default void execute( + MailOptions mailOptions, + ThrowingRunnable command, + String description) { + execute(mailOptions, command, description, EMPTY_ARGS); + } + + /** + * Executes the given command at some time in the future in the mailbox thread. + * + *

An optional description can (and should) be added to ease debugging and error-reporting. + * The description may contain placeholder that refer to the provided description arguments + * using {@link java.util.Formatter} syntax. The actual description is only formatted on demand. + * + * @param command the runnable task to add to the mailbox for execution. + * @param descriptionFormat the optional description for the command that is used for debugging + * and error-reporting. + * @param descriptionArgs the parameters used to format the final description string. + * @throws RejectedExecutionException if this task cannot be accepted for execution, e.g. + * because the mailbox is quiesced or closed. + */ + default void execute( + ThrowingRunnable command, + String descriptionFormat, + Object... descriptionArgs) { + execute(MailOptions.options(), command, descriptionFormat, descriptionArgs); + } + + /** + * Executes the given command at some time in the future in the mailbox thread. + * + *

An optional description can (and should) be added to ease debugging and error-reporting. + * The description may contain placeholder that refer to the provided description arguments + * using {@link java.util.Formatter} syntax. The actual description is only formatted on demand. + * + * @param mailOptions additional options to configure behaviour of the {@code command} * @param command the runnable task to add to the mailbox for execution. * @param descriptionFormat the optional description for the command that is used for debugging * and error-reporting. @@ -118,6 +181,7 @@ default void execute(ThrowingRunnable command, String descr * because the mailbox is quiesced or closed. */ void execute( + MailOptions mailOptions, ThrowingRunnable command, String descriptionFormat, Object... descriptionArgs); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java index f99dd877cf783c..e98a66b82a02de 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java @@ -824,6 +824,7 @@ public TestMailboxExecutor(boolean fail) { @Override public void execute( + MailOptions mailOptions, ThrowingRunnable command, String descriptionFormat, Object... descriptionArgs) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/mailbox/SyncMailboxExecutor.java b/flink-runtime/src/test/java/org/apache/flink/runtime/mailbox/SyncMailboxExecutor.java index 13ab803a385281..5b8628499388fe 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/mailbox/SyncMailboxExecutor.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/mailbox/SyncMailboxExecutor.java @@ -25,6 +25,7 @@ public class SyncMailboxExecutor implements MailboxExecutor { @Override public void execute( + MailOptions mailOptions, ThrowingRunnable command, String descriptionFormat, Object... descriptionArgs) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessor.java index 02cfd966c1cf24..77caea78dbfe42 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessor.java @@ -77,6 +77,7 @@ private void emitWatermarkInsideMailbox() throws Exception { progressWatermarkScheduled = true; // We still have work to do, but we need to let other mails to be processed first. mailboxExecutor.execute( + MailboxExecutor.MailOptions.options().setInterruptible(), () -> { progressWatermarkScheduled = false; emitWatermarkInsideMailbox(); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mail.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mail.java index bed96ae559a0b9..2dae3e2412e290 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mail.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mail.java @@ -18,6 +18,8 @@ package org.apache.flink.streaming.runtime.tasks.mailbox; import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.operators.MailOptionsImpl; +import org.apache.flink.api.common.operators.MailboxExecutor; import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor; import org.apache.flink.util.Preconditions; import org.apache.flink.util.function.ThrowingRunnable; @@ -30,6 +32,7 @@ */ @Internal public class Mail { + private final MailOptionsImpl mailOptions; /** The action to execute. */ private final ThrowingRunnable runnable; /** @@ -50,6 +53,7 @@ public Mail( String descriptionFormat, Object... descriptionArgs) { this( + MailboxExecutor.MailOptions.options(), runnable, priority, StreamTaskActionExecutor.IMMEDIATE, @@ -58,11 +62,13 @@ public Mail( } public Mail( + MailboxExecutor.MailOptions mailOptions, ThrowingRunnable runnable, int priority, StreamTaskActionExecutor actionExecutor, String descriptionFormat, Object... descriptionArgs) { + this.mailOptions = (MailOptionsImpl) mailOptions; this.runnable = Preconditions.checkNotNull(runnable); this.priority = priority; this.descriptionFormat = @@ -71,8 +77,13 @@ public Mail( this.actionExecutor = actionExecutor; } + public MailboxExecutor.MailOptions getMailOptions() { + return mailOptions; + } + public int getPriority() { - return priority; + /** See {@link MailboxExecutor.MailOptions#setInterruptible()}. */ + return mailOptions.isInterruptible() ? TaskMailbox.MIN_PRIORITY : priority; } public void tryCancel(boolean mayInterruptIfRunning) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImpl.java index b2c8eac935a35c..2987ee21cca626 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImpl.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImpl.java @@ -67,13 +67,19 @@ public boolean isIdle() { @Override public void execute( + MailOptions mailOptions, final ThrowingRunnable command, final String descriptionFormat, final Object... descriptionArgs) { try { mailbox.put( new Mail( - command, priority, actionExecutor, descriptionFormat, descriptionArgs)); + mailOptions, + command, + priority, + actionExecutor, + descriptionFormat, + descriptionArgs)); } catch (MailboxClosedException mbex) { throw new RejectedExecutionException(mbex); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkTest.java index 88e0bf3bf46e39..bc92ac151841f4 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkTest.java @@ -235,6 +235,7 @@ private static class DummyMailboxExecutor implements MailboxExecutor { @Override public void execute( + MailOptions mailOptions, ThrowingRunnable command, String descriptionFormat, Object... descriptionArgs) {} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessorTest.java index 9fb9d348525d82..87f19f4032d2cc 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessorTest.java @@ -35,6 +35,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Optional; import static org.assertj.core.api.Assertions.assertThat; @@ -43,6 +44,7 @@ class MailboxWatermarkProcessorTest { @Test void testEmitWatermarkInsideMailbox() throws Exception { + int priority = 42; final List emittedElements = new ArrayList<>(); final TaskMailboxImpl mailbox = new TaskMailboxImpl(); final InternalTimeServiceManager timerService = new NoOpInternalTimeServiceManager(); @@ -50,7 +52,8 @@ void testEmitWatermarkInsideMailbox() throws Exception { final MailboxWatermarkProcessor> watermarkProcessor = new MailboxWatermarkProcessor<>( new CollectorOutput<>(emittedElements), - new MailboxExecutorImpl(mailbox, 0, StreamTaskActionExecutor.IMMEDIATE), + new MailboxExecutorImpl( + mailbox, priority, StreamTaskActionExecutor.IMMEDIATE), timerService); final List expectedOutput = new ArrayList<>(); watermarkProcessor.emitWatermarkInsideMailbox(new Watermark(1)); @@ -69,6 +72,10 @@ void testEmitWatermarkInsideMailbox() throws Exception { assertThat(emittedElements).containsExactlyElementsOf(expectedOutput); + // FLINK-35528: do not allow yielding to continuation mails + assertThat(mailbox.tryTake(priority)).isEqualTo(Optional.empty()); + assertThat(emittedElements).containsExactlyElementsOf(expectedOutput); + while (mailbox.hasMail()) { mailbox.take(TaskMailbox.MIN_PRIORITY).run(); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImplTest.java index cfeb92b2aa1707..6b2bc3128dea60 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImplTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImplTest.java @@ -110,6 +110,26 @@ void testOperations() throws Exception { assertThat(wasExecuted).isTrue(); } + @Test + void testInterruptible() throws Exception { + int priority = 42; + MailboxExecutor localExecutor = mailboxProcessor.getMailboxExecutor(priority); + + AtomicBoolean interruptibleExecuted = new AtomicBoolean(); + + localExecutor.execute( + MailboxExecutor.MailOptions.options().setInterruptible(), + () -> interruptibleExecuted.set(true), + "interruptible mail"); + assertThat(localExecutor.tryYield()).isFalse(); + assertThat(interruptibleExecuted.get()).isFalse(); + assertThat(mailboxExecutor.tryYield()).isFalse(); + assertThat(interruptibleExecuted.get()).isFalse(); + + assertThat(mailboxProcessor.runMailboxStep()).isTrue(); + assertThat(interruptibleExecuted.get()).isTrue(); + } + @Test void testClose() throws Exception { final TestRunnable yieldRun = new TestRunnable();