diff --git a/group_consumer.go b/group_consumer.go index 4bafca5..301bccf 100644 --- a/group_consumer.go +++ b/group_consumer.go @@ -157,6 +157,14 @@ func (gc *GroupConsumer[T]) Close() []InnerAck { return gc.lostAcks } +// Fetch basic stats about buffers: +// 1. Number of fetched entries +// 2. Number of pending acks +// 3. Number of unprocessed ack errors +func (gc *GroupConsumer[T]) Stats() (int, int, int) { + return len(gc.fetchChan), len(gc.ackChan), len(gc.ackErrChan) +} + // consumeLoop fills the consumeChan with new messages from all sources. // It is the last goroutine to exit and does remaing ack message recovery. func (gc *GroupConsumer[T]) consumeLoop() {