Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-13759: Disable idempotence by default in producers instantiated by Connect #11933

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -648,6 +648,12 @@ static Map<String, Object> producerConfigs(ConnectorTaskId id,
// These settings will execute infinite retries on retriable exceptions. They *may* be overridden via configs passed to the worker,
// but this may compromise the delivery guarantees of Kafka Connect.
producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.toString(Long.MAX_VALUE));
// By default, Connect disables idempotent behavior for all producers, even though idempotence became
// default for Kafka producers. This is to ensure Connect continues to work with many Kafka broker versions, including older brokers that do not support
// idempotent producers or require explicit steps to enable them (e.g. adding the IDEMPOTENT_WRITE ACL to brokers older than 2.8).
// These settings might change when https://cwiki.apache.org/confluence/display/KAFKA/KIP-318%3A+Make+Kafka+Connect+Source+idempotent
// gets approved and scheduled for release.
producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false");
producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.toString(Integer.MAX_VALUE));
Expand Down
Expand Up @@ -511,6 +511,12 @@ KafkaBasedLog<String, byte[]> setupAndCreateKafkaBasedLog(String topic, final Wo
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE);
// By default, Connect disables idempotent behavior for all producers, even though idempotence became
// default for Kafka producers. This is to ensure Connect continues to work with many Kafka broker versions, including older brokers that do not support
// idempotent producers or require explicit steps to enable them (e.g. adding the IDEMPOTENT_WRITE ACL to brokers older than 2.8).
// These settings might change when https://cwiki.apache.org/confluence/display/KAFKA/KIP-318%3A+Make+Kafka+Connect+Source+idempotent
// gets approved and scheduled for release.
producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false");
ConnectUtils.addMetricsContextProperties(producerProps, config, clusterId);

Map<String, Object> consumerProps = new HashMap<>(originals);
Expand Down
Expand Up @@ -91,6 +91,12 @@ public void configure(final WorkerConfig config) {
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE);
// By default, Connect disables idempotent behavior for all producers, even though idempotence became
// default for Kafka producers. This is to ensure Connect continues to work with many Kafka broker versions, including older brokers that do not support
// idempotent producers or require explicit steps to enable them (e.g. adding the IDEMPOTENT_WRITE ACL to brokers older than 2.8).
// These settings might change when https://cwiki.apache.org/confluence/display/KAFKA/KIP-318%3A+Make+Kafka+Connect+Source+idempotent
// gets approved and scheduled for release.
producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false");
ConnectUtils.addMetricsContextProperties(producerProps, config, clusterId);

Map<String, Object> consumerProps = new HashMap<>(originals);
Expand Down
Expand Up @@ -170,7 +170,12 @@ public void configure(final WorkerConfig config) {
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
producerProps.put(ProducerConfig.RETRIES_CONFIG, 0); // we handle retries in this class
producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false); // disable idempotence since retries is force to 0
// By default, Connect disables idempotent behavior for all producers, even though idempotence became
// default for Kafka producers. This is to ensure Connect continues to work with many Kafka broker versions, including older brokers that do not support
// idempotent producers or require explicit steps to enable them (e.g. adding the IDEMPOTENT_WRITE ACL to brokers older than 2.8).
// These settings might change when https://cwiki.apache.org/confluence/display/KAFKA/KIP-318%3A+Make+Kafka+Connect+Source+idempotent
// gets approved and scheduled for release.
producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false"); // disable idempotence since retries is force to 0
ConnectUtils.addMetricsContextProperties(producerProps, config, clusterId);

Map<String, Object> consumerProps = new HashMap<>(originals);
Expand Down
Expand Up @@ -213,6 +213,12 @@ public void setup() {
defaultProducerConfigs.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
defaultProducerConfigs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.toString(Long.MAX_VALUE));
// By default, producers that are instantiated and used by Connect have idempotency disabled even after idempotency became
// default for Kafka producers. This is chosen to avoid breaking changes when Connect contacts Kafka brokers that do not support
// idempotent producers or require explicit steps to enable them (e.g. adding the IDEMPOTENT_WRITE ACL to brokers older than 2.8).
// These settings might change when https://cwiki.apache.org/confluence/display/KAFKA/KIP-318%3A+Make+Kafka+Connect+Source+idempotent
// gets approved and scheduled for release.
defaultProducerConfigs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false");
defaultProducerConfigs.put(ProducerConfig.ACKS_CONFIG, "all");
defaultProducerConfigs.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
defaultProducerConfigs.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.toString(Integer.MAX_VALUE));
Expand Down
10 changes: 10 additions & 0 deletions docs/upgrade.html
Expand Up @@ -25,6 +25,11 @@ <h5><a id="upgrade_320_notable" href="#upgrade_320_notable">Notable changes in 3
which meant that idempotence remained disabled unless the user had explicitly set <code>enable.idempotence</code> to true
(See <a href="https://issues.apache.org/jira/browse/KAFKA-13598">KAFKA-13598</a>for more details).
This issue was fixed and the default is properly applied in 3.0.1, 3.1.1, and 3.2.0.</li>
<li>A notable exception is Connect that by default disables idempotent behavior for all of its
producers in order to uniformly support using a wide range of Kafka broker versions.
Users can change this behavior to enable idempotence for some or all producers
via Connect worker and/or connector configuration. Connect may enable idempotent producers
by default in a future major release.</li>
</ul>

<h4><a id="upgrade_3_1_0" href="#upgrade_3_1_0">Upgrading to 3.1.0 from any version 0.8.x through 3.0.x</a></h4>
Expand Down Expand Up @@ -75,6 +80,11 @@ <h5><a id="upgrade_311_notable" href="#upgrade_311_notable">Notable changes in 3
A bug prevented the producer idempotence default from being applied which meant that it remained disabled unless the user had explicitly set
<code>enable.idempotence</code> to true. See <a href="https://issues.apache.org/jira/browse/KAFKA-13598">KAFKA-13598</a>for more details.
This issue was fixed and the default is properly applied.</li>
<li>A notable exception is Connect that by default disables idempotent behavior for all of its
producers in order to uniformly support using a wide range of Kafka broker versions.
Users can change this behavior to enable idempotence for some or all producers
via Connect worker and/or connector configuration. Connect may enable idempotent producers
by default in a future major release.</li>
</ul>

<h5><a id="upgrade_310_notable" href="#upgrade_310_notable">Notable changes in 3.1.0</a></h5>
Expand Down