Skip to content

Way to get batch messages and commit if the batch is successful #123

@sanchitlohia-ovo

Description

@sanchitlohia-ovo

There is no way to implement the below flow

  1. Read batch messages from Kafka.
  2. Process all the messages.
  3. If all messages processing succesful - Commit the offset
  4. If any of the message processing fails - Do not commit the offset and read the same batch again and try to process the messages and if successful commit the offset.

Activity

achille-roussel

achille-roussel commented on Oct 12, 2018

@achille-roussel
Contributor

There’s already a way to do this via the FetchMessage/CommitMessages methods of the reader, the program can read messages one at a time and aggregate them to them in a batch which can then be committed.

Would this fit your use case? If not, I’d be interested to hear more details about it.

sanchitlohia-ovo

sanchitlohia-ovo commented on Oct 12, 2018

@sanchitlohia-ovo
Author

@achille-roussel Thanks for the response. The way you suggested is correct but there is no way to read the same batch again if I am using specifying the GroupID in my reader. In my use case I have to specify the GroupID and I cannot use the reader.SetOffset(offSet) to reread the same batch again. Please suggest if there is any way.

achille-roussel

achille-roussel commented on Nov 25, 2018

@achille-roussel
Contributor

If you need access to the batch received by a single fetch request we'd need a new API indeed.

This isn't a use case that came up at Segment so it's unlikely that we will find time to dedicate to building this feature, but if you are available to work on it we would gladly review and merge the change.

AntonKuzValid

AntonKuzValid commented on Jul 8, 2019

@AntonKuzValid

I have faced an issue with using batch reading using reader and FetchMessage/CommitMessages methods, i have made simple tests (example here - https://play.golang.org/p/aeYfRBfWIlQ), and i have cases when i started new consumer, rebalancing happened and the new consumer can start consuming the uncommitted but already fetched messages and i got doubling of the data

robfig

robfig commented on Oct 15, 2020

@robfig

We have the same request. For example, we consume messages to load into ElasticSearch, which can index many more documents if you provide it a batch at a time, instead of one-by-one. When using the Reader API, there is no way to access the batch of messages that Kafka returned, which would be the easiest and most efficient thing. If we use our own batch size, then we have to use an arbitrary timeout and introduce unnecessary delays whenever there is not a full batch of messages waiting on the topic.

I would be happy to contribute an implementation if anyone has guidance on what the API should be and the things that need to change.

robfig

robfig commented on Oct 20, 2020

@robfig

Looking at the current Reader implementation, it uses Batches under the covers, sending the messages in each batch one at a time over a channel. I have two alternate proposals for enabling reading a batch at a time:

  1. Add a "batch mode" bit to ReaderConfig
type ReaderConfig struct {
  // Batched causes the Reader to return a Batch at a time instead of a Message at a time.
  // If this is set, callers must use FetchBatch/ReadBatch instead of their Message equivalents.
  Batched bool
}

func (*Reader) FetchBatch(context.Context) (*Batch, error)
func (*Reader) ReadBatch(context.Context) (*Batch, error)
  1. Add a new Batched Reader
// NewBatchedReader returns a reader which returns each batch read to callers
func NewBatchedReader(cfg ReaderConfig) (*BatchedReader)

// BatchedReader has a similar interface to Reader, except with FetchBatch/ReadBatch instead of *Message

Thoughts?

robfig

robfig commented on Oct 26, 2020

@robfig

@achille-roussel wondering if you have any thoughts about the API?

robfig

robfig commented on May 9, 2021

@robfig

Tagging @alecthomas and @dominicbarnes as two recently-active maintainers to provide some feedback on the API design.

achille-roussel

achille-roussel commented on May 28, 2021

@achille-roussel
Contributor

Hey @robfig! Thanks for following up on this!

Have you looked at potentially using the kafka.ConsumerGroup API that the package exposes? Paired with kafka.Conn or kafka.Client to fetch batches of messages, you should be able to gain greater control over the timing of commits.

Let us know if this is helpful. If not, let's explore adding new APIs to kafka.Reader to expose batches instead of a individual messages 👍

robfig

robfig commented on May 29, 2021

@robfig

Hi @achille-roussel ,
The ConsumerGroup API does not really do what I wanted to. It does not expose the batch of messages, only one message at a time. You can get a batch of messages from a Conn, but that is specific to a partition and does not use consumer groups. We use this library quite a bit internally, and we would have many places we would use this API. We use Consumer Groups everywhere (which I think is typical practice) and many message types benefit from batched processing, but that is done unnecessarily inefficiently right now. In my benchmarks, I found that Conn.ReadBatch was significantly faster than reading from a ConsumerGroup, so I think there is a lot to gain. (This was a while ago.. have to look for the results..)
Thanks for considering,
Rob

3 remaining items

robfig

robfig commented on Feb 15, 2022

@robfig

I don't have a benchmark setup or any code to share, sorry. The code was the trivial code you might expect -- a loop repeatedly calling FetchMessage or ReadBatch to read all the data in a partition. The Kafka connection code would not be helpful and I'm not sure how I could even share the Kafka partition state. I'm no longer working on this project so I won't be able to help further here, sorry.

vorandrew

vorandrew commented on Mar 6, 2022

@vorandrew

Batch mode - It's definitely must be done...

My thoughts: active + prefetched batch

When you done processing active batch:

  1. synchronously switch to prefetched batch
  2. asynchronously submit offset of the last message of the finished batch + prefetch next batch

In such way you have

  1. "at least once" guarantee - If crash happens you reprocess one batch only...
  2. Fetch and offset commit become async - no latency
  3. You sending offsets rarely
achille-roussel

achille-roussel commented on Mar 18, 2022

@achille-roussel
Contributor

Thanks for the suggestions @SantaCruz123

I agree it would be a useful improvement for the kafka.Reader type to support reading and committing batches.

Would you be able to submit one or more PRs to implement the suggestions you made?

anatoly314

anatoly314 commented on Dec 29, 2023

@anatoly314

Sorry guys, couldn't find it in the manual and it's not clear from the discussion. Is it already implemented? i.e. can I get batch messages from the kafka and commit? If yes I would be thankful if someone will point me to the example/tutorial of how to use it. Thank you in advance!

anatoly314

anatoly314 commented on Dec 29, 2023

@anatoly314

And another question, you mentioned Conn.ReadBatch is it meant to be used by library consumers or only for internal use? I have feeling that right now it's only for internal use, am I right?

mohamadrezza

mohamadrezza commented on Jul 31, 2024

@mohamadrezza

There’s already a way to do this via the FetchMessage/CommitMessages methods of the reader, the program can read messages one at a time and aggregate them to them in a batch which can then be committed.

Would this fit your use case? If not, I’d be interested to hear more details about it.

could you please provide an example?
I need to read messages in batch and commit all of them... the problem is reader.FetchMessage/ReadMessage read messages one by one, I want to read them all in once

anatoly-intel

anatoly-intel commented on Jul 31, 2024

@anatoly-intel

@mohamadrezza as far as I understood:

  1. Even if you use FetchMessage/ReadMessage behind the scenes kafka-go read messages in batch, so actually you get messages that already fetched
  2. When you commit last read message, all previous messages being committed too

So you can read messages as many as you need and then commit the last one.

mohamadrezza

mohamadrezza commented on Jul 31, 2024

@mohamadrezza

@mohamadrezza as far as I understood:

  1. Even if you use FetchMessage/ReadMessage behind the scenes kafka-go read messages in batch, so actually you get messages that already fetched
  2. When you commit last read message, all previous messages being committed too

So you can read messages as many as you need and then commit the last one.

thanks @anatoly-intel
how can access batch messages in once? read and fetch message only give one message at time.
I want to receive array of messages.
is there an example for this scenario?

anatoly314

anatoly314 commented on Jul 31, 2024

@anatoly314

I don't think it's possible now

added a commit that references this issue on Jun 8, 2025
b770811
linked a pull request that will close this issue on Jun 8, 2025
added a commit that references this issue on Jun 8, 2025
9938849
krsoninikhil

krsoninikhil commented on Jun 8, 2025

@krsoninikhil

Hey everyone, I was looking into implementing this batch reading API on existing Reader. Like others pointed out, FetchMessage is already using ReadBatch under hood, so would holding off returning messages until the count reaches the required batch size be the right approach to achieve this?

I've implemented the above approach in #1390 to review. If this looks good, then I can go ahead with some test cases. Looking forward to hear from the maintainers.

added 2 commits that reference this issue on Jun 8, 2025
1db836a
17cc32f
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Metadata

Metadata

Labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

    Development

    Participants

    @dominicbarnes@vorandrew@robfig@achille-roussel@rjenkins

    Issue actions

      Way to get batch messages and commit if the batch is successful · Issue #123 · segmentio/kafka-go