Permalink
Browse files

KAFKA-802 Flush message interval is based on compressed message count…

…; reviewed by Jun Rao
  • Loading branch information...
1 parent dd96761 commit c5462864aab05b98158bcbe623123db083b8e136 @nehanarkhede nehanarkhede committed Mar 13, 2013
Showing with 10 additions and 7 deletions.
  1. +9 −6 core/src/main/scala/kafka/log/Log.scala
  2. +1 −1 core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -260,7 +260,7 @@ private[kafka] class Log(val dir: File,
*/
def append(messages: ByteBufferMessageSet, assignOffsets: Boolean = true): (Long, Long) = {
val messageSetInfo = analyzeAndValidateMessageSet(messages)
-
+
// if we have any valid messages, append them to the log
if(messageSetInfo.count == 0) {
(-1L, -1L)
@@ -270,7 +270,9 @@ private[kafka] class Log(val dir: File,
try {
// they are valid, insert them in the log
- val offsets = lock synchronized {
+ val offsetsAndNumAppendedMessages = lock synchronized {
+ val firstOffset = nextOffset.get
+
// maybe roll the log if this segment is full
val segment = maybeRoll(segments.view.last)
@@ -312,16 +314,17 @@ private[kafka] class Log(val dir: File,
// advance the log end offset
nextOffset.set(offsets._2 + 1)
-
+ val numAppendedMessages = (nextOffset.get - firstOffset).toInt
+
// return the offset at which the messages were appended
- offsets
+ (offsets._1, offsets._2, numAppendedMessages)
}
// maybe flush the log and index
- maybeFlush(messageSetInfo.count)
+ maybeFlush(offsetsAndNumAppendedMessages._3)
// return the first and last offset
- offsets
+ (offsetsAndNumAppendedMessages._1, offsetsAndNumAppendedMessages._2)
} catch {
case e: IOException => throw new KafkaStorageException("I/O exception in append to log '%s'".format(name), e)
}
@@ -110,7 +110,7 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
val logIndexIntervalBytes = props.getIntInRange("log.index.interval.bytes", 4096, (0, Int.MaxValue))
/* the number of messages accumulated on a log partition before messages are flushed to disk */
- val logFlushIntervalMessages = props.getIntInRange("log.flush.interval.messages", 500, (1, Int.MaxValue))
+ val logFlushIntervalMessages = props.getIntInRange("log.flush.interval.messages", 10000, (1, Int.MaxValue))
/* the maximum time in ms that a message in selected topics is kept in memory before flushed to disk, e.g., topic1:3000,topic2: 6000 */
val logFlushIntervalMsPerTopicMap = props.getMap("log.flush.interval.ms.per.topic", _.toInt > 0).mapValues(_.toInt)

0 comments on commit c546286

Please sign in to comment.