New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-4400: Enabling configurable prefix for sink connectors' consume… #2143
base: trunk
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Basic approach seems fine, but a couple of things:
- I think you missed a place where you'd need to change the call to
SinkUtils.consumerGroupId
:
/Users/ewencp/kafka.git/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:475: error: method consumerGroupId in class SinkUtils cannot be applied to given types;
if (workerGroupId.equals(SinkUtils.consumerGroupId(name))) {
-
It's possible this needs a KIP since it's technically public API. I'd imagine it would be a straightforward and uncontroversial one. @hachikuji @ijuma thoughts on this since we've started saying some other minor public interface changes like metrics don't necessarily need a KIP?
-
The name of the configuration sounds like it could be more general. We also have a JIRA filed to possibly simplify configs by just using a common prefix for the config/status/offset topics. Should we consider tying all of these together into the single connect prefix? This might also tie in with a connect cluster ID.
@@ -43,6 +43,10 @@ | |||
+ "than one, though, in case a server is down)."; | |||
public static final String BOOTSTRAP_SERVERS_DEFAULT = "localhost:9092"; | |||
|
|||
public static final String CONNECT_PREFIX_CONFIG = "connect.prefix"; | |||
public static final String CONNECT_PREFIX_DOC = "Prefix for sink task consumer group."; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the motivation for using this probably isn't too obvious to someone who isn't familiar with the implementation details, it'd probably be worth expanding this explanation a bit to say when you would use it.
…r group names. Author: Tianji Li <skyahead@gmail.com> Reviewers: Ewen Cheslack-Postava <me@ewencp.org>
ConfigValue validatedName = validatedConfig.get(ConnectorConfig.NAME_CONFIG); | ||
String name = (String) validatedName.value(); | ||
|
||
if (workerGroupId.equals(SinkUtils.consumerGroupId(name))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are these checks being removed? They were added because people would sometimes make the group names conflict, which results in confusing error messages from the group coordination protocol.
@@ -416,48 +416,6 @@ public void testCreateConnectorFailedCustomValidation() throws Exception { | |||
} | |||
|
|||
@Test | |||
public void testConnectorNameConflictsWithWorkerGroupId() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Besides not removing this, it seems we should also probably have a unit test validating the new behavior for naming consumer groups in sinks?
…r group names.
Author: Tianji Li skyahead@gmail.com
Reviewers: Ewen Cheslack-Postava me@ewencp.org