Permalink
Browse files

KAFKA-763 follow up changes; reviewed by Neha Narkhede

  • Loading branch information...
1 parent c546286 commit 0a9283530418781941b7c232a8b09e4ec071b0e9 Swapnil Ghike committed with nehanarkhede Mar 13, 2013
@@ -136,22 +136,12 @@ class SimpleConsumer(val host: String,
* Get the earliest or latest offset of a given topic, partition.
* @param topicAndPartition Topic and partition of which the offset is needed.
* @param earliestOrLatest A value to indicate earliest or latest offset.
- * @param consumerId Id of the consumer which could be a client or a follower broker.
- * @param isFromOrdinaryConsumer Boolean to specify ordinary consumer or debugging consumer.
+ * @param consumerId Id of the consumer which could be a consumer client, SimpleConsumerShell or a follower broker.
* @return Requested offset.
*/
- def earliestOrLatestOffset(topicAndPartition: TopicAndPartition,
- earliestOrLatest: Long,
- consumerId: Int = Request.OrdinaryConsumerId,
- isFromOrdinaryConsumer: Boolean = true): Long = {
- val request =
- if(isFromOrdinaryConsumer)
- OffsetRequest(requestInfo = Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1)),
- replicaId = consumerId)
- else
- OffsetRequest(requestInfo = Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1)),
- replicaId = Request.DebuggingConsumerId)
-
+ def earliestOrLatestOffset(topicAndPartition: TopicAndPartition, earliestOrLatest: Long, consumerId: Int): Long = {
+ val request = OffsetRequest(requestInfo = Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1)),
+ replicaId = consumerId)
val partitionErrorAndOffset = getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition)
val offset = partitionErrorAndOffset.error match {
case ErrorMapping.NoError => partitionErrorAndOffset.offsets.head
@@ -83,18 +83,18 @@ class ReplicaFetcherThread(name:String,
val leaderEndOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.LatestTime, brokerConfig.brokerId)
if (leaderEndOffset < log.logEndOffset) {
log.truncateTo(leaderEndOffset)
- return leaderEndOffset
+ leaderEndOffset
+ } else {
+ /**
+ * The follower could have been down for a long time and when it starts up, its end offset could be smaller than the leader's
+ * start offset because the leader has deleted old logs (log.logEndOffset < leaderStartOffset).
+ *
+ * Roll out a new log at the follower with the start offset equal to the current leader's start offset and continue fetching.
+ */
+ val leaderStartOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.EarliestTime, brokerConfig.brokerId)
+ log.truncateAndStartWithNewOffset(leaderStartOffset)
+ leaderStartOffset
}
-
- /**
- * The follower could have been down for a long time and when it starts up, its end offset could be smaller than the leader's
- * start offset because the leader has deleted old logs (log.logEndOffset < leaderStartOffset).
- *
- * Roll out a new log at the follower with the start offset equal to the current leader's start offset and continue fetching.
- */
- val leaderStartOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.EarliestTime, brokerConfig.brokerId)
- log.truncateAndStartWithNewOffset(leaderStartOffset)
- leaderStartOffset
}
// any logic for partitions whose leader has changed
@@ -165,9 +165,8 @@ object SimpleConsumerShell extends Logging {
val simpleConsumer = new SimpleConsumer(fetchTargetBroker.host, fetchTargetBroker.port, ConsumerConfig.SocketTimeout,
ConsumerConfig.SocketBufferSize, clientId)
try {
- startingOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition = TopicAndPartition(topic, partitionId),
- earliestOrLatest = startingOffset,
- isFromOrdinaryConsumer = false)
+ startingOffset = simpleConsumer.earliestOrLatestOffset(TopicAndPartition(topic, partitionId), startingOffset,
+ Request.DebuggingConsumerId)
} catch {
case t: Throwable =>
System.err.println("Error in getting earliest or latest offset due to: " + Utils.stackTrace(t))

0 comments on commit 0a92835

Please sign in to comment.