Skip to content

Commit

Permalink
events buffer is variable, no need to poll for 1000 events, if the bu…
Browse files Browse the repository at this point in the history
…ffer is 10
  • Loading branch information
EVODelavega committed Dec 12, 2016
1 parent 119aba6 commit 3e9343c
Showing 1 changed file with 8 additions and 3 deletions.
11 changes: 8 additions & 3 deletions kafka/consumer.go
Expand Up @@ -308,7 +308,7 @@ func NewConsumer(conf *ConfigMap) (*Consumer, error) {
c.readerTermChan = make(chan bool)

/* Start rdkafka consumer queue reader -> events writer goroutine */
go consumerReader(c, c.readerTermChan)
go consumerReader(c, c.readerTermChan, eventsChanSize)
}

return c, nil
Expand All @@ -329,15 +329,20 @@ func (c *Consumer) rebalance(ev Event) bool {
// consumerReader reads messages and events from the librdkafka consumer queue
// and posts them on the consumer channel.
// Runs until termChan closes
func consumerReader(c *Consumer, termChan chan bool) {
func consumerReader(c *Consumer, termChan chan bool, eventsChanSize int) {

out:
for true {
select {
case _ = <-termChan:
break out
default:
_, term := c.handle.eventPoll(c.events, 100, 1000, termChan)
_, term := c.handle.eventPoll(
c.events,
100,
eventsChanSize - len(c.events), // max events depends on available buffer
termChan,
)
if term {
break out
}
Expand Down

0 comments on commit 3e9343c

Please sign in to comment.