From f3e6ef67be4e54471c0143d849e7b905f6e036ae Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Mon, 25 Sep 2017 16:16:34 +0200 Subject: [PATCH 1/5] [FLINK-6988] Make TwoPhaseCommitSinkFunction work with Context --- .../functions/sink/TwoPhaseCommitSinkFunction.java | 14 +++++++++++--- .../sink/TwoPhaseCommitSinkFunctionTest.java | 2 +- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java index 60409793ed4d7..2dfa292b76251 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java @@ -107,7 +107,7 @@ protected Optional getUserContext() { /** * Write value within a transaction. */ - protected abstract void invoke(TXN transaction, IN value) throws Exception; + protected abstract void invoke(TXN transaction, IN value, Context context) throws Exception; /** * Method that starts a new transaction. @@ -159,9 +159,17 @@ protected void finishRecoveringContext() { // ------ entry points for above methods implementing {@CheckPointedFunction} and {@CheckpointListener} ------ + + /** + * This should not be implemented by subclasses. + */ + @Override + public final void invoke(IN value) throws Exception {} + @Override - public final void invoke(IN value) throws Exception { - invoke(currentTransaction, value); + public final void invoke( + IN value, Context context) throws Exception { + invoke(currentTransaction, value, context); } @Override diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java index 4715c39fe384f..30435126eb1f8 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java @@ -136,7 +136,7 @@ public FileBasedSinkFunction(File tmpDirectory, File targetDirectory) { } @Override - protected void invoke(FileTransaction transaction, String value) throws Exception { + protected void invoke(FileTransaction transaction, String value, Context context) throws Exception { transaction.writer.write(value); } From 7bc6154ef8bdfe1181e404e1f7801f9fbd93543d Mon Sep 17 00:00:00 2001 From: Piotr Nowojski Date: Wed, 6 Sep 2017 16:42:59 +0200 Subject: [PATCH 2/5] [FLINK-6988] Add Kafka 0.11 connector maven module --- .../flink-connector-kafka-0.11/pom.xml | 213 ++++++++++++++++++ .../src/main/resources/log4j.properties | 28 +++ .../src/test/resources/log4j-test.properties | 30 +++ flink-connectors/pom.xml | 12 + tools/travis_mvn_watchdog.sh | 4 + 5 files changed, 287 insertions(+) create mode 100644 flink-connectors/flink-connector-kafka-0.11/pom.xml create mode 100644 flink-connectors/flink-connector-kafka-0.11/src/main/resources/log4j.properties create mode 100644 flink-connectors/flink-connector-kafka-0.11/src/test/resources/log4j-test.properties diff --git a/flink-connectors/flink-connector-kafka-0.11/pom.xml b/flink-connectors/flink-connector-kafka-0.11/pom.xml new file mode 100644 index 0000000000000..c41f697cad9ec --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.11/pom.xml @@ -0,0 +1,213 @@ + + + + + 4.0.0 + + + org.apache.flink + flink-connectors + 1.4-SNAPSHOT + .. + + + flink-connector-kafka-0.11_${scala.binary.version} + flink-connector-kafka-0.11 + + jar + + + + 0.11.0.0 + + + + + + + + org.apache.flink + flink-connector-kafka-0.10_${scala.binary.version} + ${project.version} + + + org.apache.kafka + kafka_${scala.binary.version} + + + + + + + + org.apache.flink + flink-streaming-java_${scala.binary.version} + ${project.version} + provided + + + + + + org.apache.kafka + kafka-clients + ${kafka.version} + + + + org.apache.flink + flink-table_${scala.binary.version} + ${project.version} + provided + + true + + + + + + org.apache.flink + flink-streaming-java_${scala.binary.version} + ${project.version} + test + test-jar + + + + org.apache.flink + flink-connector-kafka-0.9_${scala.binary.version} + ${project.version} + + + + org.apache.kafka + kafka_${scala.binary.version} + + + test-jar + test + + + + org.apache.flink + flink-connector-kafka-base_${scala.binary.version} + ${project.version} + + + + org.apache.kafka + kafka_${scala.binary.version} + + + test-jar + test + + + + + org.apache.kafka + kafka_${scala.binary.version} + ${kafka.version} + test + + + + org.apache.flink + flink-tests_${scala.binary.version} + ${project.version} + test-jar + test + + + + org.apache.flink + flink-test-utils_${scala.binary.version} + ${project.version} + test + + + + org.apache.flink + flink-runtime_${scala.binary.version} + ${project.version} + test-jar + test + + + + org.apache.flink + flink-metrics-jmx + ${project.version} + test + + + + + + + + org.apache.maven.plugins + maven-jar-plugin + + + + test-jar + + + + **/KafkaTestEnvironmentImpl* + + + + + + + org.apache.maven.plugins + maven-source-plugin + + + attach-test-sources + + test-jar-no-fork + + + + **/KafkaTestEnvironmentImpl* + + + + + + + org.apache.maven.plugins + maven-surefire-plugin + + + 1 + -Xms256m -Xmx2048m -Dlog4j.configuration=${log4j.configuration} -Dmvn.forkNumber=${surefire.forkNumber} -XX:-UseGCOverheadLimit + + + + + + diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/resources/log4j.properties b/flink-connectors/flink-connector-kafka-0.11/src/main/resources/log4j.properties new file mode 100644 index 0000000000000..6eef1747ddfe4 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.11/src/main/resources/log4j.properties @@ -0,0 +1,28 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +log4j.rootLogger=INFO, testlogger + +log4j.appender.testlogger=org.apache.log4j.ConsoleAppender +log4j.appender.testlogger.target = System.err +log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout +log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n + +# suppress the irrelevant (wrong) warnings from the netty channel handler +log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger + diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/resources/log4j-test.properties b/flink-connectors/flink-connector-kafka-0.11/src/test/resources/log4j-test.properties new file mode 100644 index 0000000000000..fbeb110350f5f --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.11/src/test/resources/log4j-test.properties @@ -0,0 +1,30 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +log4j.rootLogger=INFO, testlogger + +log4j.appender.testlogger=org.apache.log4j.ConsoleAppender +log4j.appender.testlogger.target = System.err +log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout +log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n + +# suppress the irrelevant (wrong) warnings from the netty channel handler +log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger +log4j.logger.org.apache.zookeeper=OFF, testlogger +log4j.logger.state.change.logger=OFF, testlogger +log4j.logger.kafka=OFF, testlogger diff --git a/flink-connectors/pom.xml b/flink-connectors/pom.xml index bc3f82f686c44..97c9f206c57c6 100644 --- a/flink-connectors/pom.xml +++ b/flink-connectors/pom.xml @@ -75,6 +75,18 @@ under the License. + + + scala-2.11 + + + !scala-2.10 + + + + flink-connector-kafka-0.11 + +