Skip to content
Permalink
Browse files

[FLINK-13248] [runtime] Using mailbox executor for yielding in AsyncW…

…aitOperator
  • Loading branch information...
Arvid Heise
Arvid Heise committed Aug 19, 2019
1 parent 6d17d10 commit 0507aa67d2f7183c3a7e4556fbf7732414647ac7
@@ -46,6 +46,7 @@
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.mailbox.execution.MailboxExecutor;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;

@@ -113,6 +114,9 @@
/** Thread running the emitter. */
private transient Thread emitterThread;

/** Mailbox executor used to yield while waiting for buffers to empty. */
private transient MailboxExecutor mailboxExecutor;

public AsyncWaitOperator(
AsyncFunction<IN, OUT> asyncFunction,
long timeout,
@@ -133,8 +137,12 @@ public AsyncWaitOperator(
}

@Override
public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) {
super.setup(containingTask, config, output);
public void setup(
StreamTask<?, ?> containingTask,
StreamConfig config,
Output<StreamRecord<OUT>> output,
MailboxExecutor mailboxExecutor) {
super.setup(containingTask, config, output, mailboxExecutor);

this.checkpointingLock = getContainingTask().getCheckpointLock();

@@ -160,14 +168,20 @@ public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<S
default:
throw new IllegalStateException("Unknown async mode: " + outputMode + '.');
}

this.mailboxExecutor = mailboxExecutor;
}

@Override
public void open() throws Exception {
super.open();

// create the emitter
this.emitter = new Emitter<>(checkpointingLock, output, queue, this);
this.emitter = new Emitter<>(checkpointingLock,
this.mailboxExecutor,
output,
queue,
this);

// start the emitter thread
this.emitterThread = new Thread(emitter, "AsyncIO-Emitter-Thread (" + getOperatorName() + ')');
@@ -387,35 +401,36 @@ private void stopResources(boolean waitForShutdown) throws InterruptedException
* Add the given stream element queue entry to the operator's stream element queue. This
* operation blocks until the element has been added.
*
* <p>For that it tries to put the element into the queue and if not successful then it waits on
* the checkpointing lock. The checkpointing lock is also used by the {@link Emitter} to output
* elements. The emitter is also responsible for notifying this method if the queue has capacity
* left again, by calling notifyAll on the checkpointing lock.
*
* @param streamElementQueueEntry to add to the operator's queue
* @param <T> Type of the stream element queue entry's result
* @throws InterruptedException if the current thread has been interrupted
*/
private <T> void addAsyncBufferEntry(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
assert(Thread.holdsLock(checkpointingLock));

pendingStreamElementQueueEntry = streamElementQueueEntry;

while (!queue.tryPut(streamElementQueueEntry)) {
// we wait for the emitter to notify us if the queue has space left again
checkpointingLock.wait();
// remove when processor timers are migrated.
if (Thread.holdsLock(this.checkpointingLock)) {
while (!queue.tryPut(streamElementQueueEntry)) {
if (!mailboxExecutor.tryYield()) {
this.checkpointingLock.wait(1);
}
}
} else {
while (!queue.tryPut(streamElementQueueEntry)) {
mailboxExecutor.yield();
}
}

pendingStreamElementQueueEntry = null;
}

private void waitInFlightInputsFinished() throws InterruptedException {
assert(Thread.holdsLock(checkpointingLock));
assert (Thread.holdsLock(this.checkpointingLock));

while (!queue.isEmpty()) {
// wait for the emitter thread to output the remaining elements
// for that he needs the checkpointing lock and thus we have to free it
checkpointingLock.wait();
if (!mailboxExecutor.tryYield()) {
this.checkpointingLock.wait(1);
}
}
}

@@ -26,11 +26,14 @@
import org.apache.flink.streaming.api.operators.async.queue.AsyncWatermarkResult;
import org.apache.flink.streaming.api.operators.async.queue.StreamElementQueue;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.mailbox.execution.MailboxExecutor;
import org.apache.flink.util.Preconditions;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;

import java.util.Collection;

/**
@@ -50,6 +53,9 @@
/** Output for the watermark elements. */
private final Output<StreamRecord<OUT>> output;

/** Executor for mailbox. */
private final MailboxExecutor executor;

/** Queue to consume the async results from. */
private final StreamElementQueue streamElementQueue;

@@ -62,11 +68,13 @@

public Emitter(
final Object checkpointLock,
final Output<StreamRecord<OUT>> output,
final StreamElementQueue streamElementQueue,
final OperatorActions operatorActions) {
final @Nonnull MailboxExecutor executor,
final @Nonnull Output<StreamRecord<OUT>> output,
final @Nonnull StreamElementQueue streamElementQueue,
final @Nonnull OperatorActions operatorActions) {

this.checkpointLock = Preconditions.checkNotNull(checkpointLock, "checkpointLock");
this.executor = Preconditions.checkNotNull(executor, "executor");
this.output = Preconditions.checkNotNull(output, "output");
this.streamElementQueue = Preconditions.checkNotNull(streamElementQueue, "streamElementQueue");
this.operatorActions = Preconditions.checkNotNull(operatorActions, "operatorActions");
@@ -80,9 +88,14 @@ public void run() {
try {
while (running) {
LOG.debug("Wait for next completed async stream element result.");
AsyncResult streamElementEntry = streamElementQueue.peekBlockingly();

output(streamElementEntry);
AsyncResult asyncResult = streamElementQueue.peekBlockingly();
executor.submit(() -> {
try {
output(asyncResult);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).get();
}
} catch (InterruptedException e) {
if (running) {
@@ -97,21 +110,22 @@ public void run() {
}
}

/**
* Executed as a mail in the mailbox thread. Output needs to be guarded with checkpoint lock (for the time being).
*
* @param asyncResult the result to output.
*/
private void output(AsyncResult asyncResult) throws InterruptedException {
if (asyncResult.isWatermark()) {
synchronized (checkpointLock) {
AsyncWatermarkResult asyncWatermarkResult = asyncResult.asWatermark();
AsyncWatermarkResult asyncWatermarkResult = asyncResult.asWatermark();

LOG.debug("Output async watermark.");
output.emitWatermark(asyncWatermarkResult.getWatermark());
LOG.debug("Output async watermark.");

synchronized (checkpointLock) {
output.emitWatermark(asyncWatermarkResult.getWatermark());
// remove the peeked element from the async collector buffer so that it is no longer
// checkpointed
streamElementQueue.poll();

// notify the main thread that there is again space left in the async collector
// buffer
checkpointLock.notifyAll();
}
} else {
AsyncCollectionResult<OUT> streamRecordResult = asyncResult.asResultCollection();
@@ -122,9 +136,9 @@ private void output(AsyncResult asyncResult) throws InterruptedException {
timestampedCollector.eraseTimestamp();
}

synchronized (checkpointLock) {
LOG.debug("Output async stream element collection result.");
LOG.debug("Output async stream element collection result.");

synchronized (checkpointLock) {
try {
Collection<OUT> resultCollection = streamRecordResult.get();

@@ -142,10 +156,6 @@ private void output(AsyncResult asyncResult) throws InterruptedException {
// remove the peeked element from the async collector buffer so that it is no longer
// checkpointed
streamElementQueue.poll();

// notify the main thread that there is again space left in the async collector
// buffer
checkpointLock.notifyAll();
}
}
}

0 comments on commit 0507aa6

Please sign in to comment.
You can’t perform that action at this time.