From ccf1e55e09e94c3f7749b937594425a435f81184 Mon Sep 17 00:00:00 2001 From: blueszheng Date: Thu, 10 May 2018 14:41:34 +0800 Subject: [PATCH 1/4] [FLINK-7789] Add handler for Async IO operator timeouts --- docs/dev/stream/operators/asyncio.md | 6 ++ .../api/functions/async/AsyncFunction.java | 14 +++++ .../operators/async/AsyncWaitOperator.java | 4 +- .../async/AsyncWaitOperatorTest.java | 60 +++++++++++++++++++ .../streaming/api/scala/AsyncDataStream.scala | 6 ++ .../api/scala/async/AsyncFunction.scala | 14 +++++ 6 files changed, 101 insertions(+), 3 deletions(-) diff --git a/docs/dev/stream/operators/asyncio.md b/docs/dev/stream/operators/asyncio.md index 64570740531d6..17e8442aab425 100644 --- a/docs/dev/stream/operators/asyncio.md +++ b/docs/dev/stream/operators/asyncio.md @@ -190,6 +190,12 @@ The following two parameters control the asynchronous operations: is exhausted. +### Timeout Handling + +When a async I/O request times out, an exception is thrown and job is restarted. +If you want to handle timeouts, please override the `AsyncFunction#timeout` method. + + ### Order of Results The concurrent requests issued by the `AsyncFunction` frequently complete in some undefined order, based on which request finished first. diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/AsyncFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/AsyncFunction.java index 2ac218dc4e3ed..99bba16d98444 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/AsyncFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/AsyncFunction.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.functions.Function; import java.io.Serializable; +import java.util.concurrent.TimeoutException; /** * A function to trigger Async I/O operation. @@ -84,4 +85,17 @@ public interface AsyncFunction extends Function, Serializable { * trigger fail-over process. */ void asyncInvoke(IN input, ResultFuture resultFuture) throws Exception; + + /** + * {@link AsyncFunction#asyncInvoke} timeout occurred. + * By default, the result future is exceptionally completed with timeout exception. + * + * @param input element coming from an upstream task + * @param resultFuture to be completed with the result data + */ + default void timeout(IN input, ResultFuture resultFuture) throws Exception { + resultFuture.completeExceptionally( + new TimeoutException("Async function call has timed out.")); + } + } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java index a7b94386eb574..2555c3b46ee2b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java @@ -53,7 +53,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; /** * The {@link AsyncWaitOperator} allows to asynchronously process incoming stream records. For that @@ -209,8 +208,7 @@ public void processElement(StreamRecord element) throws Exception { new ProcessingTimeCallback() { @Override public void onProcessingTime(long timestamp) throws Exception { - streamRecordBufferEntry.completeExceptionally( - new TimeoutException("Async function call has timed out.")); + userFunction.timeout(element.getValue(), streamRecordBufferEntry); } }); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java index 35e2fbde0aa18..9d3706098a60c 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java @@ -212,6 +212,20 @@ public static void countDown() { } } + private static class TimeoutAwareLazyAsyncFunction extends LazyAsyncFunction { + private static final long serialVersionUID = 1428714561365346128L; + + @Override + public void timeout(Integer input, ResultFuture resultFuture) throws Exception { + if (input != null && input % 2 == 0) { + resultFuture.complete(Collections.singletonList(input * 3)); + } else { + // ignore odd input number when it timeouts + resultFuture.complete(Collections.emptyList()); + } + } + } + /** * A {@link Comparator} to compare {@link StreamRecord} while sorting them. */ @@ -648,6 +662,52 @@ public void testAsyncTimeout() throws Exception { ExceptionUtils.findThrowable(mockEnvironment.getActualExternalFailureCause().get(), TimeoutException.class); } + @Test + public void testAsyncTimeoutAware() throws Exception { + final long timeout = 10L; + + final AsyncWaitOperator operator = new AsyncWaitOperator<>( + new TimeoutAwareLazyAsyncFunction(), + timeout, + 4, + AsyncDataStream.OutputMode.ORDERED); + + final OneInputStreamOperatorTestHarness testHarness = + new OneInputStreamOperatorTestHarness<>(operator, IntSerializer.INSTANCE); + + final long initialTime = 0L; + final ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); + + testHarness.open(); + testHarness.setProcessingTime(initialTime); + + synchronized (testHarness.getCheckpointLock()) { + testHarness.processElement(new StreamRecord<>(1, initialTime)); + testHarness.processElement(new StreamRecord<>(2, initialTime)); + testHarness.processElement(new StreamRecord<>(3, initialTime)); + testHarness.setProcessingTime(initialTime + 5L); + testHarness.processElement(new StreamRecord<>(4, initialTime + 5L)); + } + + // trigger the timeouts of the first three stream records + testHarness.setProcessingTime(initialTime + timeout + 1L); + + // allow the 4th async stream record to be processed + TimeoutAwareLazyAsyncFunction.countDown(); + + // wait until all async collectors in the buffer have been emitted out. + synchronized (testHarness.getCheckpointLock()) { + testHarness.close(); + } + + // output of the 2nd and the 4th stream records + expectedOutput.add(new StreamRecord<>(6 , initialTime)); + expectedOutput.add(new StreamRecord<>(4, initialTime + 5L)); + + TestHarnessUtil.assertOutputEquals("Output for stream records does not match.", + expectedOutput, testHarness.getOutput()); + } + @Nonnull private MockEnvironment createMockEnvironment() { return new MockEnvironment( diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AsyncDataStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AsyncDataStream.scala index e91922a9e00b4..a1568c29b61b5 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AsyncDataStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AsyncDataStream.scala @@ -71,6 +71,9 @@ object AsyncDataStream { override def asyncInvoke(input: IN, resultFuture: JavaResultFuture[OUT]): Unit = { asyncFunction.asyncInvoke(input, new JavaResultFutureWrapper(resultFuture)) } + override def timeout(input: IN, resultFuture: JavaResultFuture[OUT]): Unit = { + asyncFunction.timeout(input, new JavaResultFutureWrapper(resultFuture)) + } } val outType : TypeInformation[OUT] = implicitly[TypeInformation[OUT]] @@ -198,6 +201,9 @@ object AsyncDataStream { override def asyncInvoke(input: IN, resultFuture: JavaResultFuture[OUT]): Unit = { asyncFunction.asyncInvoke(input, new JavaResultFutureWrapper[OUT](resultFuture)) } + override def timeout(input: IN, resultFuture: JavaResultFuture[OUT]): Unit = { + asyncFunction.timeout(input, new JavaResultFutureWrapper[OUT](resultFuture)) + } } val outType : TypeInformation[OUT] = implicitly[TypeInformation[OUT]] diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncFunction.scala index d5e9e280e9828..864c5bc7ca595 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncFunction.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncFunction.scala @@ -21,6 +21,8 @@ package org.apache.flink.streaming.api.scala.async import org.apache.flink.annotation.PublicEvolving import org.apache.flink.api.common.functions.Function +import java.util.concurrent.TimeoutException + /** * A function to trigger async I/O operations. * @@ -46,4 +48,16 @@ trait AsyncFunction[IN, OUT] extends Function { * @param resultFuture to be completed with the result data */ def asyncInvoke(input: IN, resultFuture: ResultFuture[OUT]): Unit + + /** + * [[AsyncFunction.asyncInvoke]] timeout occurred. + * By default, the result future is exceptionally completed with timeout exception. + * + * @param input element coming from an upstream task + * @param resultFuture to be completed with the result data + */ + def timeout(input: IN, resultFuture: ResultFuture[OUT]): Unit = { + resultFuture.completeExceptionally(new TimeoutException("Async function call has timed out.")) + } + } From 1b3cf0ce156cf5e9fa30d37822b3fa94d0be4158 Mon Sep 17 00:00:00 2001 From: blueszheng Date: Thu, 31 May 2018 13:05:47 +0800 Subject: [PATCH 2/4] [FLINK-7789] Add handler for Async IO operator timeouts --- .../api/functions/async/AsyncFunction.java | 2 +- .../async/AsyncWaitOperatorTest.java | 95 +++++++------------ .../api/scala/async/AsyncFunction.scala | 2 +- 3 files changed, 37 insertions(+), 62 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/AsyncFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/AsyncFunction.java index 99bba16d98444..14a7a8469b28c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/AsyncFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/AsyncFunction.java @@ -88,7 +88,7 @@ public interface AsyncFunction extends Function, Serializable { /** * {@link AsyncFunction#asyncInvoke} timeout occurred. - * By default, the result future is exceptionally completed with timeout exception. + * By default, the result future is exceptionally completed with a timeout exception. * * @param input element coming from an upstream task * @param resultFuture to be completed with the result data diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java index 9d3706098a60c..fc71e74880668 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java @@ -74,9 +74,11 @@ import java.util.ArrayDeque; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.Iterator; +import java.util.Optional; import java.util.Queue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; @@ -212,17 +214,16 @@ public static void countDown() { } } - private static class TimeoutAwareLazyAsyncFunction extends LazyAsyncFunction { + /** + * A special {@link LazyAsyncFunction} for timeout handling. + * Complete the result future with 3 times the input when the timeout occurred. + */ + private static class IgnoreTimeoutLazyAsyncFunction extends LazyAsyncFunction { private static final long serialVersionUID = 1428714561365346128L; @Override public void timeout(Integer input, ResultFuture resultFuture) throws Exception { - if (input != null && input % 2 == 0) { - resultFuture.complete(Collections.singletonList(input * 3)); - } else { - // ignore odd input number when it timeouts - resultFuture.complete(Collections.emptyList()); - } + resultFuture.complete(Collections.singletonList(input * 3)); } } @@ -613,11 +614,29 @@ public void testStateSnapshotAndRestore() throws Exception { } @Test - public void testAsyncTimeout() throws Exception { + public void testAsyncTimeoutFailure() throws Exception { + testAsyncTimeout( + new LazyAsyncFunction(), + Optional.of(TimeoutException.class), + new StreamRecord<>(2, 5L)); + } + + @Test + public void testAsyncTimeoutIgnore() throws Exception { + testAsyncTimeout( + new IgnoreTimeoutLazyAsyncFunction(), + Optional.empty(), + new StreamRecord<>(3, 0L), + new StreamRecord<>(2, 5L)); + } + + private void testAsyncTimeout(LazyAsyncFunction lazyAsyncFunction, + Optional> expectedException, + StreamRecord... expectedRecords) throws Exception { final long timeout = 10L; final AsyncWaitOperator operator = new AsyncWaitOperator<>( - new LazyAsyncFunction(), + lazyAsyncFunction, timeout, 2, AsyncDataStream.OutputMode.ORDERED); @@ -645,67 +664,23 @@ public void testAsyncTimeout() throws Exception { testHarness.setProcessingTime(initialTime + timeout + 1L); // allow the second async stream record to be processed - LazyAsyncFunction.countDown(); + lazyAsyncFunction.countDown(); // wait until all async collectors in the buffer have been emitted out. synchronized (testHarness.getCheckpointLock()) { testHarness.close(); } - expectedOutput.add(new StreamRecord<>(2, initialTime + 5L)); + expectedOutput.addAll(Arrays.asList(expectedRecords)); TestHarnessUtil.assertOutputEquals("Output with watermark was not correct.", expectedOutput, testHarness.getOutput()); - ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(Throwable.class); - - assertTrue(mockEnvironment.getActualExternalFailureCause().isPresent()); - ExceptionUtils.findThrowable(mockEnvironment.getActualExternalFailureCause().get(), TimeoutException.class); - } - - @Test - public void testAsyncTimeoutAware() throws Exception { - final long timeout = 10L; - - final AsyncWaitOperator operator = new AsyncWaitOperator<>( - new TimeoutAwareLazyAsyncFunction(), - timeout, - 4, - AsyncDataStream.OutputMode.ORDERED); - - final OneInputStreamOperatorTestHarness testHarness = - new OneInputStreamOperatorTestHarness<>(operator, IntSerializer.INSTANCE); - - final long initialTime = 0L; - final ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); - - testHarness.open(); - testHarness.setProcessingTime(initialTime); - - synchronized (testHarness.getCheckpointLock()) { - testHarness.processElement(new StreamRecord<>(1, initialTime)); - testHarness.processElement(new StreamRecord<>(2, initialTime)); - testHarness.processElement(new StreamRecord<>(3, initialTime)); - testHarness.setProcessingTime(initialTime + 5L); - testHarness.processElement(new StreamRecord<>(4, initialTime + 5L)); + if (expectedException.isPresent()) { + assertTrue(mockEnvironment.getActualExternalFailureCause().isPresent()); + assertTrue(ExceptionUtils.findThrowable( + mockEnvironment.getActualExternalFailureCause().get(), + expectedException.get()).isPresent()); } - - // trigger the timeouts of the first three stream records - testHarness.setProcessingTime(initialTime + timeout + 1L); - - // allow the 4th async stream record to be processed - TimeoutAwareLazyAsyncFunction.countDown(); - - // wait until all async collectors in the buffer have been emitted out. - synchronized (testHarness.getCheckpointLock()) { - testHarness.close(); - } - - // output of the 2nd and the 4th stream records - expectedOutput.add(new StreamRecord<>(6 , initialTime)); - expectedOutput.add(new StreamRecord<>(4, initialTime + 5L)); - - TestHarnessUtil.assertOutputEquals("Output for stream records does not match.", - expectedOutput, testHarness.getOutput()); } @Nonnull diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncFunction.scala index 864c5bc7ca595..d6965b7361b63 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncFunction.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncFunction.scala @@ -51,7 +51,7 @@ trait AsyncFunction[IN, OUT] extends Function { /** * [[AsyncFunction.asyncInvoke]] timeout occurred. - * By default, the result future is exceptionally completed with timeout exception. + * By default, the result future is exceptionally completed with a timeout exception. * * @param input element coming from an upstream task * @param resultFuture to be completed with the result data From b22263c0bedec51203566b0728f6c319cadbeb9e Mon Sep 17 00:00:00 2001 From: blueszheng Date: Fri, 1 Jun 2018 00:58:24 +0800 Subject: [PATCH 3/4] [FLINK-7789] Add handler for Async IO operator timeouts --- .../api/scala/AsyncDataStreamITCase.scala | 157 ++++++++++++++++++ 1 file changed, 157 insertions(+) create mode 100644 flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AsyncDataStreamITCase.scala diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AsyncDataStreamITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AsyncDataStreamITCase.scala new file mode 100644 index 0000000000000..150168ab75fa1 --- /dev/null +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AsyncDataStreamITCase.scala @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.scala + +import java.util.concurrent.TimeUnit + +import org.apache.flink.streaming.api.functions.sink.SinkFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction +import org.apache.flink.streaming.api.scala.AsyncDataStreamITCase._ +import org.apache.flink.streaming.api.scala.async.{AsyncFunction, ResultFuture} +import org.apache.flink.test.util.AbstractTestBase +import org.junit.Assert._ +import org.junit.Test + +import scala.collection.mutable +import scala.concurrent.{ExecutionContext, Future} + +object AsyncDataStreamITCase { + val timeout = 1000L + private var testResult: mutable.ArrayBuffer[Int] = _ +} + +class AsyncDataStreamITCase extends AbstractTestBase { + + @Test + def testOrderedWait(): Unit = { + testAsyncWait(true) + } + + @Test + def testUnorderedWait(): Unit = { + testAsyncWait(false) + } + + private def testAsyncWait(ordered: Boolean): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setParallelism(1) + + val source = env.addSource(new SourceFunction[Int]() { + override def run(ctx: SourceFunction.SourceContext[Int]) { + ctx.collect(1) + ctx.collect(2) + } + override def cancel() {} + }) + + val asyncMapped = if (ordered) { + AsyncDataStream.orderedWait( + source, new MyAsyncFunction(), timeout, TimeUnit.MILLISECONDS) + } else { + AsyncDataStream.unorderedWait( + source, new MyAsyncFunction(), timeout, TimeUnit.MILLISECONDS) + } + + testResult = mutable.ArrayBuffer[Int]() + asyncMapped.addSink(new SinkFunction[Int]() { + override def invoke(value: Int) { + testResult += value + } + }) + + env.execute("testAsyncWait") + + val expectedResult = mutable.ArrayBuffer[Int](2, 6) + if (ordered) { + assertEquals(expectedResult, testResult) + } else { + assertEquals(expectedResult, testResult.sorted) + } + } + + @Test + def testOrderedWaitUsingAnonymousFunction(): Unit = { + testAsyncWaitUsingAnonymousFunction(true) + } + + @Test + def testUnorderedWaitUsingAnonymousFunction(): Unit = { + testAsyncWaitUsingAnonymousFunction(false) + } + + private def testAsyncWaitUsingAnonymousFunction(ordered: Boolean): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setParallelism(1) + + val source = env.addSource(new SourceFunction[Int]() { + override def run(ctx: SourceFunction.SourceContext[Int]) { + ctx.collect(1) + ctx.collect(2) + } + override def cancel() {} + }) + + val asyncFunction: (Int, ResultFuture[Int]) => Unit = + (input, collector: ResultFuture[Int]) => Future { + collector.complete(Seq(input * 2)) + }(ExecutionContext.global) + val asyncMapped = if (ordered) { + AsyncDataStream.orderedWait(source, timeout, TimeUnit.MILLISECONDS) { + asyncFunction + } + } else { + AsyncDataStream.unorderedWait(source, timeout, TimeUnit.MILLISECONDS) { + asyncFunction + } + } + + testResult = mutable.ArrayBuffer[Int]() + asyncMapped.addSink(new SinkFunction[Int]() { + override def invoke(value: Int) { + testResult += value + } + }) + + env.execute("testAsyncWaitUsingAnonymousFunction") + + val expectedResult = mutable.ArrayBuffer[Int](2, 4) + if (ordered) { + assertEquals(expectedResult, testResult) + } else { + assertEquals(expectedResult, testResult.sorted) + } + } + +} + +class MyAsyncFunction extends AsyncFunction[Int, Int] { + override def asyncInvoke(input: Int, resultFuture: ResultFuture[Int]): Unit = { + Future { + // trigger the timeout of the even input number + if (input % 2 == 0) { + Thread.sleep(AsyncDataStreamITCase.timeout + 1000) + } + + resultFuture.complete(Seq(input * 2)) + } (ExecutionContext.global) + } + override def timeout(input: Int, resultFuture: ResultFuture[Int]): Unit = { + resultFuture.complete(Seq(input * 3)) + } +} From 072d10349b3fcfd90b34977ac3f0ab53b458e53f Mon Sep 17 00:00:00 2001 From: blueszheng Date: Fri, 1 Jun 2018 16:11:58 +0800 Subject: [PATCH 4/4] [FLINK-7789] Add handler for Async IO operator timeouts --- .../api/scala/AsyncDataStreamITCase.scala | 46 ++++++------------- 1 file changed, 13 insertions(+), 33 deletions(-) diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AsyncDataStreamITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AsyncDataStreamITCase.scala index 150168ab75fa1..d0a2cec9a4f3a 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AsyncDataStreamITCase.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AsyncDataStreamITCase.scala @@ -21,7 +21,6 @@ package org.apache.flink.streaming.api.scala import java.util.concurrent.TimeUnit import org.apache.flink.streaming.api.functions.sink.SinkFunction -import org.apache.flink.streaming.api.functions.source.SourceFunction import org.apache.flink.streaming.api.scala.AsyncDataStreamITCase._ import org.apache.flink.streaming.api.scala.async.{AsyncFunction, ResultFuture} import org.apache.flink.test.util.AbstractTestBase @@ -52,13 +51,7 @@ class AsyncDataStreamITCase extends AbstractTestBase { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) - val source = env.addSource(new SourceFunction[Int]() { - override def run(ctx: SourceFunction.SourceContext[Int]) { - ctx.collect(1) - ctx.collect(2) - } - override def cancel() {} - }) + val source = env.fromElements(1, 2) val asyncMapped = if (ordered) { AsyncDataStream.orderedWait( @@ -68,16 +61,23 @@ class AsyncDataStreamITCase extends AbstractTestBase { source, new MyAsyncFunction(), timeout, TimeUnit.MILLISECONDS) } + executeAndValidate(ordered, env, asyncMapped, mutable.ArrayBuffer[Int](2, 6)) + } + + private def executeAndValidate(ordered: Boolean, + env: StreamExecutionEnvironment, + dataStream: DataStream[Int], + expectedResult: mutable.ArrayBuffer[Int]): Unit = { + testResult = mutable.ArrayBuffer[Int]() - asyncMapped.addSink(new SinkFunction[Int]() { + dataStream.addSink(new SinkFunction[Int]() { override def invoke(value: Int) { testResult += value } }) - env.execute("testAsyncWait") + env.execute("testAsyncDataStream") - val expectedResult = mutable.ArrayBuffer[Int](2, 6) if (ordered) { assertEquals(expectedResult, testResult) } else { @@ -99,13 +99,7 @@ class AsyncDataStreamITCase extends AbstractTestBase { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) - val source = env.addSource(new SourceFunction[Int]() { - override def run(ctx: SourceFunction.SourceContext[Int]) { - ctx.collect(1) - ctx.collect(2) - } - override def cancel() {} - }) + val source = env.fromElements(1, 2) val asyncFunction: (Int, ResultFuture[Int]) => Unit = (input, collector: ResultFuture[Int]) => Future { @@ -121,21 +115,7 @@ class AsyncDataStreamITCase extends AbstractTestBase { } } - testResult = mutable.ArrayBuffer[Int]() - asyncMapped.addSink(new SinkFunction[Int]() { - override def invoke(value: Int) { - testResult += value - } - }) - - env.execute("testAsyncWaitUsingAnonymousFunction") - - val expectedResult = mutable.ArrayBuffer[Int](2, 4) - if (ordered) { - assertEquals(expectedResult, testResult) - } else { - assertEquals(expectedResult, testResult.sorted) - } + executeAndValidate(ordered, env, asyncMapped, mutable.ArrayBuffer[Int](2, 4)) } }