Skip to content

Commit

Permalink
Fix incomplete flush in the sender when flushing the client
Browse files Browse the repository at this point in the history
In some case, the sender 'sendLoop' would pop a buffer from its queue
as we are calling its 'flush' method. Since the input queue is empty it
would return instantly without waiting for the current buffer own by the
sender to be sent. This would result in partial flush.

We now synchronize 'flush' and 'sendLoop' to make sure flushing the
client will send everything to the wire. The side effect of this is that
the workers are lock for longer as we keep them locked when flushing the
sender.
  • Loading branch information
hush-hush committed Aug 24, 2020
1 parent b5f99e5 commit 1df04d3
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 16 deletions.
36 changes: 24 additions & 12 deletions statsd/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,22 @@ type SenderMetrics struct {
}

type sender struct {
transport statsdWriter
pool *bufferPool
queue chan *statsdBuffer
metrics *SenderMetrics
stop chan struct{}
transport statsdWriter
pool *bufferPool
queue chan *statsdBuffer
metrics *SenderMetrics
stop chan struct{}
flushSignal chan struct{}
}

func newSender(transport statsdWriter, queueSize int, pool *bufferPool) *sender {
sender := &sender{
transport: transport,
pool: pool,
queue: make(chan *statsdBuffer, queueSize),
metrics: &SenderMetrics{},
stop: make(chan struct{}),
transport: transport,
pool: pool,
queue: make(chan *statsdBuffer, queueSize),
metrics: &SenderMetrics{},
stop: make(chan struct{}),
flushSignal: make(chan struct{}),
}

go sender.sendLoop()
Expand Down Expand Up @@ -95,11 +97,17 @@ func (s *sender) sendLoop() {
s.write(buffer)
case <-s.stop:
return
case <-s.flushSignal:
// At that point we know that the workers are paused (the statsd client
// will pause them before calling sender.flush()).
// So we can fully flush the input queue
s.flushInputQueue()
s.flushSignal <- struct{}{}
}
}
}

func (s *sender) flush() {
func (s *sender) flushInputQueue() {
for {
select {
case buffer := <-s.queue:
Expand All @@ -109,10 +117,14 @@ func (s *sender) flush() {
}
}
}
func (s *sender) flush() {
s.flushSignal <- struct{}{}
<-s.flushSignal
}

func (s *sender) close() error {
s.stop <- struct{}{}
<-s.stop
s.flush()
s.flushInputQueue()
return s.transport.Close()
}
13 changes: 9 additions & 4 deletions statsd/statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,9 +373,10 @@ func (c *Client) watch() {
}
}

// Flush forces a flush of all the queued dogstatsd payloads
// This method is blocking and will not return until everything is sent
// through the network
// Flush forces a flush of all the queued dogstatsd payloads This method is
// blocking and will not return until everything is sent through the network.
// In MutexMode, this will also block sampling new data to the client while the
// workers and sender are flushed.
func (c *Client) Flush() error {
if c == nil {
return ErrNoClient
Expand All @@ -384,8 +385,12 @@ func (c *Client) Flush() error {
c.agg.sendMetrics()
}
for _, w := range c.workers {
w.flush()
w.pause()
defer w.unpause()
w.flushUnsafe()
}
// Now that the worker are pause the sender can flush the queue between
// worker and senders
c.sender.flush()
return nil
}
Expand Down
8 changes: 8 additions & 0 deletions statsd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,14 @@ func (w *worker) flush() {
w.Unlock()
}

func (w *worker) pause() {
w.Lock()
}

func (w *worker) unpause() {
w.Unlock()
}

// flush the current buffer. Lock must be held by caller.
// flushed buffer written to the network asynchronously.
func (w *worker) flushUnsafe() {
Expand Down

0 comments on commit 1df04d3

Please sign in to comment.