From 2af36710b94d1aaabb920014832a876cb6d2ff51 Mon Sep 17 00:00:00 2001 From: Samuel Cantero Date: Mon, 30 Oct 2023 14:58:07 -0300 Subject: [PATCH] Allow creating a consumer without group.id Currently if a group.id is not specified we allow the use of the consumer for fetching metadata and watermarks. Keeping this behaviour. --- src/consumer/base_consumer.rs | 47 +++++++++++++++++++++++------------ 1 file changed, 31 insertions(+), 16 deletions(-) diff --git a/src/consumer/base_consumer.rs b/src/consumer/base_consumer.rs index fb2188774..01f1af462 100644 --- a/src/consumer/base_consumer.rs +++ b/src/consumer/base_consumer.rs @@ -37,6 +37,7 @@ where { client: Client, queue: NativeQueue, + group_id: Option, } impl FromClientConfig for BaseConsumer { @@ -78,14 +79,26 @@ where context, )?; - // Redirect rdkafka's main queue to the consumer queue so that we only - // need to listen to the consumer queue to observe events like - // rebalancings and stats. - unsafe { rdsys::rd_kafka_poll_set_consumer(client.native_ptr()) }; - let queue = client.consumer_queue().ok_or_else(|| { - KafkaError::ClientCreation("rdkafka consumer queue not available".to_string()) - })?; - Ok(BaseConsumer { client, queue }) + let group_id = config.get("group.id").map(|s| s.to_string()); + // If a group.id is not specified, we won't redirect the main queue to the consumer queue, + // allowing continued use of the consumer for fetching metadata and watermarks without the + // need to specify a group.id + let queue = if group_id.is_some() { + // Redirect rdkafka's main queue to the consumer queue so that we only need to listen + // to the consumer queue to observe events like rebalancings and stats. + unsafe { rdsys::rd_kafka_poll_set_consumer(client.native_ptr()) }; + client.consumer_queue().ok_or_else(|| { + KafkaError::ClientCreation("rdkafka consumer queue not available".to_string()) + })? + } else { + client.main_queue() + }; + + Ok(BaseConsumer { + client, + queue, + group_id, + }) } /// Polls the consumer for new messages. @@ -681,15 +694,17 @@ where { fn drop(&mut self) { trace!("Destroying consumer: {:?}", self.client.native_ptr()); - let err = unsafe { - rdsys::rd_kafka_consumer_close_queue(self.client.native_ptr(), self.queue.ptr()) - }; - if !err.is_null() { - error!("Failed to close the consumer queue on drop"); - } + if self.group_id.is_some() { + let err = unsafe { + rdsys::rd_kafka_consumer_close_queue(self.client.native_ptr(), self.queue.ptr()) + }; + if !err.is_null() { + error!("Failed to close the consumer queue on drop"); + } - while unsafe { rdsys::rd_kafka_consumer_closed(self.client.native_ptr()) } != 1 { - self.poll(Duration::from_millis(100)); + while unsafe { rdsys::rd_kafka_consumer_closed(self.client.native_ptr()) } != 1 { + self.poll(Duration::from_millis(100)); + } } trace!("Consumer destroyed: {:?}", self.client.native_ptr()); }