Skip to content

Commit

Permalink
[FLINK-13248] [runtime] Using yieldToDownstream in AsyncWaitOperator
Browse files Browse the repository at this point in the history
  • Loading branch information
Arvid Heise committed Aug 19, 2019
1 parent 5231277 commit 681ac33
Show file tree
Hide file tree
Showing 5 changed files with 296 additions and 184 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.mailbox.execution.MailboxExecutor;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;

Expand Down Expand Up @@ -102,9 +103,6 @@ public class AsyncWaitOperator<IN, OUT>
/** Queue to store the currently in-flight stream elements into. */
private transient StreamElementQueue queue;

/** Pending stream element which could not yet added to the queue. */
private transient StreamElementQueueEntry<?> pendingStreamElementQueueEntry;

private transient ExecutorService executor;

/** Emitter for the completed stream element queue entries. */
Expand All @@ -113,6 +111,9 @@ public class AsyncWaitOperator<IN, OUT>
/** Thread running the emitter. */
private transient Thread emitterThread;

/** Mailbox executor used to yield while waiting for buffers to empty. */
private transient MailboxExecutor mailboxExecutor;

public AsyncWaitOperator(
AsyncFunction<IN, OUT> asyncFunction,
long timeout,
Expand All @@ -133,8 +134,12 @@ public AsyncWaitOperator(
}

@Override
public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) {
super.setup(containingTask, config, output);
public void setup(
StreamTask<?, ?> containingTask,
StreamConfig config,
Output<StreamRecord<OUT>> output,
MailboxExecutor mailboxExecutor) {
super.setup(containingTask, config, output, mailboxExecutor);

this.checkpointingLock = getContainingTask().getCheckpointLock();

Expand All @@ -160,14 +165,20 @@ public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<S
default:
throw new IllegalStateException("Unknown async mode: " + outputMode + '.');
}

this.mailboxExecutor = mailboxExecutor;
}

@Override
public void open() throws Exception {
super.open();

// create the emitter
this.emitter = new Emitter<>(checkpointingLock, output, queue, this);
this.emitter = new Emitter<>(checkpointingLock,
this.mailboxExecutor,
output,
queue,
this);

// start the emitter thread
this.emitterThread = new Thread(emitter, "AsyncIO-Emitter-Thread (" + getOperatorName() + ')');
Expand Down Expand Up @@ -251,11 +262,6 @@ public void snapshotState(StateSnapshotContext context) throws Exception {
for (StreamElementQueueEntry<?> value : values) {
partitionableState.add(value.getStreamElement());
}

// add the pending stream element queue entry if the stream element queue is currently full
if (pendingStreamElementQueueEntry != null) {
partitionableState.add(pendingStreamElementQueueEntry.getStreamElement());
}
} catch (Exception e) {
partitionableState.clear();

Expand Down Expand Up @@ -387,35 +393,22 @@ private void stopResources(boolean waitForShutdown) throws InterruptedException
* Add the given stream element queue entry to the operator's stream element queue. This
* operation blocks until the element has been added.
*
* <p>For that it tries to put the element into the queue and if not successful then it waits on
* the checkpointing lock. The checkpointing lock is also used by the {@link Emitter} to output
* elements. The emitter is also responsible for notifying this method if the queue has capacity
* left again, by calling notifyAll on the checkpointing lock.
*
* @param streamElementQueueEntry to add to the operator's queue
* @param <T> Type of the stream element queue entry's result
* @throws InterruptedException if the current thread has been interrupted
*/
private <T> void addAsyncBufferEntry(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
assert(Thread.holdsLock(checkpointingLock));

pendingStreamElementQueueEntry = streamElementQueueEntry;

while (!queue.tryPut(streamElementQueueEntry)) {
// we wait for the emitter to notify us if the queue has space left again
checkpointingLock.wait();
mailboxExecutor.yield();
}

pendingStreamElementQueueEntry = null;
}

private void waitInFlightInputsFinished() throws InterruptedException {
assert(Thread.holdsLock(checkpointingLock));

while (!queue.isEmpty()) {
// wait for the emitter thread to output the remaining elements
// for that he needs the checkpointing lock and thus we have to free it
checkpointingLock.wait();
// yield() may never unblock, so we have a busy-wait with sleep.
if (!mailboxExecutor.tryYield()) {
Thread.sleep(1);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,14 @@
import org.apache.flink.streaming.api.operators.async.queue.AsyncWatermarkResult;
import org.apache.flink.streaming.api.operators.async.queue.StreamElementQueue;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.mailbox.execution.MailboxExecutor;
import org.apache.flink.util.Preconditions;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;

import java.util.Collection;

/**
Expand All @@ -50,6 +53,9 @@ public class Emitter<OUT> implements Runnable {
/** Output for the watermark elements. */
private final Output<StreamRecord<OUT>> output;

/** Executor for mailbox. */
private final MailboxExecutor executor;

/** Queue to consume the async results from. */
private final StreamElementQueue streamElementQueue;

Expand All @@ -62,11 +68,13 @@ public class Emitter<OUT> implements Runnable {

public Emitter(
final Object checkpointLock,
final Output<StreamRecord<OUT>> output,
final StreamElementQueue streamElementQueue,
final OperatorActions operatorActions) {
final @Nonnull MailboxExecutor executor,
final @Nonnull Output<StreamRecord<OUT>> output,
final @Nonnull StreamElementQueue streamElementQueue,
final @Nonnull OperatorActions operatorActions) {

this.checkpointLock = Preconditions.checkNotNull(checkpointLock, "checkpointLock");
this.executor = Preconditions.checkNotNull(executor, "executor");
this.output = Preconditions.checkNotNull(output, "output");
this.streamElementQueue = Preconditions.checkNotNull(streamElementQueue, "streamElementQueue");
this.operatorActions = Preconditions.checkNotNull(operatorActions, "operatorActions");
Expand All @@ -80,9 +88,11 @@ public void run() {
try {
while (running) {
LOG.debug("Wait for next completed async stream element result.");
AsyncResult streamElementEntry = streamElementQueue.peekBlockingly();

output(streamElementEntry);
AsyncResult asyncResult = streamElementQueue.peekBlockingly();
executor.submit(() -> output(asyncResult)).get();
// remove the peeked element from the async collector buffer so that it is no longer
// checkpointed
streamElementQueue.poll();
}
} catch (InterruptedException e) {
if (running) {
Expand All @@ -97,21 +107,19 @@ public void run() {
}
}

private void output(AsyncResult asyncResult) throws InterruptedException {
/**
* Executed as a mail in the mailbox thread. Output needs to be guarded with checkpoint lock (for the time being).
*
* @param asyncResult the result to output.
*/
private void output(AsyncResult asyncResult) {
if (asyncResult.isWatermark()) {
synchronized (checkpointLock) {
AsyncWatermarkResult asyncWatermarkResult = asyncResult.asWatermark();

LOG.debug("Output async watermark.");
output.emitWatermark(asyncWatermarkResult.getWatermark());
AsyncWatermarkResult asyncWatermarkResult = asyncResult.asWatermark();

// remove the peeked element from the async collector buffer so that it is no longer
// checkpointed
streamElementQueue.poll();
LOG.debug("Output async watermark.");

// notify the main thread that there is again space left in the async collector
// buffer
checkpointLock.notifyAll();
synchronized (checkpointLock) {
output.emitWatermark(asyncWatermarkResult.getWatermark());
}
} else {
AsyncCollectionResult<OUT> streamRecordResult = asyncResult.asResultCollection();
Expand All @@ -122,30 +130,22 @@ private void output(AsyncResult asyncResult) throws InterruptedException {
timestampedCollector.eraseTimestamp();
}

synchronized (checkpointLock) {
LOG.debug("Output async stream element collection result.");
LOG.debug("Output async stream element collection result.");

try {
Collection<OUT> resultCollection = streamRecordResult.get();
try {
Collection<OUT> resultCollection = streamRecordResult.get();

if (resultCollection != null) {
if (resultCollection != null) {
synchronized (checkpointLock) {
for (OUT result : resultCollection) {
timestampedCollector.collect(result);
}
}
} catch (Exception e) {
operatorActions.failOperator(
new Exception("An async function call terminated with an exception. " +
"Failing the AsyncWaitOperator.", e));
}

// remove the peeked element from the async collector buffer so that it is no longer
// checkpointed
streamElementQueue.poll();

// notify the main thread that there is again space left in the async collector
// buffer
checkpointLock.notifyAll();
} catch (Exception e) {
operatorActions.failOperator(
new Exception("An async function call terminated with an exception. " +
"Failing the AsyncWaitOperator.", e));
}
}
}
Expand Down
Loading

0 comments on commit 681ac33

Please sign in to comment.