From 75cfb7c8d63bed8b367fd9aa7d09ccf1b51b4bd5 Mon Sep 17 00:00:00 2001 From: Robert Metzger Date: Thu, 21 Jan 2016 11:42:12 +0100 Subject: [PATCH] [FLINK-3270] Add Kafka example --- .../flink-examples-streaming/pom.xml | 53 +++++++++++++++++++ .../examples/kafka}/ReadFromKafka.java | 7 ++- .../examples/kafka}/WriteIntoKafka.java | 7 ++- 3 files changed, 59 insertions(+), 8 deletions(-) rename {flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/examples => flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka}/ReadFromKafka.java (93%) rename {flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/examples => flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka}/WriteIntoKafka.java (93%) diff --git a/flink-examples/flink-examples-streaming/pom.xml b/flink-examples/flink-examples-streaming/pom.xml index 30f23438b12ffa..ba19886e5062a7 100644 --- a/flink-examples/flink-examples-streaming/pom.xml +++ b/flink-examples/flink-examples-streaming/pom.xml @@ -59,6 +59,12 @@ under the License. ${project.version} + + org.apache.flink + flink-connector-kafka-0.8 + ${project.version} + + org.apache.flink flink-streaming-java @@ -522,6 +528,53 @@ under the License. + + + + org.apache.maven.plugins + maven-shade-plugin + + + fat-jar-kafka-example + package + + shade + + + false + false + false + + + org.apache.flink.streaming.examples.kafka.ReadFromKafka + + + Kafka + + + + + * + + org/apache/flink/streaming/examples/kafka/** + org/apache/flink/streaming/** + org/apache/kafka/** + org/apache/curator/** + org/apache/zookeeper/** + org/apache/jute/** + org/I0Itec/** + jline/** + com/yammer/** + kafka/** + + + + + + + + + diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/examples/ReadFromKafka.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/ReadFromKafka.java similarity index 93% rename from flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/examples/ReadFromKafka.java rename to flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/ReadFromKafka.java index 643da66b112949..40b00efa0406dc 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/examples/ReadFromKafka.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/ReadFromKafka.java @@ -15,12 +15,12 @@ * limitations under the License. */ -package org.apache.flink.streaming.connectors.kafka.examples; +package org.apache.flink.streaming.examples.kafka; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08; import org.apache.flink.streaming.util.serialization.SimpleStringSchema; @@ -39,12 +39,11 @@ public static void main(String[] args) throws Exception { env.getConfig().disableSysoutLogging(); env.setNumberOfExecutionRetries(4); env.enableCheckpointing(5000); - env.setParallelism(2); ParameterTool parameterTool = ParameterTool.fromArgs(args); DataStream messageStream = env - .addSource(new FlinkKafkaConsumer09<>( + .addSource(new FlinkKafkaConsumer08<>( parameterTool.getRequired("topic"), new SimpleStringSchema(), parameterTool.getProperties())); diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/examples/WriteIntoKafka.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/WriteIntoKafka.java similarity index 93% rename from flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/examples/WriteIntoKafka.java rename to flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/WriteIntoKafka.java index fbe53fae16f403..4870152fa7bc12 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/examples/WriteIntoKafka.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/WriteIntoKafka.java @@ -15,13 +15,13 @@ * limitations under the License. */ -package org.apache.flink.streaming.connectors.kafka.examples; +package org.apache.flink.streaming.examples.kafka; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08; import org.apache.flink.streaming.util.serialization.SimpleStringSchema; @@ -39,7 +39,6 @@ public static void main(String[] args) throws Exception { StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().disableSysoutLogging(); env.setNumberOfExecutionRetries(4); - env.setParallelism(2); ParameterTool parameterTool = ParameterTool.fromArgs(args); @@ -63,7 +62,7 @@ public void cancel() { }); // write data into Kafka - messageStream.addSink(new FlinkKafkaProducer09<>(parameterTool.getRequired("topic"), new SimpleStringSchema(), parameterTool.getProperties())); + messageStream.addSink(new FlinkKafkaProducer08<>(parameterTool.getRequired("topic"), new SimpleStringSchema(), parameterTool.getProperties())); env.execute("Write into Kafka example"); }