Skip to content
Permalink
Browse files

[FLINK-13248] [runtime] Using yieldToDownstream in AsyncWaitOperator

  • Loading branch information...
Arvid Heise
Arvid Heise committed Aug 15, 2019
1 parent 999db7d commit 05e27c097851c65bd9a405b4aae376e2ef6c2b50
@@ -91,8 +91,6 @@
/** Timeout for the async collectors. */
private final long timeout;

protected transient Object checkpointingLock;

/** {@link TypeSerializer} for inputs while making snapshots. */
private transient StreamElementSerializer<IN> inStreamElementSerializer;

@@ -102,9 +100,6 @@
/** Queue to store the currently in-flight stream elements into. */
private transient StreamElementQueue queue;

/** Pending stream element which could not yet added to the queue. */
private transient StreamElementQueueEntry<?> pendingStreamElementQueueEntry;

private transient ExecutorService executor;

/** Emitter for the completed stream element queue entries. */
@@ -136,8 +131,6 @@ public AsyncWaitOperator(
public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) {
super.setup(containingTask, config, output);

this.checkpointingLock = getContainingTask().getCheckpointLock();

this.inStreamElementSerializer = new StreamElementSerializer<>(
getOperatorConfig().<IN>getTypeSerializerIn1(getUserCodeClassloader()));

@@ -167,7 +160,7 @@ public void open() throws Exception {
super.open();

// create the emitter
this.emitter = new Emitter<>(checkpointingLock, output, queue, this);
this.emitter = new Emitter<>(getMailboxExecutor(), output, queue, this);

// start the emitter thread
this.emitterThread = new Thread(emitter, "AsyncIO-Emitter-Thread (" + getOperatorName() + ')');
@@ -251,11 +244,6 @@ public void snapshotState(StateSnapshotContext context) throws Exception {
for (StreamElementQueueEntry<?> value : values) {
partitionableState.add(value.getStreamElement());
}

// add the pending stream element queue entry if the stream element queue is currently full
if (pendingStreamElementQueueEntry != null) {
partitionableState.add(pendingStreamElementQueueEntry.getStreamElement());
}
} catch (Exception e) {
partitionableState.clear();

@@ -367,16 +355,6 @@ private void stopResources(boolean waitForShutdown) throws InterruptedException
Thread.currentThread().interrupt();
}

/*
* FLINK-5638: If we have the checkpoint lock we might have to free it for a while so
* that the emitter thread can complete/react to the interrupt signal.
*/
if (Thread.holdsLock(checkpointingLock)) {
while (emitterThread.isAlive()) {
checkpointingLock.wait(100L);
}
}

emitterThread.join();
} else {
executor.shutdownNow();
@@ -387,35 +365,19 @@ private void stopResources(boolean waitForShutdown) throws InterruptedException
* Add the given stream element queue entry to the operator's stream element queue. This
* operation blocks until the element has been added.
*
* <p>For that it tries to put the element into the queue and if not successful then it waits on
* the checkpointing lock. The checkpointing lock is also used by the {@link Emitter} to output
* elements. The emitter is also responsible for notifying this method if the queue has capacity
* left again, by calling notifyAll on the checkpointing lock.
*
* @param streamElementQueueEntry to add to the operator's queue
* @param <T> Type of the stream element queue entry's result
* @throws InterruptedException if the current thread has been interrupted
*/
private <T> void addAsyncBufferEntry(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
assert(Thread.holdsLock(checkpointingLock));

pendingStreamElementQueueEntry = streamElementQueueEntry;

while (!queue.tryPut(streamElementQueueEntry)) {
// we wait for the emitter to notify us if the queue has space left again
checkpointingLock.wait();
yieldToDownstream();
}

pendingStreamElementQueueEntry = null;
}

private void waitInFlightInputsFinished() throws InterruptedException {
assert(Thread.holdsLock(checkpointingLock));

private void waitInFlightInputsFinished() {
while (!queue.isEmpty()) {
// wait for the emitter thread to output the remaining elements
// for that he needs the checkpointing lock and thus we have to free it
checkpointingLock.wait();
yieldToDownstream();
}
}

@@ -26,11 +26,14 @@
import org.apache.flink.streaming.api.operators.async.queue.AsyncWatermarkResult;
import org.apache.flink.streaming.api.operators.async.queue.StreamElementQueue;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.mailbox.execution.MailboxExecutor;
import org.apache.flink.util.Preconditions;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;

import java.util.Collection;

/**
@@ -44,12 +47,12 @@

private static final Logger LOG = LoggerFactory.getLogger(Emitter.class);

/** Lock to hold before outputting. */
private final Object checkpointLock;

/** Output for the watermark elements. */
private final Output<StreamRecord<OUT>> output;

/** Executor for mailbox. */
private final MailboxExecutor executor;

/** Queue to consume the async results from. */
private final StreamElementQueue streamElementQueue;

@@ -61,12 +64,12 @@
private volatile boolean running;

public Emitter(
final Object checkpointLock,
final Output<StreamRecord<OUT>> output,
final StreamElementQueue streamElementQueue,
final OperatorActions operatorActions) {
final @Nonnull MailboxExecutor executor,
final @Nonnull Output<StreamRecord<OUT>> output,
final @Nonnull StreamElementQueue streamElementQueue,
final @Nonnull OperatorActions operatorActions) {

this.checkpointLock = Preconditions.checkNotNull(checkpointLock, "checkpointLock");
this.executor = Preconditions.checkNotNull(executor, "executor");
this.output = Preconditions.checkNotNull(output, "output");
this.streamElementQueue = Preconditions.checkNotNull(streamElementQueue, "streamElementQueue");
this.operatorActions = Preconditions.checkNotNull(operatorActions, "operatorActions");
@@ -80,9 +83,11 @@ public void run() {
try {
while (running) {
LOG.debug("Wait for next completed async stream element result.");
AsyncResult streamElementEntry = streamElementQueue.peekBlockingly();

output(streamElementEntry);
AsyncResult asyncResult = streamElementQueue.peekBlockingly();
executor.submit(() -> output(asyncResult)).get();
// remove the peeked element from the async collector buffer so that it is no longer
// checkpointed
streamElementQueue.poll();
}
} catch (InterruptedException e) {
if (running) {
@@ -97,22 +102,18 @@ public void run() {
}
}

private void output(AsyncResult asyncResult) throws InterruptedException {
/**
* Executed as a mail in the mailbox thread. Output needs to be guarded with checkpoint lock (for the time being).
*
* @param asyncResult the result to output.
*/
private void output(AsyncResult asyncResult) {
if (asyncResult.isWatermark()) {
synchronized (checkpointLock) {
AsyncWatermarkResult asyncWatermarkResult = asyncResult.asWatermark();
AsyncWatermarkResult asyncWatermarkResult = asyncResult.asWatermark();

LOG.debug("Output async watermark.");
output.emitWatermark(asyncWatermarkResult.getWatermark());
LOG.debug("Output async watermark.");

// remove the peeked element from the async collector buffer so that it is no longer
// checkpointed
streamElementQueue.poll();

// notify the main thread that there is again space left in the async collector
// buffer
checkpointLock.notifyAll();
}
output.emitWatermark(asyncWatermarkResult.getWatermark());
} else {
AsyncCollectionResult<OUT> streamRecordResult = asyncResult.asResultCollection();

@@ -122,30 +123,20 @@ private void output(AsyncResult asyncResult) throws InterruptedException {
timestampedCollector.eraseTimestamp();
}

synchronized (checkpointLock) {
LOG.debug("Output async stream element collection result.");
LOG.debug("Output async stream element collection result.");

try {
Collection<OUT> resultCollection = streamRecordResult.get();
try {
Collection<OUT> resultCollection = streamRecordResult.get();

if (resultCollection != null) {
for (OUT result : resultCollection) {
timestampedCollector.collect(result);
}
if (resultCollection != null) {
for (OUT result : resultCollection) {
timestampedCollector.collect(result);
}
} catch (Exception e) {
operatorActions.failOperator(
new Exception("An async function call terminated with an exception. " +
"Failing the AsyncWaitOperator.", e));
}

// remove the peeked element from the async collector buffer so that it is no longer
// checkpointed
streamElementQueue.poll();

// notify the main thread that there is again space left in the async collector
// buffer
checkpointLock.notifyAll();
} catch (Exception e) {
operatorActions.failOperator(
new Exception("An async function call terminated with an exception. " +
"Failing the AsyncWaitOperator.", e));
}
}
}

0 comments on commit 05e27c0

Please sign in to comment.
You can’t perform that action at this time.