Skip to content

Commit

Permalink
Pass BaseConsumer to ConsumerContext::rebalance
Browse files Browse the repository at this point in the history
  • Loading branch information
loewenheim committed Nov 28, 2023
1 parent 0c5c131 commit c811175
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 10 deletions.
9 changes: 6 additions & 3 deletions src/consumer/base_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use log::{error, warn};
use rdkafka_sys as rdsys;
use rdkafka_sys::types::*;

use crate::client::{Client, NativeQueue};
use crate::client::{Client, NativeClient, NativeQueue};
use crate::config::{
ClientConfig, FromClientConfig, FromClientConfigAndContext, NativeClientConfig,
};
Expand Down Expand Up @@ -188,8 +188,7 @@ where
// The TPL is owned by the Event and will be destroyed when the event is destroyed.
// Dropping it here will lead to double free.
let mut tpl = ManuallyDrop::new(tpl);
self.context()
.rebalance(self.client.native_client(), err, &mut tpl);
self.context().rebalance(self, err, &mut tpl);
}
_ => {
let buf = unsafe {
Expand Down Expand Up @@ -353,6 +352,10 @@ where
pub fn closed(&self) -> bool {
unsafe { rdsys::rd_kafka_consumer_closed(self.client.native_ptr()) == 1 }
}

pub(crate) fn native_client(&self) -> &NativeClient {
self.client.native_client()
}
}

impl<C> Consumer<C> for BaseConsumer<C>
Expand Down
15 changes: 8 additions & 7 deletions src/consumer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::time::Duration;
use rdkafka_sys as rdsys;
use rdkafka_sys::types::*;

use crate::client::{Client, ClientContext, NativeClient};
use crate::client::{Client, ClientContext};
use crate::error::{KafkaError, KafkaResult};
use crate::groups::GroupList;
use crate::log::{error, trace};
Expand Down Expand Up @@ -43,15 +43,15 @@ pub enum Rebalance<'a> {
/// be specified.
///
/// See also the [`ClientContext`] trait.
pub trait ConsumerContext: ClientContext {
pub trait ConsumerContext: ClientContext + Sized {
/// Implements the default rebalancing strategy and calls the
/// [`pre_rebalance`](ConsumerContext::pre_rebalance) and
/// [`post_rebalance`](ConsumerContext::post_rebalance) methods. If this
/// method is overridden, it will be responsibility of the user to call them
/// if needed.
fn rebalance(
&self,
native_client: &NativeClient,
base_consumer: &BaseConsumer<Self>,
err: RDKafkaRespErr,
tpl: &mut TopicPartitionList,
) {
Expand All @@ -66,9 +66,10 @@ pub trait ConsumerContext: ClientContext {
};

trace!("Running pre-rebalance with {:?}", rebalance);
self.pre_rebalance(&rebalance);
self.pre_rebalance(base_consumer, &rebalance);

trace!("Running rebalance with {:?}", rebalance);
let native_client = base_consumer.native_client();
// Execute rebalance
unsafe {
match err {
Expand All @@ -93,18 +94,18 @@ pub trait ConsumerContext: ClientContext {
}
}
trace!("Running post-rebalance with {:?}", rebalance);
self.post_rebalance(&rebalance);
self.post_rebalance(base_consumer, &rebalance);
}

/// Pre-rebalance callback. This method will run before the rebalance and
/// should terminate its execution quickly.
#[allow(unused_variables)]
fn pre_rebalance<'a>(&self, rebalance: &Rebalance<'a>) {}
fn pre_rebalance<'a>(&self, base_consumer: &BaseConsumer<Self>, rebalance: &Rebalance<'a>) {}

/// Post-rebalance callback. This method will run after the rebalance and
/// should terminate its execution quickly.
#[allow(unused_variables)]
fn post_rebalance<'a>(&self, rebalance: &Rebalance<'a>) {}
fn post_rebalance<'a>(&self, base_consumer: &BaseConsumer<Self>, rebalance: &Rebalance<'a>) {}

// TODO: convert pointer to structure
/// Post commit callback. This method will run after a group of offsets was
Expand Down

0 comments on commit c811175

Please sign in to comment.