Skip to content

Commit

Permalink
KAFKA-1012; Consumer offset management in Kafka; patched by Tejas Pat…
Browse files Browse the repository at this point in the history
…il and Joel Koshy; feedback and reviews from Neha Narkhede, Jun Rao, Guozhang Wang, Sriram Subramanian, Joe Stein, Chris Riccomini
  • Loading branch information
jjkoshy committed Mar 14, 2014
1 parent 84a3a9a commit a670537
Show file tree
Hide file tree
Showing 82 changed files with 3,609 additions and 1,644 deletions.
7 changes: 6 additions & 1 deletion core/src/main/scala/kafka/admin/TopicCommand.scala
Expand Up @@ -26,6 +26,8 @@ import scala.collection.JavaConversions._
import kafka.cluster.Broker
import kafka.log.LogConfig
import kafka.consumer.Whitelist
import kafka.server.OffsetManager


object TopicCommand {

Expand Down Expand Up @@ -70,7 +72,7 @@ object TopicCommand {
if (opts.options.has(opts.topicOpt)) {
val topicsSpec = opts.options.valueOf(opts.topicOpt)
val topicsFilter = new Whitelist(topicsSpec)
allTopics.filter(topicsFilter.isTopicAllowed)
allTopics.filter(topicsFilter.isTopicAllowed(_, excludeInternalTopics = false))
} else
allTopics
}
Expand Down Expand Up @@ -104,6 +106,9 @@ object TopicCommand {
println("Updated config for topic \"%s\".".format(topic))
}
if(opts.options.has(opts.partitionsOpt)) {
if (topic == OffsetManager.OffsetsTopicName) {
throw new IllegalArgumentException("The number of partitions for the offsets topic cannot be changed.")
}
println("WARNING: If partitions are increased for a topic that has a key, the partition " +
"logic or ordering of the messages will be affected")
val nPartitions = opts.options.valueOf(opts.partitionsOpt).intValue
Expand Down
79 changes: 79 additions & 0 deletions core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala
@@ -0,0 +1,79 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.api

import java.nio.ByteBuffer
import kafka.network.{BoundedByteBufferSend, RequestChannel}
import kafka.network.RequestChannel.Response
import kafka.common.ErrorMapping

object ConsumerMetadataRequest {
val CurrentVersion = 0.shortValue
val DefaultClientId = ""

def readFrom(buffer: ByteBuffer) = {
// envelope
val versionId = buffer.getShort
val correlationId = buffer.getInt
val clientId = ApiUtils.readShortString(buffer)

// request
val group = ApiUtils.readShortString(buffer)
ConsumerMetadataRequest(group, versionId, correlationId, clientId)
}

}

case class ConsumerMetadataRequest(group: String,
versionId: Short = ConsumerMetadataRequest.CurrentVersion,
override val correlationId: Int = 0,
clientId: String = ConsumerMetadataRequest.DefaultClientId)
extends RequestOrResponse(Some(RequestKeys.ConsumerMetadataKey), correlationId) {

def sizeInBytes =
2 + /* versionId */
4 + /* correlationId */
ApiUtils.shortStringLength(clientId) +
ApiUtils.shortStringLength(group)

def writeTo(buffer: ByteBuffer) {
// envelope
buffer.putShort(versionId)
buffer.putInt(correlationId)
ApiUtils.writeShortString(buffer, clientId)

// consumer metadata request
ApiUtils.writeShortString(buffer, group)
}

override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
// return ConsumerCoordinatorNotAvailable for all uncaught errors
val errorResponse = ConsumerMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode)
requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
}

def describe(details: Boolean) = {
val consumerMetadataRequest = new StringBuilder
consumerMetadataRequest.append("Name: " + this.getClass.getSimpleName)
consumerMetadataRequest.append("; Version: " + versionId)
consumerMetadataRequest.append("; CorrelationId: " + correlationId)
consumerMetadataRequest.append("; ClientId: " + clientId)
consumerMetadataRequest.append("; Group: " + group)
consumerMetadataRequest.toString()
}
}
57 changes: 57 additions & 0 deletions core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala
@@ -0,0 +1,57 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.api

import java.nio.ByteBuffer
import kafka.cluster.Broker
import kafka.common.ErrorMapping

object ConsumerMetadataResponse {
val CurrentVersion = 0

def readFrom(buffer: ByteBuffer) = {
val correlationId = buffer.getInt
val errorCode = buffer.getShort
val coordinatorOpt = if (errorCode == ErrorMapping.NoError)
Some(Broker.readFrom(buffer))
else
None

ConsumerMetadataResponse(coordinatorOpt, errorCode, correlationId)
}

}

case class ConsumerMetadataResponse (coordinator: Option[Broker], errorCode: Short, override val correlationId: Int = 0)
extends RequestOrResponse(correlationId = correlationId) {

def sizeInBytes =
4 + /* correlationId */
2 + /* error code */
coordinator.map(_.sizeInBytes).getOrElse(0)

def writeTo(buffer: ByteBuffer) {
buffer.putInt(correlationId)
buffer.putShort(errorCode)
if (errorCode == ErrorMapping.NoError) {
coordinator.get.writeTo(buffer)
}
}

def describe(details: Boolean) = toString
}
57 changes: 41 additions & 16 deletions core/src/main/scala/kafka/api/OffsetCommitRequest.scala
Expand Up @@ -18,17 +18,20 @@
package kafka.api

import java.nio.ByteBuffer

import kafka.api.ApiUtils._
import kafka.utils.Logging
import kafka.utils.{SystemTime, Logging}
import kafka.network.{RequestChannel, BoundedByteBufferSend}
import kafka.common.{ErrorMapping, TopicAndPartition, OffsetMetadataAndError}
import kafka.common.{OffsetAndMetadata, ErrorMapping, TopicAndPartition}
import kafka.network.RequestChannel.Response
import scala.collection._

object OffsetCommitRequest extends Logging {
val CurrentVersion: Short = 0
val DefaultClientId = ""

def readFrom(buffer: ByteBuffer): OffsetCommitRequest = {
val now = SystemTime.milliseconds

// Read values from the envelope
val versionId = buffer.getShort
val correlationId = buffer.getInt
Expand All @@ -43,23 +46,45 @@ object OffsetCommitRequest extends Logging {
(1 to partitionCount).map(_ => {
val partitionId = buffer.getInt
val offset = buffer.getLong
val timestamp = {
val given = buffer.getLong
if (given == -1L) now else given
}
val metadata = readShortString(buffer)
(TopicAndPartition(topic, partitionId), OffsetMetadataAndError(offset, metadata))
(TopicAndPartition(topic, partitionId), OffsetAndMetadata(offset, metadata, timestamp))
})
})
OffsetCommitRequest(consumerGroupId, Map(pairs:_*), versionId, correlationId, clientId)
OffsetCommitRequest(consumerGroupId, mutable.Map(pairs:_*), versionId, correlationId, clientId)
}
}

case class OffsetCommitRequest(groupId: String,
requestInfo: Map[TopicAndPartition, OffsetMetadataAndError],
requestInfo: mutable.Map[TopicAndPartition, OffsetAndMetadata],
versionId: Short = OffsetCommitRequest.CurrentVersion,
override val correlationId: Int = 0,
clientId: String = OffsetCommitRequest.DefaultClientId)
extends RequestOrResponse(Some(RequestKeys.OffsetCommitKey), correlationId) {

lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic)


def filterLargeMetadata(maxMetadataSize: Int) =
requestInfo.filter(info => info._2.metadata == null || info._2.metadata.length <= maxMetadataSize)

def responseFor(errorCode: Short, offsetMetadataMaxSize: Int) = {
val commitStatus = requestInfo.map {info =>
(info._1, if (info._2.metadata != null && info._2.metadata.length > offsetMetadataMaxSize)
ErrorMapping.OffsetMetadataTooLargeCode
else if (errorCode == ErrorMapping.UnknownTopicOrPartitionCode)
ErrorMapping.ConsumerCoordinatorNotAvailableCode
else if (errorCode == ErrorMapping.NotLeaderForPartitionCode)
ErrorMapping.NotCoordinatorForConsumerCode
else
errorCode)
}.toMap
OffsetCommitResponse(commitStatus, correlationId)
}


def writeTo(buffer: ByteBuffer) {
// Write envelope
buffer.putShort(versionId)
Expand All @@ -73,9 +98,10 @@ case class OffsetCommitRequest(groupId: String,
writeShortString(buffer, t1._1) // topic
buffer.putInt(t1._2.size) // number of partitions for this topic
t1._2.foreach( t2 => {
buffer.putInt(t2._1.partition) // partition
buffer.putLong(t2._2.offset) // offset
writeShortString(buffer, t2._2.metadata) // metadata
buffer.putInt(t2._1.partition)
buffer.putLong(t2._2.offset)
buffer.putLong(t2._2.timestamp)
writeShortString(buffer, t2._2.metadata)
})
})
}
Expand All @@ -95,15 +121,14 @@ case class OffsetCommitRequest(groupId: String,
innerCount +
4 /* partition */ +
8 /* offset */ +
8 /* timestamp */ +
shortStringLength(offsetAndMetadata._2.metadata)
})
})

override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
val responseMap = requestInfo.map {
case (topicAndPartition, offset) => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
}.toMap
val errorResponse = OffsetCommitResponse(requestInfo=responseMap, correlationId=correlationId)
val errorCode = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])
val errorResponse = responseFor(errorCode, Int.MaxValue)
requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
}

Expand All @@ -119,7 +144,7 @@ case class OffsetCommitRequest(groupId: String,
offsetCommitRequest.toString()
}

override def toString(): String = {
describe(true)
override def toString = {
describe(details = true)
}
}
40 changes: 18 additions & 22 deletions core/src/main/scala/kafka/api/OffsetCommitResponse.scala
Expand Up @@ -19,9 +19,8 @@ package kafka.api

import java.nio.ByteBuffer

import kafka.api.ApiUtils._
import kafka.common.TopicAndPartition
import kafka.utils.Logging
import kafka.common.TopicAndPartition

object OffsetCommitResponse extends Logging {
val CurrentVersion: Short = 0
Expand All @@ -30,7 +29,7 @@ object OffsetCommitResponse extends Logging {
val correlationId = buffer.getInt
val topicCount = buffer.getInt
val pairs = (1 to topicCount).flatMap(_ => {
val topic = readShortString(buffer)
val topic = ApiUtils.readShortString(buffer)
val partitionCount = buffer.getInt
(1 to partitionCount).map(_ => {
val partitionId = buffer.getInt
Expand All @@ -42,37 +41,34 @@ object OffsetCommitResponse extends Logging {
}
}

case class OffsetCommitResponse(requestInfo: Map[TopicAndPartition, Short],
case class OffsetCommitResponse(commitStatus: Map[TopicAndPartition, Short],
override val correlationId: Int = 0)
extends RequestOrResponse(correlationId=correlationId) {

lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic)
lazy val commitStatusGroupedByTopic = commitStatus.groupBy(_._1.topic)

def writeTo(buffer: ByteBuffer) {
buffer.putInt(correlationId)
buffer.putInt(requestInfoGroupedByTopic.size) // number of topics
requestInfoGroupedByTopic.foreach( t1 => { // topic -> Map[TopicAndPartition, Short]
writeShortString(buffer, t1._1) // topic
buffer.putInt(t1._2.size) // number of partitions for this topic
t1._2.foreach( t2 => { // TopicAndPartition -> Short
buffer.putInt(t2._1.partition)
buffer.putShort(t2._2) //error
})
})
buffer.putInt(commitStatusGroupedByTopic.size)
commitStatusGroupedByTopic.foreach { case(topic, statusMap) =>
ApiUtils.writeShortString(buffer, topic)
buffer.putInt(statusMap.size) // partition count
statusMap.foreach { case(topicAndPartition, errorCode) =>
buffer.putInt(topicAndPartition.partition)
buffer.putShort(errorCode)
}
}
}

override def sizeInBytes =
4 + /* correlationId */
4 + /* topic count */
requestInfoGroupedByTopic.foldLeft(0)((count, topicAndOffsets) => {
val (topic, offsets) = topicAndOffsets
commitStatusGroupedByTopic.foldLeft(0)((count, partitionStatusMap) => {
val (topic, partitionStatus) = partitionStatusMap
count +
shortStringLength(topic) + /* topic */
4 + /* number of partitions */
offsets.size * (
4 + /* partition */
2 /* error */
)
ApiUtils.shortStringLength(topic) +
4 + /* partition count */
partitionStatus.size * ( 4 /* partition */ + 2 /* error code */)
})

override def describe(details: Boolean):String = { toString }
Expand Down

0 comments on commit a670537

Please sign in to comment.