Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

[BUG] Failed with librdkafka when enable idempotence option #2008

Open
NiuBlibing opened this issue Sep 25, 2023 · 1 comment
Open

[BUG] Failed with librdkafka when enable idempotence option #2008

NiuBlibing opened this issue Sep 25, 2023 · 1 comment
Labels

Comments

@NiuBlibing
Copy link

NiuBlibing commented Sep 25, 2023

Describe the bug
When set enable.idempotence true, the librdkafka will fail with the following

% Running producer loop. Press Ctrl-C to exit
%0|1695637496.333|FATAL|rdkafka#producer-1| [thrd:main]: Fatal error: Local: Required feature not supported by broker: Idempotent producer not supported by any of the 1 connected broker(s): requires Apache Kafka broker version >= 0.11.0
% Failed to produce to topic persistent://test/test-ns/testtopic: Local: Fatal error
% Flushing outstanding messages..
% Error: _FATAL: Fatal error: Local: Required feature not supported by broker: Idempotent producer not supported by any of the 1 connected broker(s): requires Apache Kafka broker version >= 0.11.0
% FATAL ERROR: _UNSUPPORTED_FEATURE: Idempotent producer not supported by any of the 1 connected broker(s): requires Apache Kafka broker version >= 0.11.0
% Terminating on fatal error
% Message delivery failed: Local: Purged in queue
% 1 message(s) produced, 0 delivered, 1 failed

To Reproduce
Steps to reproduce the behavior:

  1. use librdkafka's example idempotent_producer.c, set user name and password(I use sasl for kop)
  2. idempotent_producer

Expected behavior
Support enable.idempotence option

Additional context
pulsar version 3.0.0
kop version: 3.0.0.4
librdkafka: v1.9.2
broker.conf:

messagingProtocols=kafka
protocolHandlerDirectory=./protocols
allowAutoTopicCreationType=partitioned
kafkaListeners=PLAINTEXT://localhost:9092
kafkaAdvertisedListeners=PLAINTEXT://localhost:9092
brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor
brokerDeleteInactiveTopicsEnabled=false
kafkaTransactionCoordinatorEnabled=true
brokerDeduplicationEnabled=true
kafkaBrokerId=1
saslAllowedMechanisms=PLAIN

idempotent_producer.patch

diff --git a/examples/idempotent_producer.c b/examples/idempotent_producer.c
index 1e799eaf..7c14fb6f 100644
--- a/examples/idempotent_producer.c
+++ b/examples/idempotent_producer.c
@@ -197,6 +197,35 @@ int main(int argc, char **argv) {
                 return 1;
         }
 
+        if (rd_kafka_conf_set(conf, "security.protocol", "SASL_PLAINTEXT",
+                          errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
+        {
+            fprintf(stderr, "%s\n", errstr);
+            rd_kafka_conf_destroy(conf);
+            return 1;
+        }
+            if (rd_kafka_conf_set(conf, "sasl.mechanisms", "PLAIN",
+                          errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
+    {
+        fprintf(stderr, "%s\n", errstr);
+        rd_kafka_conf_destroy(conf);
+        return 1;
+    }
+    if (rd_kafka_conf_set(conf, "sasl.username", "test/test-ns",
+                          errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
+    {
+        fprintf(stderr, "%s\n", errstr);
+        rd_kafka_conf_destroy(conf);
+        return 1;
+    }
+    if (rd_kafka_conf_set(conf, "sasl.password", "token:123456",
+                          errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
+    {
+        fprintf(stderr, "%s\n", errstr);
+        rd_kafka_conf_destroy(conf);
+        return 1;
+    }
+
         /* Set the delivery report callback.
          * This callback will be called once per message to inform
          * the application if delivery succeeded or failed.
@BewareMyPower
Copy link
Collaborator

@Demogorgon314 Could you help with this issue?

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Projects
None yet
Development

No branches or pull requests

2 participants