diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java index 582271a19633..4adf6ff5e041 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java @@ -648,6 +648,12 @@ static Map producerConfigs(ConnectorTaskId id, // These settings will execute infinite retries on retriable exceptions. They *may* be overridden via configs passed to the worker, // but this may compromise the delivery guarantees of Kafka Connect. producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.toString(Long.MAX_VALUE)); + // By default, Connect disables idempotent behavior for all producers, even though idempotence became + // default for Kafka producers. This is to ensure Connect continues to work with many Kafka broker versions, including older brokers that do not support + // idempotent producers or require explicit steps to enable them (e.g. adding the IDEMPOTENT_WRITE ACL to brokers older than 2.8). + // These settings might change when https://cwiki.apache.org/confluence/display/KAFKA/KIP-318%3A+Make+Kafka+Connect+Source+idempotent + // gets approved and scheduled for release. + producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false"); producerProps.put(ProducerConfig.ACKS_CONFIG, "all"); producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1"); producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.toString(Integer.MAX_VALUE)); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java index 669c72b224df..94b98cb9eb38 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java @@ -511,6 +511,12 @@ KafkaBasedLog setupAndCreateKafkaBasedLog(String topic, final Wo producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE); + // By default, Connect disables idempotent behavior for all producers, even though idempotence became + // default for Kafka producers. This is to ensure Connect continues to work with many Kafka broker versions, including older brokers that do not support + // idempotent producers or require explicit steps to enable them (e.g. adding the IDEMPOTENT_WRITE ACL to brokers older than 2.8). + // These settings might change when https://cwiki.apache.org/confluence/display/KAFKA/KIP-318%3A+Make+Kafka+Connect+Source+idempotent + // gets approved and scheduled for release. + producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false"); ConnectUtils.addMetricsContextProperties(producerProps, config, clusterId); Map consumerProps = new HashMap<>(originals); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java index 313baf72c58c..f3cbb686fff5 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java @@ -91,6 +91,12 @@ public void configure(final WorkerConfig config) { producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE); + // By default, Connect disables idempotent behavior for all producers, even though idempotence became + // default for Kafka producers. This is to ensure Connect continues to work with many Kafka broker versions, including older brokers that do not support + // idempotent producers or require explicit steps to enable them (e.g. adding the IDEMPOTENT_WRITE ACL to brokers older than 2.8). + // These settings might change when https://cwiki.apache.org/confluence/display/KAFKA/KIP-318%3A+Make+Kafka+Connect+Source+idempotent + // gets approved and scheduled for release. + producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false"); ConnectUtils.addMetricsContextProperties(producerProps, config, clusterId); Map consumerProps = new HashMap<>(originals); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java index c2aeba808012..3ba6996da8ab 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java @@ -170,7 +170,12 @@ public void configure(final WorkerConfig config) { producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); producerProps.put(ProducerConfig.RETRIES_CONFIG, 0); // we handle retries in this class - producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false); // disable idempotence since retries is force to 0 + // By default, Connect disables idempotent behavior for all producers, even though idempotence became + // default for Kafka producers. This is to ensure Connect continues to work with many Kafka broker versions, including older brokers that do not support + // idempotent producers or require explicit steps to enable them (e.g. adding the IDEMPOTENT_WRITE ACL to brokers older than 2.8). + // These settings might change when https://cwiki.apache.org/confluence/display/KAFKA/KIP-318%3A+Make+Kafka+Connect+Source+idempotent + // gets approved and scheduled for release. + producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false"); // disable idempotence since retries is force to 0 ConnectUtils.addMetricsContextProperties(producerProps, config, clusterId); Map consumerProps = new HashMap<>(originals); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java index 2b210794207a..dcd9286480e8 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java @@ -213,6 +213,12 @@ public void setup() { defaultProducerConfigs.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); defaultProducerConfigs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.toString(Long.MAX_VALUE)); + // By default, producers that are instantiated and used by Connect have idempotency disabled even after idempotency became + // default for Kafka producers. This is chosen to avoid breaking changes when Connect contacts Kafka brokers that do not support + // idempotent producers or require explicit steps to enable them (e.g. adding the IDEMPOTENT_WRITE ACL to brokers older than 2.8). + // These settings might change when https://cwiki.apache.org/confluence/display/KAFKA/KIP-318%3A+Make+Kafka+Connect+Source+idempotent + // gets approved and scheduled for release. + defaultProducerConfigs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false"); defaultProducerConfigs.put(ProducerConfig.ACKS_CONFIG, "all"); defaultProducerConfigs.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1"); defaultProducerConfigs.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.toString(Integer.MAX_VALUE)); diff --git a/docs/upgrade.html b/docs/upgrade.html index ddcee32053eb..e7700272b39c 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -25,6 +25,11 @@
Notable changes in 3 which meant that idempotence remained disabled unless the user had explicitly set enable.idempotence to true (See KAFKA-13598for more details). This issue was fixed and the default is properly applied in 3.0.1, 3.1.1, and 3.2.0. +
  • A notable exception is Connect that by default disables idempotent behavior for all of its + producers in order to uniformly support using a wide range of Kafka broker versions. + Users can change this behavior to enable idempotence for some or all producers + via Connect worker and/or connector configuration. Connect may enable idempotent producers + by default in a future major release.
  • Upgrading to 3.1.0 from any version 0.8.x through 3.0.x

    @@ -75,6 +80,11 @@
    Notable changes in 3 A bug prevented the producer idempotence default from being applied which meant that it remained disabled unless the user had explicitly set enable.idempotence to true. See KAFKA-13598for more details. This issue was fixed and the default is properly applied. +
  • A notable exception is Connect that by default disables idempotent behavior for all of its + producers in order to uniformly support using a wide range of Kafka broker versions. + Users can change this behavior to enable idempotence for some or all producers + via Connect worker and/or connector configuration. Connect may enable idempotent producers + by default in a future major release.
  • Notable changes in 3.1.0