Skip to content

Commit

Permalink
Allow creating a consumer without group.id
Browse files Browse the repository at this point in the history
Currently if a group.id is not specified we allow the use of the consumer
for fetching metadata and watermarks. Keeping this behaviour.
  • Loading branch information
scanterog committed Nov 7, 2023
1 parent 3b98f95 commit 2af3671
Showing 1 changed file with 31 additions and 16 deletions.
47 changes: 31 additions & 16 deletions src/consumer/base_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ where
{
client: Client<C>,
queue: NativeQueue,
group_id: Option<String>,
}

impl FromClientConfig for BaseConsumer {
Expand Down Expand Up @@ -78,14 +79,26 @@ where
context,
)?;

// Redirect rdkafka's main queue to the consumer queue so that we only
// need to listen to the consumer queue to observe events like
// rebalancings and stats.
unsafe { rdsys::rd_kafka_poll_set_consumer(client.native_ptr()) };
let queue = client.consumer_queue().ok_or_else(|| {
KafkaError::ClientCreation("rdkafka consumer queue not available".to_string())
})?;
Ok(BaseConsumer { client, queue })
let group_id = config.get("group.id").map(|s| s.to_string());
// If a group.id is not specified, we won't redirect the main queue to the consumer queue,
// allowing continued use of the consumer for fetching metadata and watermarks without the
// need to specify a group.id
let queue = if group_id.is_some() {
// Redirect rdkafka's main queue to the consumer queue so that we only need to listen
// to the consumer queue to observe events like rebalancings and stats.
unsafe { rdsys::rd_kafka_poll_set_consumer(client.native_ptr()) };
client.consumer_queue().ok_or_else(|| {
KafkaError::ClientCreation("rdkafka consumer queue not available".to_string())
})?
} else {
client.main_queue()
};

Ok(BaseConsumer {
client,
queue,
group_id,
})
}

/// Polls the consumer for new messages.
Expand Down Expand Up @@ -681,15 +694,17 @@ where
{
fn drop(&mut self) {
trace!("Destroying consumer: {:?}", self.client.native_ptr());
let err = unsafe {
rdsys::rd_kafka_consumer_close_queue(self.client.native_ptr(), self.queue.ptr())
};
if !err.is_null() {
error!("Failed to close the consumer queue on drop");
}
if self.group_id.is_some() {
let err = unsafe {
rdsys::rd_kafka_consumer_close_queue(self.client.native_ptr(), self.queue.ptr())
};
if !err.is_null() {
error!("Failed to close the consumer queue on drop");
}

while unsafe { rdsys::rd_kafka_consumer_closed(self.client.native_ptr()) } != 1 {
self.poll(Duration::from_millis(100));
while unsafe { rdsys::rd_kafka_consumer_closed(self.client.native_ptr()) } != 1 {
self.poll(Duration::from_millis(100));
}
}
trace!("Consumer destroyed: {:?}", self.client.native_ptr());
}
Expand Down

0 comments on commit 2af3671

Please sign in to comment.