From 4a15c079edd5efe7048d4a92e13c23cb6b528df4 Mon Sep 17 00:00:00 2001 From: "Yazdan.JS" Date: Sat, 12 May 2018 01:41:07 -0400 Subject: [PATCH] [FLINK-9343] [Example] Add Async Example with External Rest API call --- .../examples/async/AsyncAPIExample.scala | 93 +++++++++++++++++++ .../scala/examples/async/AsyncIOExample.scala | 22 ----- .../scala/examples/async/SimpleSource.scala | 43 +++++++++ 3 files changed, 136 insertions(+), 22 deletions(-) create mode 100644 flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/async/AsyncAPIExample.scala create mode 100644 flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/async/SimpleSource.scala diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/async/AsyncAPIExample.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/async/AsyncAPIExample.scala new file mode 100644 index 0000000000000..f3de742d49f32 --- /dev/null +++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/async/AsyncAPIExample.scala @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.scala.examples.async + + +import java.io.{BufferedReader, InputStreamReader} +import java.util.concurrent.TimeUnit + +import org.apache.flink.runtime.concurrent.Executors +import org.apache.flink.streaming.api.scala.{AsyncDataStream, DataStream, StreamExecutionEnvironment, _} +import org.apache.flink.streaming.api.scala.async.{AsyncFunction, ResultFuture} +import org.apache.http.client.methods.HttpGet +import org.apache.http.impl.client.HttpClientBuilder + +import scala.concurrent.{ExecutionContext, Future} + +object AsyncAPIExample { + + def main(args: Array[String]) { + + val env = StreamExecutionEnvironment.getExecutionEnvironment + + val input : DataStream[Int] = env.addSource(new SimpleSource()) + + val quoteStream: DataStream[(Int, String)] = AsyncDataStream.unorderedWait( + input, + new AsyncQuoteRequest, + 1000, + TimeUnit.MILLISECONDS, + 10) + + quoteStream.print() + + env.execute("Async API job") + } +} + +class AsyncQuoteRequest extends AsyncFunction[Int, (Int, String)] { + + /** The API specific client that can issue concurrent requests with callbacks */ + + /** The context used for the future callbacks */ + implicit lazy val executor: ExecutionContext = ExecutionContext.global + + lazy val client = new Quote() + + override def asyncInvoke(input: Int, resultFuture: ResultFuture[(Int, String)]): Unit = { + + + // issue the asynchronous request, receive a future for the result + val resultFutureRequested: Future[String] = Future { + client.getQuote(input.toString) + } + + // set the callback to be executed once the request by the client is complete + // the callback simply forwards the result to the result future + resultFutureRequested.onSuccess { + case result: String => { + resultFuture.complete(Iterable((input, result))) + } + } + } + + +} + +class Quote { + @throws[Exception] + def getQuote(number: String) : String = { + val url = "http://gturnquist-quoters.cfapps.io/api/" + number + val client = HttpClientBuilder.create.build + val request = new HttpGet(url) + val response = client.execute(request) + val rd = new BufferedReader(new InputStreamReader(response.getEntity.getContent)) + rd.readLine + } +} diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/async/AsyncIOExample.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/async/AsyncIOExample.scala index 5808aaaf8d7ad..02de3365ca9bb 100644 --- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/async/AsyncIOExample.scala +++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/async/AsyncIOExample.scala @@ -21,8 +21,6 @@ package org.apache.flink.streaming.scala.examples.async import java.util.concurrent.TimeUnit -import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction -import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.async.ResultFuture @@ -49,23 +47,3 @@ object AsyncIOExample { env.execute("Async I/O job") } } - -class SimpleSource extends ParallelSourceFunction[Int] { - var running = true - var counter = 0 - - override def run(ctx: SourceContext[Int]): Unit = { - while (running) { - ctx.getCheckpointLock.synchronized { - ctx.collect(counter) - } - counter += 1 - - Thread.sleep(10L) - } - } - - override def cancel(): Unit = { - running = false - } -} diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/async/SimpleSource.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/async/SimpleSource.scala new file mode 100644 index 0000000000000..573fa66e1cb0f --- /dev/null +++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/async/SimpleSource.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.scala.examples.async + +import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext + +class SimpleSource extends ParallelSourceFunction[Int] { + var running = true + var counter = 0 + + override def run(ctx: SourceContext[Int]): Unit = { + while (running) { + ctx.getCheckpointLock.synchronized { + ctx.collect(counter) + } + counter += 1 + + Thread.sleep(10L) + } + } + + override def cancel(): Unit = { + running = false + } +} +