From 601da3310c9921d108b4037843bba53ec0cd7966 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Fri, 21 Jul 2017 13:38:43 +0100 Subject: [PATCH] Improve log warning to include the log name --- .../main/scala/kafka/log/LogCleanerManager.scala | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala index 4a4a59f993b38..ed0cb6992940a 100755 --- a/core/src/main/scala/kafka/log/LogCleanerManager.scala +++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala @@ -287,7 +287,7 @@ private[log] object LogCleanerManager extends Logging { if (offset < logStartOffset) { // don't bother with the warning if compact and delete are enabled. if (!isCompactAndDelete(log)) - warn(s"Resetting first dirty offset to log start offset $logStartOffset since the checkpointed offset $offset is invalid.") + warn(s"Resetting first dirty offset of ${log.name} to log start offset $logStartOffset since the checkpointed offset $offset is invalid.") logStartOffset } else { offset @@ -302,7 +302,7 @@ private[log] object LogCleanerManager extends Logging { // find first segment that cannot be cleaned // neither the active segment, nor segments with any messages closer to the head of the log than the minimum compaction lag time // may be cleaned - val firstUncleanableDirtyOffset: Long = Seq ( + val firstUncleanableDirtyOffset: Long = Seq( // we do not clean beyond the first unstable offset log.firstUnstableOffset.map(_.messageOffset), @@ -312,12 +312,11 @@ private[log] object LogCleanerManager extends Logging { // the first segment whose largest message timestamp is within a minimum time lag from now if (compactionLagMs > 0) { - dirtyNonActiveSegments.find { - s => - val isUncleanable = s.largestTimestamp > now - compactionLagMs - debug(s"Checking if log segment may be cleaned: log='${log.name}' segment.baseOffset=${s.baseOffset} segment.largestTimestamp=${s.largestTimestamp}; now - compactionLag=${now - compactionLagMs}; is uncleanable=$isUncleanable") - isUncleanable - } map(_.baseOffset) + dirtyNonActiveSegments.find { s => + val isUncleanable = s.largestTimestamp > now - compactionLagMs + debug(s"Checking if log segment may be cleaned: log='${log.name}' segment.baseOffset=${s.baseOffset} segment.largestTimestamp=${s.largestTimestamp}; now - compactionLag=${now - compactionLagMs}; is uncleanable=$isUncleanable") + isUncleanable + }.map(_.baseOffset) } else None ).flatten.min