From b4039038de8e5d02ed1054ee995cabd050e3b218 Mon Sep 17 00:00:00 2001 From: Joao Boto Date: Wed, 22 Aug 2018 12:10:32 +0200 Subject: [PATCH] BAHIR-114: update flume to 1.8 and add some tests --- .travis.yml | 42 ++++--- flink-connector-flume/README.md | 4 +- flink-connector-flume/dockers/conf/sink.conf | 34 +++++ .../dockers/conf/source.conf | 33 +++++ .../dockers/docker-compose.yml | 57 +++++++++ flink-connector-flume/pom.xml | 95 +++----------- .../connectors/flume/FlumeRpcClient.java | 118 ++++++++++++++++++ .../streaming/connectors/flume/FlumeSink.java | 100 ++------------- .../connectors/flume/FlumeRpcClientTest.java | 68 ++++++++++ .../connectors/flume/FlumeSinkTest.java | 39 ++++++ .../src/test/resources/log4j.properties | 27 ++++ pom.xml | 5 +- 12 files changed, 433 insertions(+), 189 deletions(-) create mode 100644 flink-connector-flume/dockers/conf/sink.conf create mode 100644 flink-connector-flume/dockers/conf/source.conf create mode 100644 flink-connector-flume/dockers/docker-compose.yml create mode 100644 flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeRpcClient.java create mode 100644 flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeRpcClientTest.java create mode 100644 flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeSinkTest.java create mode 100644 flink-connector-flume/src/test/resources/log4j.properties diff --git a/.travis.yml b/.travis.yml index 083e75d1..6f80e412 100644 --- a/.travis.yml +++ b/.travis.yml @@ -17,31 +17,41 @@ sudo: required dist: trusty - -cache: - directories: - - $HOME/.m2 +language: java # do not cache our own artifacts before_cache: - rm -rf $HOME/.m2/repository/org/apache/flink/ -language: java +cache: + directories: + - $HOME/.m2 -matrix: - include: - - jdk: oraclejdk8 - env: - - FLINK_VERSION="1.5.1" SCALA_VERSION="2.11" - - CACHE_NAME=JDK8_F130_A - - jdk: openjdk8 - env: - - FLINK_VERSION="1.5.1" SCALA_VERSION="2.11" - - CACHE_NAME=JDK8_F130_C +jdk: + - oraclejdk8 + - openjdk8 + +env: + - | + FLINK_VERSION="1.5.1" SCALA_VERSION="2.11" DOCKER="false" + PROJECTS="flink-connector-activemq,flink-connector-akka,flink-connector-influxdb,flink-connector-netty,flink-connector-redis,flink-library-siddhi" + - | + FLINK_VERSION="1.5.1" SCALA_VERSION="2.11" DOCKER="true" + PROJECTS="flink-connector-flume" before_install: - ./dev/change-scala-version.sh $SCALA_VERSION install: true -script: mvn clean verify -Pscala-$SCALA_VERSION -Dflink.version=$FLINK_VERSION +before_script: + - if [[ $DOCKER == "true" ]]; then + docker-compose -f "$PROJECTS/dockers/docker-compose.yml" up -d; + fi + +script: mvn clean verify -pl $PROJECTS -Pscala-$SCALA_VERSION -Dflink.version=$FLINK_VERSION + +after_script: + - if [[ $DOCKER == "true" ]]; then + docker-compose -f "$PROJECTS/dockers/docker-compose.yml" down; + fi diff --git a/flink-connector-flume/README.md b/flink-connector-flume/README.md index 69468bae..e31c1756 100644 --- a/flink-connector-flume/README.md +++ b/flink-connector-flume/README.md @@ -6,10 +6,10 @@ following dependency to your project: org.apache.bahir flink-connector-flume_2.11 - 1.0-SNAPSHOT + 1.1-SNAPSHOT -*Version Compatibility*: This module is compatible with Flume 1.5.0. +*Version Compatibility*: This module is compatible with Flume 1.8.0. Note that the streaming connectors are not part of the binary distribution of Flink. You need to link them into your job jar for cluster execution. See how to link with them for cluster execution [here](https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/linking.html). diff --git a/flink-connector-flume/dockers/conf/sink.conf b/flink-connector-flume/dockers/conf/sink.conf new file mode 100644 index 00000000..81c246ff --- /dev/null +++ b/flink-connector-flume/dockers/conf/sink.conf @@ -0,0 +1,34 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +docker.sinks = fileSink +docker.sources = avroSource +docker.channels = inMemoryChannel + +docker.sources.avroSource.type = avro +docker.sources.avroSource.channels = c1 +docker.sources.avroSource.bind = 0.0.0.0 +docker.sources.avroSource.port = 4545 +docker.sources.avroSource.channels = inMemoryChannel + +docker.channels.inMemoryChannel.type = memory +docker.channels.inMemoryChannel.capacity = 1000 +docker.channels.inMemoryChannel.transactionCapacity = 100 + +docker.sinks.fileSink.type = file_roll +docker.sinks.fileSink.channel = inMemoryChannel +docker.sinks.fileSink.sink.directory = /var/tmp/output +docker.sinks.fileSink.rollInterval = 0 \ No newline at end of file diff --git a/flink-connector-flume/dockers/conf/source.conf b/flink-connector-flume/dockers/conf/source.conf new file mode 100644 index 00000000..f883f412 --- /dev/null +++ b/flink-connector-flume/dockers/conf/source.conf @@ -0,0 +1,33 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +docker.sinks = avroSink +docker.sources = netcatSource +docker.channels = inMemoryChannel + +docker.sources.netcatSource.type = avro +docker.sources.netcatSource.bind = 0.0.0.0 +docker.sources.netcatSource.port = 44444 +docker.sources.netcatSource.channels = inMemoryChannel + +docker.channels.inMemoryChannel.type = memory +docker.channels.inMemoryChannel.capacity = 1000 +docker.channels.inMemoryChannel.transactionCapacity = 100 + +docker.sinks.avroSink.type = avro +docker.sinks.avroSink.channel = inMemoryChannel +docker.sinks.avroSink.hostname = sink +docker.sinks.avroSink.port = 4545 diff --git a/flink-connector-flume/dockers/docker-compose.yml b/flink-connector-flume/dockers/docker-compose.yml new file mode 100644 index 00000000..042bd5ee --- /dev/null +++ b/flink-connector-flume/dockers/docker-compose.yml @@ -0,0 +1,57 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +version: '2' + +services: + + source: + image: eskabetxe/flume + container_name: flume-source + hostname: 172.25.0.3 + ports: + - "44444:44444" + volumes: + - ./conf/source.conf:/opt/flume-config/flume.conf + environment: + - FLUME_AGENT_NAME=docker + links: + - "sink:sink" + networks: + mynet: + ipv4_address: 172.25.0.3 + + sink: + image: eskabetxe/flume + container_name: flume-sink + hostname: 172.25.0.4 + volumes: + - ./conf/sink.conf:/opt/flume-config/flume.conf + - ./output:/var/tmp/output + environment: + - FLUME_AGENT_NAME=docker + networks: + mynet: + ipv4_address: 172.25.0.4 + +networks: + mynet: + driver: bridge + ipam: + config: + - subnet: 172.25.0.0/24 + IPRange: 172.25.0.2/24, + gateway: 172.25.0.1 \ No newline at end of file diff --git a/flink-connector-flume/pom.xml b/flink-connector-flume/pom.xml index 1f4cf6d9..c202c6d2 100644 --- a/flink-connector-flume/pom.xml +++ b/flink-connector-flume/pom.xml @@ -35,7 +35,7 @@ under the License. - 1.5.0 + 1.8.0 @@ -50,86 +50,23 @@ under the License. org.apache.flume flume-ng-core ${flume-ng.version} - - - org.slf4j - slf4j-log4j12 - - - log4j - log4j - - - commons-io - commons-io - - - commons-codec - commons-codec - - - commons-cli - commons-cli - - - commons-lang - commons-lang - - - org.apache.avro - avro - - - org.codehaus.jackson - jackson-core-asl - - - org.codehaus.jackson - jackson-mapper-asl - - - com.thoughtworks.paranamer - paranamer - - - org.xerial.snappy - snappy-java - - - org.tukaani - xz - - - org.apache.velocity - velocity - - - commons-collections - commons-collections - - - org.mortbay.jetty - servlet-api - - - org.mortbay.jetty - jetty-util - - - org.mortbay.jetty - jetty - - - com.google.code.gson - gson - - - org.apache.thrift - libthrift - - + + org.junit.jupiter + junit-jupiter-api + 5.2.0 + test + + + + org.apache.flink + flink-tests_${scala.binary.version} + ${flink.version} + test-jar + test + + diff --git a/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeRpcClient.java b/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeRpcClient.java new file mode 100644 index 00000000..e918f567 --- /dev/null +++ b/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeRpcClient.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.flume; + +import org.apache.flume.Event; +import org.apache.flume.EventDeliveryException; +import org.apache.flume.FlumeException; +import org.apache.flume.api.RpcClient; +import org.apache.flume.api.RpcClientFactory; +import org.apache.flume.event.EventBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.Charset; + +class FlumeRpcClient implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(FlumeRpcClient.class); + + protected RpcClient client; + private String hostname; + private int port; + + + FlumeRpcClient(String hostname, int port) { + this.hostname = hostname; + this.port = port; + } + + /** + * Initializes the connection to Apache Flume. + */ + public boolean init() { + // Setup the RPC connection + int initCounter = 0; + while (true) { + verifyCounter(initCounter, "Cannot establish connection"); + + try { + this.client = RpcClientFactory.getDefaultInstance(hostname, port); + } catch (FlumeException e) { + // Wait one second if the connection failed before the next + // try + try { + Thread.sleep(1000); + } catch (InterruptedException e1) { + if (LOG.isErrorEnabled()) { + LOG.error("Interrupted while trying to connect {} on {}", hostname, port); + } + } + } + if (client != null) { + break; + } + initCounter++; + } + return client.isActive(); + } + + + public boolean sendData(String data) { + Event event = EventBuilder.withBody(data, Charset.forName("UTF-8")); + return sendData(event); + } + public boolean sendData(byte[] data) { + Event event = EventBuilder.withBody(data); + return sendData(event); + } + + private boolean sendData(Event event) { + return sendData(event, 0); + } + private boolean sendData(Event event, int retryCount) { + verifyCounter(retryCount, "Cannot send message"); + try { + client.append(event); + return true; + } catch (EventDeliveryException e) { + // clean up and recreate the client + reconnect(); + return sendData(event, ++retryCount); + } + } + + + private void verifyCounter(int counter, String messaje) { + if (counter >= 10) { + throw new RuntimeException(messaje + " on " + hostname + " on " + port); + } + } + + private void reconnect() { + close(); + client = null; + init(); + } + + @Override + public void close() { + if (this.client == null) return; + + this.client.close(); + } +} diff --git a/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java b/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java index 41b1b25b..7a80fd22 100644 --- a/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java +++ b/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java @@ -14,32 +14,20 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.flink.streaming.connectors.flume; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; -import org.apache.flume.Event; -import org.apache.flume.EventDeliveryException; -import org.apache.flume.FlumeException; -import org.apache.flume.api.RpcClient; -import org.apache.flume.api.RpcClientFactory; -import org.apache.flume.event.EventBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class FlumeSink extends RichSinkFunction { - private static final long serialVersionUID = 1L; - private static final Logger LOG = LoggerFactory.getLogger(FlumeSink.class); + private transient FlumeRpcClient client; - private transient FlinkRpcClientFacade client; - boolean initDone = false; - String host; - int port; - SerializationSchema schema; + private String host; + private int port; + private SerializationSchema schema; public FlumeSink(String host, int port, SerializationSchema schema) { this.host = host; @@ -57,84 +45,20 @@ public FlumeSink(String host, int port, SerializationSchema schema) { @Override public void invoke(IN value, Context context) throws Exception { byte[] data = schema.serialize(value); - client.sendDataToFlume(data); - + client.sendData(data); } - private class FlinkRpcClientFacade { - private RpcClient client; - private String hostname; - private int port; - - /** - * Initializes the connection to Apache Flume. - * - * @param hostname - * The host - * @param port - * The port. - */ - public void init(String hostname, int port) { - // Setup the RPC connection - this.hostname = hostname; - this.port = port; - int initCounter = 0; - while (true) { - if (initCounter >= 90) { - throw new RuntimeException("Cannot establish connection with" + port + " at " - + host); - } - try { - this.client = RpcClientFactory.getDefaultInstance(hostname, port); - } catch (FlumeException e) { - // Wait one second if the connection failed before the next - // try - try { - Thread.sleep(1000); - } catch (InterruptedException e1) { - if (LOG.isErrorEnabled()) { - LOG.error("Interrupted while trying to connect {} at {}", port, host); - } - } - } - if (client != null) { - break; - } - initCounter++; - } - initDone = true; - } - - /** - * Sends byte arrays as {@link Event} series to Apache Flume. - * - * @param data - * The byte array to send to Apache FLume - */ - public void sendDataToFlume(byte[] data) { - Event event = EventBuilder.withBody(data); - - try { - client.append(event); - - } catch (EventDeliveryException e) { - // clean up and recreate the client - client.close(); - client = null; - client = RpcClientFactory.getDefaultInstance(hostname, port); - } - } - + @Override + public void open(Configuration config) { + client = new FlumeRpcClient(host, port); + client.init(); } @Override public void close() { - client.client.close(); + if (client == null) return; + client.close(); } - @Override - public void open(Configuration config) { - client = new FlinkRpcClientFacade(); - client.init(host, port); - } + } diff --git a/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeRpcClientTest.java b/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeRpcClientTest.java new file mode 100644 index 00000000..7bab6665 --- /dev/null +++ b/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeRpcClientTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.flume; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class FlumeRpcClientTest { + + public FlumeRpcClient createGoodClient() { + return new FlumeRpcClient("172.25.0.3", 44444); + } + + + @Test + public void testInitClientMustFail() { + FlumeRpcClient client = new FlumeRpcClient("172.25.0.3", 44445); + Assertions.assertThrows(RuntimeException.class, () -> client.init(), "client start"); + } + + @Test + public void testSendStringData() { + FlumeRpcClient client = createGoodClient(); + boolean init = client.init(); + Assertions.assertTrue(init, "client not start"); + + boolean send = client.sendData("xpto"); + Assertions.assertTrue(send, "data not send"); + + } + + @Test + public void testSendBytesData() { + FlumeRpcClient client = createGoodClient(); + boolean init = client.init(); + Assertions.assertTrue(init, "client not start"); + + boolean send = client.sendData("xpto".getBytes()); + Assertions.assertTrue(send, "data not send"); + + } + + @Test + public void testSendDataWhenConnectionClosed() { + FlumeRpcClient client = createGoodClient(); + boolean init = client.init(); + Assertions.assertTrue(init, "client not start"); + client.close(); + + boolean send = client.sendData("xpto"); + Assertions.assertTrue(send, "data not send"); + + } +} diff --git a/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeSinkTest.java b/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeSinkTest.java new file mode 100644 index 00000000..9d87642a --- /dev/null +++ b/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeSinkTest.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.flume; + +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.junit.jupiter.api.Test; + +import static org.apache.flink.test.util.TestUtils.tryExecute; + +public class FlumeSinkTest { + + + @Test + public void testSink() throws Exception { + StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); + + environment.fromElements("string1", "string2") + .addSink(new FlumeSink<>("172.25.0.3", 44444, new SimpleStringSchema())); + + tryExecute(environment, "FlumeTest"); + } + + +} diff --git a/flink-connector-flume/src/test/resources/log4j.properties b/flink-connector-flume/src/test/resources/log4j.properties new file mode 100644 index 00000000..15efe08c --- /dev/null +++ b/flink-connector-flume/src/test/resources/log4j.properties @@ -0,0 +1,27 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# This file ensures that tests executed from the IDE show log output + +log4j.rootLogger=WARN, console + +# Log all infos in the given file +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.err +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n diff --git a/pom.xml b/pom.xml index d2d435c4..2ecc7423 100644 --- a/pom.xml +++ b/pom.xml @@ -85,7 +85,7 @@ UTF-8 - 1.7 + 1.8 2.11.8 2.11 @@ -696,10 +696,8 @@ - distribution - distribution @@ -739,7 +737,6 @@ - test-java-home