Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-8047] [http connector] Add HTTP Sink to allow sending output to HTTP server #5866

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
172 changes: 172 additions & 0 deletions flink-connectors/flink-connector-http/pom.xml
@@ -0,0 +1,172 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">

<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connectors</artifactId>
<version>1.6-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>

<artifactId>flink-connector-http_${scala.binary.version}</artifactId>
<name>flink-connector-http</name>

<packaging>jar</packaging>

<dependencies>

<!-- core dependencies -->


<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.3</version>
</dependency>

<!-- streaming-java dependencies -->

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>${project.version}</version>
<!-- Projects depending on this project, won't depend on flink-avro. -->
<optional>true</optional>
</dependency>

<!-- test dependencies -->

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>


<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-tests_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-jmx</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
<configuration>
<includes>
<include>**/KafkaTestEnvironmentImpl*</include>
</includes>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<executions>
<execution>
<id>attach-test-sources</id>
<goals>
<goal>test-jar-no-fork</goal>
</goals>
<configuration>
<includes>
<include>**/KafkaTestEnvironmentImpl*</include>
</includes>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<!-- Enforce single fork execution due to heavy mini cluster use in the tests -->
<forkCount>1</forkCount>
<argLine>-Xms256m -Xmx2048m -Dlog4j.configuration=${log4j.configuration} -Dmvn.forkNumber=${surefire.forkNumber} -XX:-UseGCOverheadLimit</argLine>
</configuration>
</plugin>
</plugins>
</build>

</project>
@@ -0,0 +1,32 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.connectors.http;

import org.apache.http.impl.client.CloseableHttpClient;

import java.io.Serializable;

/**
* Builder interface for creating HttpClient classes.
*/
public interface HttpBuilder extends Serializable {

CloseableHttpClient buildClient();

}
@@ -0,0 +1,159 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.connectors.http;

import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import org.apache.http.client.HttpResponseException;
import org.apache.http.client.ResponseHandler;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.client.BasicResponseHandler;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.ConnectException;
import java.net.URI;


/**
* Http sink that initiates an HTTP POST to the configured HTTP server
* for each element
*
* <p>The sink uses {@link org.apache.http.client.HttpClient} internally to perform HTTP requests.
* The sink will fail if it can't connect to the URI specified in the constructor.
*
*<p>The constructor receives an HttpBuilder and an ResponseHandlerBuilder so that Http client can be fully configured</p>
*
* @param <T> Type of the elements handled by this sink
*/
public class HttpSink<T> extends RichSinkFunction<T> {

private static final Logger LOG = LoggerFactory.getLogger(HttpSink.class);

private transient CloseableHttpClient httpClient;
private transient ResponseHandler responseHandler;

private final HttpBuilder httpBuilder;
private ResponseHandlerBuilder responseHandlerBuilder;

private URI destURI;
private SerializationSchema<T> serializationSchema;

/**
* Creates a new {@code HttpSink} that connects to a URL using an http client.
*
* @param destURI HTTP URI to which client will send data
* @param serializationSchema Serialiation schema that will be used to serialize values
*/
public HttpSink(URI destURI, SerializationSchema<T> serializationSchema) {
this(destURI,
serializationSchema,
new DefaultHttpBuilder(),
new DefaultResponseHandlerBuilder());
}

/**
* Creates a new {@code HttpSink} that connects to a URL using an http client.
*
* @param destURI HTTP URI to which client will send data
* @param serializationSchema Serialization schema that will be used to serialize values
* @param httpBuilder Builder used to customize the HttpClient
* @param responseHandlerBuilder Builder used to customize HttpClient response handler
*/
public <V> HttpSink(URI destURI,
SerializationSchema<T> serializationSchema,
HttpBuilder httpBuilder,
ResponseHandlerBuilder<V> responseHandlerBuilder) {
this.destURI = destURI;
this.serializationSchema = serializationSchema;
ClosureCleaner.ensureSerializable(serializationSchema);

this.httpBuilder = httpBuilder;
ClosureCleaner.clean(httpBuilder, true);

this.responseHandlerBuilder = responseHandlerBuilder;
ClosureCleaner.clean(responseHandlerBuilder, true);
}

@Override
public void open(Configuration parameters) {
httpClient = httpBuilder.buildClient();
responseHandler = responseHandlerBuilder.buildHandler();
}

@Override
public void close() throws Exception {
if (httpClient != null) {
httpClient.close();
}
}

@Override
public void invoke(T value, Context context) throws Exception {
byte[] serializedValue = serializationSchema.serialize(value);
HttpPost httpPost = new HttpPost(destURI);
ByteArrayEntity httpEntity = new ByteArrayEntity(serializedValue);

httpPost.setEntity(httpEntity);

try {
LOG.debug("Executing HTTP POST with data " + value);
httpClient.execute(httpPost, responseHandler);
}
catch (HttpResponseException e1) {
LOG.error("HTTP Response exception", e1);
throw e1;
}
catch (ConnectException e2) {
LOG.error("HTTP Connection exception", e2);
throw e2;
}
catch (Exception e3) {
LOG.error("HTTP generic exception", e3);
throw e3;
}
}

/**
* A default HttpBuilder that creates a default Http client.
*/
public static class DefaultHttpBuilder implements HttpBuilder {
@Override
public CloseableHttpClient buildClient() {
return HttpClients.createDefault();
}
}

/**
* A default response handler that creates a {@link BasicResponseHandler}.
*/
public static class DefaultResponseHandlerBuilder implements ResponseHandlerBuilder {
@Override
public ResponseHandler buildHandler() {
return new BasicResponseHandler();
}
}

}
@@ -0,0 +1,32 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.connectors.http;

import org.apache.http.client.ResponseHandler;

import java.io.Serializable;

/**
* Response handler interface for creating ResponseHandler builders.
*/
public interface ResponseHandlerBuilder<T> extends Serializable {

ResponseHandler<T> buildHandler();

}