/
LogSegment.scala
694 lines (615 loc) · 29.5 KB
/
LogSegment.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.log
import com.yammer.metrics.core.Timer
import kafka.common.LogSegmentOffsetOverflowException
import kafka.utils._
import org.apache.kafka.common.InvalidRecordException
import org.apache.kafka.common.errors.CorruptRecordException
import org.apache.kafka.common.record.FileRecords.{LogOffsetPosition, TimestampAndOffset}
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.{BufferSupplier, Time, Utils}
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, CompletedTxn, FetchDataInfo, LazyIndex, LogConfig, LogOffsetMetadata, OffsetIndex, OffsetPosition, ProducerStateManager, RollParams, TimeIndex, TimestampOffset, TransactionIndex, TxnIndexSearchResult}
import java.io.{File, IOException}
import java.nio.file.attribute.FileTime
import java.nio.file.{Files, NoSuchFileException}
import java.util.Optional
import java.util.concurrent.TimeUnit
import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._
import scala.math._
/**
* A segment of the log. Each segment has two components: a log and an index. The log is a FileRecords containing
* the actual messages. The index is an OffsetIndex that maps from logical offsets to physical file positions. Each
* segment has a base offset which is an offset <= the least offset of any message in this segment and > any offset in
* any previous segment.
*
* A segment with a base offset of [base_offset] would be stored in two files, a [base_offset].index and a [base_offset].log file.
*
* @param log The file records containing log entries
* @param lazyOffsetIndex The offset index
* @param lazyTimeIndex The timestamp index
* @param txnIndex The transaction index
* @param baseOffset A lower bound on the offsets in this segment
* @param indexIntervalBytes The approximate number of bytes between entries in the index
* @param rollJitterMs The maximum random jitter subtracted from the scheduled segment roll time
* @param time The time instance
*/
@nonthreadsafe
class LogSegment private[log] (val log: FileRecords,
val lazyOffsetIndex: LazyIndex[OffsetIndex],
val lazyTimeIndex: LazyIndex[TimeIndex],
val txnIndex: TransactionIndex,
val baseOffset: Long,
val indexIntervalBytes: Int,
val rollJitterMs: Long,
val time: Time) extends Logging {
def offsetIndex: OffsetIndex = lazyOffsetIndex.get
def timeIndex: TimeIndex = lazyTimeIndex.get
def shouldRoll(rollParams: RollParams): Boolean = {
val reachedRollMs = timeWaitedForRoll(rollParams.now, rollParams.maxTimestampInMessages) > rollParams.maxSegmentMs - rollJitterMs
size > rollParams.maxSegmentBytes - rollParams.messagesSize ||
(size > 0 && reachedRollMs) ||
offsetIndex.isFull || timeIndex.isFull || !canConvertToRelativeOffset(rollParams.maxOffsetInMessages)
}
def resizeIndexes(size: Int): Unit = {
offsetIndex.resize(size)
timeIndex.resize(size)
}
def sanityCheck(timeIndexFileNewlyCreated: Boolean): Unit = {
if (lazyOffsetIndex.file.exists) {
// Resize the time index file to 0 if it is newly created.
if (timeIndexFileNewlyCreated)
timeIndex.resize(0)
// Sanity checks for time index and offset index are skipped because
// we will recover the segments above the recovery point in recoverLog()
// in any case so sanity checking them here is redundant.
txnIndex.sanityCheck()
}
else throw new NoSuchFileException(s"Offset index file ${lazyOffsetIndex.file.getAbsolutePath} does not exist")
}
private var created = time.milliseconds
/* the number of bytes since we last added an entry in the offset index */
private var bytesSinceLastIndexEntry = 0
// The timestamp we used for time based log rolling and for ensuring max compaction delay
// volatile for LogCleaner to see the update
@volatile private var rollingBasedTimestamp: Option[Long] = None
/* The maximum timestamp and offset we see so far */
@volatile private var _maxTimestampAndOffsetSoFar: TimestampOffset = TimestampOffset.UNKNOWN
def maxTimestampAndOffsetSoFar_= (timestampOffset: TimestampOffset): Unit = _maxTimestampAndOffsetSoFar = timestampOffset
def maxTimestampAndOffsetSoFar: TimestampOffset = {
if (_maxTimestampAndOffsetSoFar == TimestampOffset.UNKNOWN)
_maxTimestampAndOffsetSoFar = timeIndex.lastEntry
_maxTimestampAndOffsetSoFar
}
/* The maximum timestamp we see so far */
def maxTimestampSoFar: Long = {
maxTimestampAndOffsetSoFar.timestamp
}
def offsetOfMaxTimestampSoFar: Long = {
maxTimestampAndOffsetSoFar.offset
}
/* Return the size in bytes of this log segment */
def size: Int = log.sizeInBytes()
/**
* checks that the argument offset can be represented as an integer offset relative to the baseOffset.
*/
def canConvertToRelativeOffset(offset: Long): Boolean = {
offsetIndex.canAppendOffset(offset)
}
/**
* Append the given messages starting with the given offset. Add
* an entry to the index if needed.
*
* It is assumed this method is being called from within a lock.
*
* @param largestOffset The last offset in the message set
* @param largestTimestamp The largest timestamp in the message set.
* @param shallowOffsetOfMaxTimestamp The offset of the message that has the largest timestamp in the messages to append.
* @param records The log entries to append.
* @return the physical position in the file of the appended records
* @throws LogSegmentOffsetOverflowException if the largest offset causes index offset overflow
*/
@nonthreadsafe
def append(largestOffset: Long,
largestTimestamp: Long,
shallowOffsetOfMaxTimestamp: Long,
records: MemoryRecords): Unit = {
if (records.sizeInBytes > 0) {
trace(s"Inserting ${records.sizeInBytes} bytes at end offset $largestOffset at position ${log.sizeInBytes} " +
s"with largest timestamp $largestTimestamp at shallow offset $shallowOffsetOfMaxTimestamp")
val physicalPosition = log.sizeInBytes()
if (physicalPosition == 0)
rollingBasedTimestamp = Some(largestTimestamp)
ensureOffsetInRange(largestOffset)
// append the messages
val appendedBytes = log.append(records)
trace(s"Appended $appendedBytes to ${log.file} at end offset $largestOffset")
// Update the in memory max timestamp and corresponding offset.
if (largestTimestamp > maxTimestampSoFar) {
maxTimestampAndOffsetSoFar = new TimestampOffset(largestTimestamp, shallowOffsetOfMaxTimestamp)
}
// append an entry to the index (if needed)
if (bytesSinceLastIndexEntry > indexIntervalBytes) {
offsetIndex.append(largestOffset, physicalPosition)
timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar)
bytesSinceLastIndexEntry = 0
}
bytesSinceLastIndexEntry += records.sizeInBytes
}
}
private def ensureOffsetInRange(offset: Long): Unit = {
if (!canConvertToRelativeOffset(offset))
throw new LogSegmentOffsetOverflowException(this, offset)
}
private def appendChunkFromFile(records: FileRecords, position: Int, bufferSupplier: BufferSupplier): Int = {
var bytesToAppend = 0
var maxTimestamp = Long.MinValue
var offsetOfMaxTimestamp = Long.MinValue
var maxOffset = Long.MinValue
var readBuffer = bufferSupplier.get(1024 * 1024)
def canAppend(batch: RecordBatch) =
canConvertToRelativeOffset(batch.lastOffset) &&
(bytesToAppend == 0 || bytesToAppend + batch.sizeInBytes < readBuffer.capacity)
// find all batches that are valid to be appended to the current log segment and
// determine the maximum offset and timestamp
val nextBatches = records.batchesFrom(position).asScala.iterator
for (batch <- nextBatches.takeWhile(canAppend)) {
if (batch.maxTimestamp > maxTimestamp) {
maxTimestamp = batch.maxTimestamp
offsetOfMaxTimestamp = batch.lastOffset
}
maxOffset = batch.lastOffset
bytesToAppend += batch.sizeInBytes
}
if (bytesToAppend > 0) {
// Grow buffer if needed to ensure we copy at least one batch
if (readBuffer.capacity < bytesToAppend)
readBuffer = bufferSupplier.get(bytesToAppend)
readBuffer.limit(bytesToAppend)
records.readInto(readBuffer, position)
append(maxOffset, maxTimestamp, offsetOfMaxTimestamp, MemoryRecords.readableRecords(readBuffer))
}
bufferSupplier.release(readBuffer)
bytesToAppend
}
/**
* Append records from a file beginning at the given position until either the end of the file
* is reached or an offset is found which is too large to convert to a relative offset for the indexes.
*
* @return the number of bytes appended to the log (may be less than the size of the input if an
* offset is encountered which would overflow this segment)
*/
def appendFromFile(records: FileRecords, start: Int): Int = {
var position = start
val bufferSupplier: BufferSupplier = new BufferSupplier.GrowableBufferSupplier
while (position < start + records.sizeInBytes) {
val bytesAppended = appendChunkFromFile(records, position, bufferSupplier)
if (bytesAppended == 0)
return position - start
position += bytesAppended
}
position - start
}
@nonthreadsafe
def updateTxnIndex(completedTxn: CompletedTxn, lastStableOffset: Long): Unit = {
if (completedTxn.isAborted) {
trace(s"Writing aborted transaction $completedTxn to transaction index, last stable offset is $lastStableOffset")
txnIndex.append(new AbortedTxn(completedTxn, lastStableOffset))
}
}
private def updateProducerState(producerStateManager: ProducerStateManager, batch: RecordBatch): Unit = {
if (batch.hasProducerId) {
val producerId = batch.producerId
val appendInfo = producerStateManager.prepareUpdate(producerId, AppendOrigin.REPLICATION)
val maybeCompletedTxn = appendInfo.append(batch, Optional.empty())
producerStateManager.update(appendInfo)
maybeCompletedTxn.ifPresent(completedTxn => {
val lastStableOffset = producerStateManager.lastStableOffset(completedTxn)
updateTxnIndex(completedTxn, lastStableOffset)
producerStateManager.completeTxn(completedTxn)
})
}
producerStateManager.updateMapEndOffset(batch.lastOffset + 1)
}
/**
* Find the physical file position for the first message with offset >= the requested offset.
*
* The startingFilePosition argument is an optimization that can be used if we already know a valid starting position
* in the file higher than the greatest-lower-bound from the index.
*
* @param offset The offset we want to translate
* @param startingFilePosition A lower bound on the file position from which to begin the search. This is purely an optimization and
* when omitted, the search will begin at the position in the offset index.
* @return The position in the log storing the message with the least offset >= the requested offset and the size of the
* message or null if no message meets this criteria.
*/
@threadsafe
private[log] def translateOffset(offset: Long, startingFilePosition: Int = 0): LogOffsetPosition = {
val mapping = offsetIndex.lookup(offset)
log.searchForOffsetWithSize(offset, max(mapping.position, startingFilePosition))
}
/**
* Read a message set from this segment beginning with the first offset >= startOffset. The message set will include
* no more than maxSize bytes and will end before maxOffset if a maxOffset is specified.
*
* @param startOffset A lower bound on the first offset to include in the message set we read
* @param maxSize The maximum number of bytes to include in the message set we read
* @param maxPosition The maximum position in the log segment that should be exposed for read
* @param minOneMessage If this is true, the first message will be returned even if it exceeds `maxSize` (if one exists)
*
* @return The fetched data and the offset metadata of the first message whose offset is >= startOffset,
* or null if the startOffset is larger than the largest offset in this log
*/
@threadsafe
def read(startOffset: Long,
maxSize: Int,
maxPosition: Long = size,
minOneMessage: Boolean = false): FetchDataInfo = {
if (maxSize < 0)
throw new IllegalArgumentException(s"Invalid max size $maxSize for log read from segment $log")
val startOffsetAndSize = translateOffset(startOffset)
// if the start position is already off the end of the log, return null
if (startOffsetAndSize == null)
return null
val startPosition = startOffsetAndSize.position
val offsetMetadata = new LogOffsetMetadata(startOffset, this.baseOffset, startPosition)
val adjustedMaxSize =
if (minOneMessage) math.max(maxSize, startOffsetAndSize.size)
else maxSize
// return a log segment but with zero size in the case below
if (adjustedMaxSize == 0)
return new FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY)
// calculate the length of the message set to read based on whether or not they gave us a maxOffset
val fetchSize: Int = min((maxPosition - startPosition).toInt, adjustedMaxSize)
new FetchDataInfo(offsetMetadata, log.slice(startPosition, fetchSize),
adjustedMaxSize < startOffsetAndSize.size, Optional.empty())
}
def fetchUpperBoundOffset(startOffsetPosition: OffsetPosition, fetchSize: Int): Optional[Long] =
offsetIndex.fetchUpperBoundOffset(startOffsetPosition, fetchSize).map(_.offset)
/**
* Run recovery on the given segment. This will rebuild the index from the log file and lop off any invalid bytes
* from the end of the log and index.
*
* @param producerStateManager Producer state corresponding to the segment's base offset. This is needed to recover
* the transaction index.
* @param leaderEpochCache Optionally a cache for updating the leader epoch during recovery.
* @return The number of bytes truncated from the log
* @throws LogSegmentOffsetOverflowException if the log segment contains an offset that causes the index offset to overflow
*/
@nonthreadsafe
def recover(producerStateManager: ProducerStateManager, leaderEpochCache: Option[LeaderEpochFileCache] = None): Int = {
offsetIndex.reset()
timeIndex.reset()
txnIndex.reset()
var validBytes = 0
var lastIndexEntry = 0
maxTimestampAndOffsetSoFar = TimestampOffset.UNKNOWN
try {
for (batch <- log.batches.asScala) {
batch.ensureValid()
ensureOffsetInRange(batch.lastOffset)
// The max timestamp is exposed at the batch level, so no need to iterate the records
if (batch.maxTimestamp > maxTimestampSoFar) {
maxTimestampAndOffsetSoFar = new TimestampOffset(batch.maxTimestamp, batch.lastOffset)
}
// Build offset index
if (validBytes - lastIndexEntry > indexIntervalBytes) {
offsetIndex.append(batch.lastOffset, validBytes)
timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar)
lastIndexEntry = validBytes
}
validBytes += batch.sizeInBytes()
if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
leaderEpochCache.foreach { cache =>
if (batch.partitionLeaderEpoch >= 0 && cache.latestEpoch.asScala.forall(batch.partitionLeaderEpoch > _))
cache.assign(batch.partitionLeaderEpoch, batch.baseOffset)
}
updateProducerState(producerStateManager, batch)
}
}
} catch {
case e@ (_: CorruptRecordException | _: InvalidRecordException) =>
warn("Found invalid messages in log segment %s at byte offset %d: %s. %s"
.format(log.file.getAbsolutePath, validBytes, e.getMessage, e.getCause))
}
val truncated = log.sizeInBytes - validBytes
if (truncated > 0)
debug(s"Truncated $truncated invalid bytes at the end of segment ${log.file.getAbsoluteFile} during recovery")
log.truncateTo(validBytes)
offsetIndex.trimToValidSize()
// A normally closed segment always appends the biggest timestamp ever seen into log segment, we do this as well.
timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar, true)
timeIndex.trimToValidSize()
truncated
}
private def loadLargestTimestamp(): Unit = {
// Get the last time index entry. If the time index is empty, it will return (-1, baseOffset)
val lastTimeIndexEntry = timeIndex.lastEntry
maxTimestampAndOffsetSoFar = lastTimeIndexEntry
val offsetPosition = offsetIndex.lookup(lastTimeIndexEntry.offset)
// Scan the rest of the messages to see if there is a larger timestamp after the last time index entry.
val maxTimestampOffsetAfterLastEntry = log.largestTimestampAfter(offsetPosition.position)
if (maxTimestampOffsetAfterLastEntry.timestamp > lastTimeIndexEntry.timestamp) {
maxTimestampAndOffsetSoFar = new TimestampOffset(maxTimestampOffsetAfterLastEntry.timestamp, maxTimestampOffsetAfterLastEntry.offset)
}
}
/**
* Check whether the last offset of the last batch in this segment overflows the indexes.
*/
def hasOverflow: Boolean = {
val nextOffset = readNextOffset
nextOffset > baseOffset && !canConvertToRelativeOffset(nextOffset - 1)
}
def collectAbortedTxns(fetchOffset: Long, upperBoundOffset: Long): TxnIndexSearchResult =
txnIndex.collectAbortedTxns(fetchOffset, upperBoundOffset)
override def toString: String = "LogSegment(baseOffset=" + baseOffset +
", size=" + size +
", lastModifiedTime=" + lastModified +
", largestRecordTimestamp=" + largestRecordTimestamp +
")"
/**
* Truncate off all index and log entries with offsets >= the given offset.
* If the given offset is larger than the largest message in this segment, do nothing.
*
* @param offset The offset to truncate to
* @return The number of log bytes truncated
*/
@nonthreadsafe
def truncateTo(offset: Long): Int = {
// Do offset translation before truncating the index to avoid needless scanning
// in case we truncate the full index
val mapping = translateOffset(offset)
offsetIndex.truncateTo(offset)
timeIndex.truncateTo(offset)
txnIndex.truncateTo(offset)
// After truncation, reset and allocate more space for the (new currently active) index
offsetIndex.resize(offsetIndex.maxIndexSize)
timeIndex.resize(timeIndex.maxIndexSize)
val bytesTruncated = if (mapping == null) 0 else log.truncateTo(mapping.position)
if (log.sizeInBytes == 0) {
created = time.milliseconds
rollingBasedTimestamp = None
}
bytesSinceLastIndexEntry = 0
if (maxTimestampSoFar >= 0)
loadLargestTimestamp()
bytesTruncated
}
/**
* Calculate the offset that would be used for the next message to be append to this segment.
* Note that this is expensive.
*/
@threadsafe
def readNextOffset: Long = {
val fetchData = read(offsetIndex.lastOffset, log.sizeInBytes)
if (fetchData == null)
baseOffset
else
fetchData.records.batches.asScala.lastOption
.map(_.nextOffset)
.getOrElse(baseOffset)
}
/**
* Flush this log segment to disk
*/
@threadsafe
def flush(): Unit = {
LogFlushStats.logFlushTimer.time { () =>
log.flush()
offsetIndex.flush()
timeIndex.flush()
txnIndex.flush()
}
}
/**
* Update the directory reference for the log and indices in this segment. This would typically be called after a
* directory is renamed.
*/
def updateParentDir(dir: File): Unit = {
log.updateParentDir(dir)
lazyOffsetIndex.updateParentDir(dir)
lazyTimeIndex.updateParentDir(dir)
txnIndex.updateParentDir(dir)
}
/**
* Change the suffix for the index and log files for this log segment
* IOException from this method should be handled by the caller
*/
def changeFileSuffixes(oldSuffix: String, newSuffix: String): Unit = {
log.renameTo(new File(Utils.replaceSuffix(log.file.getPath, oldSuffix, newSuffix)))
lazyOffsetIndex.renameTo(new File(Utils.replaceSuffix(lazyOffsetIndex.file.getPath, oldSuffix, newSuffix)))
lazyTimeIndex.renameTo(new File(Utils.replaceSuffix(lazyTimeIndex.file.getPath, oldSuffix, newSuffix)))
txnIndex.renameTo(new File(Utils.replaceSuffix(txnIndex.file.getPath, oldSuffix, newSuffix)))
}
def hasSuffix(suffix: String): Boolean = {
log.file.getName.endsWith(suffix) &&
lazyOffsetIndex.file.getName.endsWith(suffix) &&
lazyTimeIndex.file.getName.endsWith(suffix) &&
txnIndex.file.getName.endsWith(suffix)
}
/**
* Append the largest time index entry to the time index and trim the log and indexes.
*
* The time index entry appended will be used to decide when to delete the segment.
*/
def onBecomeInactiveSegment(): Unit = {
timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar, true)
offsetIndex.trimToValidSize()
timeIndex.trimToValidSize()
log.trim()
}
/**
* If not previously loaded,
* load the timestamp of the first message into memory.
*/
private def loadFirstBatchTimestamp(): Unit = {
if (rollingBasedTimestamp.isEmpty) {
val iter = log.batches.iterator()
if (iter.hasNext)
rollingBasedTimestamp = Some(iter.next().maxTimestamp)
}
}
/**
* The time this segment has waited to be rolled.
* If the first message batch has a timestamp we use its timestamp to determine when to roll a segment. A segment
* is rolled if the difference between the new batch's timestamp and the first batch's timestamp exceeds the
* segment rolling time.
* If the first batch does not have a timestamp, we use the wall clock time to determine when to roll a segment. A
* segment is rolled if the difference between the current wall clock time and the segment create time exceeds the
* segment rolling time.
*/
def timeWaitedForRoll(now: Long, messageTimestamp: Long): Long = {
// Load the timestamp of the first message into memory
loadFirstBatchTimestamp()
rollingBasedTimestamp match {
case Some(t) if t >= 0 => messageTimestamp - t
case _ => now - created
}
}
/**
* @return the first batch timestamp if the timestamp is available. Otherwise return Long.MaxValue
*/
def getFirstBatchTimestamp(): Long = {
loadFirstBatchTimestamp()
rollingBasedTimestamp match {
case Some(t) if t >= 0 => t
case _ => Long.MaxValue
}
}
/**
* Search the message offset based on timestamp and offset.
*
* This method returns an option of TimestampOffset. The returned value is determined using the following ordered list of rules:
*
* - If all the messages in the segment have smaller offsets, return None
* - If all the messages in the segment have smaller timestamps, return None
* - If all the messages in the segment have larger timestamps, or no message in the segment has a timestamp
* the returned the offset will be max(the base offset of the segment, startingOffset) and the timestamp will be Message.NoTimestamp.
* - Otherwise, return an option of TimestampOffset. The offset is the offset of the first message whose timestamp
* is greater than or equals to the target timestamp and whose offset is greater than or equals to the startingOffset.
*
* This methods only returns None when 1) all messages' offset < startOffing or 2) the log is not empty but we did not
* see any message when scanning the log from the indexed position. The latter could happen if the log is truncated
* after we get the indexed position but before we scan the log from there. In this case we simply return None and the
* caller will need to check on the truncated log and maybe retry or even do the search on another log segment.
*
* @param timestamp The timestamp to search for.
* @param startingOffset The starting offset to search.
* @return the timestamp and offset of the first message that meets the requirements. None will be returned if there is no such message.
*/
def findOffsetByTimestamp(timestamp: Long, startingOffset: Long = baseOffset): Option[TimestampAndOffset] = {
// Get the index entry with a timestamp less than or equal to the target timestamp
val timestampOffset = timeIndex.lookup(timestamp)
val position = offsetIndex.lookup(math.max(timestampOffset.offset, startingOffset)).position
// Search the timestamp
Option(log.searchForTimestamp(timestamp, position, startingOffset))
}
/**
* Close this log segment
*/
def close(): Unit = {
if (_maxTimestampAndOffsetSoFar != TimestampOffset.UNKNOWN)
CoreUtils.swallow(timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar, true), this)
CoreUtils.swallow(lazyOffsetIndex.close(), this)
CoreUtils.swallow(lazyTimeIndex.close(), this)
CoreUtils.swallow(log.close(), this)
CoreUtils.swallow(txnIndex.close(), this)
}
/**
* Close file handlers used by the log segment but don't write to disk. This is used when the disk may have failed
*/
def closeHandlers(): Unit = {
CoreUtils.swallow(lazyOffsetIndex.closeHandler(), this)
CoreUtils.swallow(lazyTimeIndex.closeHandler(), this)
CoreUtils.swallow(log.closeHandlers(), this)
CoreUtils.swallow(txnIndex.close(), this)
}
/**
* Delete this log segment from the filesystem.
*/
def deleteIfExists(): Unit = {
def delete(delete: () => Boolean, fileType: String, file: File, logIfMissing: Boolean): Unit = {
try {
if (delete())
info(s"Deleted $fileType ${file.getAbsolutePath}.")
else if (logIfMissing)
info(s"Failed to delete $fileType ${file.getAbsolutePath} because it does not exist.")
}
catch {
case e: IOException => throw new IOException(s"Delete of $fileType ${file.getAbsolutePath} failed.", e)
}
}
CoreUtils.tryAll(Seq(
() => delete(log.deleteIfExists _, "log", log.file, logIfMissing = true),
() => delete(lazyOffsetIndex.deleteIfExists _, "offset index", lazyOffsetIndex.file, logIfMissing = true),
() => delete(lazyTimeIndex.deleteIfExists _, "time index", lazyTimeIndex.file, logIfMissing = true),
() => delete(txnIndex.deleteIfExists _, "transaction index", txnIndex.file, logIfMissing = false)
))
}
def deleted(): Boolean = {
!log.file.exists() && !lazyOffsetIndex.file.exists() && !lazyTimeIndex.file.exists() && !txnIndex.file.exists()
}
/**
* The last modified time of this log segment as a unix time stamp
*/
def lastModified = log.file.lastModified
/**
* The largest timestamp this segment contains, if maxTimestampSoFar >= 0, otherwise None.
*/
def largestRecordTimestamp: Option[Long] = if (maxTimestampSoFar >= 0) Some(maxTimestampSoFar) else None
/**
* The largest timestamp this segment contains.
*/
def largestTimestamp = if (maxTimestampSoFar >= 0) maxTimestampSoFar else lastModified
/**
* Change the last modified time for this log segment
*/
def lastModified_=(ms: Long) = {
val fileTime = FileTime.fromMillis(ms)
Files.setLastModifiedTime(log.file.toPath, fileTime)
Files.setLastModifiedTime(lazyOffsetIndex.file.toPath, fileTime)
Files.setLastModifiedTime(lazyTimeIndex.file.toPath, fileTime)
}
}
object LogSegment {
def open(dir: File, baseOffset: Long, config: LogConfig, time: Time, fileAlreadyExists: Boolean = false,
initFileSize: Int = 0, preallocate: Boolean = false, fileSuffix: String = ""): LogSegment = {
val maxIndexSize = config.maxIndexSize
new LogSegment(
FileRecords.open(UnifiedLog.logFile(dir, baseOffset, fileSuffix), fileAlreadyExists, initFileSize, preallocate),
LazyIndex.forOffset(UnifiedLog.offsetIndexFile(dir, baseOffset, fileSuffix), baseOffset, maxIndexSize),
LazyIndex.forTime(UnifiedLog.timeIndexFile(dir, baseOffset, fileSuffix), baseOffset, maxIndexSize),
new TransactionIndex(baseOffset, UnifiedLog.transactionIndexFile(dir, baseOffset, fileSuffix)),
baseOffset,
indexIntervalBytes = config.indexInterval,
rollJitterMs = config.randomSegmentJitter,
time)
}
def deleteIfExists(dir: File, baseOffset: Long, fileSuffix: String = ""): Unit = {
UnifiedLog.deleteFileIfExists(UnifiedLog.offsetIndexFile(dir, baseOffset, fileSuffix))
UnifiedLog.deleteFileIfExists(UnifiedLog.timeIndexFile(dir, baseOffset, fileSuffix))
UnifiedLog.deleteFileIfExists(UnifiedLog.transactionIndexFile(dir, baseOffset, fileSuffix))
UnifiedLog.deleteFileIfExists(UnifiedLog.logFile(dir, baseOffset, fileSuffix))
}
}
object LogFlushStats {
private val metricsGroup = new KafkaMetricsGroup(LogFlushStats.getClass)
val logFlushTimer: Timer = metricsGroup.newTimer("LogFlushRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS)
}