Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: LogLoader: Add log identifier in few missing areas #10819

Merged
merged 1 commit into from Jun 4, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 5 additions & 4 deletions core/src/main/scala/kafka/log/LogLoader.scala
Expand Up @@ -373,7 +373,7 @@ object LogLoader extends Logging {
params.config,
time = params.time,
fileSuffix = Log.SwapFileSuffix)
info(s"Found log file ${swapFile.getPath} from interrupted swap operation, repairing.")
info(s"${params.logIdentifier}Found log file ${swapFile.getPath} from interrupted swap operation, repairing.")
recoverSegment(swapSegment, params)

// We create swap files for two cases:
Expand Down Expand Up @@ -425,8 +425,9 @@ object LogLoader extends Logging {
if (logEndOffset >= params.logStartOffsetCheckpoint)
Some(logEndOffset)
else {
warn(s"Deleting all segments because logEndOffset ($logEndOffset) is smaller than logStartOffset ${params.logStartOffsetCheckpoint}. " +
"This could happen if segment files were deleted from the file system.")
warn(s"${params.logIdentifier}Deleting all segments because logEndOffset ($logEndOffset) " +
s" smaller than logStartOffset ${params.logStartOffsetCheckpoint}." +
" This could happen if segment files were deleted from the file system.")
removeAndDeleteSegmentsAsync(params.segments.values, params)
params.leaderEpochCache.foreach(_.clearAndFlush())
params.producerStateManager.truncateFullyAndStartAt(params.logStartOffsetCheckpoint)
Expand Down Expand Up @@ -514,7 +515,7 @@ object LogLoader extends Logging {
// materialization of the iterator here, so that results of the iteration remain valid and
// deterministic.
val toDelete = segmentsToDelete.toList
info(s"Deleting segments as part of log recovery: ${toDelete.mkString(",")}")
info(s"${params.logIdentifier}Deleting segments as part of log recovery: ${toDelete.mkString(",")}")
toDelete.foreach { segment =>
params.segments.remove(segment.baseOffset)
}
Expand Down