Skip to content

Commit

Permalink
fixup! [FLINK-35528][task] Skip execution of interruptible mails when…
Browse files Browse the repository at this point in the history
… yielding
  • Loading branch information
pnowojski committed Jun 10, 2024
1 parent fb9c4b5 commit e9b1560
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,16 @@
/** Options to configure behaviour of executing mailbox mails. */
@Internal
public class MailOptionsImpl implements MailboxExecutor.MailOptions {
private boolean interruptible;
static final MailboxExecutor.MailOptions DEFAULT = new MailOptionsImpl(false);
static final MailboxExecutor.MailOptions DEFERRABLE = new MailOptionsImpl(false);

@Override
public MailboxExecutor.MailOptions setInterruptible() {
this.interruptible = true;
return this;
private final boolean deferrable;

private MailOptionsImpl(boolean deferrable) {
this.deferrable = deferrable;
}

public boolean isInterruptible() {
return interruptible;
public boolean isDeferrable() {
return deferrable;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,22 +88,23 @@ public interface MailboxExecutor {
Object[] EMPTY_ARGS = new Object[0];

/** Extra options to configure enqueued mails. */
@Experimental
@PublicEvolving
interface MailOptions {
static MailOptions options() {
return new MailOptionsImpl();
return MailOptionsImpl.DEFAULT;
}

/**
* Mark this mail as interruptible.
* Mark this mail as deferrable.
*
* <p>Interruptible mails, are those that respect {@link MailboxExecutor#shouldInterrupt()}
* flag. Marking mail as interruptible allows {@link MailboxExecutor} to optimize execution
* order. For example interruptible mails are not executed during {@link #yield()} or {@link
* #tryYield()}, to avoid having to immediately interrupt them. This is done to speed up
* checkpointing, by skipping execution of potentially long running mails.
* <p>Runtime can decide to defer execution of deferrable mails. For example, to unblock
* subtask thread as quickly as possible, deferrable mails are not executed during {@link
* #yield()} or {@link #tryYield()}. This is done to speed up checkpointing, by skipping
* execution of potentially long-running mails.
*/
MailOptions setInterruptible();
static MailOptions deferrable() {
return MailOptionsImpl.DEFERRABLE;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,7 @@ public interface InternalTimeServiceManager<K> {
@FunctionalInterface
interface ShouldStopAdvancingFn {

/**
* @return {@code true} if firing timers should be interrupted.
*/
/** @return {@code true} if firing timers should be interrupted. */
boolean test();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ private void emitWatermarkInsideMailbox() throws Exception {
progressWatermarkScheduled = true;
// We still have work to do, but we need to let other mails to be processed first.
mailboxExecutor.execute(
MailboxExecutor.MailOptions.options().setInterruptible(),
MailboxExecutor.MailOptions.deferrable(),
() -> {
progressWatermarkScheduled = false;
emitWatermarkInsideMailbox();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ public MailboxExecutor.MailOptions getMailOptions() {
}

public int getPriority() {
/** See {@link MailboxExecutor.MailOptions#setInterruptible()}. */
return mailOptions.isInterruptible() ? TaskMailbox.MIN_PRIORITY : priority;
/** See {@link MailboxExecutor.MailOptions#deferrable()} ()}. */
return mailOptions.isDeferrable() ? TaskMailbox.MIN_PRIORITY : priority;
}

public void tryCancel(boolean mayInterruptIfRunning) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,23 +111,23 @@ void testOperations() throws Exception {
}

@Test
void testInterruptible() throws Exception {
void testDeferrable() throws Exception {
int priority = 42;
MailboxExecutor localExecutor = mailboxProcessor.getMailboxExecutor(priority);

AtomicBoolean interruptibleExecuted = new AtomicBoolean();
AtomicBoolean deferrableMailExecuted = new AtomicBoolean();

localExecutor.execute(
MailboxExecutor.MailOptions.options().setInterruptible(),
() -> interruptibleExecuted.set(true),
"interruptible mail");
MailboxExecutor.MailOptions.deferrable(),
() -> deferrableMailExecuted.set(true),
"deferrable mail");
assertThat(localExecutor.tryYield()).isFalse();
assertThat(interruptibleExecuted.get()).isFalse();
assertThat(deferrableMailExecuted.get()).isFalse();
assertThat(mailboxExecutor.tryYield()).isFalse();
assertThat(interruptibleExecuted.get()).isFalse();
assertThat(deferrableMailExecuted.get()).isFalse();

assertThat(mailboxProcessor.runMailboxStep()).isTrue();
assertThat(interruptibleExecuted.get()).isTrue();
assertThat(deferrableMailExecuted.get()).isTrue();
}

@Test
Expand Down

0 comments on commit e9b1560

Please sign in to comment.