Skip to content

Commit

Permalink
Pass arc by value rather than reference and fix generic type.
Browse files Browse the repository at this point in the history
  • Loading branch information
davidblewett authored and scanterog committed Nov 7, 2023
1 parent b527a3e commit b897ec9
Showing 1 changed file with 4 additions and 4 deletions.
8 changes: 4 additions & 4 deletions src/consumer/stream_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ unsafe extern "C" fn native_message_queue_nonempty_cb(_: *mut RDKafka, opaque_pt
wakers.wake_all();
}

unsafe fn enable_nonempty_callback(queue: &Arc<NativeQueue>, wakers: &Arc<WakerSlab>) {
unsafe fn enable_nonempty_callback(queue: Arc<NativeQueue>, wakers: &Arc<WakerSlab>) {
rdsys::rd_kafka_queue_cb_event_enable(
queue.ptr(),
Some(native_message_queue_nonempty_cb),
Expand Down Expand Up @@ -193,7 +193,7 @@ where
let native_ptr = base.client().native_ptr() as usize;

let wakers = Arc::new(WakerSlab::new());
unsafe { enable_nonempty_callback(&base.get_queue(), &wakers) }
unsafe { enable_nonempty_callback(base.get_queue(), &wakers) }

// We need to make sure we poll the consumer at least once every max
// poll interval, *unless* the processing task has wedged. To accomplish
Expand Down Expand Up @@ -332,7 +332,7 @@ where
let wakers = Arc::new(WakerSlab::new());
unsafe {
rdsys::rd_kafka_queue_forward(queue.ptr(), ptr::null_mut());
enable_nonempty_callback(&queue, &wakers);
enable_nonempty_callback(Arc::new(queue), &wakers);
}
StreamPartitionQueue {
queue,
Expand Down Expand Up @@ -555,7 +555,7 @@ where
///
/// If you want multiple independent views of a Kafka partition, create
/// multiple consumers, not multiple partition streams.
pub fn stream(&self) -> MessageStream<'_> {
pub fn stream(&self) -> MessageStream<'_, C> {
MessageStream::new(&self.wakers, &self.queue)
}

Expand Down

0 comments on commit b897ec9

Please sign in to comment.