-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-15661: KIP-951: Server side changes #14444
Conversation
.setErrorCode(error.code()) | ||
.setSessionId(sessionId) | ||
.setResponses(topicResponseList); | ||
nodeEndpoints.forEach(endpoint -> data.nodeEndpoints().add( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need to populate nodeEndpoints field for all the Fetch/Produce response, which has certain overhead. to my understanding, the info needed only for error condition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that should be getting handled in KafkaApis where we only add to nodeEndpoints if there is an error, otherwise it should be an empty list.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it makes sense, thanks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: It would be good to add a comment here, something like
KafkaApis will manage the response, returning nodeEndpoints information only in case of an error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we also create the nodeEndpoints together so we keep the final data object relatively clean. (Ie we generate the response data structure above, we could also generate the enpoints data structure above.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reordered to make things cleaner
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR @chb2ab!
Looks good overall. Since the KIP is awaiting a final vote, should this PR wait until that goes through before merging?
List<FetchResponseData.FetchableTopicResponse> topicResponseList = new ArrayList<>(); | ||
FetchResponseData data = new FetchResponseData(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can we move the object creation closer to where it's updated and returned at the bottom of the method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sgtm
* @param throttleTimeMs Time in milliseconds the response was throttled | ||
* @param nodeEndpoints List of node endpoints | ||
*/ | ||
@Deprecated |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm confused—why is the new constructor is marked as @Deprecated
? If that's intentional, can you add a comment about what should be used instead? Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't look too deep into this before, but this was deprecated in https://issues.apache.org/jira/browse/KAFKA-9628 and the follow up is described here https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L605-L608. I'm not sure how big of a change that would be, and it is a critical part of the code, so I suspect it would be better to separate it into a different PR. I'm also not sure how noticeable the performance benefit will be. For now I think adding to this comment should be enough.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think all we would need to do is create the produceResponseData object. I don't think it would be too much work. I'm a little wary of adding more deprecated constructors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Taking a closer look, it looks like there was an effort to build the response directly and not pass in data structures (new maps) that will just be converted via toData.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I started implementing this but I do think it's getting out of scope of this PR. These are my initial changes, to finish I think we want to remove PartitionResponse completely and replace it with PartitionProduceResponse, otherwise were just moving around the conversion. Having a deprecated constructor isn't ideal but I think we should remove it with KAFKA-10730, not this. @jolshan what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
KAFKA-10730 is a pretty dormant JIRA. I do agree that there is some level of conversion. I wonder if folks have a strong opinion about this conversion still.
Looking into this further, I see the change would need to be made to appendRecords and the ProducePartitionStatus. It doesn't look too crazy, but also understandable this is not the scope for this PR.
I wonder if KAFKA-9682 was premature in deprecating the constructor. I guess our options are leaving it deprecated and adding a deprecated method or removing the deprecation until KAFKA-10730 is completed. (I almost just want to fix it so this doesn't happen in the future 😂 )
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll try to find time to finish the changes for KAFKA-10730, I think refactoring the tests would take some time but overall I agree it doesn't seem too big.
I'm ok with removing the deprecation, but I suspect the incentive to do the refactoring will be lost, so leaving it for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chb2ab It makes sense to do KAFKA-10730. But i agree with @jolshan that it is outside of the scope of the PR, should be done independently.
How about merging the PR with ctor as @deprecated
? And then do follow-up PR for KAFKA-10730(tackling the new ctor as well).
@@ -210,6 +238,12 @@ public String toString() { | |||
b.append(logStartOffset); | |||
b.append(", recordErrors: "); | |||
b.append(recordErrors); | |||
b.append(", currentLeader: "); | |||
if (currentLeader != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that the following lines could be simply String.valueOf(currentLeader)
, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In fact, the errorMessage
bit could be redone that way too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In fact, I think that the StringBuilder
code checks for null
s in its append()
method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looking at the java docs I think you're right, this could be replaced by b.append(currentLeader)
. I'm not sure why errorMessage
was written this way, it looks like it was changed explicitly in this commit but I don't see a reason for it, I could probably change this as well.
@@ -53,7 +53,9 @@ | |||
// | |||
// Version 15 adds the ReplicaState which includes new field ReplicaEpoch and the ReplicaId. Also, | |||
// deprecate the old ReplicaId field and set its default value to -1. (KIP-903) | |||
"validVersions": "0-15", | |||
// | |||
// Version 16 is the same as version 15. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At the risk of proving my ignorance, why do we bump the version number if nothing has changed? Is it so that the request the same version number as the response (which is bumped)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, from my reading the version of the response is based on the version of the request, so we need to bump both.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chb2ab that's correct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Request and response version should always be the same :)
@@ -33,7 +33,7 @@ | |||
// Starting in Version 8, response has RecordErrors and ErrorMessage. See KIP-467. | |||
// | |||
// Version 9 enables flexible versions. | |||
"validVersions": "0-9", | |||
"validVersions": "0-10", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it make sense to add a comment about the version bump?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, forgot to include that.
case Left(x) => | ||
debug(s"Unable to retrieve local leaderId and Epoch with error $x, falling back to metadata cache") | ||
val partitionInfo = metadataCache.getPartitionInfo(tp.topic, tp.partition) | ||
partitionInfo.foreach { info => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This foreach
loop is overwriting the leaderId
and leaderEpoch
each time. Is that intentional? Is there a benefit to looping vs. just grabbing the last entry in the collection?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at other uses of partitionInfo I think this is a style choice. There can only be 1 partitionInfo in the getPartitionInfo object, so the forEach should only ever access 1 entry, I think this is just a more succinct way of accessing it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getPartitionInfo returns an option. If it exists, foreach will access it. If it doesn't foreach does nothing. This is a common pattern in scala. Are we considering the case when the partition is not present?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We also don't need to set vars here. We could have a statement where we return a tuple or even just the partitionInfo.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
partitionInfo = partitionInfoOrError match {
case Right(partitionInfo) =>
partitionInfo
case Left(error) =>
debug(s"Unable to retrieve local leaderId and Epoch with error $error, falling back to metadata cache")
metadataCache.getPartitionInfo(tp.topic, tp.partition) match {
case Some(partitionInfo) => partitionInfo
case None => handle case where we don't have the partition
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the confusion was that foreach implies a list of elements, but there can only be 1 here. I like the use of tuple and match/case here though, I will update to that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, using foreach for scala options is a common pattern.
Thank you Kirk, yeah I am going to wait for the KIP to be approved before making more changes |
leaderEpoch = x.getLeaderEpoch | ||
case Left(x) => | ||
debug(s"Unable to retrieve local leaderId and Epoch with error $x, falling back to metadata cache") | ||
val partitionInfo = metadataCache.getPartitionInfo(tp.topic, tp.partition) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any chance partitionInfo can be null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so, getPartitionInfo returns an Option, the equivalent of null would be an empty option. We don't seem to null check this value elsewhere either.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see
@@ -32,7 +32,9 @@ | |||
// records that cause the whole batch to be dropped. See KIP-467 for details. | |||
// | |||
// Version 9 enables flexible versions. | |||
"validVersions": "0-9", | |||
// | |||
// Version 10 adds 'CurrentLeader' and 'NodeEndpoints' as tagged fields |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to bump the version if we are just adding tagged fields?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if this is absolutely necessary, I was going based off the KIP, but I do think there could be an issue with leaving the version the same. If a client is still using the old protocol definition and the server returns a message based on the new definition but with the same version number, wouldn't the client deserialize it incorrectly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
New tagged fields would be unknown to older clients, so they would ignore them. It would not affect their ability to deserialize.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, I'm fine with keeping the version the same, but we should update the KIP. @msn-tldr do you see any issues with this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't realize this was brought up in previous discussions, it looks like we decided to bump up the version # to make it clearer which clients have implemented the feature, in an email from @dajac
Personally, I would rather prefer to bump both versions and to add the
tagged fields. This would allow us to better reason about what the client
is supposed to do when we see the version on the server side. Otherwise, we
will never know if the client uses this or not.
@hachikuji does this sound good?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jolshan +1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hachikuji Yeah, your point is totally valid. I was pushing for this with the java client (and potentially librdkafka) in mind. I think that it will make request analysis easier as you said.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jolshan I think that it is hard to really define a policy for this. It mainly depends on whether there is a justification to require an epoch bump or not. In this case, I believe that there is one but this may not always be true.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm also wondering if there is a flaw with this approach. This would mean we need to bump MV after all for inter broker requests right? #14444 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I bumped the MV for fetch requests in 3.7, I think it's fine since it isn't released yet, lmk if you see any issues.
6751ab0
to
f5c24ac
Compare
@@ -338,7 +338,7 @@ public boolean isControllerRegistrationSupported() { | |||
|
|||
public short fetchRequestVersion() { | |||
if (this.isAtLeast(IBP_3_5_IV1)) { | |||
return 15; | |||
return 16; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm. Is this correct? In the upgrade scenario we will send request version 16 to brokers that may not have that version yet. I know we just ignore tagged fields, but I'm not sure I recall if we can handle version bumps.
If this is always just the latest version, should it be hardcoded?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you're right that this could cause issues during upgrade, I think using the latest version should be safe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm just trying to figure out if we would see unsupported version errors during upgrades. I think we might.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clusters with ibp 3.5 were guaranteed to support version 15 since that was the version we defined the ibp. I don't think we can just change the version because some clusters will have ibp 3.5, but not version 16 fetch requests. We could avoid this with the tagged fields, but since we are bumping the version, we run into a problem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I hadn't thought this through. I think this would need an IBP version bump to avoid errors during upgrades?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll remove this version bump, I hadn't realized it was only for replication fetches.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I remember why I added this now, we have a test that checks we're using the latest version for various API's. I guess we would need to remove that for FETCH.
kafka/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
Lines 120 to 131 in 13b2edd
@Test | |
def shouldSendLatestRequestVersionsByDefault(): Unit = { | |
val props = TestUtils.createBrokerConfig(1, "localhost:1234") | |
val config = KafkaConfig.fromProps(props) | |
val replicaManager: ReplicaManager = mock(classOf[ReplicaManager]) | |
when(replicaManager.brokerTopicStats).thenReturn(mock(classOf[BrokerTopicStats])) | |
assertEquals(ApiKeys.FETCH.latestVersion, config.interBrokerProtocolVersion.fetchRequestVersion()) | |
assertEquals(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion, config.interBrokerProtocolVersion.offsetForLeaderEpochRequestVersion) | |
assertEquals(ApiKeys.LIST_OFFSETS.latestVersion, config.interBrokerProtocolVersion.listOffsetRequestVersion) | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I chatted with David J about this offline. Since we are using MV/IBP bumps for fetches, a simple thing to do would be to pick up the newest MV for this release and include the fetch bump here.
The alternative is setting up the fetch path to use ApiVersions to ensure the correct version. But that might be out of scope for this change.
With either of these approaches we can keep the latest version for the replication fetches which would make things a little clearer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we are using MV/IBP bumps for fetches, a simple thing to do would be to pick up the newest MV for this release and include the fetch bump here
ok, so my understanding is instead of bumping IBP to IBP_3_7_IV1
we would wait for the release of IBP_3_8_IV0
to bump the fetch version here to 16.
The alternative is setting up the fetch path to use ApiVersions to ensure the correct version. But that might be out of scope for this change.
yeah, I would need to look more into this, I'm not familiar enough to know how it might look.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we are using MV/IBP bumps for fetches, a simple thing to do would be to pick up the newest MV for this release and include the fetch bump here
ok, so my understanding is instead of bumping IBP to IBP_3_7_IV1 we would wait for the release of IBP_3_8_IV0 to bump the fetch version here to 16.
I should have made this a question. I confused myself with the release versions, I think we can use IBP_3_7_IV0 to bump the fetch version to 16 since it hasn't been released yet, let me know if it makes sense to you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
left a few comments
case None => (-1, -1) | ||
} | ||
} | ||
val leaderNode: Node = metadataCache.getAliveBrokerNode(leaderId, config.interBrokerListenerName).getOrElse({ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we shouldn't be passing through the interbroker listener name, we should be using the listener used by the original request to be consistent with the metadata request.
Is it simpler if we just consult the metadata cache? In KRaft mode, the metadata cache is the source of truth for partition leadership and is updated before the partition state gets updated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will update the listener name.
I think the motivation for checking replica manager first is it may be faster than metadata cache.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
my point in the previous comment is that will never be the case with KRaft.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, I think replica manager is the more commonly used path which may have caching benefits but I'm not sure, does that make sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm -- I'm not sure I understand "more commonly used path".
ReplicaManager will have the partition if the broker hosts it. The metadatacache is meant to be a cache of all the partitions, so I don't think it loses out on "caching benefits"
The benefit of the replica manager is that it also contains the log of the partition. If the metadata cache is sufficient (which it seems to be) we should probably just use that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I looked more into this and I see replica manager looks up the partition from a Pool object while metadata cache looks it up in the current image and creates a new UpdateMetadataPartitionState to return. I think we can avoid an allocation using the replica manager, also since the fetch/produce paths should have recently tried to read through replica manager I think it's more likely to give an in-memory cache hit than the metadata path. It still seems better to me to try from replica manager first, what do you all think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the extra object allocation is not a big issue, since the new leader and new leader lookup are not done in the common case, only in erroring cases.
populating the new leader state from the Partition
also doesn't work for cases where the partition gets deleted from the leader, for instance in cases with reassignments, so populating from the metadata cache is both more likely to have up-to-date information (in KRaft mode, which we should assume to be the default) and it handles NotLeader in more cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The metadata cache having more up to date information makes sense to me, but I don't follow the deletion case, would reading from the replica manager not return NOT_LEADER_OR_FOLLOWER there? It seems like we should still fallback to the metadata cache in that case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We would but we would actually look it up in the metadata cache twice in that path :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, so my understanding is in the case of partition reassignments it would be better to go directly to metadata cache, but when moving leadership within the replica set it is better to go to replica manager first. I think we should prioritize moving leadership within the replica set here since it seems more common, what do you all think?
if (request.header.apiVersion >= 10) { | ||
status.currentLeader = { | ||
status.error match { | ||
case Errors.NOT_LEADER_OR_FOLLOWER | Errors.FENCED_LEADER_EPOCH => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
produce requests should never receive FENCED_LEADER_EPOCH.
also, shouldn't this go in the above if
block?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will move this into the if block.
why can't produce receive FENCED_LEADER_EPOCH?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the error is only returned on fetch requests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, removed FENCED_LEADER_EPOCH from the produce path
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
produce requests do not include a leader epoch => they can never get fenced leader epoch.
I reran all the failing tests locally and they passed, I'm not sure if there's anything else that needs to be done but they seem like flaky tests. |
} | ||
|
||
public PartitionResponse(Errors error, long baseOffset, long lastOffset, long logAppendTime, long logStartOffset, List<RecordError> recordErrors, String errorMessage) { | ||
this(error, baseOffset, lastOffset, logAppendTime, logStartOffset, recordErrors, errorMessage, new ProduceResponseData.LeaderIdAndEpoch()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we remove the ProduceResponseData prefixes here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or even better, can we just leave empty and use a default? Or does that bloat the constructors more?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I addressed this, the currentLeader parameter was unused so I removed it and set it to be a new LeaderIdAndEpoch by default. I also removed the prefix.
@@ -98,13 +116,20 @@ private static ProduceResponseData toData(Map<TopicPartition, PartitionResponse> | |||
.setLogAppendTimeMs(response.logAppendTime) | |||
.setErrorMessage(response.errorMessage) | |||
.setErrorCode(response.error.code()) | |||
.setCurrentLeader(response.currentLeader != null ? response.currentLeader : new LeaderIdAndEpoch()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to set anything here if the response is null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or alternatively pass in the default and not have to do a check here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
based on the change I made in handleProduceRequest I don't think currentLeader can be null anymore, I removed this check
]} | ||
]}, | ||
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true, "default": "0", | ||
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." } | ||
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, | ||
{ "name": "NodeEndpoints", "type": "[]NodeEndpoint", "versions": "10+", "taggedVersions": "10+", "tag": 0, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we be using the same tag as the CurrentLeader field?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same response as in FetchResponse.json
@@ -102,6 +104,15 @@ | |||
"about": "The preferred read replica for the consumer to use on its next fetch request"}, | |||
{ "name": "Records", "type": "records", "versions": "0+", "nullableVersions": "0+", "about": "The record data."} | |||
]} | |||
]}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we be using the same tag here as diverging epoch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it looks like tags are scoped to the list level so this isn't really the same tag. They also need to be contiguous within their scope so this gives an error if I try to tag NodeEndpoints to something other than 0.
.setLeaderId(leaderNode.leaderId) | ||
.setLeaderEpoch(leaderNode.leaderEpoch) | ||
case _ => | ||
null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could this just be the default leaderIdAndEpoch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, looking again the currentLeader should already be set to the default, I removed the allocation
@@ -125,7 +125,6 @@ class ReplicaFetcherThreadTest { | |||
val replicaManager: ReplicaManager = mock(classOf[ReplicaManager]) | |||
when(replicaManager.brokerTopicStats).thenReturn(mock(classOf[BrokerTopicStats])) | |||
|
|||
assertEquals(ApiKeys.FETCH.latestVersion, config.interBrokerProtocolVersion.fetchRequestVersion()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm -- not sure if we want to remove this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we do plan to address the MV on a followup, we should definitely call it out and file a JIRA that is a blocker for 3.7
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it looks like IBP_3_7_IV0 was added already, I was confused. I bumped up the fetch version so removing this isn't necessary anymore
@chb2ab is there a JIRA for this work? If not, can we create one and format the title as the jira title? |
a037af9
to
b895880
Compare
done, lmk if anything's missing, it's my first AK JIRA. https://issues.apache.org/jira/browse/KAFKA-15661 Thank you for your reviews everyone btw. |
Separating out the protocol changes from #14444 in an effort to more quickly unblock the client side PR. This is the protocol changes to populate the fields in KIP-951. On NOT_LEADER_OR_FOLLOWER errors in both FETCH and PRODUCE the new leader ID and epoch are included in the response. The endpoint for the new leader is retrieved from the metadata cache. The new fields are all optional (tagged) and an IBP bump is required. https://cwiki.apache.org/confluence/display/KAFKA/KIP-951%3A+Leader+discovery+optimisations+for+the+client Reviewers: Justine Olshan <jolshan@confluent.io>, Mayank Shekhar Narula <mayanks.narula@gmail.com>
…ection. Populate new leaderid, epoch, and node to return with NOT_LEADER_OR_FOLLOWER errors
4242975
to
9a8402d
Compare
Thank you. I think the build is failing for something unrelated, could we try restarting it? This was the error
|
These build failures are out of control. I have to rebuild again. |
I've seen all these failures on other builds (both 3.6 and trunk) today and yesterday. |
This reverts commit f38b0d8.
This reverts commit f38b0d8. Trying to find the root cause of org.apache.kafka.tiered.storage.integration.ReassignReplicaShrinkTest failing in CI. Reviewers: Justine Olshan <jolshan@confluent.io>
…)" (apache#14738)" This reverts commit a98bd7d.
…14738)" (#14747) This KIP-951 commit was reverted to investigate the org.apache.kafka.tiered.storage.integration.ReassignReplicaShrinkTest test failure (#14738). A fix for that was merged in #14757, hence unreverting this change. This reverts commit a98bd7d. Reviewers: Justine Olshan <jolshan@confluent.io>, Mayank Shekhar Narula <mayanks.narula@gmail.com>
This is the server side changes to populate the fields in KIP-951. On NOT_LEADER_OR_FOLLOWER errors in both FETCH and PRODUCE the new leader ID and epoch are retrieved from the local cache through ReplicaManager and included in the response, falling back to the metadata cache if they are unavailable there. The endpoint for the new leader is retrieved from the metadata cache. The new fields are all optional (tagged) and an IBP bump was required. https://cwiki.apache.org/confluence/display/KAFKA/KIP-951%3A+Leader+discovery+optimisations+for+the+client https://issues.apache.org/jira/browse/KAFKA-15661 Protocol changes: apache#14627 Testing Benchmarking described here https://cwiki.apache.org/confluence/display/KAFKA/KIP-951%3A+Leader+discovery+optimisations+for+the+client#KIP951:Leaderdiscoveryoptimisationsfortheclient-BenchmarkResults ./gradlew core:test --tests kafka.server.KafkaApisTest Reviewers: Justine Olshan <jolshan@confluent.io>, David Jacot <djacot@confluent.io>, Jason Gustafson <jason@confluent.io>, Fred Zheng <zhengyd2014@gmail.com>, Mayank Shekhar Narula <mayanks.narula@gmail.com>, Yang Yang <yayang@uber.com>, David Mao <dmao@confluent.io>, Kirk True <ktrue@confluent.io>
…ache#14738) This reverts commit f38b0d8. Trying to find the root cause of org.apache.kafka.tiered.storage.integration.ReassignReplicaShrinkTest failing in CI. Reviewers: Justine Olshan <jolshan@confluent.io>
…)" (apache#14738)" (apache#14747) This KIP-951 commit was reverted to investigate the org.apache.kafka.tiered.storage.integration.ReassignReplicaShrinkTest test failure (apache#14738). A fix for that was merged in apache#14757, hence unreverting this change. This reverts commit a98bd7d. Reviewers: Justine Olshan <jolshan@confluent.io>, Mayank Shekhar Narula <mayanks.narula@gmail.com>
I was using the ZERO_UUID topicId instead of the actual topicId in the testFetchResponseContainsNewLeaderOnNotLeaderOrFollower introduced in #14444, updating as the actual topicId is more correct. Reviewers: Justine Olshan <jolshan@confluent.io>
Separating out the protocol changes from apache#14444 in an effort to more quickly unblock the client side PR. This is the protocol changes to populate the fields in KIP-951. On NOT_LEADER_OR_FOLLOWER errors in both FETCH and PRODUCE the new leader ID and epoch are included in the response. The endpoint for the new leader is retrieved from the metadata cache. The new fields are all optional (tagged) and an IBP bump is required. https://cwiki.apache.org/confluence/display/KAFKA/KIP-951%3A+Leader+discovery+optimisations+for+the+client Reviewers: Justine Olshan <jolshan@confluent.io>, Mayank Shekhar Narula <mayanks.narula@gmail.com>
This is the server side changes to populate the fields in KIP-951. On NOT_LEADER_OR_FOLLOWER errors in both FETCH and PRODUCE the new leader ID and epoch are retrieved from the local cache through ReplicaManager and included in the response, falling back to the metadata cache if they are unavailable there. The endpoint for the new leader is retrieved from the metadata cache. The new fields are all optional (tagged) and an IBP bump was required. https://cwiki.apache.org/confluence/display/KAFKA/KIP-951%3A+Leader+discovery+optimisations+for+the+client https://issues.apache.org/jira/browse/KAFKA-15661 Protocol changes: apache#14627 Testing Benchmarking described here https://cwiki.apache.org/confluence/display/KAFKA/KIP-951%3A+Leader+discovery+optimisations+for+the+client#KIP951:Leaderdiscoveryoptimisationsfortheclient-BenchmarkResults ./gradlew core:test --tests kafka.server.KafkaApisTest Reviewers: Justine Olshan <jolshan@confluent.io>, David Jacot <djacot@confluent.io>, Jason Gustafson <jason@confluent.io>, Fred Zheng <zhengyd2014@gmail.com>, Mayank Shekhar Narula <mayanks.narula@gmail.com>, Yang Yang <yayang@uber.com>, David Mao <dmao@confluent.io>, Kirk True <ktrue@confluent.io>
…ache#14738) This reverts commit f38b0d8. Trying to find the root cause of org.apache.kafka.tiered.storage.integration.ReassignReplicaShrinkTest failing in CI. Reviewers: Justine Olshan <jolshan@confluent.io>
…)" (apache#14738)" (apache#14747) This KIP-951 commit was reverted to investigate the org.apache.kafka.tiered.storage.integration.ReassignReplicaShrinkTest test failure (apache#14738). A fix for that was merged in apache#14757, hence unreverting this change. This reverts commit a98bd7d. Reviewers: Justine Olshan <jolshan@confluent.io>, Mayank Shekhar Narula <mayanks.narula@gmail.com>
I was using the ZERO_UUID topicId instead of the actual topicId in the testFetchResponseContainsNewLeaderOnNotLeaderOrFollower introduced in apache#14444, updating as the actual topicId is more correct. Reviewers: Justine Olshan <jolshan@confluent.io>
Separating out the protocol changes from apache#14444 in an effort to more quickly unblock the client side PR. This is the protocol changes to populate the fields in KIP-951. On NOT_LEADER_OR_FOLLOWER errors in both FETCH and PRODUCE the new leader ID and epoch are included in the response. The endpoint for the new leader is retrieved from the metadata cache. The new fields are all optional (tagged) and an IBP bump is required. https://cwiki.apache.org/confluence/display/KAFKA/KIP-951%3A+Leader+discovery+optimisations+for+the+client Reviewers: Justine Olshan <jolshan@confluent.io>, Mayank Shekhar Narula <mayanks.narula@gmail.com>
This is the server side changes to populate the fields in KIP-951. On NOT_LEADER_OR_FOLLOWER errors in both FETCH and PRODUCE the new leader ID and epoch are retrieved from the local cache through ReplicaManager and included in the response, falling back to the metadata cache if they are unavailable there. The endpoint for the new leader is retrieved from the metadata cache. The new fields are all optional (tagged) and an IBP bump was required. https://cwiki.apache.org/confluence/display/KAFKA/KIP-951%3A+Leader+discovery+optimisations+for+the+client https://issues.apache.org/jira/browse/KAFKA-15661 Protocol changes: apache#14627 Testing Benchmarking described here https://cwiki.apache.org/confluence/display/KAFKA/KIP-951%3A+Leader+discovery+optimisations+for+the+client#KIP951:Leaderdiscoveryoptimisationsfortheclient-BenchmarkResults ./gradlew core:test --tests kafka.server.KafkaApisTest Reviewers: Justine Olshan <jolshan@confluent.io>, David Jacot <djacot@confluent.io>, Jason Gustafson <jason@confluent.io>, Fred Zheng <zhengyd2014@gmail.com>, Mayank Shekhar Narula <mayanks.narula@gmail.com>, Yang Yang <yayang@uber.com>, David Mao <dmao@confluent.io>, Kirk True <ktrue@confluent.io>
…ache#14738) This reverts commit f38b0d8. Trying to find the root cause of org.apache.kafka.tiered.storage.integration.ReassignReplicaShrinkTest failing in CI. Reviewers: Justine Olshan <jolshan@confluent.io>
…)" (apache#14738)" (apache#14747) This KIP-951 commit was reverted to investigate the org.apache.kafka.tiered.storage.integration.ReassignReplicaShrinkTest test failure (apache#14738). A fix for that was merged in apache#14757, hence unreverting this change. This reverts commit a98bd7d. Reviewers: Justine Olshan <jolshan@confluent.io>, Mayank Shekhar Narula <mayanks.narula@gmail.com>
This is the server side changes to populate the fields in KIP-951. On NOT_LEADER_OR_FOLLOWER errors in both FETCH and PRODUCE the new leader ID and epoch are retrieved from the local cache through ReplicaManager and included in the response, falling back to the metadata cache if they are unavailable there. The endpoint for the new leader is retrieved from the metadata cache. The new fields are all optional (tagged) and an IBP bump was required. https://cwiki.apache.org/confluence/display/KAFKA/KIP-951%3A+Leader+discovery+optimisations+for+the+client https://issues.apache.org/jira/browse/KAFKA-15661 Protocol changes: apache#14627 Testing Benchmarking described here https://cwiki.apache.org/confluence/display/KAFKA/KIP-951%3A+Leader+discovery+optimisations+for+the+client#KIP951:Leaderdiscoveryoptimisationsfortheclient-BenchmarkResults ./gradlew core:test --tests kafka.server.KafkaApisTest Reviewers: Justine Olshan <jolshan@confluent.io>, David Jacot <djacot@confluent.io>, Jason Gustafson <jason@confluent.io>, Fred Zheng <zhengyd2014@gmail.com>, Mayank Shekhar Narula <mayanks.narula@gmail.com>, Yang Yang <yayang@uber.com>, David Mao <dmao@confluent.io>, Kirk True <ktrue@confluent.io>
…ache#14738) This reverts commit f38b0d8. Trying to find the root cause of org.apache.kafka.tiered.storage.integration.ReassignReplicaShrinkTest failing in CI. Reviewers: Justine Olshan <jolshan@confluent.io>
…)" (apache#14738)" (apache#14747) This KIP-951 commit was reverted to investigate the org.apache.kafka.tiered.storage.integration.ReassignReplicaShrinkTest test failure (apache#14738). A fix for that was merged in apache#14757, hence unreverting this change. This reverts commit a98bd7d. Reviewers: Justine Olshan <jolshan@confluent.io>, Mayank Shekhar Narula <mayanks.narula@gmail.com>
I was using the ZERO_UUID topicId instead of the actual topicId in the testFetchResponseContainsNewLeaderOnNotLeaderOrFollower introduced in apache#14444, updating as the actual topicId is more correct. Reviewers: Justine Olshan <jolshan@confluent.io>
I was using the ZERO_UUID topicId instead of the actual topicId in the testFetchResponseContainsNewLeaderOnNotLeaderOrFollower introduced in apache#14444, updating as the actual topicId is more correct. Reviewers: Justine Olshan <jolshan@confluent.io>
This is the server side changes to populate the fields in KIP-951. On NOT_LEADER_OR_FOLLOWER errors in both FETCH and PRODUCE the new leader ID and epoch are retrieved from the local cache through ReplicaManager and included in the response, falling back to the metadata cache if they are unavailable there. The endpoint for the new leader is retrieved from the metadata cache. The new fields are all optional (tagged) and an IBP bump is not required.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-951%3A+Leader+discovery+optimisations+for+the+client
https://issues.apache.org/jira/browse/KAFKA-15661
Protocol changes: #14627
Testing
Benchmarking described here https://cwiki.apache.org/confluence/display/KAFKA/KIP-951%3A+Leader+discovery+optimisations+for+the+client#KIP951:Leaderdiscoveryoptimisationsfortheclient-BenchmarkResults
./gradlew core:test --tests kafka.server.KafkaApisTest
Committer Checklist (excluded from commit message)