Skip to content

Commit

Permalink
[FLINK-10791] Provide end-to-end test for Kafka 0.11 connector
Browse files Browse the repository at this point in the history
  • Loading branch information
yanghua committed Nov 7, 2018
1 parent 8106e72 commit deae50b
Show file tree
Hide file tree
Showing 6 changed files with 182 additions and 1 deletion.
14 changes: 14 additions & 0 deletions flink-dist/src/main/assemblies/bin.xml
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,20 @@ under the License.
</excludes>
</fileSet>

<!-- copy jar files of the streaming kafka 0.11 examples -->
<fileSet>
<directory>../flink-examples/flink-examples-streaming-kafka-0.11/target</directory>
<outputDirectory>examples/streaming</outputDirectory>
<fileMode>0644</fileMode>
<includes>
<include>*.jar</include>
</includes>
<excludes>
<exclude>flink-examples-streaming-kafka*.jar</exclude>
<exclude>original-*.jar</exclude>
</excludes>
</fileSet>

<!-- copy jar files of the gelly examples -->
<fileSet>
<directory>../flink-libraries/flink-gelly-examples/target</directory>
Expand Down
3 changes: 2 additions & 1 deletion flink-end-to-end-tests/run-pre-commit-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ run_test "State Evolution end-to-end test" "$END_TO_END_DIR/test-scripts/test_st
run_test "Batch Python Wordcount end-to-end test" "$END_TO_END_DIR/test-scripts/test_batch_python_wordcount.sh"
run_test "Streaming Python Wordcount end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_python_wordcount.sh"
run_test "Wordcount end-to-end test" "$END_TO_END_DIR/test-scripts/test_batch_wordcount.sh"
run_test "Kafka end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_kafka010.sh"
run_test "Kafka 0.10 end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_kafka010.sh"
run_test "Kafka 0.11 end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_kafka011.sh"
run_test "Modern Kafka end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_kafka.sh"
run_test "class loading end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_classloader.sh"
run_test "Shaded Hadoop S3A end-to-end test" "$END_TO_END_DIR/test-scripts/test_shaded_hadoop_s3a.sh"
Expand Down
24 changes: 24 additions & 0 deletions flink-end-to-end-tests/test-scripts/test_streaming_kafka011.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#!/usr/bin/env bash
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

source "$(dirname "$0")"/common.sh
source "$(dirname "$0")"/kafka-common.sh 0.11.0.2 3.2.0 3.2

source "$(dirname "$0")"/test_streaming_kafka_common.sh $FLINK_DIR/examples/streaming/Kafka011Example.jar

72 changes: 72 additions & 0 deletions flink-examples/flink-examples-streaming-kafka-0.11/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>flink-examples</artifactId>
<groupId>org.apache.flink</groupId>
<version>1.8-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>flink-examples-streaming-kafka-0.11</artifactId>
<name>flink-examples-streaming-kafka-0.11</name>
<packaging>jar</packaging>

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-examples-streaming-kafka-base</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>

<build>
<plugins>
<!-- Use the shade plugin to build a fat jar for the kafka connector test -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<id>fat-jar-kafka-example</id>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<shadeTestJar>false</shadeTestJar>
<shadedArtifactAttached>false</shadedArtifactAttached>
<createDependencyReducedPom>false</createDependencyReducedPom>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.apache.flink.streaming.examples.kafka.Kafka011Example</mainClass>
</transformer>
</transformers>
<finalName>Kafka011Example</finalName>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.examples.kafka;

import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import org.apache.flink.streaming.examples.kafka.base.CustomWatermarkExtractor;
import org.apache.flink.streaming.examples.kafka.base.KafkaEvent;
import org.apache.flink.streaming.examples.kafka.base.KafkaEventSchema;
import org.apache.flink.streaming.examples.kafka.base.KafkaExampleUtil;
import org.apache.flink.streaming.examples.kafka.base.RollingAdditionMapper;

/**
* A simple example that shows how to read from and write to Kafka. This will read String messages
* from the input topic, parse them into a POJO type {@link KafkaEvent}, group by some key, and finally
* perform a rolling addition on each key for which the results are written back to another topic.
*
* <p>This example also demonstrates using a watermark assigner to generate per-partition
* watermarks directly in the Flink Kafka consumer. For demonstration purposes, it is assumed that
* the String messages are of formatted as a (word,frequency,timestamp) tuple.
*
* <p>Example usage:
* --input-topic test-input --output-topic test-output --bootstrap.servers localhost:9092 --zookeeper.connect localhost:2181 --group.id myconsumer
*/
public class Kafka011Example {

public static void main(String[] args) throws Exception {
// parse input arguments
final ParameterTool parameterTool = ParameterTool.fromArgs(args);
StreamExecutionEnvironment env = KafkaExampleUtil.prepareExecutionEnv(parameterTool);

DataStream<KafkaEvent> input = env
.addSource(
new FlinkKafkaConsumer011<>(
parameterTool.getRequired("input-topic"),
new KafkaEventSchema(),
parameterTool.getProperties())
.assignTimestampsAndWatermarks(new CustomWatermarkExtractor()))
.keyBy("word")
.map(new RollingAdditionMapper());

input.addSink(
new FlinkKafkaProducer011<>(
parameterTool.getRequired("output-topic"),
new KafkaEventSchema(),
parameterTool.getProperties()));

env.execute("Kafka 0.11 Example");
}

}
1 change: 1 addition & 0 deletions flink-examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ under the License.
<module>flink-examples-streaming-kafka</module>
<module>flink-examples-streaming-kafka-0.10</module>
<module>flink-examples-table</module>
<module>flink-examples-streaming-kafka-0.11</module>
</modules>

<dependencies>
Expand Down

0 comments on commit deae50b

Please sign in to comment.