Skip to content

Commit

Permalink
Expose assignment_lost
Browse files Browse the repository at this point in the history
  • Loading branch information
mborst authored and davidblewett committed Jun 9, 2023
1 parent 0048649 commit 83e9a50
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 0 deletions.
1 change: 1 addition & 0 deletions changelog.md
Expand Up @@ -5,6 +5,7 @@ See also the [rdkafka-sys changelog](rdkafka-sys/changelog.md).
## Unreleased

* Add support for the cluster mock API.
* Expose assignment_lost method on the consumer.

## 0.31.0 (2023-05-17)

Expand Down
4 changes: 4 additions & 0 deletions src/consumer/base_consumer.rs
Expand Up @@ -443,6 +443,10 @@ where
}
}

fn assignment_lost(&self) -> bool {
unsafe { rdsys::rd_kafka_assignment_lost(self.client.native_ptr()) == 1 }
}

fn committed<T: Into<Timeout>>(&self, timeout: T) -> KafkaResult<TopicPartitionList> {
let mut tpl_ptr = ptr::null_mut();
let assignment_error =
Expand Down
15 changes: 15 additions & 0 deletions src/consumer/mod.rs
Expand Up @@ -306,6 +306,21 @@ where
/// Returns the current partition assignment.
fn assignment(&self) -> KafkaResult<TopicPartitionList>;

/// Check whether the consumer considers the current assignment to have been lost
/// involuntarily.
///
/// This method is only applicable for use with a high level subscribing consumer. Assignments
/// are revoked immediately when determined to have been lost, so this method is only useful
/// when reacting to a rebalance or from within a rebalance_cb. Partitions
/// that have been lost may already be owned by other members in the group and therefore
/// commiting offsets, for example, may fail.
///
/// Calling rd_kafka_assign(), rd_kafka_incremental_assign() or rd_kafka_incremental_unassign()
/// resets this flag.
///
/// Returns true if the current partition assignment is considered lost, false otherwise.
fn assignment_lost(&self) -> bool;

/// Retrieves the committed offsets for topics and partitions.
fn committed<T>(&self, timeout: T) -> KafkaResult<TopicPartitionList>
where
Expand Down
4 changes: 4 additions & 0 deletions src/consumer/stream_consumer.rs
Expand Up @@ -396,6 +396,10 @@ where
self.base.incremental_unassign(assignment)
}

fn assignment_lost(&self) -> bool {
self.base.assignment_lost()
}

fn seek<T: Into<Timeout>>(
&self,
topic: &str,
Expand Down

0 comments on commit 83e9a50

Please sign in to comment.