Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 39 additions & 6 deletions buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
var (
// ErrTimeout indicates an operation has timed out.
ErrTimeout = errors.New("operation timed-out")
// ErrClosed indicates the buffer is closed and can no longer be used.
ErrClosed = errors.New("buffer is closed")
)

type (
Expand All @@ -23,9 +25,15 @@ type (
}
)

// Push appends an item to the end of the buffer. It times out if it cannot be
// performed in a timely fashion.
// Push appends an item to the end of the buffer.
//
// It returns an ErrTimeout if if cannot be performed in a timely fashion, and
// an ErrClosed if the buffer has been closed.
func (buffer *Buffer) Push(item interface{}) error {
if buffer.closed() {
return ErrClosed
}

select {
case buffer.dataCh <- item:
return nil
Expand All @@ -34,9 +42,15 @@ func (buffer *Buffer) Push(item interface{}) error {
}
}

// Flush outputs the buffer to a permanent destination. It times out if it cannot be
// performed in a timely fashion.
// Flush outputs the buffer to a permanent destination.
//
// It returns an ErrTimeout if if cannot be performed in a timely fashion, and
// an ErrClosed if the buffer has been closed.
func (buffer *Buffer) Flush() error {
if buffer.closed() {
return ErrClosed
}

select {
case buffer.flushCh <- struct{}{}:
return nil
Expand All @@ -45,9 +59,19 @@ func (buffer *Buffer) Flush() error {
}
}

// Close flushes the buffer and prevents it from being further used. If it succeeds,
// the buffer cannot be used after it has been closed as all further operations will panic.
// Close flushes the buffer and prevents it from being further used.
//
// It returns an ErrTimeout if if cannot be performed in a timely fashion, and
// an ErrClosed if the buffer has already been closed.
//
// An ErrTimeout can either mean that a flush could not be triggered, or it can
// mean that a flush was triggered but it has not finished yet. In any case it is
// safe to call Close again.
func (buffer *Buffer) Close() error {
if buffer.closed() {
return ErrClosed
}

select {
case buffer.closeCh <- struct{}{}:
// noop
Expand All @@ -66,6 +90,15 @@ func (buffer *Buffer) Close() error {
}
}

func (buffer Buffer) closed() bool {
select {
case <-buffer.doneCh:
return true
default:
return false
}
}

func (buffer *Buffer) consume() {
count := 0
items := make([]interface{}, buffer.options.Size)
Expand Down
54 changes: 48 additions & 6 deletions buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,21 @@ var _ = Describe("Buffer", func() {
Expect(err2).To(Succeed())
Expect(err3).To(MatchError(buffer.ErrTimeout))
})

It("fails when the buffer is closed", func() {
// arrange
sut := buffer.New(
buffer.WithSize(2),
buffer.WithFlusher(flusher),
)
_ = sut.Close()

// act
err := sut.Push(1)

// assert
Expect(err).To(MatchError(buffer.ErrClosed))
})
})

Context("Flushing", func() {
Expand Down Expand Up @@ -204,6 +219,21 @@ var _ = Describe("Buffer", func() {
// assert
Expect(err).To(MatchError(buffer.ErrTimeout))
})

It("fails when the buffer is closed", func() {
// arrange
sut := buffer.New(
buffer.WithSize(2),
buffer.WithFlusher(flusher),
)
_ = sut.Close()

// act
err := sut.Flush()

// assert
Expect(err).To(MatchError(buffer.ErrClosed))
})
})

Context("Closing", func() {
Expand Down Expand Up @@ -244,7 +274,7 @@ var _ = Describe("Buffer", func() {
Expect(err).To(MatchError(buffer.ErrTimeout))
})

It("allow Close to be called again if it fails", func() {
It("fails when the buffer is closed", func() {
// arrange
flusher.Func = func() { time.Sleep(2 * time.Second) }

Expand All @@ -253,22 +283,34 @@ var _ = Describe("Buffer", func() {
buffer.WithFlusher(flusher),
buffer.WithCloseTimeout(time.Second),
)
_ = sut.Push(1)
_ = sut.Close()

// act
err := sut.Close()

// assert
Expect(err).To(MatchError(buffer.ErrTimeout))
Expect(err).To(MatchError(buffer.ErrClosed))
})

It("allows Close to be called again if it fails", func() {
// arrange
time.Sleep(time.Second)
flusher.Func = func() { time.Sleep(2 * time.Second) }

sut := buffer.New(
buffer.WithSize(1),
buffer.WithFlusher(flusher),
buffer.WithCloseTimeout(time.Second),
)
_ = sut.Push(1)

// act
err = sut.Close()
err1 := sut.Close()
time.Sleep(time.Second)
err2 := sut.Close()

// assert
Expect(err).To(BeNil())
Expect(err1).To(MatchError(buffer.ErrTimeout))
Expect(err2).To(Succeed())
})
})
})
Expand Down