-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-6530: Use actual first offset of message set when rolling log segment #4660
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the patch! Left a couple comments.
@@ -83,7 +84,8 @@ case class LogAppendInfo(var firstOffset: Long, | |||
targetCodec: CompressionCodec, | |||
shallowCount: Int, | |||
validBytes: Int, | |||
offsetsMonotonic: Boolean) | |||
offsetsMonotonic: Boolean, | |||
hasAccurateFirstOffset: Boolean) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is reasonable, but it feels a bit odd to have one parameter indicating whether or not we can trust another parameter, right? The fact that we abuse firstOffset
in the first place is a big source of confusion in the code, so I'm wondering if it would be better to replace it with an Option
to clearly express the fact that we may or may not have it. Then in places where we need to use an offset, we can write firstOffset.getOrElse(lastOffset)
, which is more explicit and less likely to cause confusion.
It may even be useful to have separate objects, LeaderLogAppendInfo
which is guaranteed to have the first offset, and ReplicaLogAppendInfo
, which may or may not have it.
if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) { | ||
firstOffset = batch.baseOffset | ||
hasAccurateFirstOffset = true | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the magic is v1 and below, then whether or not we have an accurate first offset depends on whether this is a leader append or a replica append. For a leader append, we have the first offset because we are the one who assigns it. This logic happens in Log.append
after we have validated the data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates. Left a few more comments.
offsetsMonotonic: Boolean) { | ||
def firstOffset_= (firstOffset: Long) {_firstOffset = Some(firstOffset)} | ||
def firstOffset: Long = _firstOffset.get | ||
def hasAccurateFirstOffset: Boolean = _firstOffset.isDefined |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: could this just be hasFirstOffset
? Or perhaps we could just expose the option as maybeFirstOffset
or something like that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to hasFirstOffset
var lastOffset = -1L | ||
var sourceCodec: CompressionCodec = NoCompressionCodec | ||
var monotonic = true | ||
var maxTimestamp = RecordBatch.NO_TIMESTAMP | ||
var offsetOfMaxTimestamp = -1L | ||
var hasAccurateFirstOffset = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we don't need this anymore?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
if (firstOffset < 0) | ||
firstOffset = if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) batch.baseOffset else batch.lastOffset | ||
// Also indicate whether we have the accurate first offset or not | ||
if (firstOffset == (Some(-1L))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can do firstOffset.contains(-1L)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -748,7 +748,7 @@ class ReplicaManager(val config: KafkaConfig, | |||
} | |||
|
|||
val numAppendedMessages = | |||
if (info.firstOffset == -1L || info.lastOffset == -1L) | |||
if (!info.hasAccurateFirstOffset || info.firstOffset == -1L || info.lastOffset == -1L) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be a little better encapsulation if we move this logic into a method in LogAppendInfo
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -760,7 +760,7 @@ class ReplicaManager(val config: KafkaConfig, | |||
brokerTopicStats.allTopicsStats.messagesInRate.mark(numAppendedMessages) | |||
|
|||
trace("%d bytes written to log %s-%d beginning at offset %d and ending at offset %d" | |||
.format(records.sizeInBytes, topicPartition.topic, topicPartition.partition, info.firstOffset, info.lastOffset)) | |||
.format(records.sizeInBytes, topicPartition.topic, topicPartition.partition, info.firstOrLastOffset, info.lastOffset)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the leader path, so I think we should have a first offset. Are we trying to guard the case where there were no messages appended?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In case there are no messages appended, I think firstOffset would be -1. Changed this to firstOffset instead of firstOrLast.
segmentBaseOffset = segment.baseOffset, | ||
relativePositionInSegment = segment.size) | ||
|
||
segment.append(firstOffset = appendInfo.firstOffset, | ||
segment.append(firstOffset = appendInfo.firstOrLastOffset, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As far as I can tell, the first offset here is used only for insertion into the offset index and for logging. I cannot think of a good reason why we should prefer to use the first offset over the last offset. Since the replicas have to use the last offset for the old message format anyway, I wonder if we should try to be consistent. It would also make the logging less confusing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -762,7 +766,7 @@ class Log(@volatile var dir: File, | |||
updateFirstUnstableOffset() | |||
|
|||
trace("Appended message set to log %s with first offset: %d, next offset: %d, and messages: %s" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should just use the accurate last offset instead of a potentially inaccurate first offset? Or perhaps we could print both of them, but use the Option
for the first offset?
Also nit: can we change this log message to use string interpolation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -859,12 +863,13 @@ class Log(@volatile var dir: File, | |||
private def analyzeAndValidateRecords(records: MemoryRecords, isFromClient: Boolean): LogAppendInfo = { | |||
var shallowMessageCount = 0 | |||
var validBytesCount = 0 | |||
var firstOffset = -1L | |||
var firstOffset: Option[Long] = Some(-1L) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm... I would have expected we'd use None
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is really a placeholder for "firstOffset has not been initialized yet, so go initialize when you see the first message".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Would it be a little more natural to let firstOffset
be a Long
here, and convert it to Option
when we construct LogAppendInfo
. I guess I'm a little concerned about Some(-1)
somehow leaking into the rest of the code.
@@ -1086,7 +1086,7 @@ class LogCleanerTest extends JUnitSuite { | |||
val end = 2 | |||
val offsetSeq = Seq(0L, 7206178L) | |||
writeToLog(log, (start until end) zip (start until end), offsetSeq) | |||
cleaner.buildOffsetMap(log, start, end, map, new CleanerStats()) | |||
cleaner.buildOffsetMap(log, start, 7206178L + 1L, map, new CleanerStats()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps we can choose better names for start and end to avoid the confusion. Maybe keyStart
and keyEnd
or something like that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@hachikuji I think I addressed all review comments. Please take a look when you get a chance. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates. This is looking good, just a few more small comments.
trace("Appended message set to log %s with first offset: %d, next offset: %d, and messages: %s" | ||
.format(this.name, appendInfo.firstOffset, nextOffsetMetadata.messageOffset, validRecords)) | ||
trace(s"Appended message set to log ${this.name} with last offset: ${appendInfo.lastOffset}, " + | ||
s"first offset: ${if (appendInfo.hasFirstOffset) Some(appendInfo.firstOffset) else None}, " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems we could just use firstOffset
? Then we wouldn't need hasFirstOffset
any longer.
largestTimestamp: Long, | ||
shallowOffsetOfMaxTimestamp: Long, | ||
records: MemoryRecords): Unit = { | ||
if (records.sizeInBytes > 0) { | ||
trace("Inserting %d bytes at offset %d at position %d with largest timestamp %d at shallow offset %d" | ||
.format(records.sizeInBytes, firstOffset, log.sizeInBytes(), largestTimestamp, shallowOffsetOfMaxTimestamp)) | ||
trace(s"Inserting ${records.sizeInBytes} bytes at end_offset $largestOffset at position ${log.sizeInBytes} " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: do we need the underscore in _
? Maybe a space would work? Same below.
@@ -47,19 +47,19 @@ import java.lang.{Long => JLong} | |||
import java.util.regex.Pattern | |||
|
|||
object LogAppendInfo { | |||
val UnknownLogAppendInfo = LogAppendInfo(-1, -1, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, -1L, | |||
val UnknownLogAppendInfo = LogAppendInfo(Some(-1L), -1, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, -1L, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we use None
instead so that we can always guarantee that the first offset, if present, is positive? As far as I can tell, outside of test cases, we just have a couple calls to firstOffset.get
inside ReplicaManager
that we would need to replace with getOrElse(-1)
.
I would also like to remove this UnknownLogAppendInfo
since this sentinel pattern has proven error-prone, but we can leave that for future work.
@@ -92,7 +92,7 @@ object StressTestLog { | |||
@volatile var offset = 0 | |||
override def work() { | |||
val logAppendInfo = log.appendAsFollower(TestUtils.singletonRecords(offset.toString.getBytes)) | |||
require(logAppendInfo.firstOffset == offset && logAppendInfo.lastOffset == offset) | |||
require(logAppendInfo.firstOrLastOffset == offset && logAppendInfo.lastOffset == offset) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting that this only worked because we were writing single-message batches. I think the expectation might be a little clearer if instead of logAppendInfo.firstOrLastOffset == offset
, we wrote logAppendInfo.firstOffset.forall(_ == offset)
. Then the check is valid even if we used batches with multiple messages.
maxTimestampInMessages = appendInfo.maxTimestamp, | ||
maxOffsetInMessages = appendInfo.lastOffset) | ||
val segment = maybeRoll(validRecords.sizeInBytes, | ||
appendInfo) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: enough room on the previous line for this?
@@ -474,7 +474,7 @@ class ReplicaManager(val config: KafkaConfig, | |||
topicPartition -> | |||
ProducePartitionStatus( | |||
result.info.lastOffset + 1, // required offset | |||
new PartitionResponse(result.error, result.info.firstOffset, result.info.logAppendTime, result.info.logStartOffset)) // response status | |||
new PartitionResponse(result.error, result.info.firstOffset.get, result.info.logAppendTime, result.info.logStartOffset)) // response status |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we need to replace this get
with getOrElse(-1)
since we could be getting the UnknownLogAppendInfo
here? Same thing below.
Note that the failing builds are likely a result of the bug mentioned in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks for the patch!
…writer thread crashed.
…ome(-1) in previous upload.
The two failing tests seem unrelated. I tried to reproduce locally after rebasing the PR, but was unable to do so. Note I rebased and removed a couple unneeded printlns. I will merge after the new builds complete. |
Use the exact first offset of message set when rolling log segment. This is possible to do for message format V2 and beyond without any performance penalty, because we have the first offset stored in the header. This augments the fix made in KAFKA-4451 to avoid using the heuristic for V2 and beyond messages.
Added unit tests to simulate cases where segment needs to roll because of overflow in index offsets. Verified that the new segment created in these cases uses the first offset, instead of the heuristic in use previously.
Committer Checklist (excluded from commit message)