-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-5435: Ensure producer snapshot retained after truncation #3306
Conversation
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hachikuji : Thanks for the patch. Looks good overall. A couple of comments.
ensureSnapshotRetained(logStartOffset, lastOffset) | ||
producerStateManager.truncateAndReload(logStartOffset, lastOffset, time.milliseconds()) | ||
|
||
// Only do the potentially expensive reloading of the last snapshot offset is lower than the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reloading of => reloading if?
@@ -432,7 +432,7 @@ class Log(@volatile var dir: File, | |||
info(s"Loading producer state from offset $lastOffset for partition $topicPartition") | |||
|
|||
if (producerStateManager.latestSnapshotOffset.isEmpty) { | |||
// if there are no snapshots to load producer state from, we assume that the brokers are | |||
// If there are no snapshots to load producer state from, we assume that the brokers are |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The general approach of creating an empty snapshot for the starting offset of the log seems fine. Not sure if it's 100% bullet proof though. Consider a log with 5 log segments before 0.11.0. When upgrading to 0.11.0, we will first create an empty snapshot on the last 2 segments during startup. Then, let's say we truncate to an offset within the 3rd log segment. After this, there will only be a snapshot on the base offset of the first log segment. Now, we start to append some messages into the 3rd log segment. Suppose that flush by message (flush.num.messages) have been configured and a Log.flush() is called, which only preserves snapshots on the last two log segments. At this time, the snapshot for the first segment is lost. If the broker crashes now and restarts, we will avoid scanning the log to rebuild PidMap, but in fact, there could be some EOS message in the 3rd log segment.
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
@junrao Thanks for the comments. I've pushed a change to try and tighten this up. Essentially the code attempts to ensure that we always have an empty snapshot file at the log start offset. I am working on additional test cases. |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM overall. The idea of always having an empty snapshot at the log start offset as a sentinel is a decent approach. As long as we have covered all the cases when the log start offset changes, this seems good. Regardless, it is an improvement on the current behavior.
I did leave some questions about the expectations in the tests though.
@@ -319,61 +359,96 @@ class LogTest { | |||
|
|||
log.truncateFullyAndStartAt(29) | |||
assertEquals(1, log.logSegments.size) | |||
assertEquals(None, log.latestProducerSnapshotOffset) | |||
assertEquals(Some(29), log.latestProducerSnapshotOffset) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
perhaps we should also assert that log.oldestProducerSnapshotOffset
is also the same value?
assertEquals(Some(2), stateManager.latestSnapshotOffset) | ||
|
||
stateManager.truncateAndReload(0L, 1L, time.milliseconds()) | ||
assertEquals(None, stateManager.latestSnapshotOffset) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should oldestSnapshotOffset
be 0 here. Is that worth asserting?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is related to Ismael's suggestion. We could push the log start offset snapshot invariant into ProducerStateManager
, but at the moment it is maintained in Log
. So in fact, the oldest snapshot would also be None
. For now I will add an assertion while I consider the potential refactor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hachikuji : Thanks for the updated patch. Looks good overall. A few more minor comments.
@@ -444,7 +444,7 @@ class ProducerStateManager(val topicPartition: TopicPartition, | |||
Files.deleteIfExists(file.toPath) | |||
} | |||
case None => | |||
lastSnapOffset = logStartOffset | |||
lastSnapOffset = -1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-1 => -1L ?
offset >= startOffset && offset <= endOffset | ||
} | ||
|
||
private def inRangeExclusive(snapshotFile: File, startOffset: Long, endOffset: Long): Boolean = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps inRangeExclusive and inRangeInclusive could just be outOfRange and inRange?
@@ -1050,7 +1055,11 @@ class Log(@volatile var dir: File, | |||
deletable.foreach(deleteSegment) | |||
logStartOffset = math.max(logStartOffset, segments.firstEntry().getValue.baseOffset) | |||
leaderEpochCache.clearAndFlushEarliest(logStartOffset) | |||
producerStateManager.evictUnretainedProducers(logStartOffset) | |||
|
|||
// Update the producer state with the new log start offset, which we cause any non-retained producers to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that we delete a segment first and then delete the snapshot with the start offset of that segment later in producerStateManager.truncateHead(). I am wondering if we need to tighten up ProducerStateManager.truncateAndReload() a bit. In that method, if the latest snapshot's offset doesn't have a corresponding log segment, using that snapshot could lead to incorrect producer state after recovery.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm.. In truncateAndReload
, we first delete any snapshots which are out of range. Is there a case where the start and end offsets passed to truncateAndReload
would be out of sync with the log segments?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, missed that. So this should work then.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"which we cause" -> "which would cause"?
@hachikuji Thanks for the patch. LGTM. One thing not directly related to the patch. It seems that when we delete the old segments, we don't change ProducerIdEntry.currentTxnFirstOffset. This means that ProducerIdEntry.currentTxnFirstOffset may no long be in the log. Not sure if this matters, but it probably means that ProducerIdEntry.currentTxnFirstOffset may be recovered inconsistently depending on whether the snapshot exists or not. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. Apart from some minor comments, I wonder if there's something we could do in ProducerStateManager
so that it would do the job of maintaining its invariants instead of relying on Log
to call takeEmptySnapshot
in a few different places.
|
||
// only do the potentially expensive reloading of the last snapshot offset is lower than the | ||
// Only do the potentially expensive reloading if the last snapshot offset is lower than the | ||
// log end offset (which would be the case on first startup) and there are active producers. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This description says that we do the reloading if both conditions are true. However, the if statement check succeeds if either of the checks is true.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the comment. Looking at this check, it seems a little wrong. The idea was to skip loading if there were no producers at the end of the log, but that is only useful if you consider the state of the producers prior to truncation. Let me try to sort this out.
@@ -478,7 +478,7 @@ class Log(@volatile var dir: File, | |||
completedTxns.foreach(producerStateManager.completeTxn) | |||
} | |||
|
|||
private[log] def activePids: Map[Long, ProducerIdEntry] = lock synchronized { | |||
private[log] def activeProducers: Map[Long, ProducerIdEntry] = lock synchronized { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not from this PR, but it seems like updateProducers
should just return the result from appendInfo.append(batch)
instead of taking a list to append to (from a simplicity perspective).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's fair.
@@ -1050,7 +1055,11 @@ class Log(@volatile var dir: File, | |||
deletable.foreach(deleteSegment) | |||
logStartOffset = math.max(logStartOffset, segments.firstEntry().getValue.baseOffset) | |||
leaderEpochCache.clearAndFlushEarliest(logStartOffset) | |||
producerStateManager.evictUnretainedProducers(logStartOffset) | |||
|
|||
// Update the producer state with the new log start offset, which we cause any non-retained producers to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"which we cause" -> "which would cause"?
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
|
||
// Enforce the invariant that we have an empty snapshot at the log start offset to ensure | ||
// proper loading of producer state upon recovery. | ||
producerStateManager.takeEmptySnapshot(logStartOffset) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just realized another potential issue with this. If we delete a log up to an offset in the middle of a segment, we create an empty snapshot on logStartOffset. In loadProducerState(), if we need to rebuild producer state from this snapshot, currently there seems to be 2 issues: (1) logSegments() doesn't seem to take logStartOffset into consideration. So, we may miss the first log segment. (2) If the segment is selected, we should be processing message from logStartOffset and up.
So I've been procrastinating on this patch this evening and I think it's because I'm not too satisfied with the solution. I've been thinking of an alternative. The current problem is that we sort of stupidly assume that no snapshot implies upgrade and we're now trying to clear several hurdles to make that assumption work. Perhaps we can just strengthen the upgrade check. In addition to having no snapshot file, maybe we can look at the message format version of the last batch appended to the log. If the version is old, we can skip rebuilding the producer state. This of course depends on the assumption that users won't downgrade the message format version, but we assume that in the fetch handler anyway, so it seems fine. Thoughts? |
Hmm, in the common case, the broker will be shut down cleanly before the upgrade. In this case, ideally, we don't want to do too much I/O during a clean restart. Reading the last batch may require some I/O per partition. If the # of partitions is large, the overhead could be significant. |
@junrao To be a little more specific, what I am proposing is only intended to handle the case when we find no snapshot files. This should only happen when we upgrade. After we have confirmed that the last entry in the log is on an older message format, then we would take an empty snapshot at the end of the log (and the base offset of the final two segments), as is currently done. |
Hmm.. Perhaps alternatively, we could save the reading of the last entry for when there was no clean shutdown? |
Sorry for the delay. Discussed this offline with @junrao. Instead of looking at the last entry in the log, we can use the message format version and whether the broker was shutdown cleanly:
One catch is that we need to do a better job of ensuring that we still have snapshots remaining after truncation to reduce the chance of needing to do a full scan if we hit a hard failure. |
I ended up creating a separate branch, so I'm going to close this and open a new PR. |
No description provided.