-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-13598: enable idempotence producer by default and validate the configs #11691
Conversation
if ((!config.originals().containsKey(RECONNECT_BACKOFF_MAX_MS_CONFIG)) && | ||
config.originals().containsKey(RECONNECT_BACKOFF_MS_CONFIG)) { | ||
if ((!config.hasKeyInOriginals(RECONNECT_BACKOFF_MAX_MS_CONFIG)) && | ||
config.hasKeyInOriginals(RECONNECT_BACKOFF_MS_CONFIG)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- refactor the
originals().containsKey
usage. Inoriginals()
, we'll make a copy of the configs, and in most cases, we only want to checkcontainsKey
. Refactor it by directly check the key inAbstractConfigs
to avoid unnecessary map copy.
final boolean userConfiguredIdempotence = config.originals().containsKey(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG); | ||
final boolean userConfiguredTransactions = config.originals().containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG); | ||
if (userConfiguredTransactions && !userConfiguredIdempotence) | ||
log.info("Overriding the default {} to true since {} is specified.", ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, | ||
ProducerConfig.TRANSACTIONAL_ID_CONFIG); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We won't overriding the default ENABLE_IDEMPOTENCE_CONFIG
because we already default to true. No need to inform users.
private static int configureInflightRequests(ProducerConfig config) { | ||
if (config.idempotenceEnabled() && 5 < config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)) { | ||
throw new ConfigException("Must set " + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to at most 5" + | ||
" to use the idempotent producer."); | ||
} | ||
return config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION); | ||
} | ||
|
||
private static short configureAcks(ProducerConfig config, Logger log) { | ||
boolean userConfiguredAcks = config.originals().containsKey(ProducerConfig.ACKS_CONFIG); | ||
short acks = Short.parseShort(config.getString(ProducerConfig.ACKS_CONFIG)); | ||
|
||
if (config.idempotenceEnabled()) { | ||
if (!userConfiguredAcks) | ||
log.info("Overriding the default {} to all since idempotence is enabled.", ProducerConfig.ACKS_CONFIG); | ||
else if (acks != -1) | ||
throw new ConfigException("Must set " + ProducerConfig.ACKS_CONFIG + " to all in order to use the idempotent " + | ||
"producer. Otherwise we cannot guarantee idempotence."); | ||
} | ||
return acks; | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move this idempotence configs validation into ProducerConfig#postProcessParsedConfig
.
private void maybeOverrideEnableIdempotence(final Map<String, Object> configs) { | ||
boolean userConfiguredIdempotence = this.originals().containsKey(ENABLE_IDEMPOTENCE_CONFIG); | ||
boolean userConfiguredTransactions = this.originals().containsKey(TRANSACTIONAL_ID_CONFIG); | ||
|
||
if (userConfiguredTransactions && !userConfiguredIdempotence) { | ||
configs.put(ENABLE_IDEMPOTENCE_CONFIG, true); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unnecessary method since ENABLE_IDEMPOTENCE_CONFIG
is default to true now.
} | ||
|
||
private void maybeOverrideAcksAndRetries(final Map<String, Object> configs) { | ||
private void postProcessAndValidateIdempotenceConfigs(final Map<String, Object> configs) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We won't override any configs now, change the method name.
boolean userConfiguredInflightRequests = hasKeyInOriginals(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION); | ||
if (userConfiguredInflightRequests && 5 < this.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)) { | ||
throw new ConfigException("Must set " + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to at most 5" + | ||
" to use the idempotent producer."); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add validation for MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION
here
boolean idempotenceEnabled = userConfiguredIdempotence && this.getBoolean(ENABLE_IDEMPOTENCE_CONFIG); | ||
|
||
if (!idempotenceEnabled && userConfiguredIdempotence && userConfiguredTransactions) | ||
boolean userConfiguredIdempotence = this.hasKeyInOriginals(ENABLE_IDEMPOTENCE_CONFIG); | ||
boolean userConfiguredTransactions = this.hasKeyInOriginals(TRANSACTIONAL_ID_CONFIG); | ||
boolean idempotenceEnabled = !userConfiguredIdempotence || this.getBoolean(ENABLE_IDEMPOTENCE_CONFIG); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the key fix: before the PR, we only considered the idempotenceEnabled
when user explicitly set the config and value to true
:
boolean idempotenceEnabled = userConfiguredIdempotence && this.getBoolean(ENABLE_IDEMPOTENCE_CONFIG);
Now, change it to fix the issue.
boolean idempotenceEnabled = !userConfiguredIdempotence || this.getBoolean(ENABLE_IDEMPOTENCE_CONFIG);
@hachikuji @vvcephei , @ijuma , please take a look. Thanks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @showuon for this nice cleanup!
I left a few questions and suggestions
@@ -226,6 +227,11 @@ public Password getPassword(String key) { | |||
return keys; | |||
} | |||
|
|||
public boolean hasKeyInOriginals(String configKey) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about originalsContainsKey()
?
@@ -226,6 +227,11 @@ public Password getPassword(String key) { | |||
return keys; | |||
} | |||
|
|||
public boolean hasKeyInOriginals(String configKey) { | |||
Objects.requireNonNull(configKey, "config key cannot be null"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this null check? Shouldn't we rely on the behaviour of the underlying Map?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense! We can rely on the underlying map.
import java.util.TreeMap; | ||
import java.util.ArrayList; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's keep the current import order
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Weird, not sure why intellij "help" me reorder it. Updated!
@@ -240,6 +242,7 @@ public void testAcksAndIdempotenceForIdempotentProducers() { | |||
Properties invalidProps2 = new Properties() {{ | |||
putAll(baseProps); | |||
setProperty(ProducerConfig.ACKS_CONFIG, "1"); | |||
// explicitly enable idempotence should still throw exception |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
enable
-> enabling
ProducerMetadata metadata = mock(ProducerMetadata.class); | ||
|
||
// Return empty cluster 4 times and cluster from then on | ||
when(metadata.fetch()).thenReturn(emptyCluster, emptyCluster, emptyCluster, emptyCluster, onePartitionCluster); | ||
if (isIdempotenceEnabled) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we change the comment above?
@@ -170,6 +170,7 @@ public void configure(final WorkerConfig config) { | |||
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); | |||
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); | |||
producerProps.put(ProducerConfig.RETRIES_CONFIG, 0); // we handle retries in this class | |||
producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false); // disable idempotence since retries is force to 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah :( I assumed Connect was benefiting from idempotency automatically, it looks like it isn't the case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it might be another PR (or KIP) to enable it in Connect.
@@ -2383,6 +2384,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { | |||
|
|||
private def buildTransactionalProducer(): KafkaProducer[Array[Byte], Array[Byte]] = { | |||
producerConfig.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId) | |||
producerConfig.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to explicitly set it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, because at the beginning of the AuthorizerIntegrationTest
test suite, the idempotence is disabled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right, I missed that!
@@ -143,6 +143,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { | |||
val adminClients = Buffer[Admin]() | |||
|
|||
producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "1") | |||
producerConfig.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we instead drop acks=1? I don't think it is important in this test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you miss this comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I forgot to reply this comment. No we should not remove the acks=1
config to enable the idempotence producer here. In this AuthorizerIntegrationTest
test suite, we tested normal producer (created here), and IdempotentProducer (created in buildIdempotentProducer
method), and transactional producer (created in buildTransactionalProducer
method), so if we remove acks=1
here, we won't be able to test normal producer cases.
@@ -1674,6 +1674,8 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup | |||
producerProps.put(ProducerConfig.RETRIES_CONFIG, _retries.toString) | |||
producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, _deliveryTimeoutMs.toString) | |||
producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, _requestTimeoutMs.toString) | |||
// disable the idempotence since some tests want to test the cases when retries=0, and these tests are not testing producers | |||
producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false.toString) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's use "false"
setReadAndWriteAcls(tp2) | ||
sendRecords(producer, numRecords, tp2) | ||
// in idempotence producer, we need to create another producer because the previous one is in FATEL_ERROR state (due to authorization error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FATEL_ERROR
-> FATAL_ERROR
Do we really expect the idempotent producer to not be usable anymore?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that's how it is designed. I added more comment for it to make it clear:
// // in idempotence producer, we need to create another producer because the previous one is in FATEL_ERROR state (due to authorization error)
// If the transaction state in FATEL_ERROR, it'll never transit to other state. check TransactionManager#isTransitionValid for detail
Thank you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's still fix the typo, it should be FATAL_ERROR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah, you're right, I forgot it.
I found there are flaky tests introduced by this PR. I'll fix them first. Thanks. |
@mimaison , I've updated the PR and fix the flaky tests. The tests will fail because we have 2 threads: Producer thread and Sender thread in producer. And before the latest commit, I have a bad assumption that the sender thread will run after producer send, so the called times verification will be passed. (ex: I fixed it by removing the the need to fetch metadata in sender thread, so that we can focus on testing the producer behavior as we expected. |
Are you saying that the change to enable idempotence by default in 3.0 was broken? And we shipped 3.1 with the same critical bug? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @showuon for the patch. Left a couple comments.
|
||
if (!idempotenceEnabled && userConfiguredIdempotence && userConfiguredTransactions) | ||
boolean userConfiguredIdempotence = this.originalsContainsKey(ENABLE_IDEMPOTENCE_CONFIG); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this can be simplified a little bit more by getting rid of this variable. How about this?
boolean userConfiguredTransactions = this.originalsContainsKey(TRANSACTIONAL_ID_CONFIG);
boolean idempotenceEnabled = this.getBoolean(ENABLE_IDEMPOTENCE_CONFIG);
if (!idempotenceEnabled && userConfiguredTransactions) {
throw new ConfigException("Cannot set a " + ProducerConfig.TRANSACTIONAL_ID_CONFIG + " without also enabling idempotence.");
}
return idempotenceEnabled;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good suggestion! I was affected by the original implementation. Updated.
@@ -226,6 +227,10 @@ public Password getPassword(String key) { | |||
return keys; | |||
} | |||
|
|||
public boolean originalsContainsKey(String configKey) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AbstractConfig is sadly a public class. Can we do the PR without this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops! OK, reverted.
@ijuma Sadly so. I think the configuration would even have displayed |
With this change, if a user explicitly sets So I wonder if instead we should disable idempotency when |
@mimaison , do you mean:
And we only do some special check for Does this what you mean? |
I too think it makes sense to apply the same logic to If we decide to make this change, we should also update the docs of these configs to mention this behavior. |
I have to admit that I find all these breakages that are cropping up to be a bit disturbing. I like the idea to default to stronger guarantees, but if the cost is that a lot of existing applications just crash on upgrade, it's not a great tradeoff. The above logic is fairly complex, but it seems warranted in this case, so I'm +1 as well. |
KAFKA-13673 is created for this proposal. |
I agree with the suggestions here. Especially since the idempotence change did not land in 3.0.0 as originally intended. We can continue the discussion in #11788. |
…configs (apache#11691) In v3.0, we changed the default value for `enable.idempotence` to true, but we didn't adjust the validator and the `idempotence` enabled check method. So if a user didn't explicitly enable idempotence, this feature won't be turned on. This patch addresses the problem, cleans up associated logic, and fixes tests that broke as a result of properly applying the new default. Specifically it does the following: 1. fix the `ProducerConfig#idempotenceEnabled` method, to make it correctly detect if `idempotence` is enabled or not 2. remove some unnecessary config overridden and checks due to we already default `acks`, `retries` and `enable.idempotence` configs. 3. move the config validator for the idempotent producer from `KafkaProducer` into `ProducerConfig`. The config validation should be the responsibility of `ProducerConfig` class. 4. add an `AbstractConfig#hasKeyInOriginals` method, to avoid `originals` configs get copied and only want to check the existence of the key. 5. fix many broken tests. As mentioned, we didn't actually enable idempotence in v3.0. After this PR, there are some tests broken due to some different behavior between idempotent and non-idempotent producer. 6. add additional tests to validate configuration behavior Reviewers: Kirk True <kirk@mustardgrain.com>, Ismael Juma <ismael@juma.me.uk>, Mickael Maison <mimaison@users.noreply.github.com>, Jason Gustafson <jason@confluent.io>
Remove 3.2 notes which were accidentally included after cherry-picking apache#11691. Add the section about the idempotence bug in 3.1 notable items. Reviewers: Luke Chen <showuon@gmail.com>, Jason Gustafson <jason@confluent.io>
This PR broke this system test:
|
@rondagostino , yes, we are aware of it, and that's why we have #11788 to disable idempotent producer when there are config conflicts. Could you please run again with the build including #11788 change? Thanks. |
@showuon It still fails at that commit. |
@rondagostino , could I get the error logs? Thanks. |
Here is
Note this line, which seems suspicious (though I am unfamiliar with this tools/what it does/how it does it, so maybe it is irrelevant).
|
Test passes with this change, but I don't understand why.
|
@showuon (or |
No, I'm not Yang Yu :) |
…ate the configs (apache#11691)" This reverts commit 8093331.
…ate the configs (apache#11691)" This reverts commit 43cbf17.
…ate the configs (apache#11691)" This reverts commit 43cbf17.
…ate the configs (apache#11691)" This reverts commit 8093331.
…configs (apache#11691) In v3.0, we changed the default value for `enable.idempotence` to true, but we didn't adjust the validator and the `idempotence` enabled check method. So if a user didn't explicitly enable idempotence, this feature won't be turned on. This patch addresses the problem, cleans up associated logic, and fixes tests that broke as a result of properly applying the new default. Specifically it does the following: 1. fix the `ProducerConfig#idempotenceEnabled` method, to make it correctly detect if `idempotence` is enabled or not 2. remove some unnecessary config overridden and checks due to we already default `acks`, `retries` and `enable.idempotence` configs. 3. move the config validator for the idempotent producer from `KafkaProducer` into `ProducerConfig`. The config validation should be the responsibility of `ProducerConfig` class. 4. add an `AbstractConfig#hasKeyInOriginals` method, to avoid `originals` configs get copied and only want to check the existence of the key. 5. fix many broken tests. As mentioned, we didn't actually enable idempotence in v3.0. After this PR, there are some tests broken due to some different behavior between idempotent and non-idempotent producer. 6. add additional tests to validate configuration behavior Reviewers: Kirk True <kirk@mustardgrain.com>, Ismael Juma <ismael@juma.me.uk>, Mickael Maison <mimaison@users.noreply.github.com>, Jason Gustafson <jason@confluent.io>
Remove 3.2 notes which were accidentally included after cherry-picking apache#11691. Add the section about the idempotence bug in 3.1 notable items. Reviewers: Luke Chen <showuon@gmail.com>, Jason Gustafson <jason@confluent.io>
…nd validate the configs (apache#11691)"" This reverts commit dcc96c2.
…nd validate the configs (apache#11691)"" This reverts commit dcc96c2.
In short, after v3.0, the default producer is still not idempotence enabled.
In v3.0, we make
enable.idempotence
default to true, but we didn't adjust the validator and theidempotence
enabled check method, so that if user didn't explicitly enable idempotence, this feature won't be turned on. In this PR, I did:ProducerConfig#idempotenceEnabled
method, to make it correctly detect ifidempotence
is enabled or notacks
,retries
andenable.idempotence
configs.KafkaProducer
intoProducerConfig
. The config validation should be the responsibility ofProducerConfig
class.AbstractConfig#hasKeyInOriginals
method, to avoidoriginals
configs get copied and only want to check the existence of the key.Committer Checklist (excluded from commit message)