From c62daf8d1228b2e0ea0bec233dc57dfb89d5f592 Mon Sep 17 00:00:00 2001 From: Michael Gendelman Date: Sat, 7 Apr 2018 14:10:48 +0300 Subject: [PATCH 1/2] [FLINK-8047] [http connector] Add HTTP Sink to allow sending output to HTTP server --- flink-connectors/flink-connector-http/pom.xml | 183 ++++++++++++ .../connectors/http/HttpBuilder.java | 32 +++ .../streaming/connectors/http/HttpSink.java | 159 +++++++++++ .../http/ResponseHandlerBuilder.java | 32 +++ .../src/main/resources/log4j-test.properties | 30 ++ .../connectors/http/HttpSinkTest.java | 269 ++++++++++++++++++ flink-connectors/pom.xml | 1 + 7 files changed, 706 insertions(+) create mode 100644 flink-connectors/flink-connector-http/pom.xml create mode 100644 flink-connectors/flink-connector-http/src/main/java/org/apache/flink/streaming/connectors/http/HttpBuilder.java create mode 100644 flink-connectors/flink-connector-http/src/main/java/org/apache/flink/streaming/connectors/http/HttpSink.java create mode 100644 flink-connectors/flink-connector-http/src/main/java/org/apache/flink/streaming/connectors/http/ResponseHandlerBuilder.java create mode 100644 flink-connectors/flink-connector-http/src/main/resources/log4j-test.properties create mode 100644 flink-connectors/flink-connector-http/src/test/java/org/apache/flink/streaming/connectors/http/HttpSinkTest.java diff --git a/flink-connectors/flink-connector-http/pom.xml b/flink-connectors/flink-connector-http/pom.xml new file mode 100644 index 0000000000000..b37a1db375bd9 --- /dev/null +++ b/flink-connectors/flink-connector-http/pom.xml @@ -0,0 +1,183 @@ + + + + + 4.0.0 + + + org.apache.flink + flink-connectors + 1.6-SNAPSHOT + .. + + + flink-connector-http_${scala.binary.version} + flink-connector-http + + jar + + + + + + + + org.apache.flink + flink-streaming-java_${scala.binary.version} + ${project.version} + provided + + + + org.apache.httpcomponents + httpclient + 4.5.3 + + + + org.apache.httpcomponents + httpcore-nio + 4.4.5 + + + + + + org.apache.flink + flink-avro + ${project.version} + + true + + + + + + org.apache.flink + flink-core + ${project.version} + test + test-jar + + + + + org.apache.flink + flink-avro + ${project.version} + test + test-jar + + + + org.apache.flink + flink-streaming-java_${scala.binary.version} + ${project.version} + test + test-jar + + + + org.apache.flink + flink-tests_${scala.binary.version} + ${project.version} + test-jar + test + + + + org.apache.flink + flink-test-utils_${scala.binary.version} + ${project.version} + test + + + + org.apache.flink + flink-runtime_${scala.binary.version} + ${project.version} + test-jar + test + + + + org.apache.flink + flink-metrics-jmx + ${project.version} + test + + + org.apache.httpcomponents + httpasyncclient + RELEASE + + + + + + + + org.apache.maven.plugins + maven-jar-plugin + + + + test-jar + + + + **/KafkaTestEnvironmentImpl* + + + + + + + org.apache.maven.plugins + maven-source-plugin + + + attach-test-sources + + test-jar-no-fork + + + + **/KafkaTestEnvironmentImpl* + + + + + + + org.apache.maven.plugins + maven-surefire-plugin + + + 1 + -Xms256m -Xmx2048m -Dlog4j.configuration=${log4j.configuration} -Dmvn.forkNumber=${surefire.forkNumber} -XX:-UseGCOverheadLimit + + + + + + diff --git a/flink-connectors/flink-connector-http/src/main/java/org/apache/flink/streaming/connectors/http/HttpBuilder.java b/flink-connectors/flink-connector-http/src/main/java/org/apache/flink/streaming/connectors/http/HttpBuilder.java new file mode 100644 index 0000000000000..926d0745ccc07 --- /dev/null +++ b/flink-connectors/flink-connector-http/src/main/java/org/apache/flink/streaming/connectors/http/HttpBuilder.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.http; + +import org.apache.http.impl.client.CloseableHttpClient; + +import java.io.Serializable; + +/** + * Builder interface for creating HttpClient classes. + */ +public interface HttpBuilder extends Serializable { + + CloseableHttpClient buildClient(); + +} diff --git a/flink-connectors/flink-connector-http/src/main/java/org/apache/flink/streaming/connectors/http/HttpSink.java b/flink-connectors/flink-connector-http/src/main/java/org/apache/flink/streaming/connectors/http/HttpSink.java new file mode 100644 index 0000000000000..81ed849f188e0 --- /dev/null +++ b/flink-connectors/flink-connector-http/src/main/java/org/apache/flink/streaming/connectors/http/HttpSink.java @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.http; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; + +import org.apache.http.client.HttpResponseException; +import org.apache.http.client.ResponseHandler; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.ByteArrayEntity; +import org.apache.http.impl.client.BasicResponseHandler; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.ConnectException; +import java.net.URI; + + +/** + * Http sink that initiates an HTTP POST to the configured HTTP server + * for each element + * + *

The sink uses {@link org.apache.http.client.HttpClient} internally to perform HTTP requests. + * The sink will fail if it can't connect to the URI specified in the constructor. + * + *

The constructor receives an HttpBuilder and an ResponseHandlerBuilder so that Http client can be fully configured

+ * + * @param Type of the elements handled by this sink + */ +public class HttpSink extends RichSinkFunction { + + private static final Logger LOG = LoggerFactory.getLogger(HttpSink.class); + + private transient CloseableHttpClient httpClient; + private transient ResponseHandler responseHandler; + + private final HttpBuilder httpBuilder; + private ResponseHandlerBuilder responseHandlerBuilder; + + private URI destURI; + private SerializationSchema serializationSchema; + + /** + * Creates a new {@code HttpSink} that connects to a URL using an http client. + * + * @param destURI HTTP URI to which client will send data + * @param serializationSchema Serialiation schema that will be used to serialize values + */ + public HttpSink(URI destURI, SerializationSchema serializationSchema) { + this(destURI, + serializationSchema, + new DefaultHttpBuilder(), + new DefaultResponseHandlerBuilder()); + } + + /** + * Creates a new {@code HttpSink} that connects to a URL using an http client. + * + * @param destURI HTTP URI to which client will send data + * @param serializationSchema Serialization schema that will be used to serialize values + * @param httpBuilder Builder used to customize the HttpClient + * @param responseHandlerBuilder Builder used to customize HttpClient response handler + */ + public HttpSink(URI destURI, + SerializationSchema serializationSchema, + HttpBuilder httpBuilder, + ResponseHandlerBuilder responseHandlerBuilder) { + this.destURI = destURI; + this.serializationSchema = serializationSchema; + ClosureCleaner.ensureSerializable(serializationSchema); + + this.httpBuilder = httpBuilder; + ClosureCleaner.clean(httpBuilder, true); + + this.responseHandlerBuilder = responseHandlerBuilder; + ClosureCleaner.clean(responseHandlerBuilder, true); + } + + @Override + public void open(Configuration parameters) { + httpClient = httpBuilder.buildClient(); + responseHandler = responseHandlerBuilder.buildHandler(); + } + + @Override + public void close() throws Exception { + if (httpClient != null) { + httpClient.close(); + } + } + + @Override + public void invoke(T value, Context context) throws Exception { + byte[] serializedValue = serializationSchema.serialize(value); + HttpPost httpPost = new HttpPost(destURI); + ByteArrayEntity httpEntity = new ByteArrayEntity(serializedValue); + + httpPost.setEntity(httpEntity); + + try { + LOG.debug("Executing HTTP POST with data " + value); + httpClient.execute(httpPost, responseHandler); + } + catch (HttpResponseException e1) { + LOG.error("HTTP Response exception", e1); + throw e1; + } + catch (ConnectException e2) { + LOG.error("HTTP Connection exception", e2); + throw e2; + } + catch (Exception e3) { + LOG.error("HTTP generic exception", e3); + throw e3; + } + } + + /** + * A default HttpBuilder that creates a default Http client. + */ + public static class DefaultHttpBuilder implements HttpBuilder { + @Override + public CloseableHttpClient buildClient() { + return HttpClients.createDefault(); + } + } + + /** + * A default response handler that creates a {@link BasicResponseHandler}. + */ + public static class DefaultResponseHandlerBuilder implements ResponseHandlerBuilder { + @Override + public ResponseHandler buildHandler() { + return new BasicResponseHandler(); + } + } + +} diff --git a/flink-connectors/flink-connector-http/src/main/java/org/apache/flink/streaming/connectors/http/ResponseHandlerBuilder.java b/flink-connectors/flink-connector-http/src/main/java/org/apache/flink/streaming/connectors/http/ResponseHandlerBuilder.java new file mode 100644 index 0000000000000..64e8c8aecd60a --- /dev/null +++ b/flink-connectors/flink-connector-http/src/main/java/org/apache/flink/streaming/connectors/http/ResponseHandlerBuilder.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.http; + +import org.apache.http.client.ResponseHandler; + +import java.io.Serializable; + +/** + * Response handler interface for creating ResponseHandler builders. + */ +public interface ResponseHandlerBuilder extends Serializable { + + ResponseHandler buildHandler(); + +} diff --git a/flink-connectors/flink-connector-http/src/main/resources/log4j-test.properties b/flink-connectors/flink-connector-http/src/main/resources/log4j-test.properties new file mode 100644 index 0000000000000..94d5a19001bbc --- /dev/null +++ b/flink-connectors/flink-connector-http/src/main/resources/log4j-test.properties @@ -0,0 +1,30 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +log4j.rootLogger=INFO, testlogger + +log4j.appender.testlogger=org.apache.log4j.ConsoleAppender +log4j.appender.testlogger.target=System.err +log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout +log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n + +# suppress the irrelevant (wrong) warnings from the netty channel handler +log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger + +#log4j.logger.org.apache.flink.streaming.connectors.http.HttpSinkTest=TRACE, testlogger + diff --git a/flink-connectors/flink-connector-http/src/test/java/org/apache/flink/streaming/connectors/http/HttpSinkTest.java b/flink-connectors/flink-connector-http/src/test/java/org/apache/flink/streaming/connectors/http/HttpSinkTest.java new file mode 100644 index 0000000000000..9cab37b8d7e7a --- /dev/null +++ b/flink-connectors/flink-connector-http/src/test/java/org/apache/flink/streaming/connectors/http/HttpSinkTest.java @@ -0,0 +1,269 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.http; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import org.apache.http.HttpEntity; +import org.apache.http.HttpEntityEnclosingRequest; +import org.apache.http.HttpException; +import org.apache.http.HttpRequest; +import org.apache.http.HttpResponse; +import org.apache.http.MethodNotSupportedException; +import org.apache.http.client.ClientProtocolException; +import org.apache.http.client.ResponseHandler; +import org.apache.http.config.SocketConfig; +import org.apache.http.conn.HttpConnectionFactory; +import org.apache.http.conn.ManagedHttpClientConnection; +import org.apache.http.conn.routing.HttpRoute; +import org.apache.http.impl.bootstrap.HttpServer; +import org.apache.http.impl.bootstrap.ServerBootstrap; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.impl.conn.ManagedHttpClientConnectionFactory; +import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; +import org.apache.http.protocol.HttpContext; +import org.apache.http.protocol.HttpRequestHandler; +import org.apache.http.util.EntityUtils; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.ConnectException; +import java.net.InetAddress; +import java.net.URI; +import java.util.Collections; +import java.util.HashSet; +import java.util.Locale; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Tests for {@link HttpSink}. + */ +public class HttpSinkTest { + + private static final int HTTP_PORT = 30445; + private static HttpServer httpServer; + + private static CollectingHttpRequestHandler requestHandler = + new CollectingHttpRequestHandler<>(new SimpleStringSchema()); + + public static HttpServer createHttpServer(int listenPort, HttpRequestHandler requestHandler) + throws IOException { + + SocketConfig socketConfig = SocketConfig.custom() + .setSoTimeout(15) + .setTcpNoDelay(true) + .setSoReuseAddress(true) + .build(); + + final HttpServer server = ServerBootstrap.bootstrap() + .setListenerPort(listenPort) + .setLocalAddress(InetAddress.getLoopbackAddress()) + //.setHttpProcessor(httpProcessor) + .setSocketConfig(socketConfig) + .registerHandler("*", requestHandler) + //.setResponseFactory(new DefaultHttpResponseFactory()) + .create(); + + server.start(); + + return server; + } + + @BeforeClass + public static void before() throws Exception { + httpServer = createHttpServer(HTTP_PORT, requestHandler); + } + + @AfterClass + public static void after() { + httpServer.shutdown(10, TimeUnit.MILLISECONDS); + } + + @Test + public void testSimpleHttpSinkBasicCtor() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().disableSysoutLogging(); + + Set stringData = getStringData(); + DataStream input = env.fromCollection(stringData); + + input.addSink( + new HttpSink<>(new URI("http://localhost:" + HTTP_PORT), + new SimpleStringSchema() + )) + .name("SimpleHTTP") + .setParallelism(1); + + env.execute("[HttpSink] testHttpSinkBasic"); + + assertEquals(stringData, requestHandler.getCollectedData()); + } + + @Test + public void testSimpleHttpSinkExtendedCtor() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().disableSysoutLogging(); + + Set stringData = getStringData(); + DataStream input = env.fromCollection(stringData); + + executeHttpSinkJob(env, input, HTTP_PORT); + + assertEquals(stringData, requestHandler.getCollectedData()); + } + + @Test(expected = JobExecutionException.class) + public void testSimpleHttpSinkExceptionOnConnectionError() throws Exception { + final int notListenPort = 44004; + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().disableSysoutLogging(); + + Set stringData = getStringData(); + DataStream input = env.fromCollection(stringData); + + try { + executeHttpSinkJob(env, input, notListenPort); + } catch (Exception e) { + assertTrue(e.getCause() instanceof ConnectException); + throw e; + } + } + + // ---- Utils + + private void executeHttpSinkJob(StreamExecutionEnvironment env, DataStream input, int listenPort) + throws Exception { + input.addSink( + new HttpSink<>(new URI("http://localhost:" + listenPort), + new SimpleStringSchema(), + new MyHttpBuilder(), + new MyResponseHandlerBuilder() + )) + .name("SimpleHTTP") + .setParallelism(1); + + env.execute("[HttpSink] testHttpSinkBasic"); + } + + private Set getStringData() { + Set dataSet = new HashSet<>(); + for (int i = 0; i < 10; i++) { + dataSet.add("Testing entry " + i); + } + + return dataSet; + } + + private DataStream getStringDataStream(StreamExecutionEnvironment env) { + + Set stringData = getStringData(); + return env.fromCollection(stringData); + } + + private static class MyHttpBuilder implements HttpBuilder { + private static final long serialVersionUID = 4622936756L; + + @Override + public CloseableHttpClient buildClient() { + HttpClientBuilder client = HttpClients.custom(); + // Create a connection manager with custom configuration. + HttpConnectionFactory httpConnectionFactory = + new ManagedHttpClientConnectionFactory(); + + PoolingHttpClientConnectionManager connManager = + new PoolingHttpClientConnectionManager(httpConnectionFactory); + + return client + .setConnectionManager(connManager) + .build(); + + } + } + + private static class MyResponseHandlerBuilder implements ResponseHandlerBuilder { + @Override + public ResponseHandler buildHandler() { + return new ResponseHandler() { + @Override + public String handleResponse( + final HttpResponse response) throws IOException { + int status = response.getStatusLine().getStatusCode(); + if (status >= 200 && status < 300) { + HttpEntity entity = response.getEntity(); + return entity != null ? EntityUtils.toString(entity) : null; + } else { + throw new ClientProtocolException("Unexpected response status: " + status); + } + } + }; + } + } + + private static class CollectingHttpRequestHandler implements HttpRequestHandler { + private static final Logger LOG = LoggerFactory.getLogger(CollectingHttpRequestHandler.class); + + private Set collectedData = new HashSet<>(); + private DeserializationSchema deserializationSchema; + + public CollectingHttpRequestHandler(DeserializationSchema deserializationSchema) { + this.deserializationSchema = deserializationSchema; + } + + @Override + public void handle(HttpRequest request, HttpResponse response, HttpContext context) + throws HttpException, IOException { + + String method = request.getRequestLine().getMethod().toUpperCase(Locale.ROOT); + if (!method.equals("POST")) { + throw new MethodNotSupportedException(method + " method not supported"); + } + + if (request instanceof HttpEntityEnclosingRequest) { + HttpEntity entity = ((HttpEntityEnclosingRequest) request).getEntity(); + byte[] entityContent = EntityUtils.toByteArray(entity); + + T data = deserializationSchema.deserialize(entityContent); + + LOG.debug("Incoming entity content (bytes): " + entityContent.length); + LOG.debug(data.toString()); + + collectedData.add(data); + } + } + + public Set getCollectedData() { + return Collections.unmodifiableSet(collectedData); + } + } +} diff --git a/flink-connectors/pom.xml b/flink-connectors/pom.xml index bb98403ff2d6f..c5162881aa367 100644 --- a/flink-connectors/pom.xml +++ b/flink-connectors/pom.xml @@ -55,6 +55,7 @@ under the License. flink-connector-nifi flink-connector-cassandra flink-connector-filesystem + flink-connector-http @@ -125,11 +119,6 @@ under the License. ${project.version} test - - org.apache.httpcomponents - httpasyncclient - RELEASE -