Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
test:
@go run github.com/onsi/ginkgo/ginkgo -keepGoing -progress -timeout 1m -race
@go run github.com/onsi/ginkgo/ginkgo -keepGoing -progress -timeout 1m -race --randomizeAllSpecs --randomizeSuites
23 changes: 13 additions & 10 deletions buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,22 +45,25 @@ func (buffer *Buffer) Flush() error {
}
}

// Close flushes the buffer and prevents it from being further used. The buffer
// cannot be used after it has been closed as all further operations will panic.
// Close flushes the buffer and prevents it from being further used. If it succeeds,
// the buffer cannot be used after it has been closed as all further operations will panic.
func (buffer *Buffer) Close() error {
close(buffer.closeCh)
select {
case buffer.closeCh <- struct{}{}:
// noop
case <-time.After(buffer.options.CloseTimeout):
return ErrTimeout
}

var err error
select {
case <-buffer.doneCh:
err = nil
close(buffer.dataCh)
close(buffer.flushCh)
close(buffer.closeCh)
return nil
case <-time.After(buffer.options.CloseTimeout):
err = ErrTimeout
return ErrTimeout
}

close(buffer.dataCh)
close(buffer.flushCh)
return err
}

func (buffer *Buffer) consume() {
Expand Down
27 changes: 27 additions & 0 deletions buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,33 @@ var _ = Describe("Buffer", func() {
// assert
Expect(err).To(MatchError(buffer.ErrTimeout))
})

It("allow Close to be called again if it fails", func() {
// arrange
flusher.Func = func() { time.Sleep(2 * time.Second) }

sut := buffer.New(
buffer.WithSize(1),
buffer.WithFlusher(flusher),
buffer.WithCloseTimeout(time.Second),
)
_ = sut.Push(1)

// act
err := sut.Close()

// assert
Expect(err).To(MatchError(buffer.ErrTimeout))

// arrange
time.Sleep(time.Second)

// act
err = sut.Close()

// assert
Expect(err).To(BeNil())
})
})
})

Expand Down