From 115ecd8c605ab8fe7fc6242cc1431e1b16a1de53 Mon Sep 17 00:00:00 2001 From: Michael Benford Date: Tue, 8 Sep 2020 15:27:48 -0300 Subject: [PATCH] Prevent panics from happening when the buffer is closed --- buffer.go | 45 +++++++++++++++++++++++++++++++++++------ buffer_test.go | 54 ++++++++++++++++++++++++++++++++++++++++++++------ 2 files changed, 87 insertions(+), 12 deletions(-) diff --git a/buffer.go b/buffer.go index 084ae4a..8fae11e 100644 --- a/buffer.go +++ b/buffer.go @@ -9,6 +9,8 @@ import ( var ( // ErrTimeout indicates an operation has timed out. ErrTimeout = errors.New("operation timed-out") + // ErrClosed indicates the buffer is closed and can no longer be used. + ErrClosed = errors.New("buffer is closed") ) type ( @@ -23,9 +25,15 @@ type ( } ) -// Push appends an item to the end of the buffer. It times out if it cannot be -// performed in a timely fashion. +// Push appends an item to the end of the buffer. +// +// It returns an ErrTimeout if if cannot be performed in a timely fashion, and +// an ErrClosed if the buffer has been closed. func (buffer *Buffer) Push(item interface{}) error { + if buffer.closed() { + return ErrClosed + } + select { case buffer.dataCh <- item: return nil @@ -34,9 +42,15 @@ func (buffer *Buffer) Push(item interface{}) error { } } -// Flush outputs the buffer to a permanent destination. It times out if it cannot be -// performed in a timely fashion. +// Flush outputs the buffer to a permanent destination. +// +// It returns an ErrTimeout if if cannot be performed in a timely fashion, and +// an ErrClosed if the buffer has been closed. func (buffer *Buffer) Flush() error { + if buffer.closed() { + return ErrClosed + } + select { case buffer.flushCh <- struct{}{}: return nil @@ -45,9 +59,19 @@ func (buffer *Buffer) Flush() error { } } -// Close flushes the buffer and prevents it from being further used. If it succeeds, -// the buffer cannot be used after it has been closed as all further operations will panic. +// Close flushes the buffer and prevents it from being further used. +// +// It returns an ErrTimeout if if cannot be performed in a timely fashion, and +// an ErrClosed if the buffer has already been closed. +// +// An ErrTimeout can either mean that a flush could not be triggered, or it can +// mean that a flush was triggered but it has not finished yet. In any case it is +// safe to call Close again. func (buffer *Buffer) Close() error { + if buffer.closed() { + return ErrClosed + } + select { case buffer.closeCh <- struct{}{}: // noop @@ -66,6 +90,15 @@ func (buffer *Buffer) Close() error { } } +func (buffer Buffer) closed() bool { + select { + case <-buffer.doneCh: + return true + default: + return false + } +} + func (buffer *Buffer) consume() { count := 0 items := make([]interface{}, buffer.options.Size) diff --git a/buffer_test.go b/buffer_test.go index f74055a..1fe72e6 100644 --- a/buffer_test.go +++ b/buffer_test.go @@ -126,6 +126,21 @@ var _ = Describe("Buffer", func() { Expect(err2).To(Succeed()) Expect(err3).To(MatchError(buffer.ErrTimeout)) }) + + It("fails when the buffer is closed", func() { + // arrange + sut := buffer.New( + buffer.WithSize(2), + buffer.WithFlusher(flusher), + ) + _ = sut.Close() + + // act + err := sut.Push(1) + + // assert + Expect(err).To(MatchError(buffer.ErrClosed)) + }) }) Context("Flushing", func() { @@ -204,6 +219,21 @@ var _ = Describe("Buffer", func() { // assert Expect(err).To(MatchError(buffer.ErrTimeout)) }) + + It("fails when the buffer is closed", func() { + // arrange + sut := buffer.New( + buffer.WithSize(2), + buffer.WithFlusher(flusher), + ) + _ = sut.Close() + + // act + err := sut.Flush() + + // assert + Expect(err).To(MatchError(buffer.ErrClosed)) + }) }) Context("Closing", func() { @@ -244,7 +274,7 @@ var _ = Describe("Buffer", func() { Expect(err).To(MatchError(buffer.ErrTimeout)) }) - It("allow Close to be called again if it fails", func() { + It("fails when the buffer is closed", func() { // arrange flusher.Func = func() { time.Sleep(2 * time.Second) } @@ -253,22 +283,34 @@ var _ = Describe("Buffer", func() { buffer.WithFlusher(flusher), buffer.WithCloseTimeout(time.Second), ) - _ = sut.Push(1) + _ = sut.Close() // act err := sut.Close() // assert - Expect(err).To(MatchError(buffer.ErrTimeout)) + Expect(err).To(MatchError(buffer.ErrClosed)) + }) + It("allows Close to be called again if it fails", func() { // arrange - time.Sleep(time.Second) + flusher.Func = func() { time.Sleep(2 * time.Second) } + + sut := buffer.New( + buffer.WithSize(1), + buffer.WithFlusher(flusher), + buffer.WithCloseTimeout(time.Second), + ) + _ = sut.Push(1) // act - err = sut.Close() + err1 := sut.Close() + time.Sleep(time.Second) + err2 := sut.Close() // assert - Expect(err).To(BeNil()) + Expect(err1).To(MatchError(buffer.ErrTimeout)) + Expect(err2).To(Succeed()) }) }) })