-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-4635: Client Compatibility follow-ups #2414
Conversation
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
@@ -13,18 +13,18 @@ | |||
package org.apache.kafka.common.errors; | |||
|
|||
/** | |||
* Indicates that a request cannot be completed because an obsolete broker | |||
* Indicates that a request cannot be completed because an outdated broker | |||
* does not support the required functionality. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking about this and it seems that we can also get this error if the client requires a protocol version that is too old for the broker. If that's correct, then maybe the name could be misleading.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is a good point. We should have a separate exception for when the client is outdated.
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
One additional thing: it would be good to mention in |
I stole the upgrade notes bit and added it to #2445 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR, left a few minor comments.
if (header.apiVersion() == latestClientVersion) { | ||
log.trace("Sending {} to node {}.", request, nodeId); | ||
} else { | ||
log.debug("Using older server API v{} to send {} to node {}.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@junrao, is this what you had in mind regarding logging of cases where we are downgrading the protocol version?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this looks good to me.
@@ -18,6 +18,9 @@ | |||
Users upgrading from versions prior to 0.10.1.x should follow the upgrade guide <a href="#upgrade_10_1">here</a>. Users upgrading from 0.10.1.0 | |||
can upgrade the brokers one at a time: shut down the broker, update the code, and restart it. | |||
|
|||
<p>Starting with version 0.10.2, the Kafka Java client is compatible with older brokers. Version 0.10.2 clients can talk to version 0.10.0 or newer brokers. | |||
However, if your brokers are older than 0.10.0, you must upgrade the broker before rolling out new clients. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking about it, we should probably mention in the class javadoc of KafkaConsumer
and KafkaProducer
the behaviour with regards to older brokers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah.
P.S. I removed the release notes from this patch, since it is being moved to KAFKA-4578.
…r OffsetAndTimestamp
* Convert OutdatedBrokerException -> UnsupportedVersionException. * NodeApiVersions: distinguish versions which are too old from those which are too new
Thanks for the reviews, all. As requested, I removed the release notes from this patch, since it is being moved to KAFKA-4578. Let me rebase to get rid of these conflicts.... |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
@@ -88,7 +87,7 @@ public Builder setTargetTimes(Map<TopicPartition, Long> partitionTimestamps) { | |||
public ListOffsetRequest build() { | |||
short version = version(); | |||
if (version < minVersion) { | |||
throw new ObsoleteBrokerException("The broker is too old to send this request."); | |||
throw new UnsupportedVersionException("The broker is too old to understand this request."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel bad for the broker. It's not too old; it's just fallen behind the times a bit.
By the way, could we add some more info to this message, such as which api is used, which version is needed and which is supported by the broker. Same for the other below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I added information about the API being called and the version sent (versus required) to the exception.
For the exception below, it should not actually be reachable unless there is a programming error, so it is better off as a RuntimeException rather than an UnsupportedVersionException.
*/ | ||
public final class OffsetAndTimestamp { | ||
// The timestamp should never be negative, unless it is invalid. This could happen when |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment still makes me uncomfortable. For a public-facing class, it would be nice to have surer guarantee about the class invariants (e.g. by checking that the timestamp is not negative in the constructor). I wonder if there is a different way we can represent the sentinel value internally in Fetcher.handleListOffsetResponse
. For example maybe we just need a static instance that we can use in Fetcher
whenever we need the sentinel and which we can check for using ==
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that allowing negatives here feels uncomfortable. Unfortunately, it reflects the fundamental fact that old servers are not sending back timestamps, and new servers are. I tried to create a totally separate code path for the old servers, but it was simply too much code duplication. Also unfortunately, because we need to read the offset from the structures, we cannot use static instances.
Probably the best way around this problem is to stop using the public type internally. Then we can have an internal type where the timestamp is optional, and a public type where it is not. This also makes it less likely that we'll break the public API by making an internal change, so it shoudl be a good thing overall. The downside is that copying will be needed, but I don't think the amount of copying is significant for this use-case.
@@ -290,19 +289,16 @@ private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long | |||
// information itself. It is also the case when discoverBrokerVersions is set to false. | |||
if (versionInfo == null) { | |||
if ((!discoverBrokerVersions) && (log.isTraceEnabled())) | |||
log.trace("No version information found when sending message of type {} to node {}", | |||
clientRequest.apiKey(), nodeId); | |||
log.trace("No version information found when sending message of type {} to node {}. " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: We use a single space after a period throughout the rest of the code. It would be good to stay consistent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
this.timestamp = timestamp; | ||
assert(this.timestamp >= 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We typically avoid Java language asserts because they are disabled by default.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates, 2 minor comments, LGTM otherwise.
@@ -290,19 +289,16 @@ private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long | |||
// information itself. It is also the case when discoverBrokerVersions is set to false. | |||
if (versionInfo == null) { | |||
if ((!discoverBrokerVersions) && (log.isTraceEnabled())) | |||
log.trace("No version information found when sending message of type {} to node {}", | |||
clientRequest.apiKey(), nodeId); | |||
log.trace("No version information found when sending message of type {} to node {}. " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this log message correct? It seems to imply that we looked for the version information and didn't find it. But it seems like the feature is simply disabled due to discoverBrokerVersions = false
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch! The sense of the if statement is inverted-- let me fix that.
@@ -60,7 +60,7 @@ public boolean isAllTopicPartitions() { | |||
@Override | |||
public OffsetFetchRequest build() { | |||
if (isAllTopicPartitions() && version() < 2) | |||
throw new ObsoleteBrokerException("The broker is too old to send this request."); | |||
throw new UnsupportedVersionException("The broker is too old to understand this request."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to include version
in this message?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks for the fixes! One tiny nit which can be fixed when merging.
} | ||
return usableVersion; | ||
if (usableVersion == API_NOT_ON_NODE) | ||
throw new UnsupportedVersionException("The remote node does not support " + apiKey); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: "node" is a bit generic for a user message. Maybe it should be "broker"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess the idea here is that NodeApiVersions is intended to handle arbitrary nodes, not just brokers. The idea is that it should be able to be used server-side eventually. Maybe "peer" or "remote" would be a better name, but then the class should be renamed to PeerApiVersions / RemoteApiVersions, and that seemed like a big change for a cleanup.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In Kafka, brokers are still called brokers when the communication is between brokers (the inter.broker.protocol.version
config, for example). So, it would not look out of place although I am aware that other projects use different wording, as you said.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The thought was that we would use it when communicating with clients as well, not just brokers.
I can change it for now if it seems too vague, though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed the message to use broker
for now. It's true that it looks a bit out of place given the class name, but the class is internal and we can rename it later. The message may be seen by users and it's good to make it as clear as possible.
I'll merge this to trunk and 0.10.2 once Jenkins gives the green light. |
Refer to this link for build results (access rights to CI server needed): |
Author: Colin P. Mccabe <cmccabe@confluent.io> Reviewers: Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk> Closes #2414 from cmccabe/KAFKA-4635 (cherry picked from commit 8827a5b) Signed-off-by: Ismael Juma <ismael@juma.me.uk>
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Author: Colin P. Mccabe <cmccabe@confluent.io> Reviewers: Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk> Closes apache#2414 from cmccabe/KAFKA-4635
No description provided.