diff --git a/buffer.go b/buffer.go index 5a40320..614339c 100644 --- a/buffer.go +++ b/buffer.go @@ -17,6 +17,7 @@ type ( io.Closer dataCh chan interface{} flushCh chan struct{} + closeCh chan struct{} doneCh chan struct{} options *Options } @@ -44,19 +45,22 @@ func (buffer *Buffer) Flush() error { } } -// Close flushes the buffer and prevents it from being further used. It times -// out if it cannot be performed in a timely fashion. -// The buffer must not be used after it has been closed as all further operations will panic. +// Close flushes the buffer and prevents it from being further used. The buffer +// cannot be used after it has been closed as all further operations will panic. func (buffer *Buffer) Close() error { - close(buffer.dataCh) - close(buffer.flushCh) + close(buffer.closeCh) + var err error select { case <-buffer.doneCh: - return nil + err = nil case <-time.After(buffer.options.CloseTimeout): - return ErrTimeout + err = ErrTimeout } + + close(buffer.dataCh) + close(buffer.flushCh) + return err } func (buffer *Buffer) consume() { @@ -75,8 +79,10 @@ func (buffer *Buffer) consume() { mustFlush = count >= len(items) case <-ticker: mustFlush = count > 0 - case _, open := <-buffer.flushCh: - isOpen = open + case <-buffer.flushCh: + mustFlush = count > 0 + case <-buffer.closeCh: + isOpen = false mustFlush = count > 0 } @@ -90,7 +96,7 @@ func (buffer *Buffer) consume() { } stopTicker() - buffer.doneCh <- struct{}{} + close(buffer.doneCh) } func newTicker(interval time.Duration) (<-chan time.Time, func()) { @@ -124,6 +130,7 @@ func New(opts ...Option) *Buffer { buffer := &Buffer{ dataCh: make(chan interface{}), flushCh: make(chan struct{}), + closeCh: make(chan struct{}), doneCh: make(chan struct{}), options: options, }