From 618f6b766355d37dd91700ae3c48785981cf5bd9 Mon Sep 17 00:00:00 2001 From: pengyanhong Date: Thu, 24 Jul 2014 23:07:30 +0800 Subject: [PATCH 01/10] add Kafka stream feature in according to specified starting offset position to fetch messages --- .../streaming/kafka/KafkaSimpleConsumer.scala | 150 ++++++++++++++++++ .../kafka/KafkaSimpleInputDStream.scala | 70 ++++++++ 2 files changed, 220 insertions(+) create mode 100644 external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSimpleConsumer.scala create mode 100644 external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSimpleInputDStream.scala diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSimpleConsumer.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSimpleConsumer.scala new file mode 100644 index 0000000000000..8f861d8c95c48 --- /dev/null +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSimpleConsumer.scala @@ -0,0 +1,150 @@ +package org.apache.spark.streaming.kafka + +import kafka.api.FetchRequestBuilder +import kafka.api.TopicMetadataRequest +import kafka.common.ErrorMapping +import kafka.common.TopicAndPartition +import kafka.consumer.SimpleConsumer +import kafka.message.ByteBufferMessageSet + +@serializable +class KafkaSimpleConsumer(brokers: Seq[String], topic: String, partition: Int, maxBatchByteSize: Int) { + private var leader: String = _ + private var clientName: String = _ + private var consumer: SimpleConsumer = null + private val soTimeout = 60000 + private var bufferSize = 1024 + private var replicaBrokers: Seq[(String)] = null; + + private def init(): Unit = { + val data = KafkaSimpleConsumer.findLeaderAndReplicaBrokers(brokers, topic, partition) + leader = data._1 + replicaBrokers = data._2 + val ipPort = leader.split(":") + val ip = ipPort(0) + val port = ipPort(1).toInt + clientName = "client-" + topic + "-" + partition; + consumer = new SimpleConsumer(ip, port, soTimeout, bufferSize, clientName); + } + + def getEarliestOffset(): Long = { + if (consumer == null) { + init(); + } + return KafkaSimpleConsumer.getOffset(consumer, topic, partition, kafka.api.OffsetRequest.EarliestTime, 1); + } + + def getLatestOffset(): Long = { + if (consumer == null) { + init(); + } + return KafkaSimpleConsumer.getOffset(consumer, topic, partition, kafka.api.OffsetRequest.LatestTime, 1); + } + + def findNewLeader(oldLeader: String, + replicaBrokers: Seq[String], topic: String, partition: Int): (String, Seq[String]) = { + for (i <- 0 until 3) { + var goToSleep = false; + try { + val data = KafkaSimpleConsumer.findLeaderAndReplicaBrokers( + replicaBrokers, topic, partition); + val newLeader = data._1 + if (oldLeader.equalsIgnoreCase(newLeader) && i == 0) { + goToSleep = true + } + return data + } catch { + case _: Throwable => goToSleep = true; + } + if (goToSleep) { + try { + Thread.sleep(1000 * (i + 1)); + } catch { + case _: Throwable => + } + } + } + throw new Exception( + "Unable to find new leader after Broker failure. Exiting"); + } + + def fetch(startPositionOffset: Long): ByteBufferMessageSet = { + if (consumer == null) { + init(); + } + val builder = new FetchRequestBuilder(); + val req = builder + .addFetch(topic, partition, startPositionOffset, maxBatchByteSize) + .clientId(clientName).build(); + val fetchResponse = consumer.fetch(req); + var numErrors = 0; + if (fetchResponse.hasError) { + numErrors = numErrors + 1 + val code = fetchResponse.errorCode(topic, partition); + if (numErrors > 5) { + throw new Exception("Error fetching data from the Broker:" + + leader + " Reason: " + code); + } + if (code == ErrorMapping.OffsetOutOfRangeCode) { + return fetch(getLatestOffset()); + } + close + val data = findNewLeader(leader, replicaBrokers, topic, partition) + leader = data._1 + replicaBrokers = data._2 + init() + return fetch(startPositionOffset) + } + val set = fetchResponse + .messageSet(topic, partition); + return set; + + } + + def close: Unit = { + if (consumer != null) { + consumer.close + } + } +} + +object KafkaSimpleConsumer { + def findLeaderAndReplicaBrokers(brokers: Seq[String], topic: String, partition: Int): (String, Seq[(String)]) = { + for (broker <- brokers) { + val tmp = broker.split(":") + val ip = tmp(0) + val port = tmp(1).toInt + var consumer: SimpleConsumer = null + try { + consumer = new SimpleConsumer(ip, port, 100000, 64 * 1024, "leaderLookup") + val req = new TopicMetadataRequest(Seq(topic), 1) + val resp = consumer.send(req); + val metaData = resp.topicsMetadata + for ( + item <- metaData; + part <- item.partitionsMetadata if (part.partitionId == partition) + ) { + part.leader match { + case Some(leader) => { + return (leader.host + ":" + leader.port, part.replicas.map(brk => (brk.host + ":" + brk.port))) + } + case None => + } + } + } catch { + case e: Throwable => throw new Exception("Error communicating with Broker [" + broker + + "] to find Leader for [" + topic + "] Reason: " + e); + } finally { + if (consumer != null) + consumer.close + } + } + throw new Exception("not found leader."); + } + + def getOffset(consumer: SimpleConsumer, topic: String, + partition: Int, whichTime: Long, clientId: Int): Long = { + val topicAndPartition = new TopicAndPartition(topic, partition); + consumer.earliestOrLatestOffset(topicAndPartition, whichTime, clientId) + } +} \ No newline at end of file diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSimpleInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSimpleInputDStream.scala new file mode 100644 index 0000000000000..60da01bf18c1c --- /dev/null +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSimpleInputDStream.scala @@ -0,0 +1,70 @@ +package org.apache.spark.streaming.kafka + +import org.apache.spark.Logging +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.dstream.ReceiverInputDStream +import org.apache.spark.streaming.receiver.Receiver + +import kafka.serializer.Decoder + +class KafkaSimpleInputDStream[U <: Decoder[_]: Manifest, T <: Decoder[_]: Manifest]( + @transient ssc_ : StreamingContext, + brokers: Seq[String], + topic: String, + partition: Integer, + startPositionOffset: Long, + maxBatchByteSize: Int = 1024 * 1024, + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2) extends ReceiverInputDStream[(Long, Array[Byte])](ssc_) with Logging { + + def getReceiver(): Receiver[(Long, Array[Byte])] = { + new KafkaSimpleReceiver[U, T](brokers, topic, partition, startPositionOffset, maxBatchByteSize, storageLevel) + .asInstanceOf[Receiver[(Long, Array[Byte])]] + } +} + +class KafkaSimpleReceiver[U <: Decoder[_]: Manifest, T <: Decoder[_]: Manifest]( + brokers: Seq[String], + topic: String, + partition: Integer, + startPositionOffset: Long, + maxBatchByteSize: Int = 1024 * 1024, + storageLevel: StorageLevel) extends Receiver[Any](storageLevel) with Logging { + + var currentOffset = startPositionOffset + val kac = new KafkaSimpleConsumer(brokers, topic, partition, maxBatchByteSize); + + def onStop() { + kac.close + logInfo("Kafka consumer closed.") + } + + def onStart() { + logInfo("Starting Kafka Consumer Stream") + val firstOffset = kac.getEarliestOffset() + if (currentOffset < firstOffset) { + logWarning(s"at present, the first offset is ${firstOffset}, the messages which is from ${currentOffset} to ${firstOffset} might been pruned.") + currentOffset = firstOffset + } + while (true) { + val messageSet = kac.fetch(currentOffset) + + val itr = messageSet.iterator + var hasMessage = false + while (itr.hasNext) { + val messageAndOffset = itr.next() + val payload = messageAndOffset.message.payload + val bytes = new Array[Byte](payload.limit); + payload.get(bytes); + currentOffset = messageAndOffset.offset + store((currentOffset, bytes)) + hasMessage = true + } + if (hasMessage) { + currentOffset = currentOffset + 1 + } + Thread.sleep(10) + } + } +} \ No newline at end of file From a6312058bc3bc6ea0998861f23593f2b2f0450b3 Mon Sep 17 00:00:00 2001 From: pengyanhong Date: Thu, 24 Jul 2014 23:08:42 +0800 Subject: [PATCH 02/10] add Kafka stream feature in according to specified starting offset position to fetch messages --- .../apache/spark/streaming/kafka/KafkaUtils.scala | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala index 86bb91f362d29..6a05654a661bc 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala @@ -151,4 +151,19 @@ object KafkaUtils { createStream[K, V, U, T]( jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) } + + /** + * Create an input stream that pulls messages form a Kafka Broker in according to the specified starting position. + * @param ssc StreamingContext object + * @param borkers Kafka brokers list of host:port + * @param topic a topic to consume + * @param partition partition of this topic + * @param startPositionOffset beginning to consume from this offset position + * @param maxBatchByteSize max buffer size for a fetch request + * @param storageLevel RDD storage level. + */ + def createStream(ssc: StreamingContext, brokers: Seq[String], topic: String, partition: Int, startPositionOffset: Long, + maxBatchByteSize: Int, storageLevel: StorageLevel): ReceiverInputDStream[(Long, Array[Byte])] = { + new KafkaSimpleInputDStream[StringDecoder, StringDecoder](ssc, brokers, topic, partition, startPositionOffset, maxBatchByteSize, storageLevel); + } } From 555038fd91ac9f92c0df0519d1407b948aa8737e Mon Sep 17 00:00:00 2001 From: pengyanhong Date: Sat, 26 Jul 2014 22:58:24 +0800 Subject: [PATCH 03/10] refactor code --- .../streaming/kafka/KafkaSimpleConsumer.scala | 220 ++++++++++++------ .../kafka/KafkaSimpleInputDStream.scala | 38 ++- .../spark/streaming/kafka/KafkaUtils.scala | 9 +- 3 files changed, 188 insertions(+), 79 deletions(-) diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSimpleConsumer.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSimpleConsumer.scala index 8f861d8c95c48..0ea0511e99c58 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSimpleConsumer.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSimpleConsumer.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.streaming.kafka import kafka.api.FetchRequestBuilder @@ -6,99 +23,146 @@ import kafka.common.ErrorMapping import kafka.common.TopicAndPartition import kafka.consumer.SimpleConsumer import kafka.message.ByteBufferMessageSet +import org.apache.spark.Logging +import org.I0Itec.zkclient.ZkClient +import kafka.utils.ZKStringSerializer +import scala.collection.mutable.ArrayBuffer +import org.apache.zookeeper.CreateMode +import org.I0Itec.zkclient.DataUpdater @serializable -class KafkaSimpleConsumer(brokers: Seq[String], topic: String, partition: Int, maxBatchByteSize: Int) { +class KafkaSimpleConsumer(zkQuorum: String, groupId: String, topic: String, partition: Int, maxBatchByteSize: Int) extends Logging { + private var brokers: Seq[String] = _ private var leader: String = _ - private var clientName: String = _ private var consumer: SimpleConsumer = null private val soTimeout = 60000 private var bufferSize = 1024 - private var replicaBrokers: Seq[(String)] = null; + private var replicaBrokers: Seq[(String)] = null private def init(): Unit = { - val data = KafkaSimpleConsumer.findLeaderAndReplicaBrokers(brokers, topic, partition) + brokers = KafkaSimpleConsumer.getBrokers(zkQuorum) + val data = findLeaderAndReplicaBrokers(brokers) leader = data._1 replicaBrokers = data._2 val ipPort = leader.split(":") val ip = ipPort(0) val port = ipPort(1).toInt - clientName = "client-" + topic + "-" + partition; - consumer = new SimpleConsumer(ip, port, soTimeout, bufferSize, clientName); + consumer = new SimpleConsumer(ip, port, soTimeout, bufferSize, groupId) + } + + private def getOffset(consumer: SimpleConsumer, topic: String, + partition: Int, whichTime: Long, clientId: Int): Long = { + val topicAndPartition = new TopicAndPartition(topic, partition); + consumer.earliestOrLatestOffset(topicAndPartition, whichTime, clientId) } def getEarliestOffset(): Long = { if (consumer == null) { - init(); + init() } - return KafkaSimpleConsumer.getOffset(consumer, topic, partition, kafka.api.OffsetRequest.EarliestTime, 1); + return getOffset(consumer, topic, partition, kafka.api.OffsetRequest.EarliestTime, 1) } def getLatestOffset(): Long = { if (consumer == null) { - init(); + init() } - return KafkaSimpleConsumer.getOffset(consumer, topic, partition, kafka.api.OffsetRequest.LatestTime, 1); + return getOffset(consumer, topic, partition, kafka.api.OffsetRequest.LatestTime, 1) } - def findNewLeader(oldLeader: String, - replicaBrokers: Seq[String], topic: String, partition: Int): (String, Seq[String]) = { + private def findNewLeader(oldLeader: String, replicaBrokers: Seq[String]): (String, Seq[String]) = { for (i <- 0 until 3) { - var goToSleep = false; + var goToSleep = false try { - val data = KafkaSimpleConsumer.findLeaderAndReplicaBrokers( - replicaBrokers, topic, partition); + val data = findLeaderAndReplicaBrokers(replicaBrokers) val newLeader = data._1 if (oldLeader.equalsIgnoreCase(newLeader) && i == 0) { goToSleep = true } return data } catch { - case _: Throwable => goToSleep = true; + case _: Throwable => goToSleep = true } if (goToSleep) { try { - Thread.sleep(1000 * (i + 1)); + Thread.sleep(1000 * (i + 1)) } catch { case _: Throwable => } } } - throw new Exception( - "Unable to find new leader after Broker failure. Exiting"); + throw new Exception("Unable to find new leader after Broker failure. Exiting") } def fetch(startPositionOffset: Long): ByteBufferMessageSet = { if (consumer == null) { - init(); + init() } - val builder = new FetchRequestBuilder(); - val req = builder - .addFetch(topic, partition, startPositionOffset, maxBatchByteSize) - .clientId(clientName).build(); - val fetchResponse = consumer.fetch(req); - var numErrors = 0; + val builder = new FetchRequestBuilder() + val req = builder.addFetch(topic, partition, startPositionOffset, maxBatchByteSize) + .clientId(groupId).build() + val fetchResponse = consumer.fetch(req) + var numErrors = 0 if (fetchResponse.hasError) { numErrors = numErrors + 1 - val code = fetchResponse.errorCode(topic, partition); + val code = fetchResponse.errorCode(topic, partition) if (numErrors > 5) { - throw new Exception("Error fetching data from the Broker:" - + leader + " Reason: " + code); + throw new Exception("Error fetching data from the Broker:" + leader + " Reason: " + code) } if (code == ErrorMapping.OffsetOutOfRangeCode) { - return fetch(getLatestOffset()); + return fetch(getLatestOffset()) } close - val data = findNewLeader(leader, replicaBrokers, topic, partition) + val data = findNewLeader(leader, replicaBrokers) leader = data._1 replicaBrokers = data._2 init() return fetch(startPositionOffset) } - val set = fetchResponse - .messageSet(topic, partition); - return set; + fetchResponse.messageSet(topic, partition) + } + private def findLeaderAndReplicaBrokers(broker: String): (String, Seq[(String)]) = { + var result: (String, Seq[String]) = null + val tmp = broker.split(":") + val ip = tmp(0) + val port = tmp(1).toInt + var consumer: SimpleConsumer = null + try { + consumer = new SimpleConsumer(ip, port, 100000, 64 * 1024, "leaderLookup") + val req = new TopicMetadataRequest(Seq(topic), 1) + val resp = consumer.send(req); + val metaData = resp.topicsMetadata + for ( + item <- metaData if (result == null); + part <- item.partitionsMetadata if (part.partitionId == partition) + ) { + part.leader match { + case Some(leader) => { + result = (leader.host + ":" + leader.port, part.replicas.map(brk => (brk.host + ":" + brk.port))) + } + case None => + } + } + result + } catch { + case e: Throwable => throw new Exception("Error communicating with Broker [" + broker + + "] to find Leader for [" + topic + "] Reason: " + e) + } finally { + if (consumer != null) + consumer.close + } + } + + private def findLeaderAndReplicaBrokers(brokers: Seq[String]): (String, Seq[(String)]) = { + var result: (String, Seq[String]) = null + for (broker <- brokers if (result == null)) { + result = findLeaderAndReplicaBrokers(broker) + } + if (result == null) + throw new Exception("not found leader.") + else + result } def close: Unit = { @@ -106,45 +170,67 @@ class KafkaSimpleConsumer(brokers: Seq[String], topic: String, partition: Int, m consumer.close } } + + def commitOffsetToZookeeper(offset: Long) { + val dir = "/consumers/" + groupId + "/offsets/" + topic + "/" + partition + val zk = new ZkClient(zkQuorum, 30 * 1000, 30 * 1000, ZKStringSerializer) + try { + if (zk.exists(dir) == false) + zk.createPersistent(dir,true) + zk.writeData(dir, offset.toString) + } catch { + case e: Throwable => logWarning("Error saving Kafka offset to Zookeeper dir: " + dir, e) + } finally { + zk.close() + } + } } -object KafkaSimpleConsumer { - def findLeaderAndReplicaBrokers(brokers: Seq[String], topic: String, partition: Int): (String, Seq[(String)]) = { - for (broker <- brokers) { - val tmp = broker.split(":") - val ip = tmp(0) - val port = tmp(1).toInt - var consumer: SimpleConsumer = null - try { - consumer = new SimpleConsumer(ip, port, 100000, 64 * 1024, "leaderLookup") - val req = new TopicMetadataRequest(Seq(topic), 1) - val resp = consumer.send(req); - val metaData = resp.topicsMetadata - for ( - item <- metaData; - part <- item.partitionsMetadata if (part.partitionId == partition) - ) { - part.leader match { - case Some(leader) => { - return (leader.host + ":" + leader.port, part.replicas.map(brk => (brk.host + ":" + brk.port))) - } - case None => - } +object KafkaSimpleConsumer extends Logging { + private def getBrokerFromJson(json: String): String = { + import scala.util.parsing.json.JSON + val broker = JSON.parseFull(json) + broker match { + case Some(m: Map[String, Any]) => + m("host") + ":" + (m("port").asInstanceOf[Double].toInt).toString + case _ => throw new Exception("incorrect broker info in zookeeper") + } + } + + def getBrokers(zkQuorum: String): Seq[String] = { + val list = new ArrayBuffer[String]() + val dir = "/brokers/ids" + val zk = new ZkClient(zkQuorum, 30 * 1000, 30 * 1000, ZKStringSerializer) + try { + if (zk.exists(dir)) { + val ids = zk.getChildren(dir) + import scala.collection.JavaConversions._ + for (id <- ids) { + val json = zk.readData[String](dir + "/" + id) + list.append(getBrokerFromJson(json)) } - } catch { - case e: Throwable => throw new Exception("Error communicating with Broker [" + broker - + "] to find Leader for [" + topic + "] Reason: " + e); - } finally { - if (consumer != null) - consumer.close } + } catch { + case e: Throwable => logWarning("Error reading Kafka brokers Zookeeper data", e) + } finally { + zk.close() } - throw new Exception("not found leader."); + list.toSeq } - def getOffset(consumer: SimpleConsumer, topic: String, - partition: Int, whichTime: Long, clientId: Int): Long = { - val topicAndPartition = new TopicAndPartition(topic, partition); - consumer.earliestOrLatestOffset(topicAndPartition, whichTime, clientId) + def getEndOffsetPositionFromZookeeper(groupId: String, zkQuorum: String, topic: String, partition: Int): Long = { + val dir = "/consumers/" + groupId + "/offsets/" + topic + "/" + partition + val zk = new ZkClient(zkQuorum, 30 * 1000, 30 * 1000, ZKStringSerializer) + try { + if (zk.exists(dir)) { + val offset = zk.readData[String](dir) + return offset.toInt + } + } catch { + case e: Throwable => logWarning("Error reading Kafka brokers Zookeeper data", e) + } finally { + zk.close() + } + 0L } } \ No newline at end of file diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSimpleInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSimpleInputDStream.scala index 60da01bf18c1c..6695f29babc7b 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSimpleInputDStream.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSimpleInputDStream.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.streaming.kafka import org.apache.spark.Logging @@ -6,34 +23,37 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.dstream.ReceiverInputDStream import org.apache.spark.streaming.receiver.Receiver - import kafka.serializer.Decoder class KafkaSimpleInputDStream[U <: Decoder[_]: Manifest, T <: Decoder[_]: Manifest]( @transient ssc_ : StreamingContext, - brokers: Seq[String], + zkQuorum: String, + groupId: String, topic: String, partition: Integer, startPositionOffset: Long, + autoCommitOffset:Boolean, maxBatchByteSize: Int = 1024 * 1024, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2) extends ReceiverInputDStream[(Long, Array[Byte])](ssc_) with Logging { def getReceiver(): Receiver[(Long, Array[Byte])] = { - new KafkaSimpleReceiver[U, T](brokers, topic, partition, startPositionOffset, maxBatchByteSize, storageLevel) + new KafkaSimpleReceiver[U, T](zkQuorum, groupId, topic, partition, startPositionOffset, autoCommitOffset, maxBatchByteSize, storageLevel) .asInstanceOf[Receiver[(Long, Array[Byte])]] } } class KafkaSimpleReceiver[U <: Decoder[_]: Manifest, T <: Decoder[_]: Manifest]( - brokers: Seq[String], + zkQuorum: String, + groupId: String, topic: String, partition: Integer, startPositionOffset: Long, + autoCommitOffset: Boolean, maxBatchByteSize: Int = 1024 * 1024, storageLevel: StorageLevel) extends Receiver[Any](storageLevel) with Logging { var currentOffset = startPositionOffset - val kac = new KafkaSimpleConsumer(brokers, topic, partition, maxBatchByteSize); + val kac = new KafkaSimpleConsumer(zkQuorum, groupId, topic, partition, maxBatchByteSize) def onStop() { kac.close @@ -55,14 +75,16 @@ class KafkaSimpleReceiver[U <: Decoder[_]: Manifest, T <: Decoder[_]: Manifest]( while (itr.hasNext) { val messageAndOffset = itr.next() val payload = messageAndOffset.message.payload - val bytes = new Array[Byte](payload.limit); - payload.get(bytes); + val bytes = new Array[Byte](payload.limit) + payload.get(bytes) currentOffset = messageAndOffset.offset store((currentOffset, bytes)) hasMessage = true + if (autoCommitOffset) + kac.commitOffsetToZookeeper(currentOffset) } if (hasMessage) { - currentOffset = currentOffset + 1 + currentOffset += 1 } Thread.sleep(10) } diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala index 6a05654a661bc..2afab7bed345e 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala @@ -155,15 +155,16 @@ object KafkaUtils { /** * Create an input stream that pulls messages form a Kafka Broker in according to the specified starting position. * @param ssc StreamingContext object - * @param borkers Kafka brokers list of host:port + * @param groupId The group id for this consumer + * @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..). * @param topic a topic to consume * @param partition partition of this topic * @param startPositionOffset beginning to consume from this offset position * @param maxBatchByteSize max buffer size for a fetch request * @param storageLevel RDD storage level. */ - def createStream(ssc: StreamingContext, brokers: Seq[String], topic: String, partition: Int, startPositionOffset: Long, - maxBatchByteSize: Int, storageLevel: StorageLevel): ReceiverInputDStream[(Long, Array[Byte])] = { - new KafkaSimpleInputDStream[StringDecoder, StringDecoder](ssc, brokers, topic, partition, startPositionOffset, maxBatchByteSize, storageLevel); + def createStream(ssc: StreamingContext, zkQuorum: String, groupId: String, topic: String, partition: Int, startPositionOffset: Long, + autoCommitOffset: Boolean, maxBatchByteSize: Int, storageLevel: StorageLevel): ReceiverInputDStream[(Long, Array[Byte])] = { + new KafkaSimpleInputDStream[StringDecoder, StringDecoder](ssc, zkQuorum, groupId, topic, partition, startPositionOffset, autoCommitOffset, maxBatchByteSize, storageLevel) } } From b7fa86c3b55f4910236963c646fa31776e19fdc6 Mon Sep 17 00:00:00 2001 From: pengyanhong Date: Sat, 2 Aug 2014 22:58:16 +0800 Subject: [PATCH 04/10] [SPARK-2803]: add Kafka stream feature for fetch messages from specified starting offset position --- .../streaming/kafka/KafkaSimpleConsumer.scala | 50 +++++++++++++--- .../kafka/KafkaSimpleInputDStream.scala | 42 ++++++++------ .../spark/streaming/kafka/KafkaUtils.scala | 58 +++++++++++++++++-- 3 files changed, 117 insertions(+), 33 deletions(-) diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSimpleConsumer.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSimpleConsumer.scala index 0ea0511e99c58..7e39080578518 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSimpleConsumer.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSimpleConsumer.scala @@ -31,7 +31,13 @@ import org.apache.zookeeper.CreateMode import org.I0Itec.zkclient.DataUpdater @serializable -class KafkaSimpleConsumer(zkQuorum: String, groupId: String, topic: String, partition: Int, maxBatchByteSize: Int) extends Logging { +class KafkaSimpleConsumer( + zkQuorum: String, + groupId: String, + topic: String, + partition: Int, + maxBatchByteSize: Int + ) extends Logging { private var brokers: Seq[String] = _ private var leader: String = _ private var consumer: SimpleConsumer = null @@ -70,7 +76,10 @@ class KafkaSimpleConsumer(zkQuorum: String, groupId: String, topic: String, part return getOffset(consumer, topic, partition, kafka.api.OffsetRequest.LatestTime, 1) } - private def findNewLeader(oldLeader: String, replicaBrokers: Seq[String]): (String, Seq[String]) = { + private def findNewLeader( + oldLeader: String, + replicaBrokers: Seq[String] + ): (String, Seq[String]) = { for (i <- 0 until 3) { var goToSleep = false try { @@ -139,7 +148,8 @@ class KafkaSimpleConsumer(zkQuorum: String, groupId: String, topic: String, part ) { part.leader match { case Some(leader) => { - result = (leader.host + ":" + leader.port, part.replicas.map(brk => (brk.host + ":" + brk.port))) + result = (leader.host + ":" + leader.port, part.replicas.map(brk => (brk.host + ":" + + brk.port))) } case None => } @@ -149,8 +159,9 @@ class KafkaSimpleConsumer(zkQuorum: String, groupId: String, topic: String, part case e: Throwable => throw new Exception("Error communicating with Broker [" + broker + "] to find Leader for [" + topic + "] Reason: " + e) } finally { - if (consumer != null) + if (consumer != null) { consumer.close + } } } @@ -159,10 +170,11 @@ class KafkaSimpleConsumer(zkQuorum: String, groupId: String, topic: String, part for (broker <- brokers if (result == null)) { result = findLeaderAndReplicaBrokers(broker) } - if (result == null) + if (result == null) { throw new Exception("not found leader.") - else + } else { result + } } def close: Unit = { @@ -175,8 +187,9 @@ class KafkaSimpleConsumer(zkQuorum: String, groupId: String, topic: String, part val dir = "/consumers/" + groupId + "/offsets/" + topic + "/" + partition val zk = new ZkClient(zkQuorum, 30 * 1000, 30 * 1000, ZKStringSerializer) try { - if (zk.exists(dir) == false) + if (zk.exists(dir) == false) { zk.createPersistent(dir,true) + } zk.writeData(dir, offset.toString) } catch { case e: Throwable => logWarning("Error saving Kafka offset to Zookeeper dir: " + dir, e) @@ -218,7 +231,8 @@ object KafkaSimpleConsumer extends Logging { list.toSeq } - def getEndOffsetPositionFromZookeeper(groupId: String, zkQuorum: String, topic: String, partition: Int): Long = { + def getEndOffsetPositionFromZookeeper(groupId: String, zkQuorum: String, topic: String, + partition: Int): Long = { val dir = "/consumers/" + groupId + "/offsets/" + topic + "/" + partition val zk = new ZkClient(zkQuorum, 30 * 1000, 30 * 1000, ZKStringSerializer) try { @@ -233,4 +247,24 @@ object KafkaSimpleConsumer extends Logging { } 0L } + + def getTopicPartitionList(zkQuorum: String, topic: String): Seq[Int] = { + val list = new ArrayBuffer[Int]() + val dir = "/brokers/topics/" + topic+"/partitions" + val zk = new ZkClient(zkQuorum, 30 * 1000, 30 * 1000, ZKStringSerializer) + try { + if (zk.exists(dir)) { + val ids = zk.getChildren(dir) + import scala.collection.JavaConversions._ + for (id <- ids) { + list.append(id.toInt) + } + } + } catch { + case e: Throwable => logWarning("Error reading Kafka partitions list Zookeeper data", e) + } finally { + zk.close() + } + list.toSeq + } } \ No newline at end of file diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSimpleInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSimpleInputDStream.scala index 6695f29babc7b..fb370564fe1cf 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSimpleInputDStream.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSimpleInputDStream.scala @@ -26,31 +26,34 @@ import org.apache.spark.streaming.receiver.Receiver import kafka.serializer.Decoder class KafkaSimpleInputDStream[U <: Decoder[_]: Manifest, T <: Decoder[_]: Manifest]( - @transient ssc_ : StreamingContext, - zkQuorum: String, - groupId: String, - topic: String, - partition: Integer, - startPositionOffset: Long, - autoCommitOffset:Boolean, - maxBatchByteSize: Int = 1024 * 1024, - storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2) extends ReceiverInputDStream[(Long, Array[Byte])](ssc_) with Logging { + @transient ssc_ : StreamingContext, + zkQuorum: String, + groupId: String, + topic: String, + partition: Integer, + startPositionOffset: Long, + autoCommitOffset: Boolean, + maxBatchByteSize: Int = 1024 * 1024, + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 + ) extends ReceiverInputDStream[(Long, Array[Byte])](ssc_) with Logging { def getReceiver(): Receiver[(Long, Array[Byte])] = { - new KafkaSimpleReceiver[U, T](zkQuorum, groupId, topic, partition, startPositionOffset, autoCommitOffset, maxBatchByteSize, storageLevel) + new KafkaSimpleReceiver[U, T](zkQuorum, groupId, topic, partition, startPositionOffset, + autoCommitOffset, maxBatchByteSize, storageLevel) .asInstanceOf[Receiver[(Long, Array[Byte])]] } } class KafkaSimpleReceiver[U <: Decoder[_]: Manifest, T <: Decoder[_]: Manifest]( - zkQuorum: String, - groupId: String, - topic: String, - partition: Integer, - startPositionOffset: Long, - autoCommitOffset: Boolean, - maxBatchByteSize: Int = 1024 * 1024, - storageLevel: StorageLevel) extends Receiver[Any](storageLevel) with Logging { + zkQuorum: String, + groupId: String, + topic: String, + partition: Integer, + startPositionOffset: Long, + autoCommitOffset: Boolean, + maxBatchByteSize: Int = 1024 * 1024, + storageLevel: StorageLevel + ) extends Receiver[Any](storageLevel) with Logging { var currentOffset = startPositionOffset val kac = new KafkaSimpleConsumer(zkQuorum, groupId, topic, partition, maxBatchByteSize) @@ -64,7 +67,8 @@ class KafkaSimpleReceiver[U <: Decoder[_]: Manifest, T <: Decoder[_]: Manifest]( logInfo("Starting Kafka Consumer Stream") val firstOffset = kac.getEarliestOffset() if (currentOffset < firstOffset) { - logWarning(s"at present, the first offset is ${firstOffset}, the messages which is from ${currentOffset} to ${firstOffset} might been pruned.") + logWarning(s"""at present, the first offset is ${firstOffset}, the messages which is + |from ${currentOffset} to ${firstOffset} might been pruned.""".stripMargin) currentOffset = firstOffset } while (true) { diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala index 2afab7bed345e..f2cde6ee25902 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala @@ -153,18 +153,64 @@ object KafkaUtils { } /** - * Create an input stream that pulls messages form a Kafka Broker in according to the specified starting position. + * Create an input stream that pulls messages from a Kafka Broker, + * in according to the specified starting position. * @param ssc StreamingContext object - * @param groupId The group id for this consumer * @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..). + * @param groupId The group id for this consumer * @param topic a topic to consume * @param partition partition of this topic * @param startPositionOffset beginning to consume from this offset position - * @param maxBatchByteSize max buffer size for a fetch request + * @param autoCommitOffset whether need to commit automatically the latest offset + * in this batch messages into zookeeper, in order to recover pulling message + * from this offset position when consuming this streaming again + * @param maxBatchByteSize max buffer size for a fetch request, at least it's longer + * than a max-length message, it will contain more message if possible * @param storageLevel RDD storage level. */ - def createStream(ssc: StreamingContext, zkQuorum: String, groupId: String, topic: String, partition: Int, startPositionOffset: Long, - autoCommitOffset: Boolean, maxBatchByteSize: Int, storageLevel: StorageLevel): ReceiverInputDStream[(Long, Array[Byte])] = { - new KafkaSimpleInputDStream[StringDecoder, StringDecoder](ssc, zkQuorum, groupId, topic, partition, startPositionOffset, autoCommitOffset, maxBatchByteSize, storageLevel) + def createStream( + ssc: StreamingContext, + zkQuorum: String, + groupId: String, + topic: String, + partition: Int, + startPositionOffset: Long, + autoCommitOffset: Boolean, + maxBatchByteSize: Int, + storageLevel: StorageLevel + ): ReceiverInputDStream[(Long, Array[Byte])] = { + new KafkaSimpleInputDStream[StringDecoder, StringDecoder](ssc, zkQuorum, groupId, topic, + partition, startPositionOffset, autoCommitOffset, maxBatchByteSize, storageLevel) + } + + /** + * Create an input stream that pulls messages from a Kafka Broker, + * in according to the specified starting position. + * @param jssc JavaStreamingContext object + * @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..). + * @param groupId The group id for this consumer + * @param topic a topic to consume + * @param partition partition of this topic + * @param startPositionOffset beginning to consume from this offset position + * @param autoCommitOffset whether need to commit automatically the latest offset + * in this batch messages into zookeeper, in order to recover pulling message + * from this offset position when consuming this streaming again + * @param maxBatchByteSize max buffer size for a fetch request, at least it's longer + * than a max-length message, it will contain more message if possible + * @param storageLevel RDD storage level. + */ + def createStream( + jssc: JavaStreamingContext, + zkQuorum: String, + groupId: String, + topic: String, + partition: Int, + startPositionOffset: Long, + autoCommitOffset: Boolean, + maxBatchByteSize: Int, + storageLevel: StorageLevel + ): JavaPairReceiverInputDStream[Long, Array[Byte]] = { + new KafkaSimpleInputDStream[StringDecoder, StringDecoder](jssc.ssc, zkQuorum, groupId, topic, + partition, startPositionOffset, autoCommitOffset, maxBatchByteSize, storageLevel) } } From e963c0fae28b58703afa7c4019e5ec9d2312c472 Mon Sep 17 00:00:00 2001 From: pengyanhong Date: Thu, 24 Jul 2014 23:07:30 +0800 Subject: [PATCH 05/10] add Kafka stream feature in according to specified starting offset position to fetch messages --- .../streaming/kafka/KafkaSimpleConsumer.scala | 150 ++++++++++++++++++ .../kafka/KafkaSimpleInputDStream.scala | 70 ++++++++ 2 files changed, 220 insertions(+) create mode 100644 external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSimpleConsumer.scala create mode 100644 external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSimpleInputDStream.scala diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSimpleConsumer.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSimpleConsumer.scala new file mode 100644 index 0000000000000..8f861d8c95c48 --- /dev/null +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSimpleConsumer.scala @@ -0,0 +1,150 @@ +package org.apache.spark.streaming.kafka + +import kafka.api.FetchRequestBuilder +import kafka.api.TopicMetadataRequest +import kafka.common.ErrorMapping +import kafka.common.TopicAndPartition +import kafka.consumer.SimpleConsumer +import kafka.message.ByteBufferMessageSet + +@serializable +class KafkaSimpleConsumer(brokers: Seq[String], topic: String, partition: Int, maxBatchByteSize: Int) { + private var leader: String = _ + private var clientName: String = _ + private var consumer: SimpleConsumer = null + private val soTimeout = 60000 + private var bufferSize = 1024 + private var replicaBrokers: Seq[(String)] = null; + + private def init(): Unit = { + val data = KafkaSimpleConsumer.findLeaderAndReplicaBrokers(brokers, topic, partition) + leader = data._1 + replicaBrokers = data._2 + val ipPort = leader.split(":") + val ip = ipPort(0) + val port = ipPort(1).toInt + clientName = "client-" + topic + "-" + partition; + consumer = new SimpleConsumer(ip, port, soTimeout, bufferSize, clientName); + } + + def getEarliestOffset(): Long = { + if (consumer == null) { + init(); + } + return KafkaSimpleConsumer.getOffset(consumer, topic, partition, kafka.api.OffsetRequest.EarliestTime, 1); + } + + def getLatestOffset(): Long = { + if (consumer == null) { + init(); + } + return KafkaSimpleConsumer.getOffset(consumer, topic, partition, kafka.api.OffsetRequest.LatestTime, 1); + } + + def findNewLeader(oldLeader: String, + replicaBrokers: Seq[String], topic: String, partition: Int): (String, Seq[String]) = { + for (i <- 0 until 3) { + var goToSleep = false; + try { + val data = KafkaSimpleConsumer.findLeaderAndReplicaBrokers( + replicaBrokers, topic, partition); + val newLeader = data._1 + if (oldLeader.equalsIgnoreCase(newLeader) && i == 0) { + goToSleep = true + } + return data + } catch { + case _: Throwable => goToSleep = true; + } + if (goToSleep) { + try { + Thread.sleep(1000 * (i + 1)); + } catch { + case _: Throwable => + } + } + } + throw new Exception( + "Unable to find new leader after Broker failure. Exiting"); + } + + def fetch(startPositionOffset: Long): ByteBufferMessageSet = { + if (consumer == null) { + init(); + } + val builder = new FetchRequestBuilder(); + val req = builder + .addFetch(topic, partition, startPositionOffset, maxBatchByteSize) + .clientId(clientName).build(); + val fetchResponse = consumer.fetch(req); + var numErrors = 0; + if (fetchResponse.hasError) { + numErrors = numErrors + 1 + val code = fetchResponse.errorCode(topic, partition); + if (numErrors > 5) { + throw new Exception("Error fetching data from the Broker:" + + leader + " Reason: " + code); + } + if (code == ErrorMapping.OffsetOutOfRangeCode) { + return fetch(getLatestOffset()); + } + close + val data = findNewLeader(leader, replicaBrokers, topic, partition) + leader = data._1 + replicaBrokers = data._2 + init() + return fetch(startPositionOffset) + } + val set = fetchResponse + .messageSet(topic, partition); + return set; + + } + + def close: Unit = { + if (consumer != null) { + consumer.close + } + } +} + +object KafkaSimpleConsumer { + def findLeaderAndReplicaBrokers(brokers: Seq[String], topic: String, partition: Int): (String, Seq[(String)]) = { + for (broker <- brokers) { + val tmp = broker.split(":") + val ip = tmp(0) + val port = tmp(1).toInt + var consumer: SimpleConsumer = null + try { + consumer = new SimpleConsumer(ip, port, 100000, 64 * 1024, "leaderLookup") + val req = new TopicMetadataRequest(Seq(topic), 1) + val resp = consumer.send(req); + val metaData = resp.topicsMetadata + for ( + item <- metaData; + part <- item.partitionsMetadata if (part.partitionId == partition) + ) { + part.leader match { + case Some(leader) => { + return (leader.host + ":" + leader.port, part.replicas.map(brk => (brk.host + ":" + brk.port))) + } + case None => + } + } + } catch { + case e: Throwable => throw new Exception("Error communicating with Broker [" + broker + + "] to find Leader for [" + topic + "] Reason: " + e); + } finally { + if (consumer != null) + consumer.close + } + } + throw new Exception("not found leader."); + } + + def getOffset(consumer: SimpleConsumer, topic: String, + partition: Int, whichTime: Long, clientId: Int): Long = { + val topicAndPartition = new TopicAndPartition(topic, partition); + consumer.earliestOrLatestOffset(topicAndPartition, whichTime, clientId) + } +} \ No newline at end of file diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSimpleInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSimpleInputDStream.scala new file mode 100644 index 0000000000000..60da01bf18c1c --- /dev/null +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSimpleInputDStream.scala @@ -0,0 +1,70 @@ +package org.apache.spark.streaming.kafka + +import org.apache.spark.Logging +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.dstream.ReceiverInputDStream +import org.apache.spark.streaming.receiver.Receiver + +import kafka.serializer.Decoder + +class KafkaSimpleInputDStream[U <: Decoder[_]: Manifest, T <: Decoder[_]: Manifest]( + @transient ssc_ : StreamingContext, + brokers: Seq[String], + topic: String, + partition: Integer, + startPositionOffset: Long, + maxBatchByteSize: Int = 1024 * 1024, + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2) extends ReceiverInputDStream[(Long, Array[Byte])](ssc_) with Logging { + + def getReceiver(): Receiver[(Long, Array[Byte])] = { + new KafkaSimpleReceiver[U, T](brokers, topic, partition, startPositionOffset, maxBatchByteSize, storageLevel) + .asInstanceOf[Receiver[(Long, Array[Byte])]] + } +} + +class KafkaSimpleReceiver[U <: Decoder[_]: Manifest, T <: Decoder[_]: Manifest]( + brokers: Seq[String], + topic: String, + partition: Integer, + startPositionOffset: Long, + maxBatchByteSize: Int = 1024 * 1024, + storageLevel: StorageLevel) extends Receiver[Any](storageLevel) with Logging { + + var currentOffset = startPositionOffset + val kac = new KafkaSimpleConsumer(brokers, topic, partition, maxBatchByteSize); + + def onStop() { + kac.close + logInfo("Kafka consumer closed.") + } + + def onStart() { + logInfo("Starting Kafka Consumer Stream") + val firstOffset = kac.getEarliestOffset() + if (currentOffset < firstOffset) { + logWarning(s"at present, the first offset is ${firstOffset}, the messages which is from ${currentOffset} to ${firstOffset} might been pruned.") + currentOffset = firstOffset + } + while (true) { + val messageSet = kac.fetch(currentOffset) + + val itr = messageSet.iterator + var hasMessage = false + while (itr.hasNext) { + val messageAndOffset = itr.next() + val payload = messageAndOffset.message.payload + val bytes = new Array[Byte](payload.limit); + payload.get(bytes); + currentOffset = messageAndOffset.offset + store((currentOffset, bytes)) + hasMessage = true + } + if (hasMessage) { + currentOffset = currentOffset + 1 + } + Thread.sleep(10) + } + } +} \ No newline at end of file From 39c3493374b646fa8d02e2c2824ba0eabc46995c Mon Sep 17 00:00:00 2001 From: pengyanhong Date: Thu, 24 Jul 2014 23:08:42 +0800 Subject: [PATCH 06/10] add Kafka stream feature in according to specified starting offset position to fetch messages --- .../apache/spark/streaming/kafka/KafkaUtils.scala | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala index 48668f763e41e..efec64201a218 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala @@ -145,4 +145,19 @@ object KafkaUtils { createStream[K, V, U, T]( jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) } + + /** + * Create an input stream that pulls messages form a Kafka Broker in according to the specified starting position. + * @param ssc StreamingContext object + * @param borkers Kafka brokers list of host:port + * @param topic a topic to consume + * @param partition partition of this topic + * @param startPositionOffset beginning to consume from this offset position + * @param maxBatchByteSize max buffer size for a fetch request + * @param storageLevel RDD storage level. + */ + def createStream(ssc: StreamingContext, brokers: Seq[String], topic: String, partition: Int, startPositionOffset: Long, + maxBatchByteSize: Int, storageLevel: StorageLevel): ReceiverInputDStream[(Long, Array[Byte])] = { + new KafkaSimpleInputDStream[StringDecoder, StringDecoder](ssc, brokers, topic, partition, startPositionOffset, maxBatchByteSize, storageLevel); + } } From 32b010b935bb9c4a5ddc341480cd0dfb6a722f69 Mon Sep 17 00:00:00 2001 From: pengyanhong Date: Sat, 26 Jul 2014 22:58:24 +0800 Subject: [PATCH 07/10] refactor code --- .../streaming/kafka/KafkaSimpleConsumer.scala | 220 ++++++++++++------ .../kafka/KafkaSimpleInputDStream.scala | 38 ++- .../spark/streaming/kafka/KafkaUtils.scala | 9 +- 3 files changed, 188 insertions(+), 79 deletions(-) diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSimpleConsumer.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSimpleConsumer.scala index 8f861d8c95c48..0ea0511e99c58 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSimpleConsumer.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSimpleConsumer.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.streaming.kafka import kafka.api.FetchRequestBuilder @@ -6,99 +23,146 @@ import kafka.common.ErrorMapping import kafka.common.TopicAndPartition import kafka.consumer.SimpleConsumer import kafka.message.ByteBufferMessageSet +import org.apache.spark.Logging +import org.I0Itec.zkclient.ZkClient +import kafka.utils.ZKStringSerializer +import scala.collection.mutable.ArrayBuffer +import org.apache.zookeeper.CreateMode +import org.I0Itec.zkclient.DataUpdater @serializable -class KafkaSimpleConsumer(brokers: Seq[String], topic: String, partition: Int, maxBatchByteSize: Int) { +class KafkaSimpleConsumer(zkQuorum: String, groupId: String, topic: String, partition: Int, maxBatchByteSize: Int) extends Logging { + private var brokers: Seq[String] = _ private var leader: String = _ - private var clientName: String = _ private var consumer: SimpleConsumer = null private val soTimeout = 60000 private var bufferSize = 1024 - private var replicaBrokers: Seq[(String)] = null; + private var replicaBrokers: Seq[(String)] = null private def init(): Unit = { - val data = KafkaSimpleConsumer.findLeaderAndReplicaBrokers(brokers, topic, partition) + brokers = KafkaSimpleConsumer.getBrokers(zkQuorum) + val data = findLeaderAndReplicaBrokers(brokers) leader = data._1 replicaBrokers = data._2 val ipPort = leader.split(":") val ip = ipPort(0) val port = ipPort(1).toInt - clientName = "client-" + topic + "-" + partition; - consumer = new SimpleConsumer(ip, port, soTimeout, bufferSize, clientName); + consumer = new SimpleConsumer(ip, port, soTimeout, bufferSize, groupId) + } + + private def getOffset(consumer: SimpleConsumer, topic: String, + partition: Int, whichTime: Long, clientId: Int): Long = { + val topicAndPartition = new TopicAndPartition(topic, partition); + consumer.earliestOrLatestOffset(topicAndPartition, whichTime, clientId) } def getEarliestOffset(): Long = { if (consumer == null) { - init(); + init() } - return KafkaSimpleConsumer.getOffset(consumer, topic, partition, kafka.api.OffsetRequest.EarliestTime, 1); + return getOffset(consumer, topic, partition, kafka.api.OffsetRequest.EarliestTime, 1) } def getLatestOffset(): Long = { if (consumer == null) { - init(); + init() } - return KafkaSimpleConsumer.getOffset(consumer, topic, partition, kafka.api.OffsetRequest.LatestTime, 1); + return getOffset(consumer, topic, partition, kafka.api.OffsetRequest.LatestTime, 1) } - def findNewLeader(oldLeader: String, - replicaBrokers: Seq[String], topic: String, partition: Int): (String, Seq[String]) = { + private def findNewLeader(oldLeader: String, replicaBrokers: Seq[String]): (String, Seq[String]) = { for (i <- 0 until 3) { - var goToSleep = false; + var goToSleep = false try { - val data = KafkaSimpleConsumer.findLeaderAndReplicaBrokers( - replicaBrokers, topic, partition); + val data = findLeaderAndReplicaBrokers(replicaBrokers) val newLeader = data._1 if (oldLeader.equalsIgnoreCase(newLeader) && i == 0) { goToSleep = true } return data } catch { - case _: Throwable => goToSleep = true; + case _: Throwable => goToSleep = true } if (goToSleep) { try { - Thread.sleep(1000 * (i + 1)); + Thread.sleep(1000 * (i + 1)) } catch { case _: Throwable => } } } - throw new Exception( - "Unable to find new leader after Broker failure. Exiting"); + throw new Exception("Unable to find new leader after Broker failure. Exiting") } def fetch(startPositionOffset: Long): ByteBufferMessageSet = { if (consumer == null) { - init(); + init() } - val builder = new FetchRequestBuilder(); - val req = builder - .addFetch(topic, partition, startPositionOffset, maxBatchByteSize) - .clientId(clientName).build(); - val fetchResponse = consumer.fetch(req); - var numErrors = 0; + val builder = new FetchRequestBuilder() + val req = builder.addFetch(topic, partition, startPositionOffset, maxBatchByteSize) + .clientId(groupId).build() + val fetchResponse = consumer.fetch(req) + var numErrors = 0 if (fetchResponse.hasError) { numErrors = numErrors + 1 - val code = fetchResponse.errorCode(topic, partition); + val code = fetchResponse.errorCode(topic, partition) if (numErrors > 5) { - throw new Exception("Error fetching data from the Broker:" - + leader + " Reason: " + code); + throw new Exception("Error fetching data from the Broker:" + leader + " Reason: " + code) } if (code == ErrorMapping.OffsetOutOfRangeCode) { - return fetch(getLatestOffset()); + return fetch(getLatestOffset()) } close - val data = findNewLeader(leader, replicaBrokers, topic, partition) + val data = findNewLeader(leader, replicaBrokers) leader = data._1 replicaBrokers = data._2 init() return fetch(startPositionOffset) } - val set = fetchResponse - .messageSet(topic, partition); - return set; + fetchResponse.messageSet(topic, partition) + } + private def findLeaderAndReplicaBrokers(broker: String): (String, Seq[(String)]) = { + var result: (String, Seq[String]) = null + val tmp = broker.split(":") + val ip = tmp(0) + val port = tmp(1).toInt + var consumer: SimpleConsumer = null + try { + consumer = new SimpleConsumer(ip, port, 100000, 64 * 1024, "leaderLookup") + val req = new TopicMetadataRequest(Seq(topic), 1) + val resp = consumer.send(req); + val metaData = resp.topicsMetadata + for ( + item <- metaData if (result == null); + part <- item.partitionsMetadata if (part.partitionId == partition) + ) { + part.leader match { + case Some(leader) => { + result = (leader.host + ":" + leader.port, part.replicas.map(brk => (brk.host + ":" + brk.port))) + } + case None => + } + } + result + } catch { + case e: Throwable => throw new Exception("Error communicating with Broker [" + broker + + "] to find Leader for [" + topic + "] Reason: " + e) + } finally { + if (consumer != null) + consumer.close + } + } + + private def findLeaderAndReplicaBrokers(brokers: Seq[String]): (String, Seq[(String)]) = { + var result: (String, Seq[String]) = null + for (broker <- brokers if (result == null)) { + result = findLeaderAndReplicaBrokers(broker) + } + if (result == null) + throw new Exception("not found leader.") + else + result } def close: Unit = { @@ -106,45 +170,67 @@ class KafkaSimpleConsumer(brokers: Seq[String], topic: String, partition: Int, m consumer.close } } + + def commitOffsetToZookeeper(offset: Long) { + val dir = "/consumers/" + groupId + "/offsets/" + topic + "/" + partition + val zk = new ZkClient(zkQuorum, 30 * 1000, 30 * 1000, ZKStringSerializer) + try { + if (zk.exists(dir) == false) + zk.createPersistent(dir,true) + zk.writeData(dir, offset.toString) + } catch { + case e: Throwable => logWarning("Error saving Kafka offset to Zookeeper dir: " + dir, e) + } finally { + zk.close() + } + } } -object KafkaSimpleConsumer { - def findLeaderAndReplicaBrokers(brokers: Seq[String], topic: String, partition: Int): (String, Seq[(String)]) = { - for (broker <- brokers) { - val tmp = broker.split(":") - val ip = tmp(0) - val port = tmp(1).toInt - var consumer: SimpleConsumer = null - try { - consumer = new SimpleConsumer(ip, port, 100000, 64 * 1024, "leaderLookup") - val req = new TopicMetadataRequest(Seq(topic), 1) - val resp = consumer.send(req); - val metaData = resp.topicsMetadata - for ( - item <- metaData; - part <- item.partitionsMetadata if (part.partitionId == partition) - ) { - part.leader match { - case Some(leader) => { - return (leader.host + ":" + leader.port, part.replicas.map(brk => (brk.host + ":" + brk.port))) - } - case None => - } +object KafkaSimpleConsumer extends Logging { + private def getBrokerFromJson(json: String): String = { + import scala.util.parsing.json.JSON + val broker = JSON.parseFull(json) + broker match { + case Some(m: Map[String, Any]) => + m("host") + ":" + (m("port").asInstanceOf[Double].toInt).toString + case _ => throw new Exception("incorrect broker info in zookeeper") + } + } + + def getBrokers(zkQuorum: String): Seq[String] = { + val list = new ArrayBuffer[String]() + val dir = "/brokers/ids" + val zk = new ZkClient(zkQuorum, 30 * 1000, 30 * 1000, ZKStringSerializer) + try { + if (zk.exists(dir)) { + val ids = zk.getChildren(dir) + import scala.collection.JavaConversions._ + for (id <- ids) { + val json = zk.readData[String](dir + "/" + id) + list.append(getBrokerFromJson(json)) } - } catch { - case e: Throwable => throw new Exception("Error communicating with Broker [" + broker - + "] to find Leader for [" + topic + "] Reason: " + e); - } finally { - if (consumer != null) - consumer.close } + } catch { + case e: Throwable => logWarning("Error reading Kafka brokers Zookeeper data", e) + } finally { + zk.close() } - throw new Exception("not found leader."); + list.toSeq } - def getOffset(consumer: SimpleConsumer, topic: String, - partition: Int, whichTime: Long, clientId: Int): Long = { - val topicAndPartition = new TopicAndPartition(topic, partition); - consumer.earliestOrLatestOffset(topicAndPartition, whichTime, clientId) + def getEndOffsetPositionFromZookeeper(groupId: String, zkQuorum: String, topic: String, partition: Int): Long = { + val dir = "/consumers/" + groupId + "/offsets/" + topic + "/" + partition + val zk = new ZkClient(zkQuorum, 30 * 1000, 30 * 1000, ZKStringSerializer) + try { + if (zk.exists(dir)) { + val offset = zk.readData[String](dir) + return offset.toInt + } + } catch { + case e: Throwable => logWarning("Error reading Kafka brokers Zookeeper data", e) + } finally { + zk.close() + } + 0L } } \ No newline at end of file diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSimpleInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSimpleInputDStream.scala index 60da01bf18c1c..6695f29babc7b 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSimpleInputDStream.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSimpleInputDStream.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.streaming.kafka import org.apache.spark.Logging @@ -6,34 +23,37 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.dstream.ReceiverInputDStream import org.apache.spark.streaming.receiver.Receiver - import kafka.serializer.Decoder class KafkaSimpleInputDStream[U <: Decoder[_]: Manifest, T <: Decoder[_]: Manifest]( @transient ssc_ : StreamingContext, - brokers: Seq[String], + zkQuorum: String, + groupId: String, topic: String, partition: Integer, startPositionOffset: Long, + autoCommitOffset:Boolean, maxBatchByteSize: Int = 1024 * 1024, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2) extends ReceiverInputDStream[(Long, Array[Byte])](ssc_) with Logging { def getReceiver(): Receiver[(Long, Array[Byte])] = { - new KafkaSimpleReceiver[U, T](brokers, topic, partition, startPositionOffset, maxBatchByteSize, storageLevel) + new KafkaSimpleReceiver[U, T](zkQuorum, groupId, topic, partition, startPositionOffset, autoCommitOffset, maxBatchByteSize, storageLevel) .asInstanceOf[Receiver[(Long, Array[Byte])]] } } class KafkaSimpleReceiver[U <: Decoder[_]: Manifest, T <: Decoder[_]: Manifest]( - brokers: Seq[String], + zkQuorum: String, + groupId: String, topic: String, partition: Integer, startPositionOffset: Long, + autoCommitOffset: Boolean, maxBatchByteSize: Int = 1024 * 1024, storageLevel: StorageLevel) extends Receiver[Any](storageLevel) with Logging { var currentOffset = startPositionOffset - val kac = new KafkaSimpleConsumer(brokers, topic, partition, maxBatchByteSize); + val kac = new KafkaSimpleConsumer(zkQuorum, groupId, topic, partition, maxBatchByteSize) def onStop() { kac.close @@ -55,14 +75,16 @@ class KafkaSimpleReceiver[U <: Decoder[_]: Manifest, T <: Decoder[_]: Manifest]( while (itr.hasNext) { val messageAndOffset = itr.next() val payload = messageAndOffset.message.payload - val bytes = new Array[Byte](payload.limit); - payload.get(bytes); + val bytes = new Array[Byte](payload.limit) + payload.get(bytes) currentOffset = messageAndOffset.offset store((currentOffset, bytes)) hasMessage = true + if (autoCommitOffset) + kac.commitOffsetToZookeeper(currentOffset) } if (hasMessage) { - currentOffset = currentOffset + 1 + currentOffset += 1 } Thread.sleep(10) } diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala index efec64201a218..d82aab34e0d2d 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala @@ -149,15 +149,16 @@ object KafkaUtils { /** * Create an input stream that pulls messages form a Kafka Broker in according to the specified starting position. * @param ssc StreamingContext object - * @param borkers Kafka brokers list of host:port + * @param groupId The group id for this consumer + * @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..). * @param topic a topic to consume * @param partition partition of this topic * @param startPositionOffset beginning to consume from this offset position * @param maxBatchByteSize max buffer size for a fetch request * @param storageLevel RDD storage level. */ - def createStream(ssc: StreamingContext, brokers: Seq[String], topic: String, partition: Int, startPositionOffset: Long, - maxBatchByteSize: Int, storageLevel: StorageLevel): ReceiverInputDStream[(Long, Array[Byte])] = { - new KafkaSimpleInputDStream[StringDecoder, StringDecoder](ssc, brokers, topic, partition, startPositionOffset, maxBatchByteSize, storageLevel); + def createStream(ssc: StreamingContext, zkQuorum: String, groupId: String, topic: String, partition: Int, startPositionOffset: Long, + autoCommitOffset: Boolean, maxBatchByteSize: Int, storageLevel: StorageLevel): ReceiverInputDStream[(Long, Array[Byte])] = { + new KafkaSimpleInputDStream[StringDecoder, StringDecoder](ssc, zkQuorum, groupId, topic, partition, startPositionOffset, autoCommitOffset, maxBatchByteSize, storageLevel) } } From dd7b2d757b86e94c42320d5d75857cdb1e6f2f20 Mon Sep 17 00:00:00 2001 From: pengyanhong Date: Sat, 2 Aug 2014 22:58:16 +0800 Subject: [PATCH 08/10] [SPARK-2803]: add Kafka stream feature for fetch messages from specified starting offset position --- .../streaming/kafka/KafkaSimpleConsumer.scala | 50 +++++++++++++--- .../kafka/KafkaSimpleInputDStream.scala | 42 ++++++++------ .../spark/streaming/kafka/KafkaUtils.scala | 58 +++++++++++++++++-- 3 files changed, 117 insertions(+), 33 deletions(-) diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSimpleConsumer.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSimpleConsumer.scala index 0ea0511e99c58..7e39080578518 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSimpleConsumer.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSimpleConsumer.scala @@ -31,7 +31,13 @@ import org.apache.zookeeper.CreateMode import org.I0Itec.zkclient.DataUpdater @serializable -class KafkaSimpleConsumer(zkQuorum: String, groupId: String, topic: String, partition: Int, maxBatchByteSize: Int) extends Logging { +class KafkaSimpleConsumer( + zkQuorum: String, + groupId: String, + topic: String, + partition: Int, + maxBatchByteSize: Int + ) extends Logging { private var brokers: Seq[String] = _ private var leader: String = _ private var consumer: SimpleConsumer = null @@ -70,7 +76,10 @@ class KafkaSimpleConsumer(zkQuorum: String, groupId: String, topic: String, part return getOffset(consumer, topic, partition, kafka.api.OffsetRequest.LatestTime, 1) } - private def findNewLeader(oldLeader: String, replicaBrokers: Seq[String]): (String, Seq[String]) = { + private def findNewLeader( + oldLeader: String, + replicaBrokers: Seq[String] + ): (String, Seq[String]) = { for (i <- 0 until 3) { var goToSleep = false try { @@ -139,7 +148,8 @@ class KafkaSimpleConsumer(zkQuorum: String, groupId: String, topic: String, part ) { part.leader match { case Some(leader) => { - result = (leader.host + ":" + leader.port, part.replicas.map(brk => (brk.host + ":" + brk.port))) + result = (leader.host + ":" + leader.port, part.replicas.map(brk => (brk.host + ":" + + brk.port))) } case None => } @@ -149,8 +159,9 @@ class KafkaSimpleConsumer(zkQuorum: String, groupId: String, topic: String, part case e: Throwable => throw new Exception("Error communicating with Broker [" + broker + "] to find Leader for [" + topic + "] Reason: " + e) } finally { - if (consumer != null) + if (consumer != null) { consumer.close + } } } @@ -159,10 +170,11 @@ class KafkaSimpleConsumer(zkQuorum: String, groupId: String, topic: String, part for (broker <- brokers if (result == null)) { result = findLeaderAndReplicaBrokers(broker) } - if (result == null) + if (result == null) { throw new Exception("not found leader.") - else + } else { result + } } def close: Unit = { @@ -175,8 +187,9 @@ class KafkaSimpleConsumer(zkQuorum: String, groupId: String, topic: String, part val dir = "/consumers/" + groupId + "/offsets/" + topic + "/" + partition val zk = new ZkClient(zkQuorum, 30 * 1000, 30 * 1000, ZKStringSerializer) try { - if (zk.exists(dir) == false) + if (zk.exists(dir) == false) { zk.createPersistent(dir,true) + } zk.writeData(dir, offset.toString) } catch { case e: Throwable => logWarning("Error saving Kafka offset to Zookeeper dir: " + dir, e) @@ -218,7 +231,8 @@ object KafkaSimpleConsumer extends Logging { list.toSeq } - def getEndOffsetPositionFromZookeeper(groupId: String, zkQuorum: String, topic: String, partition: Int): Long = { + def getEndOffsetPositionFromZookeeper(groupId: String, zkQuorum: String, topic: String, + partition: Int): Long = { val dir = "/consumers/" + groupId + "/offsets/" + topic + "/" + partition val zk = new ZkClient(zkQuorum, 30 * 1000, 30 * 1000, ZKStringSerializer) try { @@ -233,4 +247,24 @@ object KafkaSimpleConsumer extends Logging { } 0L } + + def getTopicPartitionList(zkQuorum: String, topic: String): Seq[Int] = { + val list = new ArrayBuffer[Int]() + val dir = "/brokers/topics/" + topic+"/partitions" + val zk = new ZkClient(zkQuorum, 30 * 1000, 30 * 1000, ZKStringSerializer) + try { + if (zk.exists(dir)) { + val ids = zk.getChildren(dir) + import scala.collection.JavaConversions._ + for (id <- ids) { + list.append(id.toInt) + } + } + } catch { + case e: Throwable => logWarning("Error reading Kafka partitions list Zookeeper data", e) + } finally { + zk.close() + } + list.toSeq + } } \ No newline at end of file diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSimpleInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSimpleInputDStream.scala index 6695f29babc7b..fb370564fe1cf 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSimpleInputDStream.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSimpleInputDStream.scala @@ -26,31 +26,34 @@ import org.apache.spark.streaming.receiver.Receiver import kafka.serializer.Decoder class KafkaSimpleInputDStream[U <: Decoder[_]: Manifest, T <: Decoder[_]: Manifest]( - @transient ssc_ : StreamingContext, - zkQuorum: String, - groupId: String, - topic: String, - partition: Integer, - startPositionOffset: Long, - autoCommitOffset:Boolean, - maxBatchByteSize: Int = 1024 * 1024, - storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2) extends ReceiverInputDStream[(Long, Array[Byte])](ssc_) with Logging { + @transient ssc_ : StreamingContext, + zkQuorum: String, + groupId: String, + topic: String, + partition: Integer, + startPositionOffset: Long, + autoCommitOffset: Boolean, + maxBatchByteSize: Int = 1024 * 1024, + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 + ) extends ReceiverInputDStream[(Long, Array[Byte])](ssc_) with Logging { def getReceiver(): Receiver[(Long, Array[Byte])] = { - new KafkaSimpleReceiver[U, T](zkQuorum, groupId, topic, partition, startPositionOffset, autoCommitOffset, maxBatchByteSize, storageLevel) + new KafkaSimpleReceiver[U, T](zkQuorum, groupId, topic, partition, startPositionOffset, + autoCommitOffset, maxBatchByteSize, storageLevel) .asInstanceOf[Receiver[(Long, Array[Byte])]] } } class KafkaSimpleReceiver[U <: Decoder[_]: Manifest, T <: Decoder[_]: Manifest]( - zkQuorum: String, - groupId: String, - topic: String, - partition: Integer, - startPositionOffset: Long, - autoCommitOffset: Boolean, - maxBatchByteSize: Int = 1024 * 1024, - storageLevel: StorageLevel) extends Receiver[Any](storageLevel) with Logging { + zkQuorum: String, + groupId: String, + topic: String, + partition: Integer, + startPositionOffset: Long, + autoCommitOffset: Boolean, + maxBatchByteSize: Int = 1024 * 1024, + storageLevel: StorageLevel + ) extends Receiver[Any](storageLevel) with Logging { var currentOffset = startPositionOffset val kac = new KafkaSimpleConsumer(zkQuorum, groupId, topic, partition, maxBatchByteSize) @@ -64,7 +67,8 @@ class KafkaSimpleReceiver[U <: Decoder[_]: Manifest, T <: Decoder[_]: Manifest]( logInfo("Starting Kafka Consumer Stream") val firstOffset = kac.getEarliestOffset() if (currentOffset < firstOffset) { - logWarning(s"at present, the first offset is ${firstOffset}, the messages which is from ${currentOffset} to ${firstOffset} might been pruned.") + logWarning(s"""at present, the first offset is ${firstOffset}, the messages which is + |from ${currentOffset} to ${firstOffset} might been pruned.""".stripMargin) currentOffset = firstOffset } while (true) { diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala index d82aab34e0d2d..780799cd4e02a 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala @@ -147,18 +147,64 @@ object KafkaUtils { } /** - * Create an input stream that pulls messages form a Kafka Broker in according to the specified starting position. + * Create an input stream that pulls messages from a Kafka Broker, + * in according to the specified starting position. * @param ssc StreamingContext object - * @param groupId The group id for this consumer * @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..). + * @param groupId The group id for this consumer * @param topic a topic to consume * @param partition partition of this topic * @param startPositionOffset beginning to consume from this offset position - * @param maxBatchByteSize max buffer size for a fetch request + * @param autoCommitOffset whether need to commit automatically the latest offset + * in this batch messages into zookeeper, in order to recover pulling message + * from this offset position when consuming this streaming again + * @param maxBatchByteSize max buffer size for a fetch request, at least it's longer + * than a max-length message, it will contain more message if possible * @param storageLevel RDD storage level. */ - def createStream(ssc: StreamingContext, zkQuorum: String, groupId: String, topic: String, partition: Int, startPositionOffset: Long, - autoCommitOffset: Boolean, maxBatchByteSize: Int, storageLevel: StorageLevel): ReceiverInputDStream[(Long, Array[Byte])] = { - new KafkaSimpleInputDStream[StringDecoder, StringDecoder](ssc, zkQuorum, groupId, topic, partition, startPositionOffset, autoCommitOffset, maxBatchByteSize, storageLevel) + def createStream( + ssc: StreamingContext, + zkQuorum: String, + groupId: String, + topic: String, + partition: Int, + startPositionOffset: Long, + autoCommitOffset: Boolean, + maxBatchByteSize: Int, + storageLevel: StorageLevel + ): ReceiverInputDStream[(Long, Array[Byte])] = { + new KafkaSimpleInputDStream[StringDecoder, StringDecoder](ssc, zkQuorum, groupId, topic, + partition, startPositionOffset, autoCommitOffset, maxBatchByteSize, storageLevel) + } + + /** + * Create an input stream that pulls messages from a Kafka Broker, + * in according to the specified starting position. + * @param jssc JavaStreamingContext object + * @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..). + * @param groupId The group id for this consumer + * @param topic a topic to consume + * @param partition partition of this topic + * @param startPositionOffset beginning to consume from this offset position + * @param autoCommitOffset whether need to commit automatically the latest offset + * in this batch messages into zookeeper, in order to recover pulling message + * from this offset position when consuming this streaming again + * @param maxBatchByteSize max buffer size for a fetch request, at least it's longer + * than a max-length message, it will contain more message if possible + * @param storageLevel RDD storage level. + */ + def createStream( + jssc: JavaStreamingContext, + zkQuorum: String, + groupId: String, + topic: String, + partition: Int, + startPositionOffset: Long, + autoCommitOffset: Boolean, + maxBatchByteSize: Int, + storageLevel: StorageLevel + ): JavaPairReceiverInputDStream[Long, Array[Byte]] = { + new KafkaSimpleInputDStream[StringDecoder, StringDecoder](jssc.ssc, zkQuorum, groupId, topic, + partition, startPositionOffset, autoCommitOffset, maxBatchByteSize, storageLevel) } } From 738bd0fae58cd6c4147842b5a996ad504466e121 Mon Sep 17 00:00:00 2001 From: pengyanhong Date: Sun, 10 Aug 2014 22:54:19 +0800 Subject: [PATCH 09/10] refactor --- .../streaming/kafka/KafkaSimpleConsumer.scala | 8 ++--- .../kafka/KafkaSimpleInputDStream.scala | 9 +++--- .../spark/streaming/kafka/KafkaUtils.scala | 32 +++++++++---------- 3 files changed, 25 insertions(+), 24 deletions(-) diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSimpleConsumer.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSimpleConsumer.scala index 7e39080578518..4edab7786b763 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSimpleConsumer.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSimpleConsumer.scala @@ -42,7 +42,7 @@ class KafkaSimpleConsumer( private var leader: String = _ private var consumer: SimpleConsumer = null private val soTimeout = 60000 - private var bufferSize = 1024 + private val bufferSize = maxBatchByteSize private var replicaBrokers: Seq[(String)] = null private def init(): Unit = { @@ -171,7 +171,7 @@ class KafkaSimpleConsumer( result = findLeaderAndReplicaBrokers(broker) } if (result == null) { - throw new Exception("not found leader.") + throw new Exception("not found leader.") } else { result } @@ -250,7 +250,7 @@ object KafkaSimpleConsumer extends Logging { def getTopicPartitionList(zkQuorum: String, topic: String): Seq[Int] = { val list = new ArrayBuffer[Int]() - val dir = "/brokers/topics/" + topic+"/partitions" + val dir = "/brokers/topics/" + topic + "/partitions" val zk = new ZkClient(zkQuorum, 30 * 1000, 30 * 1000, ZKStringSerializer) try { if (zk.exists(dir)) { @@ -267,4 +267,4 @@ object KafkaSimpleConsumer extends Logging { } list.toSeq } -} \ No newline at end of file +} diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSimpleInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSimpleInputDStream.scala index fb370564fe1cf..62d6183fb4b64 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSimpleInputDStream.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSimpleInputDStream.scala @@ -68,7 +68,7 @@ class KafkaSimpleReceiver[U <: Decoder[_]: Manifest, T <: Decoder[_]: Manifest]( val firstOffset = kac.getEarliestOffset() if (currentOffset < firstOffset) { logWarning(s"""at present, the first offset is ${firstOffset}, the messages which is - |from ${currentOffset} to ${firstOffset} might been pruned.""".stripMargin) + |from ${currentOffset} to ${firstOffset} might been pruned.""".stripMargin) currentOffset = firstOffset } while (true) { @@ -84,13 +84,14 @@ class KafkaSimpleReceiver[U <: Decoder[_]: Manifest, T <: Decoder[_]: Manifest]( currentOffset = messageAndOffset.offset store((currentOffset, bytes)) hasMessage = true - if (autoCommitOffset) + if (autoCommitOffset) { kac.commitOffsetToZookeeper(currentOffset) + } } if (hasMessage) { currentOffset += 1 } - Thread.sleep(10) + Thread.sleep(1000) } } -} \ No newline at end of file +} diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala index 780799cd4e02a..24d976abee714 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala @@ -149,16 +149,16 @@ object KafkaUtils { /** * Create an input stream that pulls messages from a Kafka Broker, * in according to the specified starting position. - * @param ssc StreamingContext object - * @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..). - * @param groupId The group id for this consumer - * @param topic a topic to consume - * @param partition partition of this topic - * @param startPositionOffset beginning to consume from this offset position - * @param autoCommitOffset whether need to commit automatically the latest offset + * @param ssc StreamingContext object + * @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..). + * @param groupId The group id for this consumer + * @param topic a topic to consume + * @param partition partition of this topic + * @param startPositionOffset beginning to consume from this offset position + * @param autoCommitOffset whether need to commit automatically the latest offset * in this batch messages into zookeeper, in order to recover pulling message * from this offset position when consuming this streaming again - * @param maxBatchByteSize max buffer size for a fetch request, at least it's longer + * @param maxBatchByteSize max buffer size for a fetch request, at least it's longer * than a max-length message, it will contain more message if possible * @param storageLevel RDD storage level. */ @@ -180,16 +180,16 @@ object KafkaUtils { /** * Create an input stream that pulls messages from a Kafka Broker, * in according to the specified starting position. - * @param jssc JavaStreamingContext object - * @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..). - * @param groupId The group id for this consumer - * @param topic a topic to consume - * @param partition partition of this topic - * @param startPositionOffset beginning to consume from this offset position - * @param autoCommitOffset whether need to commit automatically the latest offset + * @param jssc JavaStreamingContext object + * @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..). + * @param groupId The group id for this consumer + * @param topic a topic to consume + * @param partition partition of this topic + * @param startPositionOffset beginning to consume from this offset position + * @param autoCommitOffset whether need to commit automatically the latest offset * in this batch messages into zookeeper, in order to recover pulling message * from this offset position when consuming this streaming again - * @param maxBatchByteSize max buffer size for a fetch request, at least it's longer + * @param maxBatchByteSize max buffer size for a fetch request, at least it's longer * than a max-length message, it will contain more message if possible * @param storageLevel RDD storage level. */ From 67f242862fa59991894485a8939cd652cd3d8f48 Mon Sep 17 00:00:00 2001 From: pengyanhong Date: Tue, 12 Aug 2014 22:32:30 +0800 Subject: [PATCH 10/10] fix bug --- .../apache/spark/streaming/kafka/KafkaSimpleConsumer.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSimpleConsumer.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSimpleConsumer.scala index 407372631905e..783f8215e1d41 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSimpleConsumer.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaSimpleConsumer.scala @@ -118,9 +118,9 @@ class KafkaSimpleConsumer( if (numErrors > 5) { throw new Exception("Error fetching data from the Broker:" + leader + " Reason: " + code) } - if (code == ErrorMapping.OffsetOutOfRangeCode) { - return fetch(getLatestOffset()) - } +// if (code == ErrorMapping.OffsetOutOfRangeCode) { +// return fetch(getLatestOffset()) +// } close val data = findNewLeader(leader, replicaBrokers) leader = data._1