Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Why the "topics" of SinkTask cannot be overridden in SinkConnector.taskConfigs(maxTasks)? #106

Closed
tony-lijinwen opened this issue Aug 25, 2016 · 4 comments

Comments

@tony-lijinwen
Copy link

tony-lijinwen commented Aug 25, 2016

I found the topics cannot be overridden by Connector.taskConfigs(int maxTasks) (i.e. even if I return a config which I have already overridden the topics, the kafka connect will still use the original configuration in Worker.connectorTaskConfigs). Is this a bug? I think it should check the config from Connector.taskConfigs first, if it does not contain the configuration of topics, it tries to use the original configure.

Here is the detail implementation,
1. I wrote a new connector of mongodb, the main config options of it is as below:

# I want to create 3 tasks and each task only subscribe one topic
tasks.max=3
topics=topic1,topic2,topic3

2. I want to let each task only subscribe one topic, so I generate the config count equals to the topic size, and override the option topics.

@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
   List<Map<String, String>> configs = new ArrayList<>();
   for (String topic : originalConfig.split(",")) {
        Map<String, String> config = new HashMap<String, String>();
        config.putAll(originalConfig);
        config.put("topics", topic);
        configs.add(config);
   }
   return configs
}
@tony-lijinwen tony-lijinwen changed the title Why the "topics" of SinkTask cannot be overrided in SinkConnector.taskConfigs(maxTasks)? Why the "topics" of SinkTask cannot be overridden in SinkConnector.taskConfigs(maxTasks)? Aug 25, 2016
@ewencp
Copy link
Contributor

ewencp commented Aug 29, 2016

@tony-lijinwen This is intentional, and that config is intended to be managed by the framework. The intent is that the framework leverages the existing consumer group functionality and manages balancing work amongst tasks itself. It can be a bit confusing to a connector developer since framework-level and connector-level configs are a bit conflated, but this keeps things a lot simpler and easier to understand for users.

Do you have a reason why you need to split the subscriptions up so they are different among the tasks? At a bare minimum, the framework would at least have to make sure the original subscription was covered, which could actually be quite difficult once we add support for regex subscriptions (which the underlying consumers support, but has not been extended to Kafka Connect yet). This is a request I haven't seen yet before, so I'm curious about the use case.

@tony-lijinwen
Copy link
Author

tony-lijinwen commented Aug 31, 2016

@ewencp Thanks for your reply. The reason that I split the subscriptions is: I have three topics, two of them in charge of huge messages, but the messages are not urgent; one of them in charge of small messages, but the messages are urgent. And as I knew, if one consumer subscribe more than one topics, it will consume the messags by FIFO (i.e. If the other topics contain huge messages, the urgent messags will be blocked). So I want to split the subscriptions to ensure the urgent messages can be handled as soon as possible.

@sv2000
Copy link

sv2000 commented Oct 22, 2016

I have the same requirement as the OP. Don't know how the OP resolved this issue with Kafka connect. It would be nice to have a partition assignment scheme that assigns one topic to each sink task, given that the SinkConnector cannot change the assignment among the tasks.

@cotedm
Copy link

cotedm commented Jan 19, 2017

@tony-lijinwen @sv2000 I'm curious as to why you would not just have separate connector instances if you need the consumption of the topic data to be handled in a different way. Thinking about this outside of the connect related concepts, if I needed to consume data with different sizes and levels of urgency, I would logically expect to have different consumer configurations to accommodate those needs.

This is really more of a general Connect API question rather than specific to the HDFS connector, so I would propose moving this discussion to a KAFKA JIRA instead if you would like to continue it. Here is the place to open the JIRA to detail the needs that can't be accommodated currently https://issues.apache.org/jira/browse/KAFKA

@cotedm cotedm closed this as completed Jan 19, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants