Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-3978; highwatermark should always be positive #4695

Merged
merged 7 commits into from Mar 14, 2018

Conversation

@lindong28
Copy link
Member

commented Mar 13, 2018

Partition highwatermark may become -1 during partition reassignment. The bug was fixed and validated with unit test in this patch.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)
@lindong28

This comment has been minimized.

Copy link
Member Author

commented Mar 13, 2018

Ping @ijuma @hachikuji for review.

@lindong28 lindong28 force-pushed the lindong28:KAFKA-3978 branch to e6724b8 Mar 13, 2018
Copy link
Contributor

left a comment

Thanks @lindong28! It would be good to get this into 1.1 since the fix seems straightforward. I feel like the root of the problem is unsafe use of sentinel values such as LogOffsetMetadata.UnknownOffsetMetadata. If we use Option[LogOffsetMetadata] instead, then we are forced to handle the None case.

@@ -1133,7 +1133,9 @@ class Log(@volatile var dir: File,
val fetchDataInfo = readUncommitted(offset, 1)
fetchDataInfo.fetchOffsetMetadata
} catch {
case _: OffsetOutOfRangeException => LogOffsetMetadata.UnknownOffsetMetadata
case _: OffsetOutOfRangeException =>
val firstOffset = segments.firstEntry.getValue.baseOffset

This comment has been minimized.

Copy link
@hachikuji

hachikuji Mar 13, 2018

Contributor

Seems a little surprising to return the first offset in the log if the searched offset couldn't be found. I wonder if it would be clearer to change this function to return Option[LogOffsetMetadata] and to handle the None case in convertHWToLocalOffsetMetadata by converting to the start offset?

This comment has been minimized.

Copy link
@lindong28

lindong28 Mar 13, 2018

Author Member

Thanks for the review! Sure. It is fixed as suggested.

@@ -73,7 +73,7 @@ class Replica(val brokerId: Int,
* high frequency.
*/
def updateLogReadResult(logReadResult: LogReadResult) {
if (logReadResult.info.fetchOffsetMetadata.messageOffset >= logReadResult.leaderLogEndOffset)
if (logReadResult.info.fetchOffsetMetadata.messageOffset >= logReadResult.leaderLogEndOffset && logReadResult.leaderLogEndOffset >= 0)

This comment has been minimized.

Copy link
@hachikuji

hachikuji Mar 13, 2018

Contributor

I guess this is not strictly needed since _lastCaughtUpTimeMs cannot become negative?

This comment has been minimized.

Copy link
@lindong28

lindong28 Mar 13, 2018

Author Member

Yeah you are right. I couldn't remember why this change was made. I have removed this change.

@@ -76,6 +76,32 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
super.tearDown()
}


@Test
def testHw(): Unit = {

This comment has been minimized.

Copy link
@hachikuji

hachikuji Mar 13, 2018

Contributor

nit: can we come up with a better name?

This comment has been minimized.

Copy link
@lindong28

lindong28 Mar 13, 2018

Author Member

Yeah I forgot to change it before uploading the patch. It is changed to testHwAfterPartitionReassignment now.

@@ -460,7 +460,8 @@ class Partition(val topic: String,
}.map(_.logEndOffset)
val newHighWatermark = allLogEndOffsets.min(new LogOffsetMetadata.OffsetOrdering)
val oldHighWatermark = leaderReplica.highWatermark
if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset || oldHighWatermark.onOlderSegment(newHighWatermark)) {
if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset ||
(oldHighWatermark.messageOffset == newHighWatermark.messageOffset && oldHighWatermark.onOlderSegment(newHighWatermark))) {

This comment has been minimized.

Copy link
@hachikuji

hachikuji Mar 13, 2018

Contributor

Can you explain this addition? Is there a scenario where the new high watermark might be smaller than the old high watermark, but be on a newer segment?

This comment has been minimized.

Copy link
@lindong28

lindong28 Mar 13, 2018

Author Member

During my investigation of the negative hw, it appears that we have LogOffsetMetadata.UnknownOffsetMetadata=new LogOffsetMetadata(-1, 0, 0), whereas Replica.highWatermarkMetadata is initialized to be LogOffsetMetadata(0, -1, -1). So strictly speaking we can have two instances LogOffsetMetadata where the messageOffset of one instance is larger and the segmentBaseOffset of the other instance is larger.

After further investigation, I realized that this is not the root cause of the negative hw. But it still seems safer to add this additional check to make sure that we never decrease hw.

This comment has been minimized.

Copy link
@lindong28

lindong28 Mar 13, 2018

Author Member

Note that allLogEndOffsets.min(new LogOffsetMetadata.OffsetOrdering in Partition.maybeIncrementLeaderHW may assign LogOffsetMetadata.UnknownOffsetMetadata=new LogOffsetMetadata(-1, 0, 0) as newHighWatermark in the case that the new leader has not received FetchRequest from some follower.

This comment has been minimized.

Copy link
@hachikuji

hachikuji Mar 13, 2018

Contributor

Thanks for the explanation. I agree it is safer with the additional check.

This comment has been minimized.

Copy link
@ijuma

ijuma Mar 13, 2018

Contributor

Is it worth adding a comment here? It's a bit confusing otherwise.

This comment has been minimized.

Copy link
@lindong28

lindong28 Mar 13, 2018

Author Member

Would it be OK say hw offset should never decrease? This additional check ensures that we have a total ordering of offset metadata. Previously we don't have this total ordering.

This comment has been minimized.

Copy link
@lindong28

lindong28 Mar 13, 2018

Author Member

I added this comment:
Ensure that watermark offset does not increase even if segmentBaseOffset of the newer LogOffsetMetadata is larger

@lindong28 lindong28 force-pushed the lindong28:KAFKA-3978 branch to 2b42b27 Mar 13, 2018
Copy link
Contributor

left a comment

Thanks, the fix looks good. Just had a some minor comments.

@@ -1128,12 +1128,12 @@ class Log(@volatile var dir: File,
* Given a message offset, find its corresponding offset metadata in the log.
* If the message offset is out of range, return unknown offset metadata
*/
def convertToOffsetMetadata(offset: Long): LogOffsetMetadata = {
def convertToOffsetMetadata(offset: Long): Option[LogOffsetMetadata] = {

This comment has been minimized.

Copy link
@hachikuji

hachikuji Mar 13, 2018

Contributor

nit: can you update the method doc?

This comment has been minimized.

Copy link
@lindong28

lindong28 Mar 13, 2018

Author Member

I should pay more attention to the doc. It is updated now.

"broker 101 should be the new leader", pause = 1L
)

assertTrue(newLeaderServer.replicaManager.getReplicaOrException(topicPartition).highWatermark.messageOffset >= 0)

This comment has been minimized.

Copy link
@hachikuji

hachikuji Mar 13, 2018

Contributor

Do you think we may as well validate the high watermark on both replicas?

This comment has been minimized.

Copy link
@hachikuji

hachikuji Mar 13, 2018

Contributor

Also, could the assertion be more precise? Seems we expect the high watermark to be 100?

This comment has been minimized.

Copy link
@lindong28

lindong28 Mar 13, 2018

Author Member

Sure. I have updated the patch as suggested.

@@ -167,7 +167,10 @@ class Replica(val brokerId: Int,

def convertHWToLocalOffsetMetadata() = {
if (isLocal) {
highWatermarkMetadata = log.get.convertToOffsetMetadata(highWatermarkMetadata.messageOffset)
highWatermarkMetadata = log.get.convertToOffsetMetadata(highWatermarkMetadata.messageOffset).getOrElse {

This comment has been minimized.

Copy link
@hachikuji

hachikuji Mar 13, 2018

Contributor

Would it be useful to have a short comment explaining when we can hit this case?

This comment has been minimized.

Copy link
@lindong28

lindong28 Mar 13, 2018

Author Member

Sure. I added the following comment:

Convert hw to local offset metadata by reading the log at the hw offset. 
If the hw offset is out of range, return the first offset of the first log segment as the offset metadata.
@@ -76,6 +76,32 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
super.tearDown()
}


This comment has been minimized.

Copy link
@hachikuji

hachikuji Mar 13, 2018

Contributor

nit: some unneeded whitespace before and after this test

This comment has been minimized.

Copy link
@lindong28

lindong28 Mar 13, 2018

Author Member

Sure. It is removed now.

@lindong28 lindong28 force-pushed the lindong28:KAFKA-3978 branch to c589f52 Mar 13, 2018
@lindong28

This comment has been minimized.

Copy link
Member Author

commented Mar 13, 2018

@hachikuji Thanks much for the detailed review. The patch has been updated as suggested.

@hachikuji

This comment has been minimized.

Copy link
Contributor

commented Mar 13, 2018

@lindong28 Thanks for the updates. The fix LGTM for this case. I had one additional hardening suggestion. Inside Replica.highWatermark=, could we add a check for LogOffsetMetadata.UnknownOffsetMetadata? If it is unknown, we could either raise an exception or we could just use offset 0 or maybe even the log start offset.

@lindong28

This comment has been minimized.

Copy link
Member Author

commented Mar 13, 2018

@hachikuji Thanks for all the careful thinking. It may be more general to check that hw offset >= 0 instead of checking hw metadata != LogOffsetMetadata.UnknownOffsetMetadata.

Currently Replica.highWatermark= does not have any logic for checking the value of hw. The value of hw should be guaranteed to be non-negative with the following:

  1. highWatermarkMetadata is initialized to be new LogOffsetMetadata(initialHighWatermarkValue), whose offset is guaranteed to be non-negative
  2. The first way of updating highWatermarkMetadata, i.e. Replica.convertHWToLocalOffsetMetadata(), should always assign non-negative offset.
  3. The other way of updating highWatermarkMetadata, i.e. Replica.highWatermark=, should never decrease hw offset. This is currently guaranteed in Partition.maybeIncrementLeaderHW().

So we should not worry about negative hw offset for now. If we want harden the logic to prevent problematic hw from being given to Replica.highWatermark=, maybe we can move the logic of non-decreasing hw from Partition.maybeIncrementLeaderHW() to Replica.highWatermark=. What do you think?

@hachikuji

This comment has been minimized.

Copy link
Contributor

commented Mar 13, 2018

@lindong28 Yes, a more general check for offset >= 0 makes sense. I think if we are convinced that we have covered all of the callers, then we could just raise IllegalArgumentException. This provides a bit more protection from future regressions. Perhaps another options is to add the check to the public constructor of LogOffsetMetadata; we can use a private constructor for UnknownOffsetMetadata itself. This will make easier to track down all the locations where a negative offset could be introduced. What do you think?

@lindong28

This comment has been minimized.

Copy link
Member Author

commented Mar 13, 2018

@hachikuji Sure. I have updated the patch to check that offset >= 0 in Replica.highWatermark=. In the future we can also check offset >=0 || this == UnknownOffsetMetadata in the constructor of LogOffsetMetadata. I tried it but it caused a lot of unit tests to fail, most likely because many unit tests create LogOffsetMetadata with negative offset directly. If we can wait, I can do that later. If we need the patch for 1.1 soon, maybe we can skip it now.

Copy link
Contributor

left a comment

LGTM. Thanks for the patch! I will merge after the build finishes.

/*
* Convert hw to local offset metadata by reading the log at the hw offset.
* If the hw offset is out of range, return the first offset of the first log segment as the offset metadata.
*/
def convertHWToLocalOffsetMetadata() = {

This comment has been minimized.

Copy link
@hachikuji

hachikuji Mar 13, 2018

Contributor

If you're adding another commit to address Ismael's comment, could you also change this method to have Unit return type? It's a little confusing otherwise.

This comment has been minimized.

Copy link
@lindong28

lindong28 Mar 13, 2018

Author Member

Good catch. It is updated now to follow the same style of other unit-typed method in the class.

if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset || oldHighWatermark.onOlderSegment(newHighWatermark)) {
if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset ||
// Ensure that watermark offset does not increase even if segmentBaseOffset of the newer LogOffsetMetadata is larger
(oldHighWatermark.messageOffset == newHighWatermark.messageOffset && oldHighWatermark.onOlderSegment(newHighWatermark))) {

This comment has been minimized.

Copy link
@ijuma

ijuma Mar 13, 2018

Contributor

I think I'd describe it as:

"when we initially roll the segment, the offset doesn't change, but we want the high watermark to point to the last segment"

@hachikuji, does this seem OK to you?

"broker 101 should be the new leader", pause = 1L
)

assertEquals(100, leaderServer.replicaManager.getReplicaOrException(topicPartition).highWatermark.messageOffset)

This comment has been minimized.

Copy link
@hachikuji

hachikuji Mar 13, 2018

Contributor

Was it intentional to change this assertion from using newLeaderServer to leaderServer?

This comment has been minimized.

Copy link
@lindong28

lindong28 Mar 13, 2018

Author Member

Sorry. Thanks for catching this. It is fixed now.

@hachikuji

This comment has been minimized.

Copy link
Contributor

commented Mar 13, 2018

Thanks @lindong28. I spent a little extra time making sure I understood the bug. The basic requirement is that the initial high watermark on the leader is out of range. Once this happens, there is a window during which the remaining replica log end offsets will be unknown. As long as any one of them is unknown, the leader's high watermark will be stuck at -1. We would usually expect this to be a short window since replicas will be kicked out of the ISR if they aren't fetching, but Jun suggested that this state could last for a longer time if there is some instability with the zookeeper session (the famous "invalid cached zk session" issue) and the leader is unable to shrink the ISR and the followers are no longer fetching. Anecdotally, that fits with the couple cases I've seen this in the field. In any case, I think the fix here does indeed address the issue.

@hachikuji hachikuji merged commit 6b08905 into apache:trunk Mar 14, 2018
0 of 3 checks passed
0 of 3 checks passed
JDK 7 and Scala 2.11 Build triggered. sha1 is merged.
Details
JDK 8 and Scala 2.12 Build triggered. sha1 is merged.
Details
JDK 9 and Scala 2.12 Build triggered. sha1 is merged.
Details
hachikuji added a commit that referenced this pull request Mar 14, 2018
Partition high watermark may become -1 if the initial value is out of range. This situation can occur during partition reassignment, for example. The bug was fixed and validated with unit test in this patch.

Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
hachikuji added a commit that referenced this pull request Mar 14, 2018
Partition high watermark may become -1 if the initial value is out of range. This situation can occur during partition reassignment, for example. The bug was fixed and validated with unit test in this patch.

Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
Copy link
Contributor

left a comment

@lindong28 : Thanks for finding the issue and patching it. One more comment below.

if (isLocal) {
highWatermarkMetadata = log.get.convertToOffsetMetadata(highWatermarkMetadata.messageOffset)
highWatermarkMetadata = log.get.convertToOffsetMetadata(highWatermarkMetadata.messageOffset).getOrElse {
val firstOffset = log.get.logSegments.head.baseOffset

This comment has been minimized.

Copy link
@junrao

junrao Mar 14, 2018

Contributor

Should we use log.logStartOffset for firstOffset?

This comment has been minimized.

Copy link
@lindong28

lindong28 Mar 14, 2018

Author Member

@junrao Great point. Yeah logStartOffset should be better.

This comment has been minimized.

Copy link
@hachikuji

hachikuji Mar 15, 2018

Contributor

@lindong28 Were you planning to submit a follow-up for this change?

This comment has been minimized.

Copy link
@lindong28

lindong28 Mar 16, 2018

Author Member

@hachikuji Sure. I opened #4722 for this change.

@lindong28 lindong28 deleted the lindong28:KAFKA-3978 branch Mar 14, 2018
isolis added a commit to linkedin/kafka that referenced this pull request Sep 13, 2018
…che#4695)

Partition high watermark may become -1 if the initial value is out of range. This situation can occur during partition reassignment, for example. The bug was fixed and validated with unit test in this patch.

Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
nimosunbit added a commit to sunbit-dev/kafka that referenced this pull request Nov 6, 2018
Partition high watermark may become -1 if the initial value is out of range. This situation can occur during partition reassignment, for example. The bug was fixed and validated with unit test in this patch.

Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants
You can’t perform that action at this time.