Skip to content
Permalink
Browse files

[FLINK-13248] [runtime] Adding processing of downstream messages in A…

…syncWaitOperator's wait loops
  • Loading branch information...
Arvid Heise
Arvid Heise committed Aug 27, 2019
1 parent ddcfc5d commit 355736bc75b13d3e9caeea1fb03c510019985bf8
@@ -23,6 +23,7 @@
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator;
import org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorFactory;

import java.util.concurrent.TimeUnit;

@@ -78,13 +79,13 @@
true);

// create transform
AsyncWaitOperator<IN, OUT> operator = new AsyncWaitOperator<>(
AsyncWaitOperatorFactory<IN, OUT> factory = new AsyncWaitOperatorFactory<>(
in.getExecutionEnvironment().clean(func),
timeout,
bufSize,
mode);

return in.transform("async wait operator", outTypeInfo, operator);
return in.transform("async wait operator", outTypeInfo, factory);
}

/**
@@ -46,9 +46,12 @@
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.mailbox.execution.MailboxExecutor;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;

import javax.annotation.Nonnull;

import java.util.Collection;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -113,11 +116,15 @@
/** Thread running the emitter. */
private transient Thread emitterThread;

/** Mailbox executor used to yield while waiting for buffers to empty. */
private final transient MailboxExecutor mailboxExecutor;

public AsyncWaitOperator(
AsyncFunction<IN, OUT> asyncFunction,
@Nonnull AsyncFunction<IN, OUT> asyncFunction,
long timeout,
int capacity,
AsyncDataStream.OutputMode outputMode) {
@Nonnull AsyncDataStream.OutputMode outputMode,
@Nonnull MailboxExecutor mailboxExecutor) {
super(asyncFunction);

// TODO this is a temporary fix for the problems described under FLINK-13063 at the cost of breaking chains for
@@ -130,6 +137,8 @@ public AsyncWaitOperator(
this.outputMode = Preconditions.checkNotNull(outputMode, "outputMode");

this.timeout = timeout;

this.mailboxExecutor = Preconditions.checkNotNull(mailboxExecutor, "mailboxExecutor");
}

@Override
@@ -167,7 +176,12 @@ public void open() throws Exception {
super.open();

// create the emitter
this.emitter = new Emitter<>(checkpointingLock, output, queue, this);
this.emitter = new Emitter<>(
checkpointingLock,
this.mailboxExecutor,
output,
queue,
this);

// start the emitter thread
this.emitterThread = new Thread(emitter, "AsyncIO-Emitter-Thread (" + getOperatorName() + ')');
@@ -397,25 +411,31 @@ private void stopResources(boolean waitForShutdown) throws InterruptedException
* @throws InterruptedException if the current thread has been interrupted
*/
private <T> void addAsyncBufferEntry(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
assert(Thread.holdsLock(checkpointingLock));

pendingStreamElementQueueEntry = streamElementQueueEntry;

while (!queue.tryPut(streamElementQueueEntry)) {
// we wait for the emitter to notify us if the queue has space left again
checkpointingLock.wait();
// remove when processor timers are migrated.
if (Thread.holdsLock(this.checkpointingLock)) {
while (!queue.tryPut(streamElementQueueEntry)) {
if (!mailboxExecutor.tryYield()) {
this.checkpointingLock.wait(1);
}
}
} else {
while (!queue.tryPut(streamElementQueueEntry)) {
mailboxExecutor.yield();
}
}

pendingStreamElementQueueEntry = null;
}

private void waitInFlightInputsFinished() throws InterruptedException {
assert(Thread.holdsLock(checkpointingLock));
assert (Thread.holdsLock(this.checkpointingLock));

while (!queue.isEmpty()) {
// wait for the emitter thread to output the remaining elements
// for that he needs the checkpointing lock and thus we have to free it
checkpointingLock.wait();
if (!mailboxExecutor.tryYield()) {
this.checkpointingLock.wait(1);
}
}
}

@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.api.operators.async;

import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.YieldingOperatorFactory;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.mailbox.execution.MailboxExecutor;
import org.apache.flink.util.Preconditions;

/**
* The factory of {@link AsyncWaitOperator}.
*
* @param <OUT> The output type of the operator
*/
public class AsyncWaitOperatorFactory<IN, OUT> implements OneInputStreamOperatorFactory<IN, OUT>,
YieldingOperatorFactory<OUT> {
private final AsyncFunction<IN, OUT> asyncFunction;
private final long timeout;
private final int capacity;
private final AsyncDataStream.OutputMode outputMode;
private MailboxExecutor mailboxExecutor;
private ChainingStrategy strategy = ChainingStrategy.HEAD;

public AsyncWaitOperatorFactory(AsyncFunction<IN, OUT> asyncFunction,
long timeout,
int capacity,
AsyncDataStream.OutputMode outputMode) {
this.asyncFunction = asyncFunction;
this.timeout = timeout;
this.capacity = capacity;
this.outputMode = outputMode;
}

@Override public void setMailboxExecutor(MailboxExecutor mailboxExecutor) {
Preconditions.checkNotNull(mailboxExecutor, "mailboxExecutor");
this.mailboxExecutor = mailboxExecutor;
}

@Override public StreamOperator createStreamOperator(StreamTask containingTask, StreamConfig config,
Output output) {
Preconditions.checkState(mailboxExecutor != null, "#setMailboxExecutor() not called before");
AsyncWaitOperator asyncWaitOperator = new AsyncWaitOperator(asyncFunction,
timeout,
capacity,
outputMode,
mailboxExecutor);
asyncWaitOperator.setup(containingTask, config, output);
return asyncWaitOperator;
}

@Override public void setChainingStrategy(ChainingStrategy strategy) {
this.strategy = strategy;
}

@Override public ChainingStrategy getChainingStrategy() {
return strategy;
}

@Override public boolean isOperatorSelectiveReading(ClassLoader classLoader) {
return false;
}

@Override public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
return AsyncWaitOperator.class;
}
}
@@ -26,11 +26,14 @@
import org.apache.flink.streaming.api.operators.async.queue.AsyncWatermarkResult;
import org.apache.flink.streaming.api.operators.async.queue.StreamElementQueue;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.mailbox.execution.MailboxExecutor;
import org.apache.flink.util.Preconditions;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;

import java.util.Collection;

/**
@@ -50,6 +53,9 @@
/** Output for the watermark elements. */
private final Output<StreamRecord<OUT>> output;

/** Executor for mailbox. */
private final MailboxExecutor executor;

/** Queue to consume the async results from. */
private final StreamElementQueue streamElementQueue;

@@ -62,11 +68,13 @@

public Emitter(
final Object checkpointLock,
final Output<StreamRecord<OUT>> output,
final StreamElementQueue streamElementQueue,
final OperatorActions operatorActions) {
final @Nonnull MailboxExecutor executor,
final @Nonnull Output<StreamRecord<OUT>> output,
final @Nonnull StreamElementQueue streamElementQueue,
final @Nonnull OperatorActions operatorActions) {

this.checkpointLock = Preconditions.checkNotNull(checkpointLock, "checkpointLock");
this.executor = Preconditions.checkNotNull(executor, "executor");
this.output = Preconditions.checkNotNull(output, "output");
this.streamElementQueue = Preconditions.checkNotNull(streamElementQueue, "streamElementQueue");
this.operatorActions = Preconditions.checkNotNull(operatorActions, "operatorActions");
@@ -80,9 +88,14 @@ public void run() {
try {
while (running) {
LOG.debug("Wait for next completed async stream element result.");
AsyncResult streamElementEntry = streamElementQueue.peekBlockingly();

output(streamElementEntry);
AsyncResult asyncResult = streamElementQueue.peekBlockingly();
executor.submit(() -> {
try {
output(asyncResult);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).get();
}
} catch (InterruptedException e) {
if (running) {
@@ -97,21 +110,22 @@ public void run() {
}
}

/**
* Executed as a mail in the mailbox thread. Output needs to be guarded with checkpoint lock (for the time being).
*
* @param asyncResult the result to output.
*/
private void output(AsyncResult asyncResult) throws InterruptedException {
if (asyncResult.isWatermark()) {
synchronized (checkpointLock) {
AsyncWatermarkResult asyncWatermarkResult = asyncResult.asWatermark();
AsyncWatermarkResult asyncWatermarkResult = asyncResult.asWatermark();

LOG.debug("Output async watermark.");
output.emitWatermark(asyncWatermarkResult.getWatermark());
LOG.debug("Output async watermark.");

synchronized (checkpointLock) {
output.emitWatermark(asyncWatermarkResult.getWatermark());
// remove the peeked element from the async collector buffer so that it is no longer
// checkpointed
streamElementQueue.poll();

// notify the main thread that there is again space left in the async collector
// buffer
checkpointLock.notifyAll();
}
} else {
AsyncCollectionResult<OUT> streamRecordResult = asyncResult.asResultCollection();
@@ -122,9 +136,9 @@ private void output(AsyncResult asyncResult) throws InterruptedException {
timestampedCollector.eraseTimestamp();
}

synchronized (checkpointLock) {
LOG.debug("Output async stream element collection result.");
LOG.debug("Output async stream element collection result.");

synchronized (checkpointLock) {
try {
Collection<OUT> resultCollection = streamRecordResult.get();

@@ -142,10 +156,6 @@ private void output(AsyncResult asyncResult) throws InterruptedException {
// remove the peeked element from the async collector buffer so that it is no longer
// checkpointed
streamElementQueue.poll();

// notify the main thread that there is again space left in the async collector
// buffer
checkpointLock.notifyAll();
}
}
}

0 comments on commit 355736b

Please sign in to comment.
You can’t perform that action at this time.