diff --git a/src/producer/base_producer.rs b/src/producer/base_producer.rs index 73fafc94d..886d5bdee 100644 --- a/src/producer/base_producer.rs +++ b/src/producer/base_producer.rs @@ -577,10 +577,17 @@ where } fn commit_transaction>(&self, timeout: T) -> KafkaResult<()> { + // rd_kafka_commit_transaction will call flush but the user must call poll in order to + // server the event queue. In order to avoid blocking here forever on the base producer, + // we call Flush that will flush the outstanding messages and serve the event queue. + // https://github.com/confluentinc/librdkafka/blob/95a542c87c61d2c45b445f91c73dd5442eb04f3c/src/rdkafka.h#L10231 + // The recommended timeout here is -1 (never, i.e, infinite). + let timeout = timeout.into(); + self.flush(timeout)?; let ret = unsafe { RDKafkaError::from_ptr(rdsys::rd_kafka_commit_transaction( self.native_ptr(), - timeout.into().as_millis(), + timeout.as_millis(), )) }; if ret.is_error() {