Permalink
Browse files

KAFKA-815 Improve SimpleConsumerShell to take in a max messages confi…

…g option;reviewed by Jun Rao
  • Loading branch information...
1 parent a376f92 commit a737986e54ea53a2b93f3d08f5eb7fd155095f3c @nehanarkhede nehanarkhede committed Mar 20, 2013
@@ -290,6 +290,11 @@ class DefaultMessageFormatter extends MessageFormatter {
}
}
+class NoOpMessageFormatter extends MessageFormatter {
+ override def init(props: Properties) {}
+ def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) {}
+}
+
class ChecksumMessageFormatter extends MessageFormatter {
private var topicStr: String = _
@@ -84,6 +84,11 @@ object SimpleConsumerShell extends Logging {
.describedAs("ms")
.ofType(classOf[java.lang.Integer])
.defaultsTo(1000)
+ val maxMessagesOpt = parser.accepts("max-messages", "The number of messages to consume")
+ .withRequiredArg
+ .describedAs("max-messages")
+ .ofType(classOf[java.lang.Integer])
+ .defaultsTo(Integer.MAX_VALUE)
val skipMessageOnErrorOpt = parser.accepts("skip-message-on-error", "If there is an error when processing a message, " +
"skip it instead of halt.")
val noWaitAtEndOfLogOpt = parser.accepts("no-wait-at-logend",
@@ -105,6 +110,7 @@ object SimpleConsumerShell extends Logging {
val fetchSize = options.valueOf(fetchSizeOpt).intValue
val clientId = options.valueOf(clientIdOpt).toString
val maxWaitMs = options.valueOf(maxWaitMsOpt).intValue()
+ val maxMessages = options.valueOf(maxMessagesOpt).intValue
val skipMessageOnError = if (options.has(skipMessageOnErrorOpt)) true else false
val printOffsets = if(options.has(printOffsetOpt)) true else false
@@ -181,14 +187,16 @@ object SimpleConsumerShell extends Logging {
val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter]
formatter.init(formatterArgs)
- info("Starting simple consumer shell to partition [%s, %d], replica [%d], host and port: [%s, %d], from offset [%d]"
- .format(topic, partitionId, replicaId, fetchTargetBroker.host, fetchTargetBroker.port, startingOffset))
+ val replicaString = if(replicaId > 0) "leader" else "replica"
+ info("Starting simple consumer shell to partition [%s, %d], %s [%d], host and port: [%s, %d], from offset [%d]"
+ .format(topic, partitionId, replicaString, replicaId, fetchTargetBroker.host, fetchTargetBroker.port, startingOffset))
val simpleConsumer = new SimpleConsumer(fetchTargetBroker.host, fetchTargetBroker.port, 10000, 64*1024, clientId)
val thread = Utils.newThread("kafka-simpleconsumer-shell", new Runnable() {
def run() {
var offset = startingOffset
+ var numMessagesConsumed = 0
try {
- while(true) {
+ while(numMessagesConsumed < maxMessages) {
val fetchRequest = fetchRequestBuilder
.addFetch(topic, partitionId, offset, fetchSize)
.build()
@@ -199,14 +207,15 @@ object SimpleConsumerShell extends Logging {
return
}
debug("multi fetched " + messageSet.sizeInBytes + " bytes from offset " + offset)
- for(messageAndOffset <- messageSet) {
+ for(messageAndOffset <- messageSet if(numMessagesConsumed < maxMessages)) {
try {
offset = messageAndOffset.nextOffset
if(printOffsets)
System.out.println("next offset = " + offset)
val message = messageAndOffset.message
val key = if(message.hasKey) Utils.readBytes(message.key) else null
formatter.writeTo(key, Utils.readBytes(message.payload), System.out)
+ numMessagesConsumed += 1
} catch {
case e =>
if (skipMessageOnError)
@@ -226,6 +235,8 @@ object SimpleConsumerShell extends Logging {
} catch {
case e: Throwable =>
error("Error consuming topic, partition, replica (%s, %d, %d) with offset [%d]".format(topic, partitionId, replicaId, offset), e)
+ }finally {
+ info("Consumed " + numMessagesConsumed + " messages")
}
}
}, false)

0 comments on commit a737986

Please sign in to comment.