From e9b1560475d17c52d7f8a13a8570e51a442aad30 Mon Sep 17 00:00:00 2001 From: Piotr Nowojski Date: Mon, 10 Jun 2024 10:46:08 +0200 Subject: [PATCH] fixup! [FLINK-35528][task] Skip execution of interruptible mails when yielding --- .../api/common/operators/MailOptionsImpl.java | 15 ++++++++------- .../api/common/operators/MailboxExecutor.java | 19 ++++++++++--------- .../operators/InternalTimeServiceManager.java | 4 +--- .../operators/MailboxWatermarkProcessor.java | 2 +- .../streaming/runtime/tasks/mailbox/Mail.java | 4 ++-- .../mailbox/MailboxExecutorImplTest.java | 16 ++++++++-------- 6 files changed, 30 insertions(+), 30 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/MailOptionsImpl.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/MailOptionsImpl.java index 4551a4f7e60783..80d6b6cb6c5945 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/MailOptionsImpl.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/MailOptionsImpl.java @@ -22,15 +22,16 @@ /** Options to configure behaviour of executing mailbox mails. */ @Internal public class MailOptionsImpl implements MailboxExecutor.MailOptions { - private boolean interruptible; + static final MailboxExecutor.MailOptions DEFAULT = new MailOptionsImpl(false); + static final MailboxExecutor.MailOptions DEFERRABLE = new MailOptionsImpl(false); - @Override - public MailboxExecutor.MailOptions setInterruptible() { - this.interruptible = true; - return this; + private final boolean deferrable; + + private MailOptionsImpl(boolean deferrable) { + this.deferrable = deferrable; } - public boolean isInterruptible() { - return interruptible; + public boolean isDeferrable() { + return deferrable; } } diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/MailboxExecutor.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/MailboxExecutor.java index a5271b1dc5831a..0a88cd1b00ecd5 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/MailboxExecutor.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/MailboxExecutor.java @@ -88,22 +88,23 @@ public interface MailboxExecutor { Object[] EMPTY_ARGS = new Object[0]; /** Extra options to configure enqueued mails. */ - @Experimental + @PublicEvolving interface MailOptions { static MailOptions options() { - return new MailOptionsImpl(); + return MailOptionsImpl.DEFAULT; } /** - * Mark this mail as interruptible. + * Mark this mail as deferrable. * - *

Interruptible mails, are those that respect {@link MailboxExecutor#shouldInterrupt()} - * flag. Marking mail as interruptible allows {@link MailboxExecutor} to optimize execution - * order. For example interruptible mails are not executed during {@link #yield()} or {@link - * #tryYield()}, to avoid having to immediately interrupt them. This is done to speed up - * checkpointing, by skipping execution of potentially long running mails. + *

Runtime can decide to defer execution of deferrable mails. For example, to unblock + * subtask thread as quickly as possible, deferrable mails are not executed during {@link + * #yield()} or {@link #tryYield()}. This is done to speed up checkpointing, by skipping + * execution of potentially long-running mails. */ - MailOptions setInterruptible(); + static MailOptions deferrable() { + return MailOptionsImpl.DEFERRABLE; + } } /** diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java index 4f8c74e2fe0965..a1b89031f5e4ae 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java @@ -46,9 +46,7 @@ public interface InternalTimeServiceManager { @FunctionalInterface interface ShouldStopAdvancingFn { - /** - * @return {@code true} if firing timers should be interrupted. - */ + /** @return {@code true} if firing timers should be interrupted. */ boolean test(); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessor.java index 77caea78dbfe42..76eabe6dec1006 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessor.java @@ -77,7 +77,7 @@ private void emitWatermarkInsideMailbox() throws Exception { progressWatermarkScheduled = true; // We still have work to do, but we need to let other mails to be processed first. mailboxExecutor.execute( - MailboxExecutor.MailOptions.options().setInterruptible(), + MailboxExecutor.MailOptions.deferrable(), () -> { progressWatermarkScheduled = false; emitWatermarkInsideMailbox(); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mail.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mail.java index 2dae3e2412e290..6afd8918e3de52 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mail.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mail.java @@ -82,8 +82,8 @@ public MailboxExecutor.MailOptions getMailOptions() { } public int getPriority() { - /** See {@link MailboxExecutor.MailOptions#setInterruptible()}. */ - return mailOptions.isInterruptible() ? TaskMailbox.MIN_PRIORITY : priority; + /** See {@link MailboxExecutor.MailOptions#deferrable()} ()}. */ + return mailOptions.isDeferrable() ? TaskMailbox.MIN_PRIORITY : priority; } public void tryCancel(boolean mayInterruptIfRunning) { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImplTest.java index 6b2bc3128dea60..1b6fddc7d91600 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImplTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImplTest.java @@ -111,23 +111,23 @@ void testOperations() throws Exception { } @Test - void testInterruptible() throws Exception { + void testDeferrable() throws Exception { int priority = 42; MailboxExecutor localExecutor = mailboxProcessor.getMailboxExecutor(priority); - AtomicBoolean interruptibleExecuted = new AtomicBoolean(); + AtomicBoolean deferrableMailExecuted = new AtomicBoolean(); localExecutor.execute( - MailboxExecutor.MailOptions.options().setInterruptible(), - () -> interruptibleExecuted.set(true), - "interruptible mail"); + MailboxExecutor.MailOptions.deferrable(), + () -> deferrableMailExecuted.set(true), + "deferrable mail"); assertThat(localExecutor.tryYield()).isFalse(); - assertThat(interruptibleExecuted.get()).isFalse(); + assertThat(deferrableMailExecuted.get()).isFalse(); assertThat(mailboxExecutor.tryYield()).isFalse(); - assertThat(interruptibleExecuted.get()).isFalse(); + assertThat(deferrableMailExecuted.get()).isFalse(); assertThat(mailboxProcessor.runMailboxStep()).isTrue(); - assertThat(interruptibleExecuted.get()).isTrue(); + assertThat(deferrableMailExecuted.get()).isTrue(); } @Test