Skip to content

Commit

Permalink
fix(producer): replace time.After with time.Timer to avoid high memor…
Browse files Browse the repository at this point in the history
…y usage (#2355)
  • Loading branch information
napallday committed Oct 4, 2022
1 parent 50ceb93 commit 0162486
Showing 1 changed file with 9 additions and 3 deletions.
12 changes: 9 additions & 3 deletions async_producer.go
Expand Up @@ -891,7 +891,7 @@ type brokerProducer struct {
abandoned chan struct{}

buffer *produceSet
timer <-chan time.Time
timer *time.Timer
timerFired bool

closing error
Expand All @@ -900,6 +900,7 @@ type brokerProducer struct {

func (bp *brokerProducer) run() {
var output chan<- *produceSet
var timerChan <-chan time.Time
Logger.Printf("producer/broker/%d starting up\n", bp.broker.ID())

for {
Expand Down Expand Up @@ -969,12 +970,14 @@ func (bp *brokerProducer) run() {
}

if bp.parent.conf.Producer.Flush.Frequency > 0 && bp.timer == nil {
bp.timer = time.After(bp.parent.conf.Producer.Flush.Frequency)
bp.timer = time.NewTimer(bp.parent.conf.Producer.Flush.Frequency)
timerChan = bp.timer.C
}
case <-bp.timer:
case <-timerChan:
bp.timerFired = true
case output <- bp.buffer:
bp.rollOver()
timerChan = nil
case response, ok := <-bp.responses:
if ok {
bp.handleResponse(response)
Expand Down Expand Up @@ -1034,6 +1037,9 @@ func (bp *brokerProducer) waitForSpace(msg *ProducerMessage, forceRollover bool)
}

func (bp *brokerProducer) rollOver() {
if bp.timer != nil {
bp.timer.Stop()
}
bp.timer = nil
bp.timerFired = false
bp.buffer = newProduceSet(bp.parent)
Expand Down

0 comments on commit 0162486

Please sign in to comment.