Skip to content

Commit

Permalink
kafka-763 delta; Add an option to replica from the largest offset dur…
Browse files Browse the repository at this point in the history
…ing unclean leader election; patched by Swapnil Ghike; reviewed by Jun Rao
  • Loading branch information
Swapnil Ghike authored and junrao committed Mar 13, 2013
1 parent 0a92835 commit 3b3fb7f
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 6 deletions.
Expand Up @@ -58,12 +58,7 @@ class ConsumerFetcherThread(name: String,
case OffsetRequest.LargestTimeString => startTimestamp = OffsetRequest.LatestTime
case _ => startTimestamp = OffsetRequest.LatestTime
}
val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(startTimestamp, 1)))
val partitionErrorAndOffset = simpleConsumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition)
val newOffset = partitionErrorAndOffset.error match {
case ErrorMapping.NoError => partitionErrorAndOffset.offsets.head
case _ => throw ErrorMapping.exceptionFor(partitionErrorAndOffset.error)
}
val newOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, startTimestamp, Request.OrdinaryConsumerId)
val pti = partitionMap(topicAndPartition)
pti.resetFetchOffset(newOffset)
pti.resetConsumeOffset(newOffset)
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/kafka/consumer/SimpleConsumer.scala
Expand Up @@ -141,6 +141,7 @@ class SimpleConsumer(val host: String,
*/
def earliestOrLatestOffset(topicAndPartition: TopicAndPartition, earliestOrLatest: Long, consumerId: Int): Long = {
val request = OffsetRequest(requestInfo = Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1)),
clientId = clientId,
replicaId = consumerId)
val partitionErrorAndOffset = getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition)
val offset = partitionErrorAndOffset.error match {
Expand Down

0 comments on commit 3b3fb7f

Please sign in to comment.