Permalink
Browse files

KAFKA-759 Commit/FetchOffset APIs should not return versionId; review…

…ed by Neha Narkhede
  • Loading branch information...
1 parent 218e6a5 commit 82b11aa0d4bc32c5a351ace3a67cd2d57c9d1e8d @nehanarkhede nehanarkhede committed Mar 1, 2013
@@ -29,7 +29,6 @@ object OffsetCommitResponse extends Logging {
def readFrom(buffer: ByteBuffer): OffsetCommitResponse = {
// Read values from the envelope
- val versionId = buffer.getShort
val correlationId = buffer.getInt
val clientId = readShortString(buffer)
@@ -44,12 +43,11 @@ object OffsetCommitResponse extends Logging {
(TopicAndPartition(topic, partitionId), error)
})
})
- OffsetCommitResponse(Map(pairs:_*), versionId, correlationId, clientId)
+ OffsetCommitResponse(Map(pairs:_*), correlationId, clientId)
}
}
case class OffsetCommitResponse(requestInfo: Map[TopicAndPartition, Short],
- versionId: Short = OffsetCommitResponse.CurrentVersion,
correlationId: Int = 0,
clientId: String = OffsetCommitResponse.DefaultClientId)
extends RequestOrResponse {
@@ -58,7 +56,6 @@ case class OffsetCommitResponse(requestInfo: Map[TopicAndPartition, Short],
def writeTo(buffer: ByteBuffer) {
// Write envelope
- buffer.putShort(versionId)
buffer.putInt(correlationId)
writeShortString(buffer, clientId)
@@ -75,7 +72,6 @@ case class OffsetCommitResponse(requestInfo: Map[TopicAndPartition, Short],
}
override def sizeInBytes =
- 2 + /* versionId */
4 + /* correlationId */
shortStringLength(clientId) +
4 + /* topic count */
@@ -29,7 +29,6 @@ object OffsetFetchResponse extends Logging {
def readFrom(buffer: ByteBuffer): OffsetFetchResponse = {
// Read values from the envelope
- val versionId = buffer.getShort
val correlationId = buffer.getInt
val clientId = readShortString(buffer)
@@ -46,12 +45,11 @@ object OffsetFetchResponse extends Logging {
(TopicAndPartition(topic, partitionId), OffsetMetadataAndError(offset, metadata, error))
})
})
- OffsetFetchResponse(Map(pairs:_*), versionId, correlationId, clientId)
+ OffsetFetchResponse(Map(pairs:_*), correlationId, clientId)
}
}
case class OffsetFetchResponse(requestInfo: Map[TopicAndPartition, OffsetMetadataAndError],
- versionId: Short = OffsetFetchResponse.CurrentVersion,
correlationId: Int = 0,
clientId: String = OffsetFetchResponse.DefaultClientId)
extends RequestOrResponse {
@@ -60,7 +58,6 @@ case class OffsetFetchResponse(requestInfo: Map[TopicAndPartition, OffsetMetadat
def writeTo(buffer: ByteBuffer) {
// Write envelope
- buffer.putShort(versionId)
buffer.putInt(correlationId)
writeShortString(buffer, clientId)
@@ -79,7 +76,6 @@ case class OffsetFetchResponse(requestInfo: Map[TopicAndPartition, OffsetMetadat
}
override def sizeInBytes =
- 2 + /* versionId */
4 + /* correlationId */
shortStringLength(clientId) +
4 + /* topic count */
@@ -489,7 +489,6 @@ class KafkaApis(val requestChannel: RequestChannel,
}
})
val response = new OffsetCommitResponse(responseInfo,
- offsetCommitRequest.versionId,
offsetCommitRequest.correlationId,
offsetCommitRequest.clientId)
requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
@@ -521,7 +520,6 @@ class KafkaApis(val requestChannel: RequestChannel,
}
})
val response = new OffsetFetchResponse(collection.immutable.Map(responseInfo: _*),
- offsetFetchRequest.versionId,
offsetFetchRequest.correlationId,
offsetFetchRequest.clientId)
requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))

0 comments on commit 82b11aa

Please sign in to comment.