-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-8662; Fix producer metadata error handling and consumer manual assignment #7086
KAFKA-8662; Fix producer metadata error handling and consumer manual assignment #7086
Conversation
*/ | ||
@Test(expected = classOf[TopicAuthorizationException]) | ||
@Test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are the consumer and adminclient cases tested already?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ijuma Thanks for the review. This test verifies producer and consumer. AdminClient doesn't cache topics in its metadata, so that should be ok.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test comment only mentions the producer, so should probably be updated. Since this class is about end to end testing, I don't think it should care about implementation details. That is, we should have at least one test for the admin client where a batch of topics is partially authorized.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rajinisivaram Good find and thanks for the fix. If I understand correctly, the solution here is to only raise metadata errors for the specific topic that a send
is directed to. If the user sends to 'foo', they will only see fatal errors or recoverable metadata errors for 'foo', which is clear and intuitive.
So if there is an unauthorized topic, we will continue holding it in Metadata and we will continue fetching it. Just the error will be ignored unless the user attempts another send
to it. After reading the description of the bug, I had sort of expected that the solution would be to drop the invalid or unauthorized topic from metadata rather than waiting for the expiration. I'm wondering if it would be reasonable to do this in addition to the focused exception handling that is implemented here. Thoughts?
Anyway, I'll take a closer look at the PR tomorrow. Just wanted to make sure I understood things high level. By the way, the consumer bug seems unrelated. I don't feel too strongly about it, but it seems like it could be a separate patch?
@hachikuji Thanks for the review. Yes, the current fix takes us back to the behaviour pre-2.3.0. We retain the topic in Metadata after authorization exception, but throw exception only if it corresponds to the topic in the request. The topic is removed only on expiry. I did consider removing it in this PR, but decided it was safer to keep the previous behaviour. In the good path case, when you set an ACL and then produce to a topic, there is a timing window when you can see a TopicAuthorizationException because the cache on the leader broker was not yet updated. In this case, leaving the topic in there means that the next request after ACL was refreshed would succeed. I wasn't sure if removing the topic would cause an unnecessary delay in this case. I also thought I could remove the topic just for InvalidTopicException in this PR, since invalid topics dont suddenly become valid, but then again I wasn't sure if for example a deleted topic would appear as invalid topic. Finally went with the option of retaining old behaviour. It is possible that we used to have that behaviour simply because it was harder to remove the topic from the metadata and propagate the exception in the old code. So it is worth investigating and fixing this. But perhaps in a separate PR? |
Yes, I'd suggest restoring pre 2.3.0 behavior, cherry-picking the fix to the 2.3 branch and any other riskier changes to be done via a separate PR in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good overall. I just had one question
@@ -272,24 +267,22 @@ public synchronized void update(int requestVersion, MetadataResponse response, l | |||
|
|||
private void maybeSetMetadataError(Cluster cluster) { | |||
// if we encounter any invalid topics, cache the exception to later throw to the user |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this comment seems a little stale. We could probably just drop it.
return updateVersion() > lastVersion || isClosed(); | ||
boolean done = updateVersion() > lastVersion || isClosed(); | ||
// Propagate fatal exceptions if we haven't yet processed required metadata version to avoid unnecessary wait. | ||
// If metadata has been updated to the required version, don't clear error state so that caller can process |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure I follow this. Aren't fatal errors always thrown before topic errors?
@@ -289,27 +288,80 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas | |||
} | |||
} | |||
|
|||
protected def setAclsAndProduce(tp: TopicPartition) { | |||
private def setAcls(tp: TopicPartition) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe setReadAndWriteAcls
?
@rajinisivaram I pushed a minor change so that |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. I will let someone else take a quick look since I made some small tweaks.
Failures unrelated, JDK 11 with Scala 2.12 failed while checking out repo due to DNS resolution failure |
retest this please |
@hachikuji Thanks for the review and updating |
…assignment (#7086) Ensure that Producer#send() throws topic metadata exceptions only for the topic being sent to and not for other topics in the producer's metadata instance. Also removes topics from consumer's metadata when a topic is removed using manual assignment. Reviewers: Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk>
…assignment (apache#7086) Ensure that Producer#send() throws topic metadata exceptions only for the topic being sent to and not for other topics in the producer's metadata instance. Also removes topics from consumer's metadata when a topic is removed using manual assignment. Reviewers: Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk>
* apache-github/2.3: MINOR: Update documentation for enabling optimizations (apache#7099) MINOR: Remove stale streams producer retry default docs. (apache#6844) KAFKA-8635; Skip client poll in Sender loop when no request is sent (apache#7085) KAFKA-8615: Change to track partition time breaks TimestampExtractor (apache#7054) KAFKA-8670; Fix exception for kafka-topics.sh --describe without --topic mentioned (apache#7094) KAFKA-8602: Separate PR for 2.3 branch (apache#7092) KAFKA-8530; Check for topic authorization errors in OffsetFetch response (apache#6928) KAFKA-8662; Fix producer metadata error handling and consumer manual assignment (apache#7086) KAFKA-8637: WriteBatch objects leak off-heap memory (apache#7050) KAFKA-8620: fix NPE due to race condition during shutdown while rebalancing (apache#7021) HOT FIX: close RocksDB objects in correct order (apache#7076) KAFKA-7157: Fix handling of nulls in TimestampConverter (apache#7070) KAFKA-6605: Fix NPE in Flatten when optional Struct is null (apache#5705) Fixes apache#8198 KStreams testing docs use non-existent method pipe (apache#6678) KAFKA-5998: fix checkpointableOffsets handling (apache#7030) KAFKA-8653; Default rebalance timeout to session timeout for JoinGroup v0 (apache#7072) KAFKA-8591; WorkerConfigTransformer NPE on connector configuration reloading (apache#6991) MINOR: add upgrade text (apache#7013) Bump version to 2.3.1-SNAPSHOT
…ndling and consumer manual assignment (apache#7086) TICKET = KAFKA-8662 LI_DESCRIPTION = EXIT_CRITERIA = HASH [27eba24] ORIGINAL_DESCRIPTION = Ensure that Producer#send() throws topic metadata exceptions only for the topic being sent to and not for other topics in the producer's metadata instance. Also removes topics from consumer's metadata when a topic is removed using manual assignment. Reviewers: Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk> (cherry picked from commit 27eba24)
Producer adds a topic to its Metadata instance when send is requested. If metadata request for the topic fails (e.g. due to authorization failure), we retain the topic in Metadata and continue to attempt refresh until a hard-coded expiry time of 5 minutes. Due to changes introduced in 460e46c, subsequent sends to any topic including valid authorized topics report authorization failures in any topic in the metadata, rather than just the topic to which send is requested. As a result, the producer remains unusable for 5 minutes if a send is requested on an unauthorized topic. This PR fails send only if metadata for the topic being sent to has an error (or there is a fatal exception like authentication failure).
Consumer adds a topic to its Metadata instance on
subscribe()
orassign()
. Even thoughassign()
is not incremental and replaces existing assignment, new assignments were being added to existing topics in SubscriptionState#groupSubscriptions, which is used for fetching topic metadata. This PR does a replace for manual assignment alone.Committer Checklist (excluded from commit message)