Message are not acknowledge, we don't have clues if it has been read or not.
Adding a singal will handle it
await consumer.store_offset()
and a addapted subscribption:
consumer = await client.subscribe( stream="your-stream", message_handler=on_message_received, consumer_config=ConsumerConfig( reference="my-consumer-group", offset_spec=OffsetSpecification.next() ) )
Message are not acknowledge, we don't have clues if it has been read or not.
Adding a singal will handle it
await consumer.store_offset()and a addapted subscribption:
consumer = await client.subscribe( stream="your-stream", message_handler=on_message_received, consumer_config=ConsumerConfig( reference="my-consumer-group", offset_spec=OffsetSpecification.next() ) )