From d379ef6944e7dbed930f97ff51f6c9f98e9f9376 Mon Sep 17 00:00:00 2001 From: Vahid Hashemian Date: Fri, 4 Mar 2016 16:47:05 -0800 Subject: [PATCH] KAFKA-3312: Add utility offset methods to ZkUtils Create utility getOffset(...) and updateOffset(...) methods to replace readData(...), readDataMaybeNull(...) and updatePersistentPath(...) method calls where they deal with getting or setting offsets. --- .../main/scala/kafka/admin/ConsumerGroupCommand.scala | 4 ++-- .../kafka/consumer/ZookeeperConsumerConnector.scala | 8 ++++---- core/src/main/scala/kafka/server/KafkaApis.scala | 10 +++++----- .../scala/kafka/tools/ConsumerOffsetChecker.scala | 4 ++-- core/src/main/scala/kafka/tools/ExportZkOffsets.scala | 2 +- core/src/main/scala/kafka/tools/ImportZkOffsets.scala | 2 +- .../main/scala/kafka/tools/UpdateOffsetsInZK.scala | 2 +- core/src/main/scala/kafka/utils/ZkUtils.scala | 11 +++++++++++ core/src/test/scala/unit/kafka/utils/TestUtils.scala | 2 +- 9 files changed, 28 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala index 414e7baee449..674c138fc837 100755 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -216,8 +216,8 @@ object ConsumerGroupCommand { // this group may not have migrated off zookeeper for offsets storage (we don't expose the dual-commit option in this tool // (meaning the lag may be off until all the consumers in the group have the same setting for offsets storage) try { - val offset = zkUtils.readData(topicDirs.consumerOffsetDir + "/" + topicAndPartition.partition)._1.toLong - offsetMap.put(topicAndPartition, offset) + val offset = zkUtils.getOffset(topicDirs.consumerOffsetDir + "/" + topicAndPartition.partition) + offsetMap.put(topicAndPartition, offset.get) } catch { case z: ZkNoNodeException => println("Could not fetch offset from zookeeper for group %s partition %s due to missing offset data in zookeeper." diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index f776578f6d49..bd8be4a2c058 100755 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -308,7 +308,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, def commitOffsetToZooKeeper(topicPartition: TopicAndPartition, offset: Long) { if (checkpointedZkOffsets.get(topicPartition) != offset) { val topicDirs = new ZKGroupTopicDirs(config.groupId, topicPartition.topic) - zkUtils.updatePersistentPath(topicDirs.consumerOffsetDir + "/" + topicPartition.partition, offset.toString) + zkUtils.updateOffset(topicDirs.consumerOffsetDir + "/" + topicPartition.partition, offset.toString) checkpointedZkOffsets.put(topicPartition, offset) zkCommitMeter.mark() } @@ -416,9 +416,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, private def fetchOffsetFromZooKeeper(topicPartition: TopicAndPartition) = { val dirs = new ZKGroupTopicDirs(config.groupId, topicPartition.topic) - val offsetString = zkUtils.readDataMaybeNull(dirs.consumerOffsetDir + "/" + topicPartition.partition)._1 - offsetString match { - case Some(offsetStr) => (topicPartition, OffsetMetadataAndError(offsetStr.toLong)) + val offset = zkUtils.getOffset(dirs.consumerOffsetDir + "/" + topicPartition.partition) + offset match { + case Some(offsetVal) => (topicPartition, OffsetMetadataAndError(offsetVal)) case None => (topicPartition, OffsetMetadataAndError.NoOffset) } } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 086bd4b893db..b8402595ca18 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -264,7 +264,7 @@ class KafkaApis(val requestChannel: RequestChannel, else if (partitionData.metadata != null && partitionData.metadata.length > config.offsetMetadataMaxSize) (topicPartition, Errors.OFFSET_METADATA_TOO_LARGE.code) else { - zkUtils.updatePersistentPath(s"${topicDirs.consumerOffsetDir}/${topicPartition.partition}", partitionData.offset.toString) + zkUtils.updateOffset(s"${topicDirs.consumerOffsetDir}/${topicPartition.partition}", partitionData.offset.toString) (topicPartition, Errors.NONE.code) } } catch { @@ -774,10 +774,10 @@ class KafkaApis(val requestChannel: RequestChannel, if (!metadataCache.hasTopicMetadata(topicPartition.topic)) (topicPartition, unknownTopicPartitionResponse) else { - val payloadOpt = zkUtils.readDataMaybeNull(s"${topicDirs.consumerOffsetDir}/${topicPartition.partition}")._1 - payloadOpt match { - case Some(payload) => - (topicPartition, new OffsetFetchResponse.PartitionData(payload.toLong, "", Errors.NONE.code)) + val payload = zkUtils.getOffset(s"${topicDirs.consumerOffsetDir}/${topicPartition.partition}") + payload match { + case Some(offsetVal) => + (topicPartition, new OffsetFetchResponse.PartitionData(offsetVal, "", Errors.NONE.code)) case None => (topicPartition, unknownTopicPartitionResponse) } diff --git a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala index 5c01f34a1e26..bbf9149fe3fb 100644 --- a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala +++ b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala @@ -177,8 +177,8 @@ object ConsumerOffsetChecker extends Logging { // this group may not have migrated off zookeeper for offsets storage (we don't expose the dual-commit option in this tool // (meaning the lag may be off until all the consumers in the group have the same setting for offsets storage) try { - val offset = zkUtils.readData(topicDirs.consumerOffsetDir + "/%d".format(topicAndPartition.partition))._1.toLong - offsetMap.put(topicAndPartition, offset) + val offset = zkUtils.getOffset(topicDirs.consumerOffsetDir + "/%d".format(topicAndPartition.partition)) + offsetMap.put(topicAndPartition, offset.get) } catch { case z: ZkNoNodeException => if(zkUtils.pathExists(topicDirs.consumerOffsetDir)) diff --git a/core/src/main/scala/kafka/tools/ExportZkOffsets.scala b/core/src/main/scala/kafka/tools/ExportZkOffsets.scala index ccccae579232..a2b21a3867bf 100644 --- a/core/src/main/scala/kafka/tools/ExportZkOffsets.scala +++ b/core/src/main/scala/kafka/tools/ExportZkOffsets.scala @@ -101,7 +101,7 @@ object ExportZkOffsets extends Logging { for (bidPid <- bidPidList) { val zkGrpTpDir = new ZKGroupTopicDirs(consumerGrp,topic) val offsetPath = zkGrpTpDir.consumerOffsetDir + "/" + bidPid - zkUtils.readDataMaybeNull(offsetPath)._1 match { + zkUtils.getOffset(offsetPath) match { case Some(offsetVal) => fileWriter.write(offsetPath + ":" + offsetVal + "\n") debug(offsetPath + " => " + offsetVal) diff --git a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala index 60d48fa326c4..a3d1b1159d77 100644 --- a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala +++ b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala @@ -98,7 +98,7 @@ object ImportZkOffsets extends Logging { debug("updating [" + partition + "] with offset [" + offset + "]") try { - zkUtils.updatePersistentPath(partition, offset.toString) + zkUtils.updateOffset(partition, offset) } catch { case e: Throwable => e.printStackTrace() } diff --git a/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala b/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala index 96a33b17de3c..b27a4c38ee9d 100755 --- a/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala +++ b/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala @@ -76,7 +76,7 @@ object UpdateOffsetsInZK { val topicDirs = new ZKGroupTopicDirs(config.groupId, topic) println("updating partition " + partition + " with new offset: " + offset) - zkUtils.updatePersistentPath(topicDirs.consumerOffsetDir + "/" + partition, offset.toString) + zkUtils.updateOffset(topicDirs.consumerOffsetDir + "/" + partition, offset.toString) numParts += 1 case None => throw new KafkaException("Broker information for broker id %d does not exist in ZK".format(broker)) } diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 81eb24ad105c..29c5e512e421 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -867,6 +867,17 @@ class ZkUtils(val zkClient: ZkClient, zkClient.close() } } + + def getOffset(path: String): Option[Long] = { + val offsetString = readDataMaybeNull(path)._1 + offsetString match { + case Some(offsetStr) => Some(offsetStr.toLong) + case None => None + } + } + + def updateOffset(path: String, offset: String) = + updatePersistentPath(path, offset) } private object ZKStringSerializer extends ZkSerializer { diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 7df87fc31c76..b1460e5cd10d 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -590,7 +590,7 @@ object TestUtils extends Logging { def updateConsumerOffset(config : ConsumerConfig, path : String, offset : Long) = { val zkUtils = ZkUtils(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false) - zkUtils.updatePersistentPath(path, offset.toString) + zkUtils.updateOffset(path, offset.toString) zkUtils.close() }