You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
If all messages processing succesful - Commit the offset
If any of the message processing fails - Do not commit the offset and read the same batch again and try to process the messages and if successful commit the offset.
vikrampunchh, gauravds, jhhurwitz, mestroma, mhmtszr and 4 more
There’s already a way to do this via the FetchMessage/CommitMessages methods of the reader, the program can read messages one at a time and aggregate them to them in a batch which can then be committed.
Would this fit your use case? If not, I’d be interested to hear more details about it.
@achille-roussel Thanks for the response. The way you suggested is correct but there is no way to read the same batch again if I am using specifying the GroupID in my reader. In my use case I have to specify the GroupID and I cannot use the reader.SetOffset(offSet) to reread the same batch again. Please suggest if there is any way.
If you need access to the batch received by a single fetch request we'd need a new API indeed.
This isn't a use case that came up at Segment so it's unlikely that we will find time to dedicate to building this feature, but if you are available to work on it we would gladly review and merge the change.
I have faced an issue with using batch reading using reader and FetchMessage/CommitMessages methods, i have made simple tests (example here - https://play.golang.org/p/aeYfRBfWIlQ), and i have cases when i started new consumer, rebalancing happened and the new consumer can start consuming the uncommitted but already fetched messages and i got doubling of the data
We have the same request. For example, we consume messages to load into ElasticSearch, which can index many more documents if you provide it a batch at a time, instead of one-by-one. When using the Reader API, there is no way to access the batch of messages that Kafka returned, which would be the easiest and most efficient thing. If we use our own batch size, then we have to use an arbitrary timeout and introduce unnecessary delays whenever there is not a full batch of messages waiting on the topic.
I would be happy to contribute an implementation if anyone has guidance on what the API should be and the things that need to change.
Looking at the current Reader implementation, it uses Batches under the covers, sending the messages in each batch one at a time over a channel. I have two alternate proposals for enabling reading a batch at a time:
Add a "batch mode" bit to ReaderConfig
typeReaderConfigstruct {
// Batched causes the Reader to return a Batch at a time instead of a Message at a time.// If this is set, callers must use FetchBatch/ReadBatch instead of their Message equivalents.Batchedbool
}
func (*Reader) FetchBatch(context.Context) (*Batch, error)
func (*Reader) ReadBatch(context.Context) (*Batch, error)
Add a new Batched Reader
// NewBatchedReader returns a reader which returns each batch read to callersfuncNewBatchedReader(cfgReaderConfig) (*BatchedReader)
// BatchedReader has a similar interface to Reader, except with FetchBatch/ReadBatch instead of *Message
Thoughts?
ludanfeng, gpestana, sizovilya, kishaningithub, Sneagan and 7 more
Have you looked at potentially using the kafka.ConsumerGroup API that the package exposes? Paired with kafka.Conn or kafka.Client to fetch batches of messages, you should be able to gain greater control over the timing of commits.
Let us know if this is helpful. If not, let's explore adding new APIs to kafka.Reader to expose batches instead of a individual messages 👍
Hi @achille-roussel ,
The ConsumerGroup API does not really do what I wanted to. It does not expose the batch of messages, only one message at a time. You can get a batch of messages from a Conn, but that is specific to a partition and does not use consumer groups. We use this library quite a bit internally, and we would have many places we would use this API. We use Consumer Groups everywhere (which I think is typical practice) and many message types benefit from batched processing, but that is done unnecessarily inefficiently right now. In my benchmarks, I found that Conn.ReadBatch was significantly faster than reading from a ConsumerGroup, so I think there is a lot to gain. (This was a while ago.. have to look for the results..)
Thanks for considering,
Rob
I don't have a benchmark setup or any code to share, sorry. The code was the trivial code you might expect -- a loop repeatedly calling FetchMessage or ReadBatch to read all the data in a partition. The Kafka connection code would not be helpful and I'm not sure how I could even share the Kafka partition state. I'm no longer working on this project so I won't be able to help further here, sorry.
Sorry guys, couldn't find it in the manual and it's not clear from the discussion. Is it already implemented? i.e. can I get batch messages from the kafka and commit? If yes I would be thankful if someone will point me to the example/tutorial of how to use it. Thank you in advance!
And another question, you mentioned Conn.ReadBatch is it meant to be used by library consumers or only for internal use? I have feeling that right now it's only for internal use, am I right?
There’s already a way to do this via the FetchMessage/CommitMessages methods of the reader, the program can read messages one at a time and aggregate them to them in a batch which can then be committed.
Would this fit your use case? If not, I’d be interested to hear more details about it.
could you please provide an example?
I need to read messages in batch and commit all of them... the problem is reader.FetchMessage/ReadMessage read messages one by one, I want to read them all in once
Even if you use FetchMessage/ReadMessage behind the scenes kafka-go read messages in batch, so actually you get messages that already fetched
When you commit last read message, all previous messages being committed too
So you can read messages as many as you need and then commit the last one.
thanks @anatoly-intel
how can access batch messages in once? read and fetch message only give one message at time.
I want to receive array of messages.
is there an example for this scenario?
Hey everyone, I was looking into implementing this batch reading API on existing Reader. Like others pointed out, FetchMessage is already using ReadBatch under hood, so would holding off returning messages until the count reaches the required batch size be the right approach to achieve this?
I've implemented the above approach in #1390 to review. If this looks good, then I can go ahead with some test cases. Looking forward to hear from the maintainers.
Activity
achille-roussel commentedon Oct 12, 2018
There’s already a way to do this via the FetchMessage/CommitMessages methods of the reader, the program can read messages one at a time and aggregate them to them in a batch which can then be committed.
Would this fit your use case? If not, I’d be interested to hear more details about it.
sanchitlohia-ovo commentedon Oct 12, 2018
@achille-roussel Thanks for the response. The way you suggested is correct but there is no way to read the same batch again if I am using specifying the GroupID in my reader. In my use case I have to specify the GroupID and I cannot use the reader.SetOffset(offSet) to reread the same batch again. Please suggest if there is any way.
achille-roussel commentedon Nov 25, 2018
If you need access to the batch received by a single fetch request we'd need a new API indeed.
This isn't a use case that came up at Segment so it's unlikely that we will find time to dedicate to building this feature, but if you are available to work on it we would gladly review and merge the change.
AntonKuzValid commentedon Jul 8, 2019
I have faced an issue with using batch reading using reader and FetchMessage/CommitMessages methods, i have made simple tests (example here - https://play.golang.org/p/aeYfRBfWIlQ), and i have cases when i started new consumer, rebalancing happened and the new consumer can start consuming the uncommitted but already fetched messages and i got doubling of the data
robfig commentedon Oct 15, 2020
We have the same request. For example, we consume messages to load into ElasticSearch, which can index many more documents if you provide it a batch at a time, instead of one-by-one. When using the Reader API, there is no way to access the batch of messages that Kafka returned, which would be the easiest and most efficient thing. If we use our own batch size, then we have to use an arbitrary timeout and introduce unnecessary delays whenever there is not a full batch of messages waiting on the topic.
I would be happy to contribute an implementation if anyone has guidance on what the API should be and the things that need to change.
robfig commentedon Oct 20, 2020
Looking at the current Reader implementation, it uses Batches under the covers, sending the messages in each batch one at a time over a channel. I have two alternate proposals for enabling reading a batch at a time:
Thoughts?
robfig commentedon Oct 26, 2020
@achille-roussel wondering if you have any thoughts about the API?
robfig commentedon May 9, 2021
Tagging @alecthomas and @dominicbarnes as two recently-active maintainers to provide some feedback on the API design.
achille-roussel commentedon May 28, 2021
Hey @robfig! Thanks for following up on this!
Have you looked at potentially using the
kafka.ConsumerGroup
API that the package exposes? Paired withkafka.Conn
orkafka.Client
to fetch batches of messages, you should be able to gain greater control over the timing of commits.Let us know if this is helpful. If not, let's explore adding new APIs to
kafka.Reader
to expose batches instead of a individual messages 👍robfig commentedon May 29, 2021
Hi @achille-roussel ,
The ConsumerGroup API does not really do what I wanted to. It does not expose the batch of messages, only one message at a time. You can get a batch of messages from a Conn, but that is specific to a partition and does not use consumer groups. We use this library quite a bit internally, and we would have many places we would use this API. We use Consumer Groups everywhere (which I think is typical practice) and many message types benefit from batched processing, but that is done unnecessarily inefficiently right now. In my benchmarks, I found that Conn.ReadBatch was significantly faster than reading from a ConsumerGroup, so I think there is a lot to gain. (This was a while ago.. have to look for the results..)
Thanks for considering,
Rob
3 remaining items
robfig commentedon Feb 15, 2022
I don't have a benchmark setup or any code to share, sorry. The code was the trivial code you might expect -- a loop repeatedly calling
FetchMessage
orReadBatch
to read all the data in a partition. The Kafka connection code would not be helpful and I'm not sure how I could even share the Kafka partition state. I'm no longer working on this project so I won't be able to help further here, sorry.vorandrew commentedon Mar 6, 2022
Batch mode - It's definitely must be done...
My thoughts: active + prefetched batch
When you done processing active batch:
In such way you have
achille-roussel commentedon Mar 18, 2022
Thanks for the suggestions @SantaCruz123
I agree it would be a useful improvement for the
kafka.Reader
type to support reading and committing batches.Would you be able to submit one or more PRs to implement the suggestions you made?
anatoly314 commentedon Dec 29, 2023
Sorry guys, couldn't find it in the manual and it's not clear from the discussion. Is it already implemented? i.e. can I get batch messages from the kafka and commit? If yes I would be thankful if someone will point me to the example/tutorial of how to use it. Thank you in advance!
anatoly314 commentedon Dec 29, 2023
And another question, you mentioned Conn.ReadBatch is it meant to be used by library consumers or only for internal use? I have feeling that right now it's only for internal use, am I right?
mohamadrezza commentedon Jul 31, 2024
could you please provide an example?
I need to read messages in batch and commit all of them... the problem is reader.FetchMessage/ReadMessage read messages one by one, I want to read them all in once
anatoly-intel commentedon Jul 31, 2024
@mohamadrezza as far as I understood:
So you can read messages as many as you need and then commit the last one.
mohamadrezza commentedon Jul 31, 2024
thanks @anatoly-intel
how can access batch messages in once? read and fetch message only give one message at time.
I want to receive array of messages.
is there an example for this scenario?
anatoly314 commentedon Jul 31, 2024
I don't think it's possible now
Add method to fetch messages in batch
Add method to fetch messages in batch
krsoninikhil commentedon Jun 8, 2025
Hey everyone, I was looking into implementing this batch reading API on existing
Reader
. Like others pointed out,FetchMessage
is already usingReadBatch
under hood, so would holding off returning messages until the count reaches the required batch size be the right approach to achieve this?I've implemented the above approach in #1390 to review. If this looks good, then I can go ahead with some test cases. Looking forward to hear from the maintainers.
Add method to fetch messages in batch
Add method to fetch messages in batch