From b18c66b5f7c59ccd1c3fe0d87cec11df16594e43 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=A1bor=20Hermann?= Date: Mon, 9 Mar 2015 19:03:11 +0100 Subject: [PATCH] [FLINK-1752][streaming] Rework and improve Kafka connectors --- docs/streaming_guide.md | 47 +++-- .../flink-streaming-connectors/pom.xml | 2 +- .../kafka/KafkaProducerExample.java | 2 +- .../kafka/KafkaSimpleConsumerExample.java | 11 +- .../connectors/kafka/api/KafkaSink.java | 52 ++--- .../connectors/kafka/api/KafkaSource.java | 2 + .../kafka/api/simple/KafkaTopicFactory.java | 70 ------- .../kafka/api/simple/KafkaTopicUtils.java | 123 +++++++++++ ...thOffset.java => MessageWithMetadata.java} | 17 +- .../api/simple/PersistentKafkaSource.java | 120 +++++++++-- .../kafka/api/simple/SimpleKafkaSource.java | 25 +-- .../iterator/KafkaConsumerIterator.java | 46 ++++ .../iterator/KafkaIdleConsumerIterator.java | 54 +++++ .../KafkaMultiplePartitionsIterator.java | 100 +++++++++ .../KafkaOnePartitionIterator.java} | 196 +++++++++--------- .../api/simple/offset/BeginningOffset.java | 30 +++ .../api/simple/offset/CurrentOffset.java | 30 +++ .../GivenOffset.java} | 22 +- .../kafka/api/simple/offset/KafkaOffset.java | 50 +++++ 19 files changed, 727 insertions(+), 272 deletions(-) delete mode 100644 flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicFactory.java create mode 100644 flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicUtils.java rename flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/{MessageWithOffset.java => MessageWithMetadata.java} (83%) create mode 100644 flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaConsumerIterator.java create mode 100644 flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaIdleConsumerIterator.java create mode 100644 flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaMultiplePartitionsIterator.java rename flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/{KafkaConsumerIterator.java => iterator/KafkaOnePartitionIterator.java} (62%) create mode 100644 flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/BeginningOffset.java create mode 100644 flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/CurrentOffset.java rename flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/{KafkaDeserializingConsumerIterator.java => offset/GivenOffset.java} (52%) create mode 100644 flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/KafkaOffset.java diff --git a/docs/streaming_guide.md b/docs/streaming_guide.md index 49913fc76c5bb..302a83958b121 100644 --- a/docs/streaming_guide.md +++ b/docs/streaming_guide.md @@ -1166,12 +1166,9 @@ A class providing an interface for receiving data from Kafka. The followings have to be provided for the `KafkaSource(…)` constructor in order: -1. The hostname -2. The group name -3. The topic name -4. The parallelism -5. Deserialisation schema - +1. Zookeeper hostname +2. The topic name +3. Deserialisation schema Example: @@ -1179,26 +1176,52 @@ Example:
{% highlight java %} DataStream stream = env - .addSource(new KafkaSource("localhost:2181", "group", "test", new SimpleStringSchema())) + .addSource(new KafkaSource("localhost:2181", "test", new SimpleStringSchema())) .print(); {% endhighlight %}
{% highlight scala %} stream = env - .addSource(new KafkaSource[String]("localhost:2181", "group", "test", new SimpleStringSchema) + .addSource(new KafkaSource[String]("localhost:2181", "test", new SimpleStringSchema) .print {% endhighlight %}
+#### Persistent Kafka Source +As Kafka persists all their data, a fault tolerant Kafka source can be provided. + +The PersistentKafkaSource can read a topic, and if the job fails for some reason, when restarting the source will continue on reading from where it left off. For example if there are 3 partitions in the topic with offsets 31, 122, 110 read at the time of job failure, then at the time of restart it will continue on reading from those offsets, no matter whether these partitions have new messages. + +The followings have to be provided for the `PersistentKafkaSource(…)` constructor in order: + +1. The topic name +2. The hostname of a Kafka broker +3. Deserialisation schema + +Example: + +
+
+{% highlight java %} +stream.addSink(new PersistentKafkaSource("test", "localhost:9092", new SimpleStringSchema())); +{% endhighlight %} +
+
+{% highlight scala %} +stream.addSink(new PersistentKafkaSource[String]("test", "localhost:9092", new SimpleStringSchema)) +{% endhighlight %} +
+
+ #### Kafka Sink A class providing an interface for sending data to Kafka. -The followings have to be provided for the `KafkaSink()` constructor in order: +The followings have to be provided for the `KafkaSink(…)` constructor in order: 1. The topic name -2. The hostname +2. The hostname of a Kafka broker 3. Serialisation schema Example: @@ -1206,12 +1229,12 @@ Example:
{% highlight java %} -stream.addSink(new KafkaSink("test", "localhost:9092", new SimpleStringSchema())); +stream.addSink(new KafkaSink("test", "localhost:9092", new SimpleStringSchema())); {% endhighlight %}
{% highlight scala %} -stream.addSink(new KafkaSink[String, String]("test", "localhost:9092", new SimpleStringSchema)) +stream.addSink(new KafkaSink[String]("test", "localhost:9092", new SimpleStringSchema)) {% endhighlight %}
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/pom.xml b/flink-staging/flink-streaming/flink-streaming-connectors/pom.xml index 76fef0b17ff5a..1a62234aa4b71 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/pom.xml +++ b/flink-staging/flink-streaming/flink-streaming-connectors/pom.xml @@ -46,7 +46,7 @@ under the License. org.apache.kafka kafka_2.10 - 0.8.2.0 + 0.8.1 com.sun.jmx diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java index a17beb8274aeb..1fe759a7a5472 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java @@ -56,7 +56,7 @@ public void cancel() { }).addSink( - new KafkaSink(topic, host + ":" + port, new JavaDefaultStringSchema()) + new KafkaSink(host + ":" + port, topic, new JavaDefaultStringSchema()) ) .setParallelism(3); diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSimpleConsumerExample.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSimpleConsumerExample.java index b8d4d2cc24807..47c5a33577d53 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSimpleConsumerExample.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSimpleConsumerExample.java @@ -27,8 +27,6 @@ public class KafkaSimpleConsumerExample { private static String host; private static int port; private static String topic; - private static int partition; - private static long offset; public static void main(String[] args) throws Exception { @@ -37,9 +35,8 @@ public static void main(String[] args) throws Exception { } StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment().setDegreeOfParallelism(4); - DataStream kafkaStream = env - .addSource(new PersistentKafkaSource(topic, host, port, partition, offset, new JavaDefaultStringSchema())); + .addSource(new PersistentKafkaSource(host + ":" + port, topic, new JavaDefaultStringSchema())); kafkaStream.print(); @@ -47,15 +44,13 @@ public static void main(String[] args) throws Exception { } private static boolean parseParameters(String[] args) { - if (args.length == 4) { + if (args.length == 3) { host = args[0]; port = Integer.parseInt(args[1]); topic = args[2]; - partition = Integer.parseInt(args[3]); - offset = Long.parseLong(args[4]); return true; } else { - System.err.println("Usage: KafkaConsumerExample "); + System.err.println("Usage: KafkaConsumerExample "); return false; } } diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java index d14772bb438b0..0b30f6e015b50 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java @@ -19,11 +19,6 @@ import java.util.Properties; -import kafka.javaapi.producer.Producer; -import kafka.producer.KeyedMessage; -import kafka.producer.ProducerConfig; -import kafka.serializer.DefaultEncoder; - import org.apache.flink.streaming.api.function.sink.RichSinkFunction; import org.apache.flink.streaming.connectors.kafka.config.EncoderWrapper; import org.apache.flink.streaming.connectors.kafka.config.PartitionerWrapper; @@ -31,11 +26,16 @@ import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; import org.apache.flink.streaming.connectors.util.SerializationSchema; +import kafka.javaapi.producer.Producer; +import kafka.producer.KeyedMessage; +import kafka.producer.ProducerConfig; +import kafka.serializer.DefaultEncoder; + /** * Sink that emits its inputs to a Kafka topic. - * + * * @param - * Type of the sink input + * Type of the sink input */ public class KafkaSink extends RichSinkFunction { private static final long serialVersionUID = 1L; @@ -51,33 +51,33 @@ public class KafkaSink extends RichSinkFunction { /** * Creates a KafkaSink for a given topic. The partitioner distributes the * messages between the partitions of the topics. - * - * @param topicId - * ID of the Kafka topic. + * * @param brokerAddr - * Address of the Kafka broker (with port number). + * Address of the Kafka broker (with port number). + * @param topicId + * ID of the Kafka topic. * @param serializationSchema - * User defined serialization schema. + * User defined serialization schema. */ - public KafkaSink(String topicId, String brokerAddr, + public KafkaSink(String brokerAddr, String topicId, SerializationSchema serializationSchema) { - this(topicId, brokerAddr, serializationSchema, new KafkaDistributePartitioner()); + this(brokerAddr, topicId, serializationSchema, new KafkaDistributePartitioner()); } /** * Creates a KafkaSink for a given topic. The sink produces its input into * the topic. - * - * @param topicId - * ID of the Kafka topic. + * * @param brokerAddr - * Address of the Kafka broker (with port number). + * Address of the Kafka broker (with port number). + * @param topicId + * ID of the Kafka topic. * @param serializationSchema - * User defined serialization schema. + * User defined serialization schema. * @param partitioner - * User defined partitioner. + * User defined partitioner. */ - public KafkaSink(String topicId, String brokerAddr, + public KafkaSink(String brokerAddr, String topicId, SerializationSchema serializationSchema, KafkaPartitioner partitioner) { this.topicId = topicId; this.brokerAddr = brokerAddr; @@ -107,15 +107,19 @@ public void initialize() { ProducerConfig config = new ProducerConfig(props); - producer = new Producer(config); + try { + producer = new Producer(config); + } catch (NullPointerException e) { + throw new RuntimeException("Cannot connect to Kafka broker " + brokerAddr); + } initDone = true; } /** * Called when new data arrives to the sink, and forwards it to Kafka. - * + * * @param next - * The incoming data + * The incoming data */ @Override public void invoke(IN next) { diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java index 1baaba70ef047..5fae3b6ca33a1 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java @@ -106,6 +106,8 @@ private void initializeConnection() { List> streams = consumerMap.get(topicId); KafkaStream stream = streams.get(0); + consumer.commitOffsets(); + consumerIterator = stream.iterator(); } diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicFactory.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicFactory.java deleted file mode 100644 index 9e6dea7fce079..0000000000000 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicFactory.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.api.simple; - -import java.io.UnsupportedEncodingException; -import java.util.Properties; - -import org.I0Itec.zkclient.ZkClient; -import org.I0Itec.zkclient.exception.ZkMarshallingError; -import org.I0Itec.zkclient.serialize.ZkSerializer; - -import kafka.admin.AdminUtils; - -/** - * Factory for creating custom Kafka partitions. - */ -public class KafkaTopicFactory { - - public static void createTopic(String zookeeperServer, String topicName, int numOfPartitions, int replicationFactor) { - createTopic(zookeeperServer, topicName, numOfPartitions, replicationFactor, new Properties(), 10000, 10000); - } - - public static void createTopic(String zookeeperServer, String topicName, int numOfPartitions, int replicationFactor, Properties topicProperties, int sessionTimeoutMs, int connectionTimeoutMs) { - ZkClient zkClient = new ZkClient(zookeeperServer, sessionTimeoutMs, connectionTimeoutMs, - new KafkaZKStringSerializer()); - - Properties topicConfig = new Properties(); - AdminUtils.createTopic(zkClient, topicName, numOfPartitions, replicationFactor, topicConfig); - } - - private static class KafkaZKStringSerializer implements ZkSerializer { - - @Override - public byte[] serialize(Object data) throws ZkMarshallingError { - try { - return ((String) data).getBytes("UTF-8"); - } catch (UnsupportedEncodingException e) { - throw new RuntimeException(e); - } - } - - @Override - public Object deserialize(byte[] bytes) throws ZkMarshallingError { - if (bytes == null) { - return null; - } else { - try { - return new String(bytes, "UTF-8"); - } catch (UnsupportedEncodingException e) { - throw new RuntimeException(e); - } - } - } - } -} diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicUtils.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicUtils.java new file mode 100644 index 0000000000000..616b2790e74bf --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicUtils.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.api.simple; + +import java.io.UnsupportedEncodingException; +import java.util.Collection; +import java.util.Properties; + +import org.I0Itec.zkclient.ZkClient; +import org.I0Itec.zkclient.exception.ZkMarshallingError; +import org.I0Itec.zkclient.serialize.ZkSerializer; + +import kafka.admin.AdminUtils; +import kafka.api.PartitionMetadata; +import kafka.api.TopicMetadata; +import kafka.cluster.Broker; +import scala.collection.JavaConversions; +import scala.collection.Seq; + +/** + * For retrieving Kafka topic information (e.g. number of partitions), + * or creating a topic. + */ +public class KafkaTopicUtils { + + public static void main(String[] args) { + KafkaTopicUtils kafkaTopicUtils = new KafkaTopicUtils("localhost:2181", 5000, 5000); +// TopicMetadata para4 = kafkaTopicUtils.getTopicInfo("para4"); +// PartitionMetadata next = JavaConversions.asJavaCollection(para4.partitionsMetadata()).iterator().next(); +// next. + System.out.println(kafkaTopicUtils.getLeaderBrokerAddressForTopic("para4")); + } + + private final ZkClient zkClient; + + public KafkaTopicUtils(String zookeeperServer) { + this(zookeeperServer, 10000, 10000); + } + + public KafkaTopicUtils(String zookeeperServer, int sessionTimeoutMs, int connectionTimeoutMs) { + zkClient = new ZkClient(zookeeperServer, sessionTimeoutMs, connectionTimeoutMs, + new KafkaZKStringSerializer()); + } + + public void createTopic(String topicName, int numOfPartitions, int replicationFactor) { + createTopic(topicName, numOfPartitions, replicationFactor, new Properties()); + } + + public void createTopic(String topicName, int numOfPartitions, int replicationFactor, Properties topicProperties) { + Properties topicConfig = new Properties(); + AdminUtils.createTopic(zkClient, topicName, numOfPartitions, replicationFactor, topicConfig); + } + + public int getNumberOfPartitions(String topicName) { + Seq partitionMetadataSeq = getTopicInfo(topicName).partitionsMetadata(); + return JavaConversions.asJavaCollection(partitionMetadataSeq).size(); + } + + public String getLeaderBrokerAddressForTopic(String topicName) { + TopicMetadata topicInfo = getTopicInfo(topicName); + + Collection partitions = JavaConversions.asJavaCollection(topicInfo.partitionsMetadata()); + PartitionMetadata partitionMetadata = partitions.iterator().next(); + + Broker leader = JavaConversions.asJavaCollection(partitionMetadata.isr()).iterator().next(); + + // TODO for Kafka version 8.2.0 + // return leader.connectionString(); + return leader.getConnectionString(); + } + + public TopicMetadata getTopicInfo(String topicName) { + if (topicExists(topicName)) { + return AdminUtils.fetchTopicMetadataFromZk(topicName, zkClient); + } else { + throw new RuntimeException("Topic does not exist: " + topicName); + } + } + + public boolean topicExists(String topicName) { + return AdminUtils.topicExists(zkClient, topicName); + } + + private static class KafkaZKStringSerializer implements ZkSerializer { + + @Override + public byte[] serialize(Object data) throws ZkMarshallingError { + try { + return ((String) data).getBytes("UTF-8"); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException(e); + } + } + + @Override + public Object deserialize(byte[] bytes) throws ZkMarshallingError { + if (bytes == null) { + return null; + } else { + try { + return new String(bytes, "UTF-8"); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException(e); + } + } + } + } +} diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/MessageWithOffset.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/MessageWithMetadata.java similarity index 83% rename from flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/MessageWithOffset.java rename to flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/MessageWithMetadata.java index c5b8e3290e4b5..3985350c589dc 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/MessageWithOffset.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/MessageWithMetadata.java @@ -18,13 +18,16 @@ package org.apache.flink.streaming.connectors.kafka.api.simple; /** - * POJO encapsulating records received from Kafka with their offset. + * POJO encapsulating records received from Kafka with their offset and partition. */ -public class MessageWithOffset { +public class MessageWithMetadata { + + private int partition; private long offset; private byte[] message; - public MessageWithOffset(long offset, byte[] message) { + public MessageWithMetadata(long offset, byte[] message, int partition) { + this.partition = partition; this.offset = offset; this.message = message; } @@ -33,15 +36,11 @@ public long getOffset() { return offset; } - public void setOffset(long offset) { - this.offset = offset; - } - public byte[] getMessage() { return message; } - public void setMessage(byte[] message) { - this.message = message; + public int getPartition() { + return partition; } } \ No newline at end of file diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java index 0f980e8592d3a..a841e11cda093 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java @@ -17,11 +17,22 @@ package org.apache.flink.streaming.connectors.kafka.api.simple; +import java.util.HashMap; +import java.util.Map; + import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.OperatorState; import org.apache.flink.streaming.api.streamvertex.StreamingRuntimeContext; +import org.apache.flink.streaming.connectors.kafka.api.simple.iterator.KafkaConsumerIterator; +import org.apache.flink.streaming.connectors.kafka.api.simple.iterator.KafkaIdleConsumerIterator; +import org.apache.flink.streaming.connectors.kafka.api.simple.iterator.KafkaMultiplePartitionsIterator; +import org.apache.flink.streaming.connectors.kafka.api.simple.offset.CurrentOffset; +import org.apache.flink.streaming.connectors.kafka.api.simple.offset.GivenOffset; +import org.apache.flink.streaming.connectors.kafka.api.simple.offset.KafkaOffset; import org.apache.flink.streaming.connectors.util.DeserializationSchema; import org.apache.flink.util.Collector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Kafka source persisting its offset through the {@link OperatorState} interface. @@ -29,51 +40,124 @@ * by the whole execution graph. * * @param - * Type of the messages on the topic. + * Type of the messages on the topic. */ public class PersistentKafkaSource extends SimpleKafkaSource { private static final long serialVersionUID = 1L; - private long initialOffset; + private static final Logger LOG = LoggerFactory.getLogger(PersistentKafkaSource.class); + + protected transient OperatorState> kafkaOffSet; + protected transient Map partitions; + + private int partition; - private transient OperatorState kafkaOffSet; + private int zookeeperSyncTimeMillis; + private int waitOnEmptyFetchMillis; - public PersistentKafkaSource(String topicId, String host, int port, int partition, long initialOffset, + /** + * Creates a persistent Kafka source that consumes a topic. + * + * @param zookeeperHost + * Address of the Zookeeper host (with port number). + * @param topicId + * ID of the Kafka topic. + * @param deserializationSchema + * User defined deserialization schema. + */ + public PersistentKafkaSource(String zookeeperHost, String topicId, DeserializationSchema deserializationSchema) { - super(topicId, host, port, partition, deserializationSchema); - this.initialOffset = initialOffset; + this(zookeeperHost, topicId, deserializationSchema, 5000, 500); + } + + /** + * Creates a persistent Kafka source that consumes a topic. + * + * @param zookeeperHost + * Address of the Zookeeper host (with port number). + * @param topicId + * ID of the Kafka topic. + * @param deserializationSchema + * User defined deserialization schema. + * @param zookeeperSyncTimeMillis + * Synchronization time with zookeeper. + * @param waitOnEmptyFetchMillis + * Time to wait before fetching for new message. + */ + public PersistentKafkaSource(String zookeeperHost, String topicId, + DeserializationSchema deserializationSchema, int zookeeperSyncTimeMillis, int waitOnEmptyFetchMillis) { + super(topicId, zookeeperHost, deserializationSchema); + this.zookeeperSyncTimeMillis = zookeeperSyncTimeMillis; + this.waitOnEmptyFetchMillis = waitOnEmptyFetchMillis; } @SuppressWarnings("unchecked") @Override public void open(Configuration parameters) throws InterruptedException { StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext(); - - if (context.containsState("kafka")) { - kafkaOffSet = (OperatorState) context.getState("kafka"); + int indexOfSubtask = context.getIndexOfThisSubtask(); + int numberOfSubtasks = context.getNumberOfParallelSubtasks(); + + KafkaTopicUtils kafkaTopicUtils = + new KafkaTopicUtils(zookeeperServerAddress, zookeeperSyncTimeMillis, zookeeperSyncTimeMillis); + + int numberOfPartitions = kafkaTopicUtils.getNumberOfPartitions(topicId); + + String brokerAddress = kafkaTopicUtils.getLeaderBrokerAddressForTopic(topicId); + + if (indexOfSubtask >= numberOfPartitions) { + iterator = new KafkaIdleConsumerIterator(); } else { - kafkaOffSet = new OperatorState(initialOffset); - context.registerState("kafka", kafkaOffSet); + if (context.containsState("kafka")) { + kafkaOffSet = (OperatorState>) context.getState("kafka"); + + partitions = kafkaOffSet.getState(); + } else { + partitions = new HashMap(); + + partition = indexOfSubtask; + + for (int partitionIndex = indexOfSubtask; partitionIndex < numberOfPartitions; partitionIndex += numberOfSubtasks) { + partitions.put(partitionIndex, new CurrentOffset()); + } + + kafkaOffSet = new OperatorState>(partitions); + + context.registerState("kafka", kafkaOffSet); + } + + iterator = getMultiKafkaIterator(brokerAddress, topicId, partitions, waitOnEmptyFetchMillis); + + if (LOG.isInfoEnabled()) { + LOG.info("KafkaSource ({}/{}) listening to partitions {} of topic {}.", + indexOfSubtask + 1, numberOfSubtasks, partitions.keySet(), topicId); + } } - super.open(parameters); + iterator.initialize(); } - @Override - protected void setInitialOffset(Configuration config) throws InterruptedException{ - iterator.initializeFromOffset(kafkaOffSet.getState()); + protected KafkaConsumerIterator getMultiKafkaIterator(String hostName, String topic, Map partitionsWithOffset, int waitOnEmptyFetch) { + return new KafkaMultiplePartitionsIterator(hostName, topic, partitionsWithOffset, waitOnEmptyFetch); } - @Override public void run(Collector collector) throws Exception { - MessageWithOffset msg; + MessageWithMetadata msg; while (iterator.hasNext()) { msg = iterator.nextWithOffset(); OUT out = schema.deserialize(msg.getMessage()); + + if (schema.isEndOfStream(out)) { + break; + } + collector.collect(out); - kafkaOffSet.update(msg.getOffset()); + + // TODO avoid object creation + partitions.put(msg.getPartition(), new GivenOffset(msg.getOffset())); + kafkaOffSet.update(partitions); } } } diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/SimpleKafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/SimpleKafkaSource.java index 61fd173b9dc13..f972e51a9fdb3 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/SimpleKafkaSource.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/SimpleKafkaSource.java @@ -20,6 +20,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.connectors.ConnectorSource; +import org.apache.flink.streaming.connectors.kafka.api.simple.iterator.KafkaConsumerIterator; import org.apache.flink.streaming.connectors.util.DeserializationSchema; import org.apache.flink.util.Collector; @@ -32,33 +33,25 @@ public class SimpleKafkaSource extends ConnectorSource { private static final long serialVersionUID = 1L; - private String topicId; - private final String hostName; - private final int port; - private final int partition; + protected String topicId; + protected final String zookeeperServerAddress; protected KafkaConsumerIterator iterator; - public SimpleKafkaSource(String topic, String hostName, int port, int partition, - DeserializationSchema deserializationSchema) { + public SimpleKafkaSource(String topic, String zookeeperServerAddress, + DeserializationSchema deserializationSchema) { super(deserializationSchema); this.topicId = topic; - this.hostName = hostName; - this.port = port; - this.partition = partition; - } - - private void initializeConnection() { - iterator = new KafkaConsumerIterator(hostName, port, topicId, partition); + this.zookeeperServerAddress = zookeeperServerAddress; } protected void setInitialOffset(Configuration config) throws InterruptedException { - iterator.initializeFromCurrent(); + iterator.initialize(); } @Override public void run(Collector collector) throws Exception { while (iterator.hasNext()) { - MessageWithOffset msg = iterator.nextWithOffset(); + MessageWithMetadata msg = iterator.nextWithOffset(); OUT out = schema.deserialize(msg.getMessage()); collector.collect(out); } @@ -71,7 +64,7 @@ public void cancel() { @Override public void open(Configuration config) throws InterruptedException { - initializeConnection(); +// initializeConnection(); setInitialOffset(config); } } diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaConsumerIterator.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaConsumerIterator.java new file mode 100644 index 0000000000000..ef87b3427df53 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaConsumerIterator.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.api.simple.iterator; + +import org.apache.flink.streaming.connectors.kafka.api.simple.MessageWithMetadata; + +/** + * Iterator interface for different types of Kafka consumers. + */ +public interface KafkaConsumerIterator { + + public void initialize() throws InterruptedException; + + public boolean hasNext(); + + /** + * Returns the next message received from Kafka as a + * byte array. + * + * @return next message as a byte array. + */ + public byte[] next() throws InterruptedException; + + /** + * Returns the next message and its offset received from + * Kafka encapsulated in a POJO. + * + * @return next message and its offset. + */ + public MessageWithMetadata nextWithOffset() throws InterruptedException; +} diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaIdleConsumerIterator.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaIdleConsumerIterator.java new file mode 100644 index 0000000000000..af16ab5e64ad8 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaIdleConsumerIterator.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.api.simple.iterator; + +import org.apache.flink.streaming.connectors.kafka.api.simple.MessageWithMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KafkaIdleConsumerIterator implements KafkaConsumerIterator { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaIdleConsumerIterator.class); + + public KafkaIdleConsumerIterator() { + if (LOG.isWarnEnabled()) { + LOG.warn("Idle Kafka consumer created. The subtask does nothing."); + } + } + + + @Override + public void initialize() throws InterruptedException { + + } + + @Override + public boolean hasNext() { + return false; + } + + @Override + public byte[] next() { + throw new RuntimeException("Idle consumer has no input."); + } + + @Override + public MessageWithMetadata nextWithOffset() { + throw new RuntimeException("Idle consumer has no input."); + } +} diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaMultiplePartitionsIterator.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaMultiplePartitionsIterator.java new file mode 100644 index 0000000000000..b1c258b18a2a0 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaMultiplePartitionsIterator.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.api.simple.iterator; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.flink.streaming.connectors.kafka.api.simple.MessageWithMetadata; +import org.apache.flink.streaming.connectors.kafka.api.simple.offset.KafkaOffset; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KafkaMultiplePartitionsIterator implements KafkaConsumerIterator { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaMultiplePartitionsIterator.class); + + protected List partitions; + protected final int waitOnEmptyFetch; + + public KafkaMultiplePartitionsIterator(String hostName, String topic, Map partitionsWithOffset, int waitOnEmptyFetch) { + partitions = new ArrayList(partitionsWithOffset.size()); + + String[] hostAndPort = hostName.split(":"); + + String host = hostAndPort[0]; + int port = Integer.parseInt(hostAndPort[1]); + + this.waitOnEmptyFetch = waitOnEmptyFetch; + + for (Map.Entry partitionWithOffset : partitionsWithOffset.entrySet()) { + partitions.add(new KafkaOnePartitionIterator( + host, + port, + topic, + partitionWithOffset.getKey(), + partitionWithOffset.getValue())); + } + } + + @Override + public void initialize() throws InterruptedException { + for (KafkaOnePartitionIterator partition : partitions) { + partition.initialize(); + } + } + + @Override + public boolean hasNext() { + return true; + } + + @Override + public byte[] next() throws InterruptedException { + return nextWithOffset().getMessage(); + } + + protected int lastCheckedPartitionIndex = -1; + + @Override + public MessageWithMetadata nextWithOffset() throws InterruptedException { + KafkaOnePartitionIterator partition; + + while (true) { + for (int i = nextPartition(lastCheckedPartitionIndex); i < partitions.size(); i = nextPartition(i)) { + partition = partitions.get(i); + + if (partition.fetchHasNext()) { + lastCheckedPartitionIndex = i; + return partition.nextWithOffset(); + } + } + + try { + Thread.sleep(waitOnEmptyFetch); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + protected int nextPartition(int currentPartition) { + return (currentPartition + 1) % partitions.size(); + } +} diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaConsumerIterator.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaOnePartitionIterator.java similarity index 62% rename from flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaConsumerIterator.java rename to flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaOnePartitionIterator.java index 370b3f07175f8..ee6210a083a7e 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaConsumerIterator.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaOnePartitionIterator.java @@ -15,23 +15,25 @@ * limitations under the License. */ -package org.apache.flink.streaming.connectors.kafka.api.simple; +package org.apache.flink.streaming.connectors.kafka.api.simple.iterator; import java.io.Serializable; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.Iterator; import java.util.List; -import java.util.Map; + +import org.apache.flink.streaming.connectors.kafka.api.simple.MessageWithMetadata; +import org.apache.flink.streaming.connectors.kafka.api.simple.offset.CurrentOffset; +import org.apache.flink.streaming.connectors.kafka.api.simple.offset.KafkaOffset; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import kafka.api.FetchRequest; import kafka.api.FetchRequestBuilder; -import kafka.api.PartitionOffsetRequestInfo; -import kafka.common.TopicAndPartition; +import kafka.common.ErrorMapping; import kafka.javaapi.FetchResponse; -import kafka.javaapi.OffsetResponse; import kafka.javaapi.PartitionMetadata; import kafka.javaapi.TopicMetadata; import kafka.javaapi.TopicMetadataRequest; @@ -41,22 +43,25 @@ /** * Iterates the records received from a partition of a Kafka topic as byte arrays. */ -public class KafkaConsumerIterator implements Serializable { +public class KafkaOnePartitionIterator implements KafkaConsumerIterator, Serializable { private static final long serialVersionUID = 1L; - private static final long DEFAULT_WAIT_ON_EMPTY_FETCH = 1000L; + private static final Logger LOG = LoggerFactory.getLogger(KafkaOnePartitionIterator.class); + + private static final long DEFAULT_WAIT_ON_EMPTY_FETCH = 10000L; private List hosts; private String topic; private int port; private int partition; private long readOffset; - private long waitOnEmptyFetch; private transient SimpleConsumer consumer; private List replicaBrokers; private String clientName; + private String leadBroker; + private KafkaOffset initialOffset; private transient Iterator iter; private transient FetchResponse fetchResponse; @@ -68,33 +73,18 @@ public class KafkaConsumerIterator implements Serializable { * @param port Port of the known Kafka broker * @param topic Name of the topic to listen to * @param partition Partition in the chosen topic - * @param waitOnEmptyFetch wait time on empty fetch in millis */ - public KafkaConsumerIterator(String hostName, int port, String topic, int partition, - long waitOnEmptyFetch) { - + public KafkaOnePartitionIterator(String hostName, int port, String topic, int partition, KafkaOffset initialOffset) { this.hosts = new ArrayList(); hosts.add(hostName); this.port = port; this.topic = topic; this.partition = partition; - this.waitOnEmptyFetch = waitOnEmptyFetch; - replicaBrokers = new ArrayList(); - } + this.initialOffset = initialOffset; - /** - * Constructor without configurable wait time on empty fetch. For connecting to the Kafka service - * we use the so called simple or low level Kafka API thus directly connecting to one of the brokers. - * - * @param hostName Hostname of a known Kafka broker - * @param port Port of the known Kafka broker - * @param topic Name of the topic to listen to - * @param partition Partition in the chosen topic - */ - public KafkaConsumerIterator(String hostName, int port, String topic, int partition){ - this(hostName, port, topic, partition, DEFAULT_WAIT_ON_EMPTY_FETCH); + replicaBrokers = new ArrayList(); } // -------------------------------------------------------------------------------------------- @@ -105,16 +95,14 @@ public KafkaConsumerIterator(String hostName, int port, String topic, int partit * Initializes the connection by detecting the leading broker of * the topic and establishing a connection to it. */ - private void initialize() throws InterruptedException { + public void initialize() throws InterruptedException { PartitionMetadata metadata; do { metadata = findLeader(hosts, port, topic, partition); - if (metadata == null) { - try { - Thread.sleep(waitOnEmptyFetch); - } catch (InterruptedException e) { - throw new InterruptedException("Establishing connection to Kafka failed"); - } + try { + Thread.sleep(DEFAULT_WAIT_ON_EMPTY_FETCH); + } catch (InterruptedException e) { + throw new InterruptedException("Establishing connection to Kafka failed"); } } while (metadata == null); @@ -123,46 +111,26 @@ private void initialize() throws InterruptedException { + ":" + port); } - String leadBroker = metadata.leader().host(); + leadBroker = metadata.leader().host(); clientName = "Client_" + topic + "_" + partition; consumer = new SimpleConsumer(leadBroker, port, 100000, 64 * 1024, clientName); - } - /** - * Initializes a connection from the earliest available offset. - */ - public void initializeFromBeginning() throws InterruptedException { - initialize(); - readOffset = getLastOffset(consumer, topic, partition, - kafka.api.OffsetRequest.EarliestTime(), clientName); + readOffset = initialOffset.getOffset(consumer, topic, partition, clientName); resetFetchResponse(readOffset); } /** - * Initializes a connection from the latest available offset. - */ - public void initializeFromCurrent() throws InterruptedException { - initialize(); - readOffset = getLastOffset(consumer, topic, partition, - kafka.api.OffsetRequest.LatestTime(), clientName); - - resetFetchResponse(readOffset); - } - - /** - * Initializes a connection from the specified offset. + * Sets the partition to read from. * - * @param offset Desired Kafka offset + * @param partition + * partition number */ - public void initializeFromOffset(long offset) throws InterruptedException { - initialize(); - readOffset = offset; - resetFetchResponse(readOffset); + public void setPartition(int partition) { + this.partition = partition; } - // -------------------------------------------------------------------------------------------- // Iterator methods // -------------------------------------------------------------------------------------------- @@ -186,22 +154,29 @@ public byte[] next() throws InterruptedException { return nextWithOffset().getMessage(); } + public boolean fetchHasNext() throws InterruptedException { + synchronized (fetchResponse) { + if (!iter.hasNext()) { + resetFetchResponse(readOffset); + return iter.hasNext(); + } else { + return true; + } + } + } + /** * Returns the next message and its offset received from * Kafka encapsulated in a POJO. * * @return next message and its offset. */ - public MessageWithOffset nextWithOffset() throws InterruptedException { + public MessageWithMetadata nextWithOffset() throws InterruptedException { synchronized (fetchResponse) { - while (!iter.hasNext()) { - resetFetchResponse(readOffset); - try { - Thread.sleep(waitOnEmptyFetch); - } catch (InterruptedException e) { - throw new InterruptedException("Fetching from Kafka was interrupted"); - } + if (!iter.hasNext()) { + throw new RuntimeException( + "Trying to read when response is not fetched. Call fetchHasNext() first."); } MessageAndOffset messageAndOffset = iter.next(); @@ -218,19 +193,7 @@ public MessageWithOffset nextWithOffset() throws InterruptedException { byte[] bytes = new byte[payload.limit()]; payload.get(bytes); - return new MessageWithOffset(messageAndOffset.offset(), bytes); - } - } - - /** - * Resets the iterator to a given offset. - * - * @param offset Desired Kafka offset. - */ - public void reset(long offset) { - synchronized (fetchResponse) { - readOffset = offset; - resetFetchResponse(offset); + return new MessageWithMetadata(messageAndOffset.offset(), bytes, partition); } } @@ -238,12 +201,30 @@ public void reset(long offset) { // Internal utilities // -------------------------------------------------------------------------------------------- - private void resetFetchResponse(long offset) { + private void resetFetchResponse(long offset) throws InterruptedException { FetchRequest req = new FetchRequestBuilder().clientId(clientName) .addFetch(topic, partition, offset, 100000).build(); + fetchResponse = consumer.fetch(req); - //TODO deal with broker failures + if (fetchResponse.hasError()) { + short code = fetchResponse.errorCode(topic, partition); + + if (LOG.isErrorEnabled()) { + LOG.error("Error fetching data from the Broker:" + leadBroker + " Reason: " + code); + } + + if (code == ErrorMapping.OffsetOutOfRangeCode()) { + if (LOG.isErrorEnabled()) { + LOG.error("Asked for invalid offset {}, setting the offset to the latest.", offset); + } + + readOffset = new CurrentOffset().getOffset(consumer, topic, partition, clientName); + } + consumer.close(); + consumer = null; + leadBroker = findNewLeader(leadBroker, topic, partition, port); + } iter = fetchResponse.messageSet(topic, partition).iterator(); } @@ -251,7 +232,8 @@ private void resetFetchResponse(long offset) { private PartitionMetadata findLeader(List a_hosts, int a_port, String a_topic, int a_partition) { PartitionMetadata returnMetaData = null; - loop: for (String seed : a_hosts) { + loop: + for (String seed : a_hosts) { SimpleConsumer consumer = null; try { consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup"); @@ -270,8 +252,7 @@ private PartitionMetadata findLeader(List a_hosts, int a_port, String a_ } } catch (Exception e) { throw new RuntimeException("Error communicating with Broker [" + seed - + "] to find Leader for [" + a_topic + ", " + a_partition + "] Reason: " - + e); + + "] to find Leader for [" + a_topic + ", " + a_partition + "]", e); } finally { if (consumer != null) { consumer.close(); @@ -287,20 +268,33 @@ private PartitionMetadata findLeader(List a_hosts, int a_port, String a_ return returnMetaData; } - private static long getLastOffset(SimpleConsumer consumer, String topic, int partition, - long whichTime, String clientName) { - TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition); - Map requestInfo = new HashMap(); - requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1)); - kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, - kafka.api.OffsetRequest.CurrentVersion(), clientName); - OffsetResponse response = consumer.getOffsetsBefore(request); - - if (response.hasError()) { - throw new RuntimeException("Error fetching data from Kafka broker. Reason: " - + response.errorCode(topic, partition)); + private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws InterruptedException { + for (int i = 0; i < 3; i++) { + if (LOG.isInfoEnabled()) { + LOG.info("Trying to find a new leader after Broker failure."); + } + boolean goToSleep = false; + PartitionMetadata metadata = findLeader(replicaBrokers, a_port, a_topic, a_partition); + if (metadata == null) { + goToSleep = true; + } else if (metadata.leader() == null) { + goToSleep = true; + } else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) { + // first time through if the leader hasn't changed give ZooKeeper a second to recover + // second time, assume the broker did recover before failover, or it was a non-Broker issue + // + goToSleep = true; + } else { + return metadata.leader().host(); + } + if (goToSleep) { + try { + Thread.sleep(10000); + } catch (InterruptedException ie) { + } + } } - long[] offsets = response.offsets(topic, partition); - return offsets[0]; + throw new InterruptedException("Unable to find new leader after Broker failure."); } + } diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/BeginningOffset.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/BeginningOffset.java new file mode 100644 index 0000000000000..f7096ad80b24b --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/BeginningOffset.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.api.simple.offset; + +import kafka.api.OffsetRequest; +import kafka.javaapi.consumer.SimpleConsumer; + +public class BeginningOffset extends KafkaOffset { + + @Override + public long getOffset(SimpleConsumer consumer, String topic, int partition, String clientName) { + return getLastOffset(consumer, topic, partition, OffsetRequest.EarliestTime(), clientName); + } + +} diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/CurrentOffset.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/CurrentOffset.java new file mode 100644 index 0000000000000..3555ff9c83744 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/CurrentOffset.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.api.simple.offset; + +import kafka.api.OffsetRequest; +import kafka.javaapi.consumer.SimpleConsumer; + +public class CurrentOffset extends KafkaOffset { + + @Override + public long getOffset(SimpleConsumer consumer, String topic, int partition, String clientName) { + return getLastOffset(consumer, topic, partition, OffsetRequest.LatestTime(), clientName); + } + +} diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaDeserializingConsumerIterator.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/GivenOffset.java similarity index 52% rename from flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaDeserializingConsumerIterator.java rename to flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/GivenOffset.java index f2af6ca703342..1282125aff6e2 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaDeserializingConsumerIterator.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/GivenOffset.java @@ -15,23 +15,21 @@ * limitations under the License. */ -package org.apache.flink.streaming.connectors.kafka.api.simple; +package org.apache.flink.streaming.connectors.kafka.api.simple.offset; -import org.apache.flink.streaming.connectors.util.DeserializationSchema; +import kafka.javaapi.consumer.SimpleConsumer; -public class KafkaDeserializingConsumerIterator extends KafkaConsumerIterator { +public class GivenOffset extends KafkaOffset { - private static final long serialVersionUID = 1L; - private DeserializationSchema deserializationSchema; + private final long offset; - public KafkaDeserializingConsumerIterator(String host, int port, String topic, int partition, long waitOnEmptyFetch, - DeserializationSchema deserializationSchema) { - super(host, port, topic, partition, waitOnEmptyFetch); - this.deserializationSchema = deserializationSchema; + public GivenOffset(long offset) { + this.offset = offset; } - public IN nextRecord() throws InterruptedException { - return deserializationSchema.deserialize(next()); + @Override + public long getOffset(SimpleConsumer consumer, String topic, int partition, String clientName) { + return offset; } -} \ No newline at end of file +} diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/KafkaOffset.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/KafkaOffset.java new file mode 100644 index 0000000000000..cb795560c6875 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/KafkaOffset.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.api.simple.offset; + +import java.util.HashMap; +import java.util.Map; + +import kafka.api.PartitionOffsetRequestInfo; +import kafka.common.TopicAndPartition; +import kafka.javaapi.OffsetResponse; +import kafka.javaapi.consumer.SimpleConsumer; + +public abstract class KafkaOffset { + + public abstract long getOffset(SimpleConsumer consumer, String topic, int partition, + String clientName); + + protected long getLastOffset(SimpleConsumer consumer, String topic, int partition, + long whichTime, String clientName) { + TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition); + Map requestInfo = new HashMap(); + requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1)); + kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, + kafka.api.OffsetRequest.CurrentVersion(), clientName); + OffsetResponse response = consumer.getOffsetsBefore(request); + + if (response.hasError()) { + throw new RuntimeException("Error fetching data from Kafka broker. Reason: " + + response.errorCode(topic, partition)); + } + long[] offsets = response.offsets(topic, partition); + return offsets[0]; + } + +}