-
Notifications
You must be signed in to change notification settings - Fork 13.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-9839: Broker should accept control requests with newer broker epoch #8509
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. The changes look good to me. However, I wonder if we could also add unit tests in KafkaApisTest
to cover this change. What do you think?
@dajac I did initially add a unit test to |
ok to test |
@apovzner Yeah, you're right. It does not increase the overall coverage. I spent quite some time in the I do agree that adding a test which verifies that |
@dajac Sounds good! I added unit tests to KafkaApisTest as you suggested. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updated PR, @apovzner! I left few small comments.
@Test | ||
def testStopReplicaRequestWithCurrentBrokerEpoch(): Unit = { | ||
val currentBrokerEpoch = 1239875L | ||
testUpdateMetadataRequest(currentBrokerEpoch, currentBrokerEpoch, Errors.NONE) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This and the two above should use testStopReplicaRequest
instead of testUpdateMetadataRequest
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for catching this and others below!
def testStopReplicaRequest(currentBrokerEpoch: Long, brokerEpochInRequest: Long, expectedError: Errors): Unit = { | ||
val controllerId = 0 | ||
val controllerEpoch = 5 | ||
val brokerEpoch = 230498320L |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
brokerEpoch
is not necessary, I suppose.
EasyMock.eq(request.context.correlationId), | ||
EasyMock.eq(controllerId), | ||
EasyMock.eq(controllerEpoch), | ||
EasyMock.eq(brokerEpoch + 1), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should rely on brokerEpochInRequest
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @apovzner. I just had a minor suggestion in addition to David's comments.
else if (brokerEpochInRequest == curBrokerEpoch) false | ||
else throw new IllegalStateException(s"Epoch $brokerEpochInRequest larger than current broker epoch $curBrokerEpoch") | ||
} | ||
else brokerEpochInRequest < controller.brokerEpoch |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Short comment here may be helpful about the case where the controller sees the epoch bump first.
@dajac and @hachikuji thanks for your comments, I addressed them. |
ok to test |
retest this please |
2 similar comments
retest this please |
retest this please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@apovzner Thanks for the updated PR. LGTM. I just left a small cosmetic comment.
// brokerEpochInRequest > controller.brokerEpoch is possible in rare scenarios where the controller gets notified | ||
// about the new broker epoch and sends a control request with this epoch before the broker learns about it |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Alignment of the comment looks a bit weird. I would align it with the if/else.
retest this please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
retest this please |
ok to test |
…poch (#8509) A broker throws IllegalStateException if the broker epoch in the LeaderAndIsr/UpdateMetadataRequest/StopReplicaRequest is larger than its current broker epoch. However, there is no guarantee that the broker would receive the latest broker epoch before the controller: when the broker registers with ZK, there are few more instructions to process before this broker "knows" about its epoch, while the controller may already get notified and send UPDATE_METADATA request (as an example) with the new epoch. This will result in clients getting stale metadata from this broker. With this PR, a broker accepts LeaderAndIsr/UpdateMetadataRequest/StopReplicaRequest if the broker epoch is newer than the current epoch. Reviewers: David Jacot <djacot@confluent.io>, Jason Gustafson <jason@confluent.io>
…t-for-generated-requests * apache-github/trunk: (366 commits) MINOR: Improve producer test BufferPoolTest#testCloseNotifyWaiters. (apache#7982) MINOR: document how to escape json parameters to ducktape tests (apache#8546) KAFKA-9885; Evict last members of a group when the maximum allowed is reached (apache#8525) KAFKA-9866: Avoid election for topics where preferred leader is not in ISR (apache#8524) KAFKA-9839; Broker should accept control requests with newer broker epoch (apache#8509) KAKFA-9612: Add an option to kafka-configs.sh to add configs from a prop file (KIP-574) MINOR: Partition is under reassignment when adding and removing (apache#8364) MINOR: reduce allocations in log start and recovery checkpoints (apache#8467) MINOR: Remove unused foreign-key join class (apache#8547) HOTFIX: Fix broker bounce system tests (apache#8532) KAFKA-9704: Fix the issue z/OS won't let us resize file when mmap. (apache#8224) KAFKA-8639: Replace AddPartitionsToTxn with Automated Protocol (apache#8326) MINOR: equals() should compare all fields for generated classes (apache#8539) KAFKA-9844; Fix race condition which allows more than maximum number of members(apache#8454) KAFKA-9823: Remember the sent generation for the coordinator request (apache#8445) KAFKA-9883: Add better error message when REST API forwards a request and leader is not known (apache#8536) KAFKA-9907: Switch default build to Scala 2.13 (apache#8537) MINOR: Some html fixes in Streams DSL documentation (apache#8503) MINOR: Enable fatal warnings with scala 2.13 (apache#8429) KAFKA-9852: Change the max duration that calls to the buffer pool can block from 2000ms to 10ms to reduce overall test runtime (apache#8464) ...
…poch (apache#8509) A broker throws IllegalStateException if the broker epoch in the LeaderAndIsr/UpdateMetadataRequest/StopReplicaRequest is larger than its current broker epoch. However, there is no guarantee that the broker would receive the latest broker epoch before the controller: when the broker registers with ZK, there are few more instructions to process before this broker "knows" about its epoch, while the controller may already get notified and send UPDATE_METADATA request (as an example) with the new epoch. This will result in clients getting stale metadata from this broker. With this PR, a broker accepts LeaderAndIsr/UpdateMetadataRequest/StopReplicaRequest if the broker epoch is newer than the current epoch. Reviewers: David Jacot <djacot@confluent.io>, Jason Gustafson <jason@confluent.io>
…poch (apache#8509) A broker throws IllegalStateException if the broker epoch in the LeaderAndIsr/UpdateMetadataRequest/StopReplicaRequest is larger than its current broker epoch. However, there is no guarantee that the broker would receive the latest broker epoch before the controller: when the broker registers with ZK, there are few more instructions to process before this broker "knows" about its epoch, while the controller may already get notified and send UPDATE_METADATA request (as an example) with the new epoch. This will result in clients getting stale metadata from this broker. With this PR, a broker accepts LeaderAndIsr/UpdateMetadataRequest/StopReplicaRequest if the broker epoch is newer than the current epoch. Reviewers: David Jacot <djacot@confluent.io>, Jason Gustafson <jason@confluent.io>
…poch (apache#8509) A broker throws IllegalStateException if the broker epoch in the LeaderAndIsr/UpdateMetadataRequest/StopReplicaRequest is larger than its current broker epoch. However, there is no guarantee that the broker would receive the latest broker epoch before the controller: when the broker registers with ZK, there are few more instructions to process before this broker "knows" about its epoch, while the controller may already get notified and send UPDATE_METADATA request (as an example) with the new epoch. This will result in clients getting stale metadata from this broker. With this PR, a broker accepts LeaderAndIsr/UpdateMetadataRequest/StopReplicaRequest if the broker epoch is newer than the current epoch. Reviewers: David Jacot <djacot@confluent.io>, Jason Gustafson <jason@confluent.io>
A broker throws IllegalStateException if the broker epoch in the LeaderAndIsr/UpdateMetadataRequest/StopReplicaRequest is larger than its current broker epoch. However, there is no guarantee that the broker would receive the latest broker epoch before the controller: When the broker registers with ZK, there are few more instructions to process before this broker "knows" about its epoch, while the controller may already get notified and send UPDATE_METADATA request (as an example) with the new epoch. This will result in clients getting stale metadata from this broker.
With this PR, a broker accepts LeaderAndIsr/UpdateMetadataRequest/StopReplicaRequest if the broker epoch is newer than the current epoch.
Committer Checklist (excluded from commit message)