Skip to content

Commit

Permalink
[FLINK-5851] [streaming API] Rename AsyncCollector into ResultFuture
Browse files Browse the repository at this point in the history
Complete renaming AsyncCollector -> ResultFuture

This closes #4243.
  • Loading branch information
zhangminglei authored and tillrohrmann committed Aug 22, 2017
1 parent 9077e51 commit 40cec17
Show file tree
Hide file tree
Showing 20 changed files with 118 additions and 121 deletions.
20 changes: 10 additions & 10 deletions docs/dev/stream/operators/asyncio.md
Expand Up @@ -74,7 +74,7 @@ Assuming one has an asynchronous client for the target database, three parts are
with asynchronous I/O against the database: with asynchronous I/O against the database:


- An implementation of `AsyncFunction` that dispatches the requests - An implementation of `AsyncFunction` that dispatches the requests
- A *callback* that takes the result of the operation and hands it to the `AsyncCollector` - A *callback* that takes the result of the operation and hands it to the `ResultFuture`
- Applying the async I/O operation on a DataStream as a transformation - Applying the async I/O operation on a DataStream as a transformation


The following code example illustrates the basic pattern: The following code example illustrates the basic pattern:
Expand Down Expand Up @@ -104,16 +104,16 @@ class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, Stri
} }


@Override @Override
public void asyncInvoke(final String str, final AsyncCollector<Tuple2<String, String>> asyncCollector) throws Exception { public void asyncInvoke(final String str, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception {


// issue the asynchronous request, receive a future for result // issue the asynchronous request, receive a future for result
Future<String> resultFuture = client.query(str); Future<String> resultFuture = client.query(str);


// set the callback to be executed once the request by the client is complete // set the callback to be executed once the request by the client is complete
// the callback simply forwards the result to the collector // the callback simply forwards the result to the result future
resultFuture.thenAccept( (String result) -> { resultFuture.thenAccept( (String result) -> {


asyncCollector.collect(Collections.singleton(new Tuple2<>(str, result))); resultFuture.complete(Collections.singleton(new Tuple2<>(str, result)));
}); });
} }
Expand Down Expand Up @@ -142,15 +142,15 @@ class AsyncDatabaseRequest extends AsyncFunction[String, (String, String)] {
implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor()) implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor())




override def asyncInvoke(str: String, asyncCollector: AsyncCollector[(String, String)]): Unit = { override def asyncInvoke(str: String, resultFutre: ResultFuture[(String, String)]): Unit = {


// issue the asynchronous request, receive a future for the result // issue the asynchronous request, receive a future for the result
val resultFuture: Future[String] = client.query(str) val resultFuture: Future[String] = client.query(str)


// set the callback to be executed once the request by the client is complete // set the callback to be executed once the request by the client is complete
// the callback simply forwards the result to the collector // the callback simply forwards the result to the result future
resultFuture.onSuccess { resultFuture.onSuccess {
case result: String => asyncCollector.collect(Iterable((str, result))); case result: String => resultFuture.complete(Iterable((str, result)));
} }
} }
} }
Expand All @@ -166,8 +166,8 @@ val resultStream: DataStream[(String, String)] =
</div> </div>
</div> </div>


**Important note**: The `AsyncCollector` is completed with the first call of `AsyncCollector.collect`. **Important note**: The `ResultFuture` is completed with the first call of `ResultFuture.complete`.
All subsequent `collect` calls will be ignored. All subsequent `complete` calls will be ignored.


The following two parameters control the asynchronous operations: The following two parameters control the asynchronous operations:


Expand Down Expand Up @@ -229,7 +229,7 @@ asynchronous requests in checkpoints and restores/re-triggers the requests when


For implementations with *Futures* that have an *Executor* (or *ExecutionContext* in Scala) for callbacks, we suggets to use a `DirectExecutor`, because the For implementations with *Futures* that have an *Executor* (or *ExecutionContext* in Scala) for callbacks, we suggets to use a `DirectExecutor`, because the
callback typically does minimal work, and a `DirectExecutor` avoids an additional thread-to-thread handover overhead. The callback typically only hands callback typically does minimal work, and a `DirectExecutor` avoids an additional thread-to-thread handover overhead. The callback typically only hands
the result to the `AsyncCollector`, which adds it to the output buffer. From there, the heavy logic that includes record emission and interaction the result to the `ResultFuture`, which adds it to the output buffer. From there, the heavy logic that includes record emission and interaction
with the checkpoint bookkeepting happens in a dedicated thread-pool anyways. with the checkpoint bookkeepting happens in a dedicated thread-pool anyways.


A `DirectExecutor` can be obtained via `org.apache.flink.runtime.concurrent.Executors.directExecutor()` or A `DirectExecutor` can be obtained via `org.apache.flink.runtime.concurrent.Executors.directExecutor()` or
Expand Down
Expand Up @@ -29,8 +29,8 @@
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.async.AsyncFunction; import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector; import org.apache.flink.util.Collector;


Expand Down Expand Up @@ -178,7 +178,7 @@ public void close() throws Exception {
} }


@Override @Override
public void asyncInvoke(final Integer input, final AsyncCollector<String> collector) throws Exception { public void asyncInvoke(final Integer input, final ResultFuture<String> resultFuture) throws Exception {
this.executorService.submit(new Runnable() { this.executorService.submit(new Runnable() {
@Override @Override
public void run() { public void run() {
Expand All @@ -188,13 +188,13 @@ public void run() {
Thread.sleep(sleep); Thread.sleep(sleep);


if (random.nextFloat() < failRatio) { if (random.nextFloat() < failRatio) {
collector.collect(new Exception("wahahahaha...")); resultFuture.completeExceptionally(new Exception("wahahahaha..."));
} else { } else {
collector.collect( resultFuture.complete(
Collections.singletonList("key-" + (input % 10))); Collections.singletonList("key-" + (input % 10)));
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
collector.collect(new ArrayList<String>(0)); resultFuture.complete(new ArrayList<String>(0));
} }
} }
}); });
Expand Down
Expand Up @@ -24,7 +24,7 @@ import java.util.concurrent.TimeUnit
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.async.AsyncCollector import org.apache.flink.streaming.api.scala.async.ResultFuture


import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.{ExecutionContext, Future}


Expand All @@ -38,9 +38,9 @@ object AsyncIOExample {
val input = env.addSource(new SimpleSource()) val input = env.addSource(new SimpleSource())


val asyncMapped = AsyncDataStream.orderedWait(input, timeout, TimeUnit.MILLISECONDS, 10) { val asyncMapped = AsyncDataStream.orderedWait(input, timeout, TimeUnit.MILLISECONDS, 10) {
(input, collector: AsyncCollector[Int]) => (input, collector: ResultFuture[Int]) =>
Future { Future {
collector.collect(Seq(input)) collector.complete(Seq(input))
} (ExecutionContext.global) } (ExecutionContext.global)
} }


Expand Down
Expand Up @@ -20,29 +20,28 @@


import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.functions.Function;
import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;


import java.io.Serializable; import java.io.Serializable;


/** /**
* A function to trigger Async I/O operation. * A function to trigger Async I/O operation.
* *
* <p>For each #asyncInvoke, an async io operation can be triggered, and once it has been done, * <p>For each #asyncInvoke, an async io operation can be triggered, and once it has been done,
* the result can be collected by calling {@link AsyncCollector#collect}. For each async * the result can be collected by calling {@link ResultFuture#complete}. For each async
* operation, its context is stored in the operator immediately after invoking * operation, its context is stored in the operator immediately after invoking
* #asyncInvoke, avoiding blocking for each stream input as long as the internal buffer is not full. * #asyncInvoke, avoiding blocking for each stream input as long as the internal buffer is not full.
* *
* <p>{@link AsyncCollector} can be passed into callbacks or futures to collect the result data. * <p>{@link ResultFuture} can be passed into callbacks or futures to collect the result data.
* An error can also be propagate to the async IO operator by * An error can also be propagate to the async IO operator by
* {@link AsyncCollector#collect(Throwable)}. * {@link ResultFuture#completeExceptionally(Throwable)}.
* *
* <p>Callback example usage: * <p>Callback example usage:
* *
* <pre>{@code * <pre>{@code
* public class HBaseAsyncFunc implements AsyncFunction<String, String> { * public class HBaseAsyncFunc implements AsyncFunction<String, String> {
* *
* public void asyncInvoke(String row, AsyncCollector<String> collector) throws Exception { * public void asyncInvoke(String row, ResultFuture<String> result) throws Exception {
* HBaseCallback cb = new HBaseCallback(collector); * HBaseCallback cb = new HBaseCallback(result);
* Get get = new Get(Bytes.toBytes(row)); * Get get = new Get(Bytes.toBytes(row));
* hbase.asyncGet(get, cb); * hbase.asyncGet(get, cb);
* } * }
Expand All @@ -54,16 +53,16 @@
* <pre>{@code * <pre>{@code
* public class HBaseAsyncFunc implements AsyncFunction<String, String> { * public class HBaseAsyncFunc implements AsyncFunction<String, String> {
* *
* public void asyncInvoke(String row, final AsyncCollector<String> collector) throws Exception { * public void asyncInvoke(String row, final ResultFuture<String> result) throws Exception {
* Get get = new Get(Bytes.toBytes(row)); * Get get = new Get(Bytes.toBytes(row));
* ListenableFuture<Result> future = hbase.asyncGet(get); * ListenableFuture<Result> future = hbase.asyncGet(get);
* Futures.addCallback(future, new FutureCallback<Result>() { * Futures.addCallback(future, new FutureCallback<Result>() {
* public void onSuccess(Result result) { * public void onSuccess(Result result) {
* List<String> ret = process(result); * List<String> ret = process(result);
* collector.collect(ret); * result.complete(ret);
* } * }
* public void onFailure(Throwable thrown) { * public void onFailure(Throwable thrown) {
* collector.collect(thrown); * result.completeExceptionally(thrown);
* } * }
* }); * });
* } * }
Expand All @@ -80,9 +79,9 @@ public interface AsyncFunction<IN, OUT> extends Function, Serializable {
* Trigger async operation for each stream input. * Trigger async operation for each stream input.
* *
* @param input element coming from an upstream task * @param input element coming from an upstream task
* @param collector to collect the result data * @param resultFuture to be completed with the result data
* @exception Exception in case of a user code error. An exception will make the task fail and * @exception Exception in case of a user code error. An exception will make the task fail and
* trigger fail-over process. * trigger fail-over process.
*/ */
void asyncInvoke(IN input, AsyncCollector<OUT> collector) throws Exception; void asyncInvoke(IN input, ResultFuture<OUT> resultFuture) throws Exception;
} }
Expand Up @@ -16,21 +16,21 @@
* limitations under the License. * limitations under the License.
*/ */


package org.apache.flink.streaming.api.functions.async.collector; package org.apache.flink.streaming.api.functions.async;


import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.PublicEvolving;


import java.util.Collection; import java.util.Collection;


/** /**
* {@link AsyncCollector} collects data / error in user codes while processing async i/o. * {@link ResultFuture} collects data / error in user codes while processing async i/o.
* *
* @param <OUT> Output type * @param <OUT> Output type
*/ */
@PublicEvolving @PublicEvolving
public interface AsyncCollector<OUT> { public interface ResultFuture<OUT> {
/** /**
* Set result. * Completes the result future with a collection of result objects.
* *
* <p>Note that it should be called for exactly one time in the user code. * <p>Note that it should be called for exactly one time in the user code.
* Calling this function for multiple times will cause data lose. * Calling this function for multiple times will cause data lose.
Expand All @@ -39,12 +39,12 @@ public interface AsyncCollector<OUT> {
* *
* @param result A list of results. * @param result A list of results.
*/ */
void collect(Collection<OUT> result); void complete(Collection<OUT> result);


/** /**
* Set error. * Completes the result future exceptionally with an exception.
* *
* @param error A Throwable object. * @param error A Throwable object.
*/ */
void collect(Throwable error); void completeExceptionally(Throwable error);
} }
Expand Up @@ -43,7 +43,6 @@
import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
import org.apache.flink.types.Value; import org.apache.flink.types.Value;
import org.apache.flink.util.Preconditions; import org.apache.flink.util.Preconditions;


Expand Down Expand Up @@ -85,7 +84,7 @@ public void setRuntimeContext(RuntimeContext runtimeContext) {
} }


@Override @Override
public abstract void asyncInvoke(IN input, AsyncCollector<OUT> collector) throws Exception; public abstract void asyncInvoke(IN input, ResultFuture<OUT> resultFuture) throws Exception;


// ----------------------------------------------------------------------------------------- // -----------------------------------------------------------------------------------------
// Wrapper classes // Wrapper classes
Expand Down
Expand Up @@ -27,7 +27,7 @@
import org.apache.flink.streaming.api.datastream.AsyncDataStream; import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.AsyncDataStream.OutputMode; import org.apache.flink.streaming.api.datastream.AsyncDataStream.OutputMode;
import org.apache.flink.streaming.api.functions.async.AsyncFunction; import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector; import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.ChainingStrategy;
Expand Down Expand Up @@ -57,7 +57,7 @@


/** /**
* The {@link AsyncWaitOperator} allows to asynchronously process incoming stream records. For that * The {@link AsyncWaitOperator} allows to asynchronously process incoming stream records. For that
* the operator creates an {@link AsyncCollector} which is passed to an {@link AsyncFunction}. * the operator creates an {@link ResultFuture} which is passed to an {@link AsyncFunction}.
* Within the async function, the user can complete the async collector arbitrarily. Once the async * Within the async function, the user can complete the async collector arbitrarily. Once the async
* collector has been completed, the result is emitted by the operator's emitter to downstream * collector has been completed, the result is emitted by the operator's emitter to downstream
* operators. * operators.
Expand Down Expand Up @@ -209,7 +209,7 @@ public void processElement(StreamRecord<IN> element) throws Exception {
new ProcessingTimeCallback() { new ProcessingTimeCallback() {
@Override @Override
public void onProcessingTime(long timestamp) throws Exception { public void onProcessingTime(long timestamp) throws Exception {
streamRecordBufferEntry.collect( streamRecordBufferEntry.completeExceptionally(
new TimeoutException("Async function call has timed out.")); new TimeoutException("Async function call has timed out."));
} }
}); });
Expand Down
Expand Up @@ -68,7 +68,7 @@ public Emitter(


this.checkpointLock = Preconditions.checkNotNull(checkpointLock, "checkpointLock"); this.checkpointLock = Preconditions.checkNotNull(checkpointLock, "checkpointLock");
this.output = Preconditions.checkNotNull(output, "output"); this.output = Preconditions.checkNotNull(output, "output");
this.streamElementQueue = Preconditions.checkNotNull(streamElementQueue, "asyncCollectorBuffer"); this.streamElementQueue = Preconditions.checkNotNull(streamElementQueue, "streamElementQueue");
this.operatorActions = Preconditions.checkNotNull(operatorActions, "operatorActions"); this.operatorActions = Preconditions.checkNotNull(operatorActions, "operatorActions");


this.timestampedCollector = new TimestampedCollector<>(this.output); this.timestampedCollector = new TimestampedCollector<>(this.output);
Expand Down
Expand Up @@ -20,22 +20,22 @@


import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.api.functions.async.AsyncFunction; import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector; import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;


import java.util.Collection; import java.util.Collection;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;


/** /**
* {@link StreamElementQueueEntry} implementation for {@link StreamRecord}. This class also acts * {@link StreamElementQueueEntry} implementation for {@link StreamRecord}. This class also acts
* as the {@link AsyncCollector} implementation which is given to the {@link AsyncFunction}. The * as the {@link ResultFuture} implementation which is given to the {@link AsyncFunction}. The
* async function completes this class with a collection of results. * async function completes this class with a collection of results.
* *
* @param <OUT> Type of the asynchronous collection result * @param <OUT> Type of the asynchronous collection result
*/ */
@Internal @Internal
public class StreamRecordQueueEntry<OUT> extends StreamElementQueueEntry<Collection<OUT>> public class StreamRecordQueueEntry<OUT> extends StreamElementQueueEntry<Collection<OUT>>
implements AsyncCollectionResult<OUT>, AsyncCollector<OUT> { implements AsyncCollectionResult<OUT>, ResultFuture<OUT> {


/** Timestamp information. */ /** Timestamp information. */
private final boolean hasTimestamp; private final boolean hasTimestamp;
Expand Down Expand Up @@ -74,12 +74,12 @@ protected CompletableFuture<Collection<OUT>> getFuture() {
} }


@Override @Override
public void collect(Collection<OUT> result) { public void complete(Collection<OUT> result) {
resultFuture.complete(result); resultFuture.complete(result);
} }


@Override @Override
public void collect(Throwable error) { public void completeExceptionally(Throwable error) {
resultFuture.completeExceptionally(error); resultFuture.completeExceptionally(error);
} }
} }
Expand Up @@ -31,7 +31,6 @@
import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;


import org.junit.Test; import org.junit.Test;


Expand All @@ -55,7 +54,7 @@ public void testIterationRuntimeContext() throws Exception {
private static final long serialVersionUID = -2023923961609455894L; private static final long serialVersionUID = -2023923961609455894L;


@Override @Override
public void asyncInvoke(Integer input, AsyncCollector<Integer> collector) throws Exception { public void asyncInvoke(Integer input, ResultFuture<Integer> resultFuture) throws Exception {
// no op // no op
} }
}; };
Expand Down Expand Up @@ -94,7 +93,7 @@ public void testRuntimeContext() throws Exception {
private static final long serialVersionUID = 1707630162838967972L; private static final long serialVersionUID = 1707630162838967972L;


@Override @Override
public void asyncInvoke(Integer input, AsyncCollector<Integer> collector) throws Exception { public void asyncInvoke(Integer input, ResultFuture<Integer> resultFuture) throws Exception {
// no op // no op
} }
}; };
Expand Down

0 comments on commit 40cec17

Please sign in to comment.