Skip to content

Commit

Permalink
fixup! [FLINK-35528][task] Skip execution of interruptible mails when…
Browse files Browse the repository at this point in the history
… yielding
  • Loading branch information
pnowojski committed Jun 10, 2024
1 parent 24fff6a commit dd831d2
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
/** Options to configure behaviour of executing mailbox mails. */
@Internal
public class MailOptionsImpl implements MailboxExecutor.MailOptions {
static final MailboxExecutor.MailOptions DEFAULT = new MailOptionsImpl();
static final MailboxExecutor.MailOptions DEFERRABLE = new MailOptionsImpl().setDeferrable();

private boolean deferrable;

@Override
public MailboxExecutor.MailOptions setDeferrable() {
this.deferrable = true;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,21 +88,23 @@ public interface MailboxExecutor {
Object[] EMPTY_ARGS = new Object[0];

/** Extra options to configure enqueued mails. */
@Experimental
@PublicEvolving
interface MailOptions {
static MailOptions options() {
return new MailOptionsImpl();
return MailOptionsImpl.DEFAULT;
}

/**
* Mark this mail as deferrable.
*
* <p>Runtime can decide to defer execution of deferrable mails. For example, to unblock
* subtask thread as quickly as possible, deferrable mails are not executed during
* {@link #yield()} or {@link #tryYield()}. This is done to speed up checkpointing,
* by skipping execution of potentially long-running mails.
* subtask thread as quickly as possible, deferrable mails are not executed during {@link
* #yield()} or {@link #tryYield()}. This is done to speed up checkpointing, by skipping
* execution of potentially long-running mails.
*/
MailOptions setDeferrable();
static MailOptions deferrable() {
return MailOptionsImpl.DEFERRABLE;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,7 @@ public interface InternalTimeServiceManager<K> {
@FunctionalInterface
interface ShouldStopAdvancingFn {

/**
* @return {@code true} if firing timers should be interrupted.
*/
/** @return {@code true} if firing timers should be interrupted. */
boolean test();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ private void emitWatermarkInsideMailbox() throws Exception {
progressWatermarkScheduled = true;
// We still have work to do, but we need to let other mails to be processed first.
mailboxExecutor.execute(
MailboxExecutor.MailOptions.options().setDeferrable(),
MailboxExecutor.MailOptions.deferrable(),
() -> {
progressWatermarkScheduled = false;
emitWatermarkInsideMailbox();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public MailboxExecutor.MailOptions getMailOptions() {
}

public int getPriority() {
/** See {@link MailboxExecutor.MailOptions#setDeferrable()}. */
/** See {@link MailboxExecutor.MailOptions#deferrable()} ()}. */
return mailOptions.isDeferrable() ? TaskMailbox.MIN_PRIORITY : priority;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ void testDeferrable() throws Exception {
AtomicBoolean deferrableMailExecuted = new AtomicBoolean();

localExecutor.execute(
MailboxExecutor.MailOptions.options().setDeferrable(),
MailboxExecutor.MailOptions.deferrable(),
() -> deferrableMailExecuted.set(true),
"deferrable mail");
assertThat(localExecutor.tryYield()).isFalse();
Expand Down

0 comments on commit dd831d2

Please sign in to comment.