Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Investigate why channels are faster #13

Closed
binary132 opened this issue Nov 14, 2016 · 10 comments
Closed

Investigate why channels are faster #13

binary132 opened this issue Nov 14, 2016 · 10 comments

Comments

@binary132
Copy link

I'm curious why channels are faster than the "function-based" consumer API. Has anyone profiled this?

@EVODelavega
Copy link

Check the consumer_performance_test.go file, it has been checked. Reading through consumer.go, it's pretty clear that both do the same thing (c.handle.eventPoll to get the events). However, the channel-based consumer fetches more than one message at a time, in a separate go-routine. the function-based (c.Poll) approach only fetches a single event, and the call c.Poll(100) essentially blocks the current routine up to 100ms, making the overall approach slower (because of the additional function calls etc...)

@gwenshap
Copy link

gwenshap commented Dec 6, 2016

Why do you think c.Poll only fetches a single event? My understanding is that it wraps the Kafka fetch request which can return any number of events (you can even configure a minimum). In addition, c.Poll(100) will only block if there are no new request, so in a standard benchmark where the Topic is pre-loaded, I wouldn't expect any blocking at all.

@EVODelavega
Copy link

EVODelavega commented Dec 6, 2016

The third argument in the call to c.handle.eventPoll is a hard-coded 1

// Poll the consumer for messages or events.
//
// Will block for at most timeoutMs milliseconds
//
// The following callbacks may be triggered:
//   Subscribe()'s rebalanceCb
//
// Returns nil on timeout, else an Event
func (c *Consumer) Poll(timeoutMs int) (event Event) {
	ev, _ := c.handle.eventPoll(nil, timeoutMs, 1, nil)
	return ev
}

Combined with func (h *handle) eventPoll(channel chan Event, timeoutMs int, maxEvents int, termChan chan bool) (Event, bool) { <-- that makes the maxEvents is 1 for every c.Poll call. It can't work in any other way, because the c.Poll function returns a single Event...

WRT to the blocking: yes, that's why I said that c.Poll will block the current routine for up to 100ms. If the event is fetched within the 100 ms timeout, then obviously c.Poll will return earlier. My point was that: Yes, the difference in performance has been verified, and looking at the code, I tried to provide an educated guess as to why channels generally are faster

@binary132
Copy link
Author

binary132 commented Dec 6, 2016

Perhaps it would make sense to put a configurable receive batch size in kafka.Consumer? Then the user can specify the network behavior of the client, and the same value could be used for the eventPoll maxEvents parameter regardless of whether the user is using the "channel method" or the "function method".

@EVODelavega
Copy link

@binary132 That would mean c.Poll should return a slice of len 0-N, and cap of N, however this approach would probably cause the function-based approach to suffer from the same potential problem as the channel-based one (ie outdated messages/events being consumed). Imagine this:

ch := make(chan []Event, 10)
go func() {
    if events, err := c.Poll(100, 10); err != nil { // 10 being the max events to fetch
        log.Warn(err) // or something
    }
    ch <- events
}()
go func() {
    if events, err := c.Poll(100, 5); err != nil {
        log.Warn(err)
    }
    ch <- events
}

Wouldn't this pose problems WRT order of the messages and how they're pushed onto the channel?

And all things aside, the c.Poll function should be implemented to use yet another channel:

// current implementation:
ev, _ := c.handle.eventPoll(nil, timeoutMs, 1, nil)
// should be
tch := make(chan bool)
ch := make(chan Event, maxEvents) // max events being the second param
_, err := c.handle.eventPoll(ch, timeoutMs, maxEvents, tch)
// handle err, check/wait for <-tch etc... (all the stuff consumeReader does, basically)
close(ch)
es := make([]Event, 0, maxEvents)
for e := range ch {
    es = append(es, e)
}
return es

And this is grossly oversimplifying it, in part because I'm going through the source while writing this response. Everything that you need to do, just to get c.Poll to return a slice rather than a single event adds a lot of complexity and potential issues. If you need to get 10 messages using a consumer, just create a new consumer and set go.events.channel.size to 10 in the config. Close the consumer after that and you're done

@binary132
Copy link
Author

binary132 commented Dec 6, 2016

On a quick first glance I had assumed handle.eventPoll returns a slice since it's called with a max receive size. I see now that it writes to the given channel in a loop, and may return a single message.

I looked over the librdkafka documentation, and now I see why a slice return value isn't right here. Maybe a compromise could be to offer the user a lower-level Consumer.PollQueue wrapper for C._rk_queue_poll (which is how the librdkafka docs say a client application is expected to use the API.)

Introducing more channels introduces more mutex thrashing, and introducing more coroutines introduces more concurrent (i.e. unpredictable) behaviors the package user is unaware of. I know coroutines are used in other packages such as database/sql, but both of those factors make me uncomfortable as a package user.

@EVODelavega
Copy link

@binary132 Adding a Poll-like function returning a slice of messages might be worth considering, although to mitigate as many of the concurrency issues you mention, perhaps a blocking call is required. If so, it might be worth considering to pass a callback function to handle every fetched event as it is consumed. Perhaps create an issue suggesting something along the lines of:

// type for brevity
type EventCallback func(Event)
func (c *Consumer) PollQueue(timeoutMS, maxEvents int, callback EventCallback) int {
    // poll underlying consumer and return the actual number of events processed?
}

@binary132
Copy link
Author

binary132 commented Dec 12, 2016

Yes, I'm saying a blocking call without the use of goroutines or callbacks would be a desirable API feature to some users, especially those to whom performance is the critical feature of this package. As to callbacks: Go isn't NodeJS, callbacks in an exported API are almost never idiomatic Go, and adding a callback on every message will reintroduce the stack thrashing performance drawback of the current synchronous Consumer.Poll.

My concern with simply exposing C._rk_queue_poll is that users will then also have to handle Kafka events other than messages, but maybe users who want a lowlevel sync call are willing to make the tradeoff?

The typical Go idiom is that the package user implements concurrency as desired. You'll notice there are nearly zero async calls in the Go stdlib, and even the async internals of database/sql (which relate to connection pools) are hidden behind a synchronous API. time.Timer is the first obvious exception that comes to mind, but it's a synchronization tool.

@binary132
Copy link
Author

binary132 commented Dec 12, 2016

This is important because Go has the capacity to match well-written C++ in performance, but in practice seldom does, and at times is considered a second-class citizen of the high-performance world. This is an issue at the user and package level rather than at the language level.

My team is using a channel-driven Go implementation of a Kafka client (github.com/Shopify/sarama) and confluent-kafka-go offers a potentially significant performance advantage. Since this is emphasized as a key feature, I think it's valid to at least offer the user a choice between a lowlevel synchronous wrapper and a concurrent helper, if not to have the concurrency entirely implemented as a helper type or package. But, I won't stress that since it's clearly been built around it so far.

It is certainly worth exposing a synchronous client API as something besides a layer on top of an underlying concurrent wrapper over an underlying synchronous call to an underlying poll.

@EVODelavega
Copy link

@binary132 I was just suggesting that, if you feel there is a clear, valid, use-case for a synchronous,(potentially) blocking call, then the best course of action would be to open a separate issue requesting that feature.

The suggestion/notion of callbacks stems from having used librdkafka a while back. Having a, for lack of a better word, sequential consumer to me suggests that you may want to handle each message before fetching the next. Why not pass a handler function when doing that? That function can then process the data and commit the offset. I'd consider that a fairly reasonable thing to do, no?
It's also because the synchronous approach would, almost by definition be more low-level that I'd expect said API to expose more of the underlying librdkafka library, which does tend to use a fair number of callbacks (consume_cb, error_cb etc...)

Either way, might be worth opening a new issue to get some feedback on whether or not this feature is likely to be added. The author of librdkafka also seems to be very active on this repo. There probably is no better place to ask than here 😄

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants