Skip to content

Commit

Permalink
Adapt commit_transaction to the event api
Browse files Browse the repository at this point in the history
  • Loading branch information
scanterog committed Nov 7, 2023
1 parent 74ff52a commit bb2aee0
Showing 1 changed file with 8 additions and 1 deletion.
9 changes: 8 additions & 1 deletion src/producer/base_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -577,10 +577,17 @@ where
}

fn commit_transaction<T: Into<Timeout>>(&self, timeout: T) -> KafkaResult<()> {
// rd_kafka_commit_transaction will call flush but the user must call poll in order to
// server the event queue. In order to avoid blocking here forever on the base producer,
// we call Flush that will flush the outstanding messages and serve the event queue.
// https://github.com/confluentinc/librdkafka/blob/95a542c87c61d2c45b445f91c73dd5442eb04f3c/src/rdkafka.h#L10231
// The recommended timeout here is -1 (never, i.e, infinite).
let timeout = timeout.into();
self.flush(timeout)?;
let ret = unsafe {
RDKafkaError::from_ptr(rdsys::rd_kafka_commit_transaction(
self.native_ptr(),
timeout.into().as_millis(),
timeout.as_millis(),
))
};
if ret.is_error() {
Expand Down

0 comments on commit bb2aee0

Please sign in to comment.