Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFC: Batch consume API #23

Closed
binary132 opened this issue Dec 13, 2016 · 15 comments
Closed

RFC: Batch consume API #23

binary132 opened this issue Dec 13, 2016 · 15 comments

Comments

@binary132
Copy link

binary132 commented Dec 13, 2016

@edenhill: After discussing with @EVODelavega in Issue #13 I've come here to ask for input. I'm not very familiar with librdkafka, but I understand kafka.Consumer.Poll uses the channel-based API, which wraps a call to C._rk_queue_poll internally, to receive a single message.

What do you think about exposing a lower-level call to librdkafka poll functions, without the channel-based helper, which can return a slice of messages for the user to handle?

Our use-case is to fetch a batch of messages, then process sequentially in a loop, committing only upon reaching some condition.

I think the problematic part is that package users will then have to manage Kafka status messages. That could be deferred to be handled by the user, or helpers could be offered.

@edenhill
Copy link
Contributor

I dont see a problem adding a PollBatch() (..batch is such a confusing word in this context), but it seems like you need to wrap the Consumer anyway (to filter certain events) in which case you should be able to implement the batching behaviour with a timed channel consumer loop, right?

@binary132
Copy link
Author

What I'm trying to get at is a way to use librdkafka through Go without adding the overhead of the Poll method directly using the underlying channel method. For some reason I thought there was a librdkafka function which would return an array of pending messages; if not, just wrapping the underlying _rk_queue_poll function seems like it would be sufficient. I just don't want to have to use the channel consumer, whether through a synchronous wrapper or not.

@ewencp
Copy link
Contributor

ewencp commented Dec 15, 2016

@edenhill I think the idea would be to add a C method to consumer.go that does something like:

rd_kafka_consumer_poll(... long timeout...)
while(true) {
  rd_kafka_consumer_poll(... 0 timeout...)
  if (no event returned) break;
}
return <list of messages>

i.e. try to get multiple messages via fewer cgo calls as long as they are available. I realize we use different APIs in the Go implementation, but I think the real question here is whether it will be possible to use a single cgo call to return a set of messages rather than just one at a time.

@edenhill
Copy link
Contributor

Yes, I understand, but I'm asking why this can't just be done with the existing channel interface?
Pseudo-code:

readBatch(wanted_cnt, my_timeout) *Message[]..  {
  msgs := make(..wanted_cnt)
  outer:
  while len(msgs) < wanted_cnt {
      select {
      case msg := <.c.Events()
          msgs.append(msg)
      case <-time.After(my_timeout):
        break outer
      }
  }
 return msgs
}

@ewencp
Copy link
Contributor

ewencp commented Dec 15, 2016

Ah, my presumption was that the cgo <-> c cost was too expensive. I haven't done anything to verify, just read previous threads where it seemed the 1 message per call issue became a performance issue.

@edenhill
Copy link
Contributor

Possibly yes, @binary132 - can you try the channel-based batch approach and see if it is performant enough for your needs?
If not we'll look into a C:ier alternative.

@binary132
Copy link
Author

binary132 commented Dec 15, 2016

The question isn't a team requirement, just a suggestion / RFC. I don't think it's a premature optimization since this package (and librdkafka) are marketed as the highest-performing Kafka client options, and exporting an idiomatic API is crucial to a community-leading project like this. Generally speaking, package users should implement their own concurrency and channel behaviors. However, Sarama has already established precedent for a channel-based exported API. I find that decision disagreeable too, but it is what it is.

It may be that incurring a mutex lock and unlock on every received message, in addition to the CGo calls, is inconsequential compared to I/O performance limitations for the vast majority of users.

I would be happy to help contribute this feature if you think it would be valuable.

See https://www.datadoghq.com/blog/go-performance-tales/ for some of the reasoning around channels (again, this is an optimization perspective that is irrelevant to most users at the application level, but may be important at the client package / IO level.)

@edenhill
Copy link
Contributor

Thanks for the feedback and some background, very interesting.

Generally speaking, package users should implement their own concurrency and channel behaviors.

Is this a general Go community standpoint? That packages should not provide their own channels?
As stated in the README we weren't sure if people were more interested in the channel or function APIs, and we really haven't gotten much wiser on that subject so that's why both are still in there.

One of the main draw-backs, performance wise, of Go channels is the lack of batch support: it is a well known fact that high-performance queues should be implemented in a batching fashion, that is: the consumer locks the queue, moves the entire queues content to a local queue, and then unlocks the queue. This keeps down on the number of times the queue is accessed by both producer and consumer threads and thus minimizes lock contention, improves cache coherency, allows smarter handling of buffers, etc. (This is how librdkafka's internal producer queue is implemented).

So yes, I now understand your concern and I wholly agree that we should provide a batch consume call.

@EVODelavega
Copy link

@edenhill I wouldn't say there's a hard-line community standpoint WRT channels and concurrency in packages. I suppose it depends. In the end, a package shouldn't/can't make assumptions as to how it'll be used. for all you know, the runtime might be constricted to a single thread (runtime.GOMAXPROCS(1)). A package can't do anything about that, but the user of said package could chose the optimal solution given the runtime restrictions.
Packages, I think, should offer convenience, abstract some nitty-gritty stuff, and ideally offer quick and easy solutions that cover the more common/generic use-cases. For specific use-cases, it falls on to the user to wield the package in a way that makes most sense. A low level API tends to provide the most flexibility (although it's riskier to use, but that again is the users responsibility).

TL;DR:

  • Is it a general community stand-point: AFAIK, there is no general standpoint, but it makes sense.
  • The current (higher-level) solutions the package offers are great, and should remain an integral part of the package. It allows users, particularly at this earlier stage, to hack out quick prototypes or PoC's
  • A low-level API would definitely add value to this package, although it means exposing more of the underlying librdkafka library, it also means users can implement the best solution for their specific problems.
  • Performance: As is the case with librdkafka, kafka in general, or indeed anything in life: performance depends on what you make of it, and what you want/need to do. Offering the channel-based API as an acceptable default implementation that will suite most users' needs (especially when prototyping or hacking out a quick PoC) is a major selling point at this stage. A low-level API is harder, as is everything else if you're gunning for performance

@binary132
Copy link
Author

binary132 commented Dec 21, 2016

@edenhill: "batching" Go channels can be done by sending slices, but then you're passing by reference and state can get hairy, plus you're potentially causing a lot of heap thrashing. On that note, slice allocation is always to the heap, a consumer returning a slice will constantly be causing allocations, so it could be worth offering something a bit akin to io.ReaderFrom, where the user can manage allocation and pass the slice to be read into to your consumer function, even maybe implementing something like io.WriterTo on your consumer type! (Write directly to socket from Kafka without copy?) But I'm getting a bit ahead of myself here. 😅 Getting into total performance analysis isn't the point of this RFC...

As to the community standard, I would argue that the Go standard library sets the highest standards for package design, code smell, etc. and hardly anything in the stdlib exports channel-based interfaces, except a few synchronization types such as time.Timer. But in this case I agree with @EVODelavega especially since Sarama already offers a channel consumer for Kafka. However, I would try to implement it as a helper function (not method) for consuming from a synchronous interface defined by this package (kafka.Consumer?) But that's also tangential to the RFC.

@edenhill glad we're in agreement! Let me know if there's anything I can do. I've actually been meaning to get more familiar with C idioms...

@edenhill edenhill changed the title RFC: Synchronous poll API RFC: Batch consume API Jan 20, 2017
@sandrom
Copy link

sandrom commented Jul 6, 2017

any news on this, as I would also be very interested in this. At the moment I am mostly sticking to sarama, cause the performance gains of librdkafka are not that big really and the pain of a c library then easily outweighs the pure go implementation. batching inbound and a less channel centric api could change quite a bit here. Channels are unfortunately not exactly the fastest beast on earth. While being easy to handle, I've consistently encountered to be able to push (sometimes dramatically) more performance out with mutexed implementations. We cant forget - channels mutex too. Thats not free. Might seem like an optimization, but the reason for adding a c library when using go is mainly that. Otherwise you'd not go through the pain of it :)

@j-mie
Copy link

j-mie commented Sep 21, 2018

Having something like this would be incredibly useful for doing bulk ElasticSearch index requests

@edenhill
Copy link
Contributor

Closed in favour of #128

@shubhang93
Copy link

Why was this closed in favour of #128 . Aren’t they unrelated? That issue is about running the library on Windows but this is about batch consumers

@musinit
Copy link

musinit commented May 17, 2023

Neither I understood, why is closed in favour of 128
We need to consume batches from Poll() as well, this would be so great. Currently we have to implement our own solution around.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

8 participants