Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-5435: Improve producer state loading after failure #3361

Closed
wants to merge 8 commits into from

Conversation

hachikuji
Copy link

No description provided.

@asfgit
Copy link

asfgit commented Jun 17, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/5427/
Test FAILed (JDK 7 and Scala 2.11).

@asfgit
Copy link

asfgit commented Jun 17, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/5412/
Test FAILed (JDK 8 and Scala 2.12).

// There are two common cases where we can skip loading and write a new snapshot at the log end offset:
//
// 1. The broker has been upgraded, but the topic is still on the old message format.
// 2. The broker has been upgraded, the topic is on the new message format, and we had a clean shutdown.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is true because we should always have a snapshot file on a clean shutdown with the new message format, correct? In other words, are we assuming that if there isn't a snapshot file and the message format is new and we had a clean shutdown, that means this is an upgrade?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that should be the common case for a direct upgrade to the new message format.

@@ -445,17 +448,20 @@ class Log(@volatile var dir: File,
producerStateManager.takeSnapshot()
}
} else {
val currentTimeMs = time.milliseconds
producerStateManager.truncateAndReload(logStartOffset, lastOffset, currentTimeMs)
val isEmptyBeforeTruncation = producerStateManager.isEmpty && producerStateManager.mapEndOffset >= lastOffset
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this always be false for lastOffset > 0 because the producer state is actually loaded on the next line?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's only the case when doing the initial loading. Truncation after initial load would hit this case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah. right. I keep forgetting that leader failover can result in truncation at any time.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add a comment about this because I forgot several times as well.

updateFirstUnstableOffset()
}
} else {
lock synchronized {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a question you can ignore for now since it's for my understanding: but how could the logStartOffset be in the middle of a segment if numToDelete == 0 ?

@asfgit
Copy link

asfgit commented Jun 17, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/5429/
Test FAILed (JDK 7 and Scala 2.11).

@asfgit
Copy link

asfgit commented Jun 17, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/5414/
Test FAILed (JDK 8 and Scala 2.12).

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hachikuji : Thanks for the patch. LGTM. Just a couple of minor comments.

// There are two common cases where we can skip loading and write a new snapshot at the log end offset:
//
// 1. The broker has been upgraded, but the topic is still on the old message format.
// 2. The broker has been upgraded, the topic is on the new message format, and we had a clean shutdown.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps make it clear that both 1 and 2 are when there is no snapshot.


if (producerStateManager.latestSnapshotOffset.isEmpty && (messageFormatVersion < RecordBatch.MAGIC_VALUE_V2 || reloadFromCleanShutdown)) {
// To avoid an expensive scan through all of the segments, we take empty snapshots from the start of
// the last two segments and the last offset. This avoid the full scan in the case that the log needs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

avoid => avoids

@asfgit
Copy link

asfgit commented Jun 17, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/5417/
Test PASSed (JDK 8 and Scala 2.12).

@hachikuji
Copy link
Author

By the way, I ran the transaction system tests here: https://jenkins.confluent.io/view/All/job/system-test-kafka-branch-builder/917/console. I'll run a few more times.

@asfgit
Copy link

asfgit commented Jun 17, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/5432/
Test PASSed (JDK 7 and Scala 2.11).

lock synchronized {
// The log start offset may now point to the middle of a segment, so we need to truncate the producer
// state to ensure that non-retained producers are evicted.
producerStateManager.truncateHead(logStartOffset)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@junrao One question I had is whether it is more appropriate to try and handle this case in maybeIncrementLogStartOffset?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logStartOffset is only going to change when some segments are deleted in deleteSegments() or when maybeIncrementLogStartOffset() is called. So, it seems that we don't need to do anything in the else clause since the other two cases are already covered.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm.. Currently I don't have any logic in maybeIncrementLogStartOffset. Maybe this block needs to move there?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, perhaps we can move the following 2 lines to maybeIncrementLogStartOffset() and have deleteSegments() call maybeIncrementLogStartOffset() instead of directly updating logStartOffset.

        producerStateManager.truncateHead(logStartOffset)
        updateFirstUnstableOffset()

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

@@ -551,21 +556,33 @@ class ProducerStateManager(val topicPartition: TopicPartition,
*/
def oldestSnapshotOffset: Option[Long] = oldestSnapshotFile.map(file => offsetFromFilename(file.getName))

private def isProducerRetained(producerIdEntry: ProducerIdEntry, logStartOffset: Long): Boolean = {
producerIdEntry.lastOffset >= logStartOffset
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, when logStartOffset advances, we don't update producerIdEntry.currentTxnFirstOffset, which will be used to compute lastStableOffset. Could that lead to the case that lastStableOffset is < logStartOffset?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we discussed this briefly before. I think we agreed that we would keep the currentTxnFirstOffset preserved (since it is difficult to find the next higher offset for a transaction), but we would fix first unstable offset to return no lower than log start offset. Does that sound correct?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that sounds good.

Copy link
Author

@hachikuji hachikuji Jun 17, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@junrao I added some code to do this. See my comment in the commit. I fear we will have to live with the possibility of this allowing the LSO to temporarily diverge between replicas. Hopefully this is rare. In any case, there seems little else we can do about it this late in the game (though I'm open to ideas).

@asfgit
Copy link

asfgit commented Jun 17, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/5434/
Test PASSed (JDK 7 and Scala 2.11).

@asfgit
Copy link

asfgit commented Jun 17, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/5419/
Test PASSed (JDK 8 and Scala 2.12).

producerStateManager.evictUnretainedProducers(logStartOffset)
updateFirstUnstableOffset()
val newLogStartOffset = math.max(logStartOffset, segments.firstEntry.getValue.baseOffset)
leaderEpochCache.clearAndFlushEarliest(newLogStartOffset)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that both line 1070 and 1071 can be moved into maybeIncrementLogStartOffset(). We can just call maybeIncrementLogStartOffset(segments.firstEntry.getValue.baseOffset) here. Inside maybeIncrementLogStartOffset, if startOffset changes, we do the leaderEpoch and producerState update.

@asfgit
Copy link

asfgit commented Jun 17, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/5443/
Test PASSed (JDK 7 and Scala 2.11).

@asfgit
Copy link

asfgit commented Jun 17, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/5428/
Test PASSed (JDK 8 and Scala 2.12).

@junrao
Copy link
Contributor

junrao commented Jun 17, 2017

@hachikuji : Thanks for the updated patch. LGTM. I will let you merge it.

@hachikuji
Copy link
Author

I ran the transactions system tests last night and we're looking good: http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2017-06-17--001.1497703753--hachikuji--KAFKA-5435-ALT--49205c7/report.txt. I will go ahead and merge this to trunk and 0.11.0.

asfgit pushed a commit that referenced this pull request Jun 17, 2017
Author: Jason Gustafson <jason@confluent.io>

Reviewers: Apurva Mehta <apurva@confluent.io>, Jun Rao <junrao@gmail.com>

Closes #3361 from hachikuji/KAFKA-5435-ALT

(cherry picked from commit bcaee7f)
Signed-off-by: Jason Gustafson <jason@confluent.io>
@asfgit asfgit closed this in bcaee7f Jun 17, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants