Skip to content

Commit

Permalink
Adapt consumer close to the event api
Browse files Browse the repository at this point in the history
  • Loading branch information
scanterog committed Nov 7, 2023
1 parent bb2aee0 commit 3b98f95
Showing 1 changed file with 10 additions and 36 deletions.
46 changes: 10 additions & 36 deletions src/consumer/base_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::ptr;
use std::sync::Arc;
use std::time::Duration;

use log::warn;
use log::{error, warn};
use rdkafka_sys as rdsys;
use rdkafka_sys::types::*;

Expand Down Expand Up @@ -37,7 +37,6 @@ where
{
client: Client<C>,
queue: NativeQueue,
static_member: bool,
}

impl FromClientConfig for BaseConsumer {
Expand All @@ -62,12 +61,6 @@ where
native_config: NativeClientConfig,
context: C,
) -> KafkaResult<BaseConsumer<C>> {
let mut static_member = false;
if let Some(group_instance_id) = config.get("group.instance.id") {
if !group_instance_id.is_empty() {
static_member = true;
}
}
unsafe {
rdsys::rd_kafka_conf_set_events(
native_config.ptr(),
Expand All @@ -92,11 +85,7 @@ where
let queue = client.consumer_queue().ok_or_else(|| {
KafkaError::ClientCreation("rdkafka consumer queue not available".to_string())
})?;
Ok(BaseConsumer {
client,
queue,
static_member,
})
Ok(BaseConsumer { client, queue })
}

/// Polls the consumer for new messages.
Expand Down Expand Up @@ -692,31 +681,16 @@ where
{
fn drop(&mut self) {
trace!("Destroying consumer: {:?}", self.client.native_ptr());
// If this consumer is configured for static membership, do not explicitly unsubscribe from
// the group. Note that static members will *not* receive a final revoke event when they
// shutdown.
if !self.static_member {
// We use the Event API rather than the Callback API.
// As we don't register a rebalance_cb, rd_kafka_consumer_close()
// will shortcut the rebalance_cb and do a forced unassign.
// This is undesired as the application might need the final
// revoke events before shutting down. Hence, we trigger
// an Unsubscribe() first, wait for that to propagate, and then
// close the consumer.
unsafe { rdsys::rd_kafka_unsubscribe(self.client.native_ptr()) };

// Poll for rebalance events
loop {
self.poll(Duration::from_secs(5));
let qlen = unsafe { rdsys::rd_kafka_queue_length(self.queue.ptr()) };
if qlen == 0 {
break;
}
}
let err = unsafe {
rdsys::rd_kafka_consumer_close_queue(self.client.native_ptr(), self.queue.ptr())
};
if !err.is_null() {
error!("Failed to close the consumer queue on drop");
}

// TODO(sam): do we need to destroy the queue before calling close?
unsafe { rdsys::rd_kafka_consumer_close(self.client.native_ptr()) };
while unsafe { rdsys::rd_kafka_consumer_closed(self.client.native_ptr()) } != 1 {
self.poll(Duration::from_millis(100));
}
trace!("Consumer destroyed: {:?}", self.client.native_ptr());
}
}
Expand Down

0 comments on commit 3b98f95

Please sign in to comment.