Permalink
Browse files

Support deep iteration in DumpLogSegments tool; patched by Jun Rao; r…

…eviewed by Neha Narkhede; kafka-812
  • Loading branch information...
1 parent 46ebdc1 commit a376f922149330995f91d427d76ff1595fbd26ce @junrao junrao committed Mar 19, 2013
Showing with 71 additions and 27 deletions.
  1. +71 −27 core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -35,6 +35,12 @@ object DumpLogSegments {
.withRequiredArg
.describedAs("file1, file2, ...")
.ofType(classOf[String])
+ val maxMessageSizeOpt = parser.accepts("max-message-size", "Size of largest message.")
+ .withRequiredArg
+ .describedAs("size")
+ .ofType(classOf[java.lang.Integer])
+ .defaultsTo(5 * 1024 * 1024)
+ val deepIterationOpt = parser.accepts("deep-iteration", "if set, uses deep instead of shallow iteration")
val options = parser.parse(args : _*)
if(!options.has(filesOpt)) {
@@ -46,6 +52,8 @@ object DumpLogSegments {
val print = if(options.has(printOpt)) true else false
val verifyOnly = if(options.has(verifyOpt)) true else false
val files = options.valueOf(filesOpt).split(",")
+ val maxMessageSize = options.valueOf(maxMessageSizeOpt).intValue()
+ val isDeepIteration = if(options.has(deepIterationOpt)) true else false
val misMatchesForIndexFilesMap = new mutable.HashMap[String, List[(Long, Long)]]
val nonConsecutivePairsForLogFilesMap = new mutable.HashMap[String, List[(Long, Long)]]
@@ -54,17 +62,17 @@ object DumpLogSegments {
val file = new File(arg)
if(file.getName.endsWith(Log.LogFileSuffix)) {
println("Dumping " + file)
- dumpLog(file, print, nonConsecutivePairsForLogFilesMap)
+ dumpLog(file, print, nonConsecutivePairsForLogFilesMap, isDeepIteration)
} else if(file.getName.endsWith(Log.IndexFileSuffix)) {
println("Dumping " + file)
- dumpIndex(file, verifyOnly, misMatchesForIndexFilesMap)
+ dumpIndex(file, verifyOnly, misMatchesForIndexFilesMap, maxMessageSize)
}
}
misMatchesForIndexFilesMap.foreach {
case (fileName, listOfMismatches) => {
System.err.println("Mismatches in :" + fileName)
listOfMismatches.foreach(m => {
- System.err.println(" Index position: %d, log position: %d".format(m._1, m._2))
+ System.err.println(" Index offset: %d, log offset: %d".format(m._1, m._2))
})
}
}
@@ -79,16 +87,19 @@ object DumpLogSegments {
}
/* print out the contents of the index */
- private def dumpIndex(file: File, verifyOnly: Boolean, misMatchesForIndexFilesMap: mutable.HashMap[String, List[(Long, Long)]]) {
+ private def dumpIndex(file: File,
+ verifyOnly: Boolean,
+ misMatchesForIndexFilesMap: mutable.HashMap[String, List[(Long, Long)]],
+ maxMessageSize: Int) {
val startOffset = file.getName().split("\\.")(0).toLong
val logFileName = file.getAbsolutePath.split("\\.")(0) + Log.LogFileSuffix
val logFile = new File(logFileName)
val messageSet = new FileMessageSet(logFile)
val index = new OffsetIndex(file = file, baseOffset = startOffset)
for(i <- 0 until index.entries) {
val entry = index.entry(i)
- val partialFileMessageSet: FileMessageSet = messageSet.read(entry.position, messageSet.sizeInBytes())
- val messageAndOffset = partialFileMessageSet.head
+ val partialFileMessageSet: FileMessageSet = messageSet.read(entry.position, maxMessageSize)
+ val messageAndOffset = getIterator(partialFileMessageSet.head, isDeepIteration = true).next()
if(messageAndOffset.offset != entry.offset + index.baseOffset) {
var misMatchesSeq = misMatchesForIndexFilesMap.getOrElse(file.getName, List[(Long, Long)]())
misMatchesSeq ::=(entry.offset + index.baseOffset, messageAndOffset.offset)
@@ -103,40 +114,73 @@ object DumpLogSegments {
}
/* print out the contents of the log */
- private def dumpLog(file: File, printContents: Boolean, nonConsecutivePairsForLogFilesMap: mutable.HashMap[String, List[(Long, Long)]]) {
+ private def dumpLog(file: File,
+ printContents: Boolean,
+ nonConsecutivePairsForLogFilesMap: mutable.HashMap[String, List[(Long, Long)]],
+ isDeepIteration: Boolean) {
val startOffset = file.getName().split("\\.")(0).toLong
println("Starting offset: " + startOffset)
val messageSet = new FileMessageSet(file)
var validBytes = 0L
var lastOffset = -1l
- for(messageAndOffset <- messageSet) {
- val msg = messageAndOffset.message
+ for(shallowMessageAndOffset <- messageSet) { // this only does shallow iteration
+ val itr = getIterator(shallowMessageAndOffset, isDeepIteration)
+ for (messageAndOffset <- itr) {
+ val msg = messageAndOffset.message
- if(lastOffset == -1)
+ if(lastOffset == -1)
+ lastOffset = messageAndOffset.offset
+ // If we are iterating uncompressed messages, offsets must be consecutive
+ else if (msg.compressionCodec == NoCompressionCodec && messageAndOffset.offset != lastOffset +1) {
+ var nonConsecutivePairsSeq = nonConsecutivePairsForLogFilesMap.getOrElse(file.getName, List[(Long, Long)]())
+ nonConsecutivePairsSeq ::=(lastOffset, messageAndOffset.offset)
+ nonConsecutivePairsForLogFilesMap.put(file.getName, nonConsecutivePairsSeq)
+ }
lastOffset = messageAndOffset.offset
- // If it's uncompressed message, its offset must be lastOffset + 1 no matter last message is compressed or uncompressed
- else if (msg.compressionCodec == NoCompressionCodec && messageAndOffset.offset != lastOffset +1) {
- var nonConsecutivePairsSeq = nonConsecutivePairsForLogFilesMap.getOrElse(file.getName, List[(Long, Long)]())
- nonConsecutivePairsSeq ::=(lastOffset, messageAndOffset.offset)
- nonConsecutivePairsForLogFilesMap.put(file.getName, nonConsecutivePairsSeq)
- }
- lastOffset = messageAndOffset.offset
- print("offset: " + messageAndOffset.offset + " position: " + validBytes + " isvalid: " + msg.isValid +
- " payloadsize: " + msg.payloadSize + " magic: " + msg.magic +
- " compresscodec: " + msg.compressionCodec + " crc: " + msg.checksum)
- validBytes += MessageSet.entrySize(msg)
- if(msg.hasKey)
- print(" keysize: " + msg.keySize)
- if(printContents) {
+ print("offset: " + messageAndOffset.offset + " position: " + validBytes + " isvalid: " + msg.isValid +
+ " payloadsize: " + msg.payloadSize + " magic: " + msg.magic +
+ " compresscodec: " + msg.compressionCodec + " crc: " + msg.checksum)
if(msg.hasKey)
- print(" key: " + Utils.readString(messageAndOffset.message.payload, "UTF-8"))
- print(" payload: " + Utils.readString(messageAndOffset.message.payload, "UTF-8"))
+ print(" keysize: " + msg.keySize)
+ if(printContents) {
+ if(msg.hasKey)
+ print(" key: " + Utils.readString(messageAndOffset.message.payload, "UTF-8"))
+ print(" payload: " + Utils.readString(messageAndOffset.message.payload, "UTF-8"))
+ }
+ println()
}
- println()
+ validBytes += MessageSet.entrySize(shallowMessageAndOffset.message)
}
val trailingBytes = messageSet.sizeInBytes - validBytes
if(trailingBytes > 0)
println("Found %d invalid bytes at the end of %s".format(trailingBytes, file.getName))
}
+
+ private def getIterator(messageAndOffset: MessageAndOffset, isDeepIteration: Boolean) = {
+ if (isDeepIteration) {
+ val message = messageAndOffset.message
+ message.compressionCodec match {
+ case NoCompressionCodec =>
+ getSingleMessageIterator(messageAndOffset)
+ case _ =>
+ ByteBufferMessageSet.decompress(message).iterator
+ }
+ } else
+ getSingleMessageIterator(messageAndOffset)
+ }
+
+ private def getSingleMessageIterator(messageAndOffset: MessageAndOffset) = {
+ new IteratorTemplate[MessageAndOffset] {
+ var messageIterated = false
+
+ override def makeNext(): MessageAndOffset = {
+ if (!messageIterated) {
+ messageIterated = true
+ messageAndOffset
+ } else
+ allDone()
+ }
+ }
+ }
}

0 comments on commit a376f92

Please sign in to comment.