-
Notifications
You must be signed in to change notification settings - Fork 13.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-13673: disable idempotence when config conflicts #11788
Conversation
@@ -514,7 +514,7 @@ private TransactionManager configureTransactionState(ProducerConfig config, | |||
LogContext logContext) { | |||
TransactionManager transactionManager = null; | |||
|
|||
if (config.idempotenceEnabled()) { | |||
if (config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
refactor: the idempotenceEnabled()
method is only to get the config of ENABLE_IDEMPOTENCE_CONFIG
now.
boolean idempotenceEnabled() { | ||
boolean userConfiguredTransactions = this.originals().containsKey(TRANSACTIONAL_ID_CONFIG); | ||
boolean idempotenceEnabled = this.getBoolean(ENABLE_IDEMPOTENCE_CONFIG); | ||
if (!idempotenceEnabled && userConfiguredTransactions) | ||
throw new ConfigException("Cannot set a " + ProducerConfig.TRANSACTIONAL_ID_CONFIG + " without also enabling idempotence."); | ||
|
||
return idempotenceEnabled; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move the transaction.id
config validation into validation method, so that we don't need to validate it each time when checking idempotenceEnabled
.
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
Show resolved
Hide resolved
if (userConfiguredIdempotence) { | ||
throw new ConfigException("Must set " + RETRIES_CONFIG + " to non-zero when using the idempotent producer."); | ||
} | ||
log.warn("`enable.idempotence` will be disabled because {} config is set to 0.", RETRIES_CONFIG, retries); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
log warning to users to let them know the enable.idempotence
is disabled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One question is what we want to do with this longer term. Do we want to upgrade this to an error in Apache Kafka 4.0?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree we directly throw exception after AK 4.0.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking about this some more, not sure there's a lot of value in forcing users to set idempotence=false
in cases where they're setting acks=1|0
or retries=0
. So, I'd change the warning to info for these cases.
max.in.flight.requests.per.connection
is different since it's an implementation constraint that idempotence
doesn't work when it's > 5 vs inherent to the configuration. For this one, I'd have a warning and mention that it will become an error in Kafka 4.0.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good to me. I've updated the PR. Also appended a note to upgrade doc. Please take a look. Thank you.
@mimaison @hachikuji @vvcephei , call for review. Thanks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Luke for the PR! Should we also update the docs of these configs?
docs/upgrade.html
Outdated
@@ -73,7 +74,8 @@ <h5><a id="upgrade_311_notable" href="#upgrade_311_notable">Notable changes in 3 | |||
<ul> | |||
<li>A bug prevented the producer idempotence default from being applied which meant that it remained disabled unless the user had explicitly set | |||
<code>enable.idempotence</code> to true. See <a href="https://issues.apache.org/jira/browse/KAFKA-13598">KAFKA-13598</a>for more details. | |||
This issue was fixed and the default is properly applied.</li> | |||
This issue was fixed and the default is properly applied. | |||
However, if <code>enable.idempotence</code> is unset and the other configs conflicts with it, idempotence will be disabled to avoid break existing producers.</li> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
break
-> breaking
Same below
if (userConfiguredIdempotence) { | ||
throw new ConfigException("Must set " + RETRIES_CONFIG + " to non-zero when using the idempotent producer."); | ||
} | ||
log.info("`enable.idempotence` will be disabled because {} config is set to 0.", RETRIES_CONFIG, retries); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would read better if we remove config
from the sentence.
+ ACKS_CONFIG + "</code> must be 'all'. If these values are not explicitly set by the user, suitable values will be chosen. If incompatible " | ||
+ "values are set, a <code>ConfigException</code> will be thrown."; | ||
+ ACKS_CONFIG + "</code> must be 'all'. If incompatible values are set, a <code>ConfigException</code> will be thrown. " | ||
+ "The default value is `true`. But if incompatible values are set and this config is not set explicitly, idempotent producer will be disabled automatically."; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If these values are not explicitly set by the user, suitable values will be chosen.
-> remove this line because we already default all the values to suitable values.
Added:
The default value is 'true'. But if incompatible values are set and this config is not set explicitly, idempotent producer will be disabled automatically.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should update the documentation for the 3 configs mentioned here too. I have a suggestion to try and make this particular one a bit clearer regarding the defaults, will post it a bit later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we update MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC
, RETRIES_DOC
and ACKS_DOC
to mention the impact on idempotence?
Also, for this description, I was thinking something like:
When set to 'true', the producer will ensure that exactly one copy of each message is written in the stream. If 'false', producer retries due to broker failures, etc., may write duplicates of the retried message in the stream. Note that enabling idempotence requires
" + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + "
to be less than or equal to " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE
+ " (with message ordering preserved for any allowable value)," + RETRIES_CONFIG + "
to be greater than 0, and"
must be 'all'.
+ ACKS_CONFIG + "Idempotence is enabled by default if no conflicting configurations are set. If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled. If idempotence is explicitly enabled and conflicting configurations are set, a ConfigException is thrown.
Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a difficult one. The update looks better than my current change. Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@showuon Thanks for the patch. The approach seems reasonable. At least I cannot think of a better alternative. If the user is explicitly opting into weaker semantics, then it seems fair to disable idempotence. The inflight limitation is vexing, but I hope we can do something about that separately. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did a full pass and left some more comments. This is a bit complex, so trying to make it as clear to users as possible.
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
Show resolved
Hide resolved
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
Show resolved
Hide resolved
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
Outdated
Show resolved
Hide resolved
docs/upgrade.html
Outdated
@@ -21,7 +21,7 @@ | |||
|
|||
<h5><a id="upgrade_320_notable" href="#upgrade_320_notable">Notable changes in 3.2.0</a></h5> | |||
<ul> | |||
<li>Idempotence for the producer is enabled by default. In 3.0.0 and 3.1.0, a bug prevented this default from being applied, | |||
<li>Idempotence for the producer is enabled by default if no conflicting configurations are set explicitly. In 3.0.0 and 3.1.0, a bug prevented this default from being applied, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, after reading this again, I think we can simply say "if no conflicting configurations are set". The "explicitly" feels redundant. Same for the two other paragraphs below. Yes, I know I suggested this text. :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No worries. I agree we should make it as clear as possible, and this update indeed makes it better!
+ ACKS_CONFIG + "</code> must be 'all'. If these values are not explicitly set by the user, suitable values will be chosen. If incompatible " | ||
+ "values are set, a <code>ConfigException</code> will be thrown."; | ||
+ ACKS_CONFIG + "</code> must be 'all'. If incompatible values are set, a <code>ConfigException</code> will be thrown. " | ||
+ "The default value is `true`. But if incompatible values are set and this config is not set explicitly, idempotent producer will be disabled automatically."; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we update MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC
, RETRIES_DOC
and ACKS_DOC
to mention the impact on idempotence?
Also, for this description, I was thinking something like:
When set to 'true', the producer will ensure that exactly one copy of each message is written in the stream. If 'false', producer retries due to broker failures, etc., may write duplicates of the retried message in the stream. Note that enabling idempotence requires
" + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + "
to be less than or equal to " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE
+ " (with message ordering preserved for any allowable value)," + RETRIES_CONFIG + "
to be greater than 0, and"
must be 'all'.
+ ACKS_CONFIG + "Idempotence is enabled by default if no conflicting configurations are set. If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled. If idempotence is explicitly enabled and conflicting configurations are set, a ConfigException is thrown.
Thoughts?
@ijuma , please take a look again when available. Thanks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In addition to the one minor point inline, I think we should add a note to MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC, RETRIES_DOC and ACKS_DOC to mention the impact on idempotence. Basically copy the part from the idempotence docs so that people can understand the implications of setting the configs within their own documentation.
} | ||
|
||
// validate `transaction.id` after validating idempotence dependant configs because `enable.idempotence` config might be overridden | ||
idempotenceEnabled = idempotenceEnabled && !shouldDisableIdempotence; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can set this to false
in the shouldDisableIdempotence
block? Seems a bit more natural.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good suggestion! Updated.
|
||
/** <code>retries</code> */ | ||
public static final String RETRIES_CONFIG = CommonClientConfigs.RETRIES_CONFIG; | ||
private static final String RETRIES_DOC = "Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error." | ||
+ " Note that this retry is no different than if the client resent the record upon receiving the error." | ||
+ " Allowing retries without setting <code>" + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + "</code> to 1 will potentially change the" | ||
+ " Allowing retries and disabling <code>enable.idempotence</code> but without setting <code>" + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + "</code> to 1 will potentially change the" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
side fix: currently, we said:
Allowing retries without setting max.in.flight.requests.per.connection to 1 will potentially change the ordering of records
which is not quite right, because we didn't have the idempotence before.
Updated to:
Allowing retries and disabling enable.idempotence but without setting max.in.flight.requests.per.connection to 1 will potentially change the ordering of records
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we need to reword this a bit given the new defaults. Move the following nearer the top:
Users should generally"
+ " prefer to leave this config unset and instead use" + DELIVERY_TIMEOUT_MS_CONFIG + "
to control
Move the paragraph regarding what happens if idempotence is disabled to the end.
Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ijuma Agree! Below is what I've updated. I think it looks better.
I ran almost full kafka test suite. Got in total around 23 failures, which is great compared to |
Thanks for verifying it @michalxo ! |
@ijuma Can you take another look? I'm just waiting on this before getting started on 3.0.1. |
// max.in.flight.requests.per.connection should be less than or equal to 5 when idempotence producer enabled to ensure message ordering | ||
private static final int MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE = 5; | ||
+ " message re-ordering after a failed send due to retries (i.e., if retries are enabled)." | ||
+ " Note additionally that enabling idempotence requires this config value to be less than or equal to " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE + "." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: remove "Note". "Additionally, enabling..." reads better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates and the patience. LGTM. I left a minor final comment below, but no re-review required after you address it.
+ " <code>" + DELIVERY_TIMEOUT_MS_CONFIG + "</code> expires first before successful acknowledgement. Users should generally" | ||
+ " prefer to leave this config unset and instead use <code>" + DELIVERY_TIMEOUT_MS_CONFIG + "</code> to control" | ||
+ " retry behavior." | ||
+ " <p>" | ||
+ "<p>" | ||
+ "Allowing retries and disabling <code>enable.idempotence</code> but without setting <code>" + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + "</code> to 1 will potentially change the" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It reads weird to say
disabling
enable.idempotence
I would write the sentence as:
Allowing retries while setting
enable.idempotence
tofalse
and" + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + "
to 1 will potentially change the"
I would also move this paragraph below:
Enabling idempotence requires this config value to be greater than 0...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ijuma , thanks for the comment. I've updated as below (though I'm not 100% sure I catch your point about this one: I would also move this paragraph below...
.
This is my update. I think we can merge this PR first to cut the RC for v3.0.1 first. And we can have follow-up PR if you have any other comments. Thank you for your review!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant to have "Note that enabling idempotence requires this config..." before "Allowing retries...". And break the two parts with a paragraph.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Updated. Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
config/server.properties
Outdated
|
||
# The interval at which log segments are checked to see if they can be deleted according | ||
# to the retention policies | ||
log.retention.check.interval.ms=300000 | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these changes accidental?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops! Let me revert it!
Back porting to 3.0 and 3.1 since we merged e6db0ca into these branches |
Disable idempotence when conflicting config values for acks, retries and max.in.flight.requests.per.connection are set by the user. For the former two configs, we log at info level when we disable idempotence due to conflicting configs. For the latter, we log at warn level since it's due to an implementation detail that is likely to be surprising. This mitigates compatibility impact of enabling idempotence by default. Added unit tests to verify the change in behavior. Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>, Mickael Maison <mickael.maison@gmail.com>
Disable idempotence when conflicting config values for acks, retries and max.in.flight.requests.per.connection are set by the user. For the former two configs, we log at info level when we disable idempotence due to conflicting configs. For the latter, we log at warn level since it's due to an implementation detail that is likely to be surprising. This mitigates compatibility impact of enabling idempotence by default. Added unit tests to verify the change in behavior. Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>, Mickael Maison <mickael.maison@gmail.com>
Merged to trunk, 3.1 and 3.0. |
@mimaison I did the backports. |
Thanks! |
Disable idempotence when conflicting config values for acks, retries and max.in.flight.requests.per.connection are set by the user. For the former two configs, we log at info level when we disable idempotence due to conflicting configs. For the latter, we log at warn level since it's due to an implementation detail that is likely to be surprising. This mitigates compatibility impact of enabling idempotence by default. Added unit tests to verify the change in behavior. Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>, Mickael Maison <mickael.maison@gmail.com>
Disable idempotence when conflicting config values for acks, retries and max.in.flight.requests.per.connection are set by the user. For the former two configs, we log at info level when we disable idempotence due to conflicting configs. For the latter, we log at warn level since it's due to an implementation detail that is likely to be surprising. This mitigates compatibility impact of enabling idempotence by default. Added unit tests to verify the change in behavior. Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>, Mickael Maison <mickael.maison@gmail.com>
apache#11788)"" This reverts commit 38bee4a.
apache#11788)"" This reverts commit 38bee4a.
Disable idempotence when conflicting config values for
acks
,retries
and
max.in.flight.requests.per.connection
are set by the user. For theformer two configs, we log at
info
level when we disable idempotencedue to conflicting configs. For the latter, we log at
warn
level sinceit's due to an implementation detail that is likely to be surprising.
This mitigates compatibility impact of enabling idempotence by default.
Added unit tests to verify the change in behavior.
Committer Checklist (excluded from commit message)