-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-4591: Create Topic Policy (KIP-108) #2361
Conversation
Will post additional tests soon. |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Just few nits.
topicErrors.put(topic, Errors.forException(e)); | ||
Errors error = Errors.forException(e); | ||
// Avoid populating the error message if it's a generic one | ||
String message = error.message().equals(e.getMessage()) ? null : e.getMessage(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm... This seems kind of annoying for non-java clients. Is it really worth the savings?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it though? Don't they have to map error codes for all requests anyway? The idea was to only include an error message when it provided information over what the error code provides. Do you think sending it back always would be better?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess that's fair.
Map<String, Errors> errors = new HashMap<>(); | ||
errors.put("t1", Errors.INVALID_TOPIC_EXCEPTION); | ||
errors.put("t2", Errors.LEADER_NOT_AVAILABLE); | ||
Map<String, CreateTopicsResponse.Error> errors = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't want to have tests for the two versions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, working on that and more tests.
} catch { | ||
case e: Throwable => | ||
warn(s"Error processing create topic request for topic $topic with arguments $arguments", e) | ||
CreateTopicMetadata(topic, Map(), Errors.forException(e)) | ||
info(s"Error processing create topic request for topic $topic with arguments $arguments", e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why downgrade to info?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is related to the other comment of yours with regards to logging. To me, warn
implies that there's something that needs attention. Invalid topic creation requests are normal in self-serve environments. Does this not make sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess its a matter of taste :) IMO, it requires attention since it can indicate an unhappy user - maybe reaching out to the user to discuss why he's trying to do what he does? or to explain why he can't? Maybe not all admins are as caring :)
If you are not convinced, leave it as is. We can figure out if admins complain later :)
s"for the following topics: ${duplicateTopics.keySet.mkString(",")}" | ||
// We can send the error message in the response for version 1, so we don't have to log it any more | ||
if (request.header.apiVersion == 0) | ||
warn(errorMessage) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we still need to log. There's the client log and the broker log - IMO, create topic is rare enough and partially an SRE responsibility so the admins will want to monitor for failed create-topics too. Lets give them the option.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Create topics can be somewhat common in self-serve environments. This error only happens if someone sends duplicate topics in their requests. It's not clear to me why a SRE would care about that? It's clearly user error and whoever is creating a topic would just fix it and move on. It would be good to understand from @granthenke what was his reasoning for this warning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I'm with Ismael. Since we return the error message to the user in the new version, logging it on the broker seems redundant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a Kafka admin, I'd want it highlighted to my attention. But maybe I'm unique - we can leave it that was and see if anyone else cares :)
duplicateTopics.keySet.map((_, new CreateTopicsResponse.Error(Errors.INVALID_REQUEST, errorMessage))).toMap | ||
} else Map.empty | ||
|
||
val completeResults = results ++ duplicatedTopicsResults | ||
sendResponseCallback(completeResults) | ||
} | ||
|
||
adminManager.createTopics( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not 100% related - but there is a private createTopic method in this file (KafkaApis), and I got a bit confused on why it isn't used. It think it is only for internal topics, but maybe add a comment explaining?
continue; | ||
} else { | ||
throw new StreamsException("Could not create topic: " + internalTopicConfig.name() + ". " + createTopicsResponse.errors().get(internalTopicConfig.name()).name()); | ||
throw new StreamsException("Could not create topic: " + internalTopicConfig.name() + " due to " + error.message()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah, much better!
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Mentioned offline, but I thought I would raise it here. I'm not sure |
…pic-policy * apache/trunk: MINOR: Remove unneeded client used API lists MINOR: Cluster file generator should produce valid json KAFKA-4565; Separation of Internal and External traffic (KIP-103) KAFKA-4626; Add KafkaConsumer#close change to upgrade notes KAFKA-4581; Fail early if multiple client login modules in sasl.jaas.config MINOR: Minor improvements in consumer close timeout handling MINOR: Remove unnecessary options in SCRAM test jaas config MINOR: avoid closing over both pre & post-transform record in WorkerSourceTask MINOR: make methods introduced in KAFKA-4490 consistent with KIP-100 KAFKA-3209: KIP-66: single message transforms MINOR: fix JavaDoc in SegmentIterator
…pic-policy * apache/trunk: KAFKA-4381: Add per partition lag metrics to the consumer MINOR: Remove unnecessary store info from TopologyBuilder KAFKA-3739: Add no-arg constructor for WindowedSerdes in Streams HOTFIX: Added another broker to smoke test KAFKA-4627; Fix timing issue in consumer close tests
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
@@ -167,9 +176,12 @@ private CreateTopicsRequest(Map<String, TopicDetails> topics, Integer timeout, s | |||
} | |||
struct.set(REQUESTS_KEY_NAME, createTopicRequestStructs.toArray()); | |||
struct.set(TIMEOUT_KEY_NAME, timeout); | |||
if (version >= 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If validateOnly
is true, I wonder if we should raise an exception if the version is 0. It seems a little dangerous otherwise.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can do that in the builder since the constructor is private?
Refer to this link for build results (access rights to CI server needed): |
Overall looks good to me. I think Gwen and Jason brought up some good minor points, to which I have nothing to add. |
About the exception, I'll create a custom exception as @hachikuji suggested. I'll also rename |
@@ -98,7 +111,7 @@ class AdminManager(val config: KafkaConfig, | |||
val results = metadata.map { createTopicMetadata => | |||
// ignore topics that already have errors | |||
if (createTopicMetadata.error == Errors.NONE) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This check needs to be updated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Haha, I was trying to diagnose some test failures and reached the exact same conclusion at about the same time. There are quite a few places with the same issue. Will push an update ASAP.
@@ -488,6 +490,8 @@ object KafkaConfig { | |||
"create a topic with a replication factor of 3, set min.insync.replicas to 2, and " + | |||
"produce with acks of \"all\". This will ensure that the producer raises an exception " + | |||
"if a majority of replicas do not receive a write." | |||
|
|||
val CreateTopicsPolicyClassNameDoc = "The create topics policy class that should be used for validation." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Worth mentioning the interface name that users should extend here?
Quite dangerous otherwise.
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
@@ -113,6 +114,8 @@ public Builder(Map<String, TopicDetails> topics, int timeout, boolean validateOn | |||
|
|||
@Override | |||
public CreateTopicsRequest build() { | |||
if (validateOnly && version() == 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably worth a test case for this.
@ijuma LGTM. I'm ready to merge when you add the additional tests. |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
|
||
public interface CreateTopicPolicy { | ||
|
||
class RequestMetadata { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: might be nice to have a toString
for this.
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
…pic-policy * apache/trunk: KAFKA-3853; Extend OffsetFetch API to allow fetching all offsets for a consumer group (KIP-88)
c1a2144
to
e371971
Compare
Refer to this link for build results (access rights to CI server needed): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Author: Ismael Juma <ismael@juma.me.uk> Reviewers: Apurva Mehta <apurva.1618@gmail.com>, Gwen Shapira <cshapi@gmail.com>, Jason Gustafson <jason@confluent.io> Closes apache#2361 from ijuma/kafka-4591-create-topic-policy
No description provided.