Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-14405: Log a warning when users attempt to set a config controlled by Streams #12988

Open
wants to merge 7 commits into
base: trunk
Choose a base branch
from

Conversation

ashmeet13
Copy link
Contributor

@ashmeet13 ashmeet13 commented Dec 14, 2022

Streams hard-codes a few configurations, currently the documentation refers to 5 such configs.

The four mentioned within - Parameters controlled by Kafka Streams + enable.auto.commit.
Three out of the 4 mentioned within the Parameters controlled by Kafka Streams are also present within Default Values which state the configurations that can be configured but will have a default value if not set.

This PR makes changes to warn the user when a configuration set by them is being overridden.
Due to the overlapping documentation I have gone through the code and have separated the configs in a few categories -

  • Non configurable consumer configs when EOS is disabled -
enable.auto.commit
allow.auto.create.topics
  • Non configurable consumer configs when EOS is enabled -
isolation.level
  • Non configurable producer configs when EOS is enabled -
enable.idempotence
max.in.flight.requests.per.connection
transactional.id
  • Default consumer configs which can be configured -
auto.offset.reset
max.poll.records
  • Default producer configs which can be configured -
linger.ms

StreamsConfig already had code to log warnings when a non-configurable property was being set.
It was missing logging warnings when allow.auto.create.topics was configured. I have added this change and removed the code that was setting allow.auto.create.topics within getMainConsumerConfigs.

Now it follows the existing code execution path to check for -

  1. Missing default configurations
  2. Logging warnings and overriding settings for non-configurable properties.

Permalink to checkIfUnexpectedUserSpecifiedConsumerConfig

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@ashmeet13
Copy link
Contributor Author

ashmeet13 commented Dec 14, 2022

Since I am fairly new to the code base - I will be spending some more time identifying any other configs that I might have missed that can fall under any of these buckets.
@ableegoldman could you please have a look at this PR.

Copy link
Contributor

@ableegoldman ableegoldman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR! One config in particular that is kind of a special case is the ProducerConfig default.partitioner

It's not that Streams overrides this (as is the case for other configs, like allow.auto.create.topics) but you just can't use it within Streams without breaking your app. We have a special StreamsPartitioner interface for Streams users to plug in rather than using this config.

I guess rather than adding special handling, we could just add it to the PRODUCER_DEFAULT_OVERRIDES similar to what you did here already, and just override it to null

@ashmeet13
Copy link
Contributor Author

Hi @ableegoldman thank you for the review!
I went through the producer configs and was not able to find any config with the name default.partitioner. Were you referring to the partitioner.class config?
In case this is not the config could you please help me by pointing where I can find default.partitioner within code?

@mjsax mjsax added the streams label Dec 28, 2022
@ableegoldman
Copy link
Contributor

Ah, yeah, sorry -- it's partitioner.class

@mjsax
Copy link
Member

mjsax commented Feb 14, 2023

What's the status of this PR?

@github-actions
Copy link

This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch)

If this PR is no longer valid or desired, please feel free to close it. If no activity occurrs in the next 30 days, it will be automatically closed.

@github-actions github-actions bot added the stale Stale PRs label Jun 23, 2023
@ashmeet13
Copy link
Contributor Author

ashmeet13 commented Jul 22, 2023

Will be re-picking this. My bad for dropping this in the middle.
Will update soon.
Issue - https://issues.apache.org/jira/browse/KAFKA-14405

@github-actions github-actions bot removed the stale Stale PRs label Jul 23, 2023
@mjsax
Copy link
Member

mjsax commented Jul 25, 2023

Thanks! Great to hear.

@ashmeet13
Copy link
Contributor Author

ashmeet13 commented Aug 16, 2023

Hi @mjsax, a small doubt -
From what I understand there are three different types of consumer configs - global, main and restore

If a user has set the property consumer.allow.auto.create.topics to True we override it to False because stream controls this property value.
But this can be bypassed by setting the property as main.consumer.allow.auto.create.topics = True and this property would be set as True.

Is this expected behaviour?

@mjsax
Copy link
Member

mjsax commented Sep 8, 2023

Sorry for late reply. I was OOO. Well, if it can be by-passed, it sounds like a bug.

Because there are multiple consumer, we use consumer. to allow users to change a config for all consumer. If you want to change a config for a specific consumer (or if you want to configure two consumer differently), you would use main.consumer. et al. If both prefix (generic and specific) are use the consumer-specific prefix has preference over the general consumer. prefix. Does this make sense? It's basically a "config hierarchy" (flexible and powerful, but maybe a little hard to understand on first encounter...)

But no matter what prefix is used, we should not allow users to by-pass what Streams tries to overwrite. (Btw: just setting allow.auto.create.topic = true, ie, without consumer. prefix is technically also valid, and we would pass config without prefix into the consumer, too, so this case must also be covered.)

@ashmeet13
Copy link
Contributor Author

Got it @mjsax -
Sharing the code that seems to be causing this bypass. Currently to fetch any consumer config i.e. main, restore or global we use a common function getCommonConsumerConfigs

It's within the getCommonConsumerConfigs function where we check and override the configs preferred by streams -

    private Map<String, Object> getCommonConsumerConfigs() {
        // Fetch all consumer props starting with "consumer."
        clientProvidedProps = getClientPropsWithPrefix(CONSUMER_PREFIX, ConsumerConfig.configNames());

        // CLean out any properties that were set but need to be controlled by streams
        checkIfUnexpectedUserSpecifiedConfig(clientProvidedProps, NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS);
        checkIfUnexpectedUserSpecifiedConfig(clientProvidedProps, NON_CONFIGURABLE_CONSUMER_EOS_CONFIGS);

        // Create a config map of the preferred props and merge it with the cleaned props from above  
        final consumerProps =new (eosEnabled ? CONSUMER_EOS_OVERRIDES : CONSUMER_DEFAULT_OVERRIDES);
        consumerProps.putAll(clientProvidedProps);
    }

And the logic within getMainConsumerConfigs is -

    public Map<String, Object> getMainConsumerConfigs(...) {
        // Fetch the props starting with "consumer." after cleaning
        // any props that needed to be overwritten
        final consumerProps = getCommonConsumerConfigs();

        // Get main consumer override props i.e. the ones 
        // starting with "main.consumer." and merge the two maps.
        final mainConsumerProps = originalsWithPrefix(MAIN_CONSUMER_PREFIX);
        for (final entry: mainConsumerProps.entrySet()) {
            consumerProps.put(entry.getKey(), entry.getValue());

        // Continue processing and filling in other required configs
        }

Do you think I've understood this piece correct?
If so should a fix go for this within this PR itself?

@mjsax
Copy link
Member

mjsax commented Oct 13, 2023

Thanks for digging into this -- I think you are spot on -- seem we should extract a method that will set KS controlled config, and refactor getMainConsumerConfigs to first call getCommonConsumerConfigs(), than apply main.consumer configs, and in a last step call the new method to set KS controlled configs.

I assume we need to do something similar for restore and global consumer? -- To be fair, I was actually aware that something is off and still have a (old and stale) local branch adding corresponding testing to StreamsConfigTest to verify that overwrite hierarchy works as expected... Would be great if you could also look into this test...

@ashmeet13
Copy link
Contributor Author

Got it! I'll make this change - for now I have gone through the code and the following two references and compiled a list of configs that are somehow "controlled" by KS. For now sharing the Producer Configs here and soon Consumer Configs too

Producer Configs with EoS Disabled

1. [Editable] [CustomDefault] linger.ms = 100
2. [Fixed] partitioner.class = StreamsPartitioner

Producer Configs with EoS Disabled

1. [Editable] [CustomDefault] linger.ms = 100
2. [Fixed] partitioner.class = StreamsPartitioner
3. [Fixed] enable.idempotence = true
4. [Validate] max.in.flight.requests.per.connection <= 5
5. [Fixed] [NoDefault] transactional.id = <appId>-<generatedSuffix>
6. [Editable] [CustomDefault] delivery.timeout.ms = Integer.MAX
7. [Editable] [CustomDefault] transaction.timeout.ms = 10000

@mjsax
Copy link
Member

mjsax commented Nov 29, 2023

@ashmeet13 -- Any update on this PR? We are coming up to 3.7 release code freeze deadline. Might be nice to finish this on time?

@ashmeet13
Copy link
Contributor Author

Hi @mjsax apologies for the delay. Pushing this soon.

@mjsax
Copy link
Member

mjsax commented Feb 17, 2024

@ashmeet13 -- do you still have interest to finish this PR?

@@ -1140,6 +1145,7 @@ public class StreamsConfig extends AbstractConfig {
static {
final Map<String, Object> tempProducerDefaultOverrides = new HashMap<>();
tempProducerDefaultOverrides.put(ProducerConfig.LINGER_MS_CONFIG, "100");
tempProducerDefaultOverrides.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "none");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Default is already null -- why do we need to set it?

@@ -1883,3 +1886,20 @@ public static void main(final String[] args) {
System.out.println(CONFIG.toHtml(4, config -> "streamsconfigs_" + config));
}
}


public Map<String, Object> getMainConsumerConfigs(...) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StreamsConfig is public API and we cannot just modify it w/o a KIP. -- Also, why do we need this new method to begin with? We already have getMainConsumerConfigs(...).

@@ -530,6 +530,14 @@ public void shouldSetInternalLeaveGroupOnCloseConfigToFalseInConsumer() {
assertThat(consumerConfigs.get("internal.leave.group.on.close"), is(false));
}

@Test
public void shouldResetToDefaultIfConsumerAllowAutoCreateTopicsIsOverridden() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should apply to all consumers, right? Should we extend the test accordingly?

Should we also capture the logs and verify that the WARN is printed (not sure if necessary)?

@mjsax
Copy link
Member

mjsax commented Apr 30, 2024

@ashmeet13 -- any updates on this PR?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
4 participants