/
Log.scala
2700 lines (2413 loc) · 125 KB
/
Log.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.log
import java.io.{File, IOException}
import java.nio.file.Files
import java.text.NumberFormat
import java.util.Optional
import java.util.concurrent.atomic._
import java.util.concurrent.TimeUnit
import java.util.regex.Pattern
import kafka.api.{ApiVersion, KAFKA_0_10_0_IV0}
import kafka.common.{LongRef, OffsetsOutOfOrderException, UnexpectedAppendOffsetException}
import kafka.log.AppendOrigin.RaftLeader
import kafka.message.{BrokerCompressionCodec, CompressionCodec, NoCompressionCodec}
import kafka.metrics.KafkaMetricsGroup
import kafka.server.checkpoints.LeaderEpochCheckpointFile
import kafka.server.epoch.LeaderEpochFileCache
import kafka.server.{BrokerTopicStats, FetchDataInfo, FetchHighWatermark, FetchIsolation, FetchLogEnd, FetchTxnCommitted, LogDirFailureChannel, LogOffsetMetadata, OffsetAndEpoch, PartitionMetadataFile, RequestLocal}
import kafka.utils._
import org.apache.kafka.common.errors._
import org.apache.kafka.common.message.{DescribeProducersResponseData, FetchResponseData}
import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.ListOffsetsRequest
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH_OFFSET
import org.apache.kafka.common.requests.ProduceResponse.RecordError
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{InvalidRecordException, KafkaException, TopicPartition, Uuid}
import scala.jdk.CollectionConverters._
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import scala.collection.{Seq, immutable, mutable}
object LogAppendInfo {
val UnknownLogAppendInfo = LogAppendInfo(None, -1, None, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, -1L,
RecordConversionStats.EMPTY, NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false, -1L)
def unknownLogAppendInfoWithLogStartOffset(logStartOffset: Long): LogAppendInfo =
LogAppendInfo(None, -1, None, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, logStartOffset,
RecordConversionStats.EMPTY, NoCompressionCodec, NoCompressionCodec, -1, -1,
offsetsMonotonic = false, -1L)
/**
* In ProduceResponse V8+, we add two new fields record_errors and error_message (see KIP-467).
* For any record failures with InvalidTimestamp or InvalidRecordException, we construct a LogAppendInfo object like the one
* in unknownLogAppendInfoWithLogStartOffset, but with additiona fields recordErrors and errorMessage
*/
def unknownLogAppendInfoWithAdditionalInfo(logStartOffset: Long, recordErrors: Seq[RecordError], errorMessage: String): LogAppendInfo =
LogAppendInfo(None, -1, None, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, logStartOffset,
RecordConversionStats.EMPTY, NoCompressionCodec, NoCompressionCodec, -1, -1,
offsetsMonotonic = false, -1L, recordErrors, errorMessage)
}
sealed trait LeaderHwChange
object LeaderHwChange {
case object Increased extends LeaderHwChange
case object Same extends LeaderHwChange
case object None extends LeaderHwChange
}
/**
* Struct to hold various quantities we compute about each message set before appending to the log
*
* @param firstOffset The first offset in the message set unless the message format is less than V2 and we are appending
* to the follower. If the message is a duplicate message the segment base offset and relative position
* in segment will be unknown.
* @param lastOffset The last offset in the message set
* @param lastLeaderEpoch The partition leader epoch corresponding to the last offset, if available.
* @param maxTimestamp The maximum timestamp of the message set.
* @param offsetOfMaxTimestamp The offset of the message with the maximum timestamp.
* @param logAppendTime The log append time (if used) of the message set, otherwise Message.NoTimestamp
* @param logStartOffset The start offset of the log at the time of this append.
* @param recordConversionStats Statistics collected during record processing, `null` if `assignOffsets` is `false`
* @param sourceCodec The source codec used in the message set (send by the producer)
* @param targetCodec The target codec of the message set(after applying the broker compression configuration if any)
* @param shallowCount The number of shallow messages
* @param validBytes The number of valid bytes
* @param offsetsMonotonic Are the offsets in this message set monotonically increasing
* @param lastOffsetOfFirstBatch The last offset of the first batch
* @param leaderHwChange Incremental if the high watermark needs to be increased after appending record.
* Same if high watermark is not changed. None is the default value and it means append failed
*
*/
case class LogAppendInfo(var firstOffset: Option[LogOffsetMetadata],
var lastOffset: Long,
var lastLeaderEpoch: Option[Int],
var maxTimestamp: Long,
var offsetOfMaxTimestamp: Long,
var logAppendTime: Long,
var logStartOffset: Long,
var recordConversionStats: RecordConversionStats,
sourceCodec: CompressionCodec,
targetCodec: CompressionCodec,
shallowCount: Int,
validBytes: Int,
offsetsMonotonic: Boolean,
lastOffsetOfFirstBatch: Long,
recordErrors: Seq[RecordError] = List(),
errorMessage: String = null,
leaderHwChange: LeaderHwChange = LeaderHwChange.None) {
/**
* Get the first offset if it exists, else get the last offset of the first batch
* For magic versions 2 and newer, this method will return first offset. For magic versions
* older than 2, we use the last offset of the first batch as an approximation of the first
* offset to avoid decompressing the data.
*/
def firstOrLastOffsetOfFirstBatch: Long = firstOffset.map(_.messageOffset).getOrElse(lastOffsetOfFirstBatch)
/**
* Get the (maximum) number of messages described by LogAppendInfo
* @return Maximum possible number of messages described by LogAppendInfo
*/
def numMessages: Long = {
firstOffset match {
case Some(firstOffsetVal) if (firstOffsetVal.messageOffset >= 0 && lastOffset >= 0) =>
(lastOffset - firstOffsetVal.messageOffset + 1)
case _ => 0
}
}
}
/**
* Container class which represents a snapshot of the significant offsets for a partition. This allows fetching
* of these offsets atomically without the possibility of a leader change affecting their consistency relative
* to each other. See [[Log.fetchOffsetSnapshot()]].
*/
case class LogOffsetSnapshot(logStartOffset: Long,
logEndOffset: LogOffsetMetadata,
highWatermark: LogOffsetMetadata,
lastStableOffset: LogOffsetMetadata)
/**
* Another container which is used for lower level reads using [[kafka.cluster.Partition.readRecords()]].
*/
case class LogReadInfo(fetchedData: FetchDataInfo,
divergingEpoch: Option[FetchResponseData.EpochEndOffset],
highWatermark: Long,
logStartOffset: Long,
logEndOffset: Long,
lastStableOffset: Long)
/**
* A class used to hold useful metadata about a completed transaction. This is used to build
* the transaction index after appending to the log.
*
* @param producerId The ID of the producer
* @param firstOffset The first offset (inclusive) of the transaction
* @param lastOffset The last offset (inclusive) of the transaction. This is always the offset of the
* COMMIT/ABORT control record which indicates the transaction's completion.
* @param isAborted Whether or not the transaction was aborted
*/
case class CompletedTxn(producerId: Long, firstOffset: Long, lastOffset: Long, isAborted: Boolean) {
override def toString: String = {
"CompletedTxn(" +
s"producerId=$producerId, " +
s"firstOffset=$firstOffset, " +
s"lastOffset=$lastOffset, " +
s"isAborted=$isAborted)"
}
}
/**
* A class used to hold params required to decide to rotate a log segment or not.
*/
case class RollParams(maxSegmentMs: Long,
maxSegmentBytes: Int,
maxTimestampInMessages: Long,
maxOffsetInMessages: Long,
messagesSize: Int,
now: Long)
object RollParams {
def apply(config: LogConfig, appendInfo: LogAppendInfo, messagesSize: Int, now: Long): RollParams = {
new RollParams(config.maxSegmentMs,
config.segmentSize,
appendInfo.maxTimestamp,
appendInfo.lastOffset,
messagesSize, now)
}
}
sealed trait LogStartOffsetIncrementReason
case object ClientRecordDeletion extends LogStartOffsetIncrementReason {
override def toString: String = "client delete records request"
}
case object LeaderOffsetIncremented extends LogStartOffsetIncrementReason {
override def toString: String = "leader offset increment"
}
case object SegmentDeletion extends LogStartOffsetIncrementReason {
override def toString: String = "segment deletion"
}
case object SnapshotGenerated extends LogStartOffsetIncrementReason {
override def toString: String = "snapshot generated"
}
/**
* An append-only log for storing messages.
*
* The log is a sequence of LogSegments, each with a base offset denoting the first message in the segment.
*
* New log segments are created according to a configurable policy that controls the size in bytes or time interval
* for a given segment.
*
* @param _dir The directory in which log segments are created.
* @param config The log configuration settings
* @param segments The non-empty log segments recovered from disk
* @param logStartOffset The earliest offset allowed to be exposed to kafka client.
* The logStartOffset can be updated by :
* - user's DeleteRecordsRequest
* - broker's log retention
* - broker's log truncation
* - broker's log recovery
* The logStartOffset is used to decide the following:
* - Log deletion. LogSegment whose nextOffset <= log's logStartOffset can be deleted.
* It may trigger log rolling if the active segment is deleted.
* - Earliest offset of the log in response to ListOffsetRequest. To avoid OffsetOutOfRange exception after user seeks to earliest offset,
* we make sure that logStartOffset <= log's highWatermark
* Other activities such as log cleaning are not affected by logStartOffset.
* @param recoveryPoint The offset at which to begin the next recovery i.e. the first offset which has not been flushed to disk
* @param nextOffsetMetadata The offset where the next message could be appended
* @param scheduler The thread pool scheduler used for background actions
* @param brokerTopicStats Container for Broker Topic Yammer Metrics
* @param time The time instance used for checking the clock
* @param producerIdExpirationCheckIntervalMs How often to check for producer ids which need to be expired
* @param topicPartition The topic partition associated with this Log instance
* @param leaderEpochCache The LeaderEpochFileCache instance (if any) containing state associated
* with the provided logStartOffset and nextOffsetMetadata
* @param producerStateManager The ProducerStateManager instance containing state associated with the provided segments
* @param logDirFailureChannel The LogDirFailureChannel instance to asynchronously handle log directory failure
* @param topicId optional Uuid to specify the topic ID for the topic if it exists. Should only be specified when
* first creating the log through Partition.makeLeader or Partition.makeFollower. When reloading a log,
* this field will be populated by reading the topic ID value from partition.metadata if it exists.
* @param keepPartitionMetadataFile boolean flag to indicate whether the partition.metadata file should be kept in the
* log directory. A partition.metadata file is only created when the raft controller is used
* or the ZK controller's inter-broker protocol version is at least 2.8.
* This file will persist the topic ID on the broker. If inter-broker protocol for a ZK controller
* is downgraded below 2.8, a topic ID may be lost and a new ID generated upon re-upgrade.
* If the inter-broker protocol version on a ZK cluster is below 2.8, partition.metadata
* will be deleted to avoid ID conflicts upon re-upgrade.
*/
@threadsafe
class Log(@volatile private var _dir: File,
@volatile var config: LogConfig,
val segments: LogSegments,
@volatile var logStartOffset: Long,
@volatile var recoveryPoint: Long,
@volatile var nextOffsetMetadata: LogOffsetMetadata,
scheduler: Scheduler,
brokerTopicStats: BrokerTopicStats,
val time: Time,
val producerIdExpirationCheckIntervalMs: Int,
val topicPartition: TopicPartition,
@volatile var leaderEpochCache: Option[LeaderEpochFileCache],
val producerStateManager: ProducerStateManager,
logDirFailureChannel: LogDirFailureChannel,
@volatile private var _topicId: Option[Uuid],
val keepPartitionMetadataFile: Boolean) extends Logging with KafkaMetricsGroup {
import kafka.log.Log._
this.logIdent = s"[Log partition=$topicPartition, dir=${dir.getParent}] "
/* A lock that guards all modifications to the log */
private val lock = new Object
// The memory mapped buffer for index files of this log will be closed with either delete() or closeHandlers()
// After memory mapped buffer is closed, no disk IO operation should be performed for this log
@volatile private var isMemoryMappedBufferClosed = false
// Cache value of parent directory to avoid allocations in hot paths like ReplicaManager.checkpointHighWatermarks
@volatile private var _parentDir: String = dir.getParent
/* last time it was flushed */
private val lastFlushedTime = new AtomicLong(time.milliseconds)
/* The earliest offset which is part of an incomplete transaction. This is used to compute the
* last stable offset (LSO) in ReplicaManager. Note that it is possible that the "true" first unstable offset
* gets removed from the log (through record or segment deletion). In this case, the first unstable offset
* will point to the log start offset, which may actually be either part of a completed transaction or not
* part of a transaction at all. However, since we only use the LSO for the purpose of restricting the
* read_committed consumer to fetching decided data (i.e. committed, aborted, or non-transactional), this
* temporary abuse seems justifiable and saves us from scanning the log after deletion to find the first offsets
* of each ongoing transaction in order to compute a new first unstable offset. It is possible, however,
* that this could result in disagreement between replicas depending on when they began replicating the log.
* In the worst case, the LSO could be seen by a consumer to go backwards.
*/
@volatile private var firstUnstableOffsetMetadata: Option[LogOffsetMetadata] = None
/* Keep track of the current high watermark in order to ensure that segments containing offsets at or above it are
* not eligible for deletion. This means that the active segment is only eligible for deletion if the high watermark
* equals the log end offset (which may never happen for a partition under consistent load). This is needed to
* prevent the log start offset (which is exposed in fetch responses) from getting ahead of the high watermark.
*/
@volatile private var highWatermarkMetadata: LogOffsetMetadata = LogOffsetMetadata(logStartOffset)
@volatile var partitionMetadataFile : PartitionMetadataFile = null
locally {
initializePartitionMetadata()
updateLogStartOffset(logStartOffset)
maybeIncrementFirstUnstableOffset()
// Delete partition metadata file if the version does not support topic IDs.
// Recover topic ID if present and topic IDs are supported
// If we were provided a topic ID when creating the log, partition metadata files are supported, and one does not yet exist
// write to the partition metadata file.
// Ensure we do not try to assign a provided topicId that is inconsistent with the ID on file.
if (partitionMetadataFile.exists()) {
if (keepPartitionMetadataFile) {
val fileTopicId = partitionMetadataFile.read().topicId
if (_topicId.isDefined && !_topicId.contains(fileTopicId))
throw new InconsistentTopicIdException(s"Tried to assign topic ID $topicId to log for topic partition $topicPartition," +
s"but log already contained topic ID $fileTopicId")
_topicId = Some(fileTopicId)
} else {
try partitionMetadataFile.delete()
catch {
case e: IOException =>
error(s"Error while trying to delete partition metadata file ${partitionMetadataFile}", e)
}
}
} else if (keepPartitionMetadataFile) {
_topicId.foreach(partitionMetadataFile.write)
}
}
def topicId: Option[Uuid] = _topicId
def dir: File = _dir
def parentDir: String = _parentDir
def parentDirFile: File = new File(_parentDir)
def updateConfig(newConfig: LogConfig): Unit = {
val oldConfig = this.config
this.config = newConfig
val oldRecordVersion = oldConfig.messageFormatVersion.recordVersion
val newRecordVersion = newConfig.messageFormatVersion.recordVersion
if (newRecordVersion.precedes(oldRecordVersion))
warn(s"Record format version has been downgraded from $oldRecordVersion to $newRecordVersion.")
if (newRecordVersion.value != oldRecordVersion.value)
initializeLeaderEpochCache()
}
private def checkIfMemoryMappedBufferClosed(): Unit = {
if (isMemoryMappedBufferClosed)
throw new KafkaStorageException(s"The memory mapped buffer for log of $topicPartition is already closed")
}
def highWatermark: Long = highWatermarkMetadata.messageOffset
/**
* Update the high watermark to a new offset. The new high watermark will be lower
* bounded by the log start offset and upper bounded by the log end offset.
*
* This is intended to be called when initializing the high watermark or when updating
* it on a follower after receiving a Fetch response from the leader.
*
* @param hw the suggested new value for the high watermark
* @return the updated high watermark offset
*/
def updateHighWatermark(hw: Long): Long = {
updateHighWatermark(LogOffsetMetadata(hw))
}
/**
* Update high watermark with offset metadata. The new high watermark will be lower
* bounded by the log start offset and upper bounded by the log end offset.
*
* @param highWatermarkMetadata the suggested high watermark with offset metadata
* @return the updated high watermark offset
*/
def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = {
val endOffsetMetadata = logEndOffsetMetadata
val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < logStartOffset) {
LogOffsetMetadata(logStartOffset)
} else if (highWatermarkMetadata.messageOffset >= endOffsetMetadata.messageOffset) {
endOffsetMetadata
} else {
highWatermarkMetadata
}
updateHighWatermarkMetadata(newHighWatermarkMetadata)
newHighWatermarkMetadata.messageOffset
}
/**
* Update the high watermark to a new value if and only if it is larger than the old value. It is
* an error to update to a value which is larger than the log end offset.
*
* This method is intended to be used by the leader to update the high watermark after follower
* fetch offsets have been updated.
*
* @return the old high watermark, if updated by the new value
*/
def maybeIncrementHighWatermark(newHighWatermark: LogOffsetMetadata): Option[LogOffsetMetadata] = {
if (newHighWatermark.messageOffset > logEndOffset)
throw new IllegalArgumentException(s"High watermark $newHighWatermark update exceeds current " +
s"log end offset $logEndOffsetMetadata")
lock.synchronized {
val oldHighWatermark = fetchHighWatermarkMetadata
// Ensure that the high watermark increases monotonically. We also update the high watermark when the new
// offset metadata is on a newer segment, which occurs whenever the log is rolled to a new segment.
if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset ||
(oldHighWatermark.messageOffset == newHighWatermark.messageOffset && oldHighWatermark.onOlderSegment(newHighWatermark))) {
updateHighWatermarkMetadata(newHighWatermark)
Some(oldHighWatermark)
} else {
None
}
}
}
/**
* Get the offset and metadata for the current high watermark. If offset metadata is not
* known, this will do a lookup in the index and cache the result.
*/
private def fetchHighWatermarkMetadata: LogOffsetMetadata = {
checkIfMemoryMappedBufferClosed()
val offsetMetadata = highWatermarkMetadata
if (offsetMetadata.messageOffsetOnly) {
lock.synchronized {
val fullOffset = convertToOffsetMetadataOrThrow(highWatermark)
updateHighWatermarkMetadata(fullOffset)
fullOffset
}
} else {
offsetMetadata
}
}
private def updateHighWatermarkMetadata(newHighWatermark: LogOffsetMetadata): Unit = {
if (newHighWatermark.messageOffset < 0)
throw new IllegalArgumentException("High watermark offset should be non-negative")
lock synchronized {
if (newHighWatermark.messageOffset < highWatermarkMetadata.messageOffset) {
warn(s"Non-monotonic update of high watermark from $highWatermarkMetadata to $newHighWatermark")
}
highWatermarkMetadata = newHighWatermark
producerStateManager.onHighWatermarkUpdated(newHighWatermark.messageOffset)
maybeIncrementFirstUnstableOffset()
}
trace(s"Setting high watermark $newHighWatermark")
}
/**
* Get the first unstable offset. Unlike the last stable offset, which is always defined,
* the first unstable offset only exists if there are transactions in progress.
*
* @return the first unstable offset, if it exists
*/
private[log] def firstUnstableOffset: Option[Long] = firstUnstableOffsetMetadata.map(_.messageOffset)
private def fetchLastStableOffsetMetadata: LogOffsetMetadata = {
checkIfMemoryMappedBufferClosed()
// cache the current high watermark to avoid a concurrent update invalidating the range check
val highWatermarkMetadata = fetchHighWatermarkMetadata
firstUnstableOffsetMetadata match {
case Some(offsetMetadata) if offsetMetadata.messageOffset < highWatermarkMetadata.messageOffset =>
if (offsetMetadata.messageOffsetOnly) {
lock synchronized {
val fullOffset = convertToOffsetMetadataOrThrow(offsetMetadata.messageOffset)
if (firstUnstableOffsetMetadata.contains(offsetMetadata))
firstUnstableOffsetMetadata = Some(fullOffset)
fullOffset
}
} else {
offsetMetadata
}
case _ => highWatermarkMetadata
}
}
/**
* The last stable offset (LSO) is defined as the first offset such that all lower offsets have been "decided."
* Non-transactional messages are considered decided immediately, but transactional messages are only decided when
* the corresponding COMMIT or ABORT marker is written. This implies that the last stable offset will be equal
* to the high watermark if there are no transactional messages in the log. Note also that the LSO cannot advance
* beyond the high watermark.
*/
def lastStableOffset: Long = {
firstUnstableOffsetMetadata match {
case Some(offsetMetadata) if offsetMetadata.messageOffset < highWatermark => offsetMetadata.messageOffset
case _ => highWatermark
}
}
def lastStableOffsetLag: Long = highWatermark - lastStableOffset
/**
* Fully materialize and return an offset snapshot including segment position info. This method will update
* the LogOffsetMetadata for the high watermark and last stable offset if they are message-only. Throws an
* offset out of range error if the segment info cannot be loaded.
*/
def fetchOffsetSnapshot: LogOffsetSnapshot = {
val lastStable = fetchLastStableOffsetMetadata
val highWatermark = fetchHighWatermarkMetadata
LogOffsetSnapshot(
logStartOffset,
logEndOffsetMetadata,
highWatermark,
lastStable
)
}
private val tags = {
val maybeFutureTag = if (isFuture) Map("is-future" -> "true") else Map.empty[String, String]
Map("topic" -> topicPartition.topic, "partition" -> topicPartition.partition.toString) ++ maybeFutureTag
}
newGauge(LogMetricNames.NumLogSegments, () => numberOfSegments, tags)
newGauge(LogMetricNames.LogStartOffset, () => logStartOffset, tags)
newGauge(LogMetricNames.LogEndOffset, () => logEndOffset, tags)
newGauge(LogMetricNames.Size, () => size, tags)
val producerExpireCheck = scheduler.schedule(name = "PeriodicProducerExpirationCheck", fun = () => {
lock synchronized {
producerStateManager.removeExpiredProducers(time.milliseconds)
}
}, period = producerIdExpirationCheckIntervalMs, delay = producerIdExpirationCheckIntervalMs, unit = TimeUnit.MILLISECONDS)
/** The name of this log */
def name = dir.getName()
private def recordVersion: RecordVersion = config.messageFormatVersion.recordVersion
private def initializePartitionMetadata(): Unit = lock synchronized {
val partitionMetadata = PartitionMetadataFile.newFile(dir)
partitionMetadataFile = new PartitionMetadataFile(partitionMetadata, logDirFailureChannel)
}
/** Only used for ZK clusters when we update and start using topic IDs on existing topics */
def assignTopicId(topicId: Uuid): Unit = {
partitionMetadataFile.write(topicId)
_topicId = Some(topicId)
}
private def initializeLeaderEpochCache(): Unit = lock synchronized {
leaderEpochCache = Log.maybeCreateLeaderEpochCache(dir, topicPartition, logDirFailureChannel, recordVersion, logIdent)
}
private def updateLogEndOffset(offset: Long): Unit = {
nextOffsetMetadata = LogOffsetMetadata(offset, activeSegment.baseOffset, activeSegment.size)
// Update the high watermark in case it has gotten ahead of the log end offset following a truncation
// or if a new segment has been rolled and the offset metadata needs to be updated.
if (highWatermark >= offset) {
updateHighWatermarkMetadata(nextOffsetMetadata)
}
if (this.recoveryPoint > offset) {
this.recoveryPoint = offset
}
}
private def updateLogStartOffset(offset: Long): Unit = {
logStartOffset = offset
if (highWatermark < offset) {
updateHighWatermark(offset)
}
if (this.recoveryPoint < offset) {
this.recoveryPoint = offset
}
}
// Rebuild producer state until lastOffset. This method may be called from the recovery code path, and thus must be
// free of all side-effects, i.e. it must not update any log-specific state.
private def rebuildProducerState(lastOffset: Long,
producerStateManager: ProducerStateManager): Unit = lock synchronized {
checkIfMemoryMappedBufferClosed()
Log.rebuildProducerState(producerStateManager, segments, logStartOffset, lastOffset, recordVersion, time,
reloadFromCleanShutdown = false, logIdent)
}
def activeProducers: Seq[DescribeProducersResponseData.ProducerState] = {
lock synchronized {
producerStateManager.activeProducers.map { case (producerId, state) =>
new DescribeProducersResponseData.ProducerState()
.setProducerId(producerId)
.setProducerEpoch(state.producerEpoch)
.setLastSequence(state.lastSeq)
.setLastTimestamp(state.lastTimestamp)
.setCoordinatorEpoch(state.coordinatorEpoch)
.setCurrentTxnStartOffset(state.currentTxnFirstOffset.getOrElse(-1L))
}
}.toSeq
}
private[log] def activeProducersWithLastSequence: Map[Long, Int] = lock synchronized {
producerStateManager.activeProducers.map { case (producerId, producerIdEntry) =>
(producerId, producerIdEntry.lastSeq)
}
}
private[log] def lastRecordsOfActiveProducers: Map[Long, LastRecord] = lock synchronized {
producerStateManager.activeProducers.map { case (producerId, producerIdEntry) =>
val lastDataOffset = if (producerIdEntry.lastDataOffset >= 0 ) Some(producerIdEntry.lastDataOffset) else None
val lastRecord = LastRecord(lastDataOffset, producerIdEntry.producerEpoch)
producerId -> lastRecord
}
}
/**
* The number of segments in the log.
* Take care! this is an O(n) operation.
*/
def numberOfSegments: Int = segments.numberOfSegments
/**
* Close this log.
* The memory mapped buffer for index files of this log will be left open until the log is deleted.
*/
def close(): Unit = {
debug("Closing log")
lock synchronized {
checkIfMemoryMappedBufferClosed()
producerExpireCheck.cancel(true)
maybeHandleIOException(s"Error while renaming dir for $topicPartition in dir ${dir.getParent}") {
// We take a snapshot at the last written offset to hopefully avoid the need to scan the log
// after restarting and to ensure that we cannot inadvertently hit the upgrade optimization
// (the clean shutdown file is written after the logs are all closed).
producerStateManager.takeSnapshot()
segments.close()
}
}
}
/**
* Rename the directory of the log
*
* @throws KafkaStorageException if rename fails
*/
def renameDir(name: String): Unit = {
lock synchronized {
maybeHandleIOException(s"Error while renaming dir for $topicPartition in log dir ${dir.getParent}") {
val renamedDir = new File(dir.getParent, name)
Utils.atomicMoveWithFallback(dir.toPath, renamedDir.toPath)
if (renamedDir != dir) {
_dir = renamedDir
_parentDir = renamedDir.getParent
segments.updateParentDir(renamedDir)
producerStateManager.updateParentDir(dir)
// re-initialize leader epoch cache so that LeaderEpochCheckpointFile.checkpoint can correctly reference
// the checkpoint file in renamed log directory
initializeLeaderEpochCache()
initializePartitionMetadata()
}
}
}
}
/**
* Close file handlers used by log but don't write to disk. This is called if the log directory is offline
*/
def closeHandlers(): Unit = {
debug("Closing handlers")
lock synchronized {
segments.closeHandlers()
isMemoryMappedBufferClosed = true
}
}
/**
* Append this message set to the active segment of the log, assigning offsets and Partition Leader Epochs
*
* @param records The records to append
* @param origin Declares the origin of the append which affects required validations
* @param interBrokerProtocolVersion Inter-broker message protocol version
* @param requestLocal request local instance
* @throws KafkaStorageException If the append fails due to an I/O error.
* @return Information about the appended messages including the first and last offset.
*/
def appendAsLeader(records: MemoryRecords,
leaderEpoch: Int,
origin: AppendOrigin = AppendOrigin.Client,
interBrokerProtocolVersion: ApiVersion = ApiVersion.latestVersion,
requestLocal: RequestLocal = RequestLocal.NoCaching): LogAppendInfo = {
val validateAndAssignOffsets = origin != AppendOrigin.RaftLeader
append(records, origin, interBrokerProtocolVersion, validateAndAssignOffsets, leaderEpoch, Some(requestLocal), ignoreRecordSize = false)
}
/**
* Append this message set to the active segment of the log without assigning offsets or Partition Leader Epochs
*
* @param records The records to append
* @throws KafkaStorageException If the append fails due to an I/O error.
* @return Information about the appended messages including the first and last offset.
*/
def appendAsFollower(records: MemoryRecords): LogAppendInfo = {
append(records,
origin = AppendOrigin.Replication,
interBrokerProtocolVersion = ApiVersion.latestVersion,
validateAndAssignOffsets = false,
leaderEpoch = -1,
None,
// disable to check the validation of record size since the record is already accepted by leader.
ignoreRecordSize = true)
}
/**
* Append this message set to the active segment of the log, rolling over to a fresh segment if necessary.
*
* This method will generally be responsible for assigning offsets to the messages,
* however if the assignOffsets=false flag is passed we will only check that the existing offsets are valid.
*
* @param records The log records to append
* @param origin Declares the origin of the append which affects required validations
* @param interBrokerProtocolVersion Inter-broker message protocol version
* @param validateAndAssignOffsets Should the log assign offsets to this message set or blindly apply what it is given
* @param leaderEpoch The partition's leader epoch which will be applied to messages when offsets are assigned on the leader
* @param requestLocal The request local instance if assignOffsets is true
* @param ignoreRecordSize true to skip validation of record size.
* @throws KafkaStorageException If the append fails due to an I/O error.
* @throws OffsetsOutOfOrderException If out of order offsets found in 'records'
* @throws UnexpectedAppendOffsetException If the first or last offset in append is less than next offset
* @return Information about the appended messages including the first and last offset.
*/
private def append(records: MemoryRecords,
origin: AppendOrigin,
interBrokerProtocolVersion: ApiVersion,
validateAndAssignOffsets: Boolean,
leaderEpoch: Int,
requestLocal: Option[RequestLocal],
ignoreRecordSize: Boolean): LogAppendInfo = {
val appendInfo = analyzeAndValidateRecords(records, origin, ignoreRecordSize, leaderEpoch)
// return if we have no valid messages or if this is a duplicate of the last appended entry
if (appendInfo.shallowCount == 0) appendInfo
else {
// trim any invalid bytes or partial messages before appending it to the on-disk log
var validRecords = trimInvalidBytes(records, appendInfo)
// they are valid, insert them in the log
lock synchronized {
maybeHandleIOException(s"Error while appending records to $topicPartition in dir ${dir.getParent}") {
checkIfMemoryMappedBufferClosed()
if (validateAndAssignOffsets) {
// assign offsets to the message set
val offset = new LongRef(nextOffsetMetadata.messageOffset)
appendInfo.firstOffset = Some(LogOffsetMetadata(offset.value))
val now = time.milliseconds
val validateAndOffsetAssignResult = try {
LogValidator.validateMessagesAndAssignOffsets(validRecords,
topicPartition,
offset,
time,
now,
appendInfo.sourceCodec,
appendInfo.targetCodec,
config.compact,
config.messageFormatVersion.recordVersion.value,
config.messageTimestampType,
config.messageTimestampDifferenceMaxMs,
leaderEpoch,
origin,
interBrokerProtocolVersion,
brokerTopicStats,
requestLocal.getOrElse(throw new IllegalArgumentException(
"requestLocal should be defined if assignOffsets is true")))
} catch {
case e: IOException =>
throw new KafkaException(s"Error validating messages while appending to log $name", e)
}
validRecords = validateAndOffsetAssignResult.validatedRecords
appendInfo.maxTimestamp = validateAndOffsetAssignResult.maxTimestamp
appendInfo.offsetOfMaxTimestamp = validateAndOffsetAssignResult.shallowOffsetOfMaxTimestamp
appendInfo.lastOffset = offset.value - 1
appendInfo.recordConversionStats = validateAndOffsetAssignResult.recordConversionStats
if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)
appendInfo.logAppendTime = now
// re-validate message sizes if there's a possibility that they have changed (due to re-compression or message
// format conversion)
if (!ignoreRecordSize && validateAndOffsetAssignResult.messageSizeMaybeChanged) {
validRecords.batches.forEach { batch =>
if (batch.sizeInBytes > config.maxMessageSize) {
// we record the original message set size instead of the trimmed size
// to be consistent with pre-compression bytesRejectedRate recording
brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)
throw new RecordTooLargeException(s"Message batch size is ${batch.sizeInBytes} bytes in append to" +
s"partition $topicPartition which exceeds the maximum configured size of ${config.maxMessageSize}.")
}
}
}
} else {
// we are taking the offsets we are given
if (!appendInfo.offsetsMonotonic)
throw new OffsetsOutOfOrderException(s"Out of order offsets found in append to $topicPartition: " +
records.records.asScala.map(_.offset))
if (appendInfo.firstOrLastOffsetOfFirstBatch < nextOffsetMetadata.messageOffset) {
// we may still be able to recover if the log is empty
// one example: fetching from log start offset on the leader which is not batch aligned,
// which may happen as a result of AdminClient#deleteRecords()
val firstOffset = appendInfo.firstOffset match {
case Some(offsetMetadata) => offsetMetadata.messageOffset
case None => records.batches.asScala.head.baseOffset()
}
val firstOrLast = if (appendInfo.firstOffset.isDefined) "First offset" else "Last offset of the first batch"
throw new UnexpectedAppendOffsetException(
s"Unexpected offset in append to $topicPartition. $firstOrLast " +
s"${appendInfo.firstOrLastOffsetOfFirstBatch} is less than the next offset ${nextOffsetMetadata.messageOffset}. " +
s"First 10 offsets in append: ${records.records.asScala.take(10).map(_.offset)}, last offset in" +
s" append: ${appendInfo.lastOffset}. Log start offset = $logStartOffset",
firstOffset, appendInfo.lastOffset)
}
}
// update the epoch cache with the epoch stamped onto the message by the leader
validRecords.batches.forEach { batch =>
if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
maybeAssignEpochStartOffset(batch.partitionLeaderEpoch, batch.baseOffset)
} else {
// In partial upgrade scenarios, we may get a temporary regression to the message format. In
// order to ensure the safety of leader election, we clear the epoch cache so that we revert
// to truncation by high watermark after the next leader election.
leaderEpochCache.filter(_.nonEmpty).foreach { cache =>
warn(s"Clearing leader epoch cache after unexpected append with message format v${batch.magic}")
cache.clearAndFlush()
}
}
}
// check messages set size may be exceed config.segmentSize
if (validRecords.sizeInBytes > config.segmentSize) {
throw new RecordBatchTooLargeException(s"Message batch size is ${validRecords.sizeInBytes} bytes in append " +
s"to partition $topicPartition, which exceeds the maximum configured segment size of ${config.segmentSize}.")
}
// maybe roll the log if this segment is full
val segment = maybeRoll(validRecords.sizeInBytes, appendInfo)
val logOffsetMetadata = LogOffsetMetadata(
messageOffset = appendInfo.firstOrLastOffsetOfFirstBatch,
segmentBaseOffset = segment.baseOffset,
relativePositionInSegment = segment.size)
// now that we have valid records, offsets assigned, and timestamps updated, we need to
// validate the idempotent/transactional state of the producers and collect some metadata
val (updatedProducers, completedTxns, maybeDuplicate) = analyzeAndValidateProducerState(
logOffsetMetadata, validRecords, origin)
maybeDuplicate match {
case Some(duplicate) =>
appendInfo.firstOffset = Some(LogOffsetMetadata(duplicate.firstOffset))
appendInfo.lastOffset = duplicate.lastOffset
appendInfo.logAppendTime = duplicate.timestamp
appendInfo.logStartOffset = logStartOffset
case None =>
// Before appending update the first offset metadata to include segment information
appendInfo.firstOffset = appendInfo.firstOffset.map { offsetMetadata =>
offsetMetadata.copy(segmentBaseOffset = segment.baseOffset, relativePositionInSegment = segment.size)
}
segment.append(largestOffset = appendInfo.lastOffset,
largestTimestamp = appendInfo.maxTimestamp,
shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp,
records = validRecords)
// Increment the log end offset. We do this immediately after the append because a
// write to the transaction index below may fail and we want to ensure that the offsets
// of future appends still grow monotonically. The resulting transaction index inconsistency
// will be cleaned up after the log directory is recovered. Note that the end offset of the
// ProducerStateManager will not be updated and the last stable offset will not advance
// if the append to the transaction index fails.
updateLogEndOffset(appendInfo.lastOffset + 1)
// update the producer state
updatedProducers.values.foreach(producerAppendInfo => producerStateManager.update(producerAppendInfo))
// update the transaction index with the true last stable offset. The last offset visible
// to consumers using READ_COMMITTED will be limited by this value and the high watermark.
completedTxns.foreach { completedTxn =>
val lastStableOffset = producerStateManager.lastStableOffset(completedTxn)
segment.updateTxnIndex(completedTxn, lastStableOffset)
producerStateManager.completeTxn(completedTxn)
}
// always update the last producer id map offset so that the snapshot reflects the current offset
// even if there isn't any idempotent data being written
producerStateManager.updateMapEndOffset(appendInfo.lastOffset + 1)
// update the first unstable offset (which is used to compute LSO)
maybeIncrementFirstUnstableOffset()
trace(s"Appended message set with last offset: ${appendInfo.lastOffset}, " +
s"first offset: ${appendInfo.firstOffset}, " +
s"next offset: ${nextOffsetMetadata.messageOffset}, " +
s"and messages: $validRecords")
if (unflushedMessages >= config.flushInterval) flush()
}
appendInfo
}
}
}
}
def maybeAssignEpochStartOffset(leaderEpoch: Int, startOffset: Long): Unit = {
leaderEpochCache.foreach { cache =>
cache.assign(leaderEpoch, startOffset)
}
}
def latestEpoch: Option[Int] = leaderEpochCache.flatMap(_.latestEpoch)
def endOffsetForEpoch(leaderEpoch: Int): Option[OffsetAndEpoch] = {
leaderEpochCache.flatMap { cache =>
val (foundEpoch, foundOffset) = cache.endOffsetFor(leaderEpoch, logEndOffset)
if (foundOffset == UNDEFINED_EPOCH_OFFSET)
None
else
Some(OffsetAndEpoch(foundOffset, foundEpoch))
}
}
private def maybeIncrementFirstUnstableOffset(): Unit = lock synchronized {
checkIfMemoryMappedBufferClosed()
val updatedFirstStableOffset = producerStateManager.firstUnstableOffset match {
case Some(logOffsetMetadata) if logOffsetMetadata.messageOffsetOnly || logOffsetMetadata.messageOffset < logStartOffset =>
val offset = math.max(logOffsetMetadata.messageOffset, logStartOffset)
Some(convertToOffsetMetadataOrThrow(offset))
case other => other
}
if (updatedFirstStableOffset != this.firstUnstableOffsetMetadata) {
debug(s"First unstable offset updated to $updatedFirstStableOffset")
this.firstUnstableOffsetMetadata = updatedFirstStableOffset
}
}
/**
* Increment the log start offset if the provided offset is larger.
*
* If the log start offset changed, then this method also update a few key offset such that
* `logStartOffset <= logStableOffset <= highWatermark`. The leader epoch cache is also updated
* such that all of offsets referenced in that component point to valid offset in this log.
*
* @throws OffsetOutOfRangeException if the log start offset is greater than the high watermark
* @return true if the log start offset was updated; otherwise false
*/
def maybeIncrementLogStartOffset(newLogStartOffset: Long, reason: LogStartOffsetIncrementReason): Boolean = {
// We don't have to write the log start offset to log-start-offset-checkpoint immediately.
// The deleteRecordsOffset may be lost only if all in-sync replicas of this broker are shutdown
// in an unclean manner within log.flush.start.offset.checkpoint.interval.ms. The chance of this happening is low.
var updatedLogStartOffset = false
maybeHandleIOException(s"Exception while increasing log start offset for $topicPartition to $newLogStartOffset in dir ${dir.getParent}") {
lock synchronized {
if (newLogStartOffset > highWatermark)
throw new OffsetOutOfRangeException(s"Cannot increment the log start offset to $newLogStartOffset of partition $topicPartition " +
s"since it is larger than the high watermark $highWatermark")
checkIfMemoryMappedBufferClosed()
if (newLogStartOffset > logStartOffset) {
updatedLogStartOffset = true
updateLogStartOffset(newLogStartOffset)
info(s"Incremented log start offset to $newLogStartOffset due to $reason")
leaderEpochCache.foreach(_.truncateFromStart(logStartOffset))
producerStateManager.onLogStartOffsetIncremented(newLogStartOffset)
maybeIncrementFirstUnstableOffset()
}
}
}
updatedLogStartOffset
}
private def analyzeAndValidateProducerState(appendOffsetMetadata: LogOffsetMetadata,
records: MemoryRecords,