From 99a01047e8762a9738ca13e3ba64361d3bb7bc11 Mon Sep 17 00:00:00 2001 From: Michael Benford Date: Sun, 2 Jan 2022 12:40:51 -0300 Subject: [PATCH] Add support for type parameters Add support for type parameters so that there is no need for any type-asserting inside the flush function. Also change New's signature to make the flush function mandatory, since it makes no sense to create a buffer without a way to flush out its contents. --- .github/workflows/go.yml | 2 +- .tool-versions | 1 - README.md | 131 +++++++++++++++++++++++++++++++++++++-- bench_test.go | 16 ++--- buffer.go | 65 ++++++++++--------- buffer_test.go | 109 ++++++++++++++++---------------- flusher.go | 15 ----- go.mod | 15 ++++- go.sum | 1 - options.go | 13 ---- options_test.go | 14 +---- 11 files changed, 241 insertions(+), 141 deletions(-) delete mode 100644 .tool-versions delete mode 100644 flusher.go diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 5950fd7..6fea364 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -16,7 +16,7 @@ jobs: - name: Set up Go 1.x uses: actions/setup-go@v2 with: - go-version: ^1.14 + go-version: 1.24 id: go - name: Check out code into the Go module directory diff --git a/.tool-versions b/.tool-versions deleted file mode 100644 index 6973eed..0000000 --- a/.tool-versions +++ /dev/null @@ -1 +0,0 @@ -golang 1.14 diff --git a/README.md b/README.md index 9cfec63..a3e113a 100644 --- a/README.md +++ b/README.md @@ -14,17 +14,140 @@ # go-buffer -`go-buffer` represents a buffer that asynchronously flushes its contents. It is useful for applications that need to aggregate data before writing it to an external storage. A buffer is flushed manually, or automatically when it becomes full or after an interval has elapsed, whichever comes first. +`go-buffer` represents a buffer that asynchronously flushes its contents. It is useful for applications that need to +aggregate data before writing it to an external storage. A buffer is flushed manually, or automatically when it becomes +full or after an interval has elapsed, whichever comes first. ## Installation go get github.com/globocom/go-buffer +Go < 1.18: + + go get github.com/globocom/go-buffer@v2 + ## Examples +> [!NOTE] +> For v2, see [Examples v2](#examples-v2). + +### Size-triggered flush + +```go +package main + +import ( + "time" + + "github.com/globocom/go-buffer/v3" +) + +func main() { + buff := buffer.New( + // call this function when the buffer needs flushing + func(items []string) { + for _, item := range items { + println(string) + } + }, + // buffer can hold up to 5 items + buffer.WithSize(5), + ) + // ensure the buffer + defer buff.Close() + + buff.Push("item 1") + buff.Push("item 2") + buff.Push("item 3") + buff.Push("item 4") + buff.Push("item 5") + + // block the current goroutine + time.Sleep(3 * time.Second) + + println("done") +} +``` + +### Interval-triggered flush + +```go +package main + +import ( + "time" + + "github.com/globocom/go-buffer/v3" +) + +func main() { + buff := buffer.New( + // call this function when the buffer needs flushing + func(items []string) { + for _, item := range items { + println(item) + } + }, + // buffer can hold up to 5 items + buffer.WithSize(5), + // buffer will be flushed every second, regardless of + // how many items were pushed + buffer.WithFlushInterval(time.Second), + ) + defer buff.Close() + + buff.Push("item 1") + buff.Push("item 2") + buff.Push("item 3") + + // block the current goroutine + time.Sleep(3 * time.Second) + + println("done") +} +``` + +### Manual flush + +```go +package main + +import ( + "time" + + "github.com/globocom/go-buffer/v3" +) + +func main() { + buff := buffer.New( + // call this function when the buffer needs flushing + func(items []string) { + for _, item := range items { + println(item) + } + }, + // buffer can hold up to 5 items + buffer.WithSize(5), + ) + defer buff.Close() + + buff.Push("item 1") + buff.Push("item 2") + buff.Push("item 3") + + // block the current goroutine + time.Sleep(3*time.Second) + + buff.Flush() + println("done") +} +``` + +## Examples v2 + ### Size-triggered flush -```golang +```go package main import ( @@ -62,7 +185,7 @@ func main() { ### Interval-triggered flush -```golang +```go package main import ( @@ -100,7 +223,7 @@ func main() { ### Manual flush -```golang +```go package main import ( diff --git a/bench_test.go b/bench_test.go index 9c07000..8b3ad93 100644 --- a/bench_test.go +++ b/bench_test.go @@ -3,21 +3,21 @@ package buffer_test import ( "testing" - "github.com/globocom/go-buffer/v2" + "github.com/globocom/go-buffer/v3" ) func BenchmarkBuffer(b *testing.B) { - noop := buffer.FlusherFunc(func([]interface{}) {}) + noop := func([]int) {} b.Run("push only", func(b *testing.B) { sut := buffer.New( + noop, buffer.WithSize(uint(b.N)+1), - buffer.WithFlusher(noop), ) defer sut.Close() - for i := 0; i < b.N; i++ { - err := sut.Push(i) + for b.Loop() { + err := sut.Push(1) if err != nil { b.Fail() } @@ -26,13 +26,13 @@ func BenchmarkBuffer(b *testing.B) { b.Run("push and flush", func(b *testing.B) { sut := buffer.New( + noop, buffer.WithSize(1), - buffer.WithFlusher(noop), ) defer sut.Close() - for i := 0; i < b.N; i++ { - err := sut.Push(i) + for b.Loop() { + err := sut.Push(1) if err != nil { b.Fail() } diff --git a/buffer.go b/buffer.go index 8fae11e..163af8e 100644 --- a/buffer.go +++ b/buffer.go @@ -15,21 +15,42 @@ var ( type ( // Buffer represents a data buffer that is asynchronously flushed, either manually or automatically. - Buffer struct { + Buffer[T any] struct { io.Closer - dataCh chan interface{} - flushCh chan struct{} - closeCh chan struct{} - doneCh chan struct{} - options *Options + flushFunc func([]T) + dataCh chan T + flushCh chan struct{} + closeCh chan struct{} + doneCh chan struct{} + options *Options } ) +// New creates a new buffer instance with the provided flush function and options. +// It panics if provided with a nil flush function. +func New[T any](flushFunc func([]T), opts ...Option) *Buffer[T] { + if flushFunc == nil { + panic("flush function cannot be nil") + } + + buffer := &Buffer[T]{ + flushFunc: flushFunc, + dataCh: make(chan T), + flushCh: make(chan struct{}), + closeCh: make(chan struct{}), + doneCh: make(chan struct{}), + options: resolveOptions(opts...), + } + go buffer.consume() + + return buffer +} + // Push appends an item to the end of the buffer. // -// It returns an ErrTimeout if if cannot be performed in a timely fashion, and +// It returns an ErrTimeout if it cannot be performed in a timely fashion, and // an ErrClosed if the buffer has been closed. -func (buffer *Buffer) Push(item interface{}) error { +func (buffer *Buffer[T]) Push(item T) error { if buffer.closed() { return ErrClosed } @@ -46,7 +67,7 @@ func (buffer *Buffer) Push(item interface{}) error { // // It returns an ErrTimeout if if cannot be performed in a timely fashion, and // an ErrClosed if the buffer has been closed. -func (buffer *Buffer) Flush() error { +func (buffer *Buffer[T]) Flush() error { if buffer.closed() { return ErrClosed } @@ -67,7 +88,7 @@ func (buffer *Buffer) Flush() error { // An ErrTimeout can either mean that a flush could not be triggered, or it can // mean that a flush was triggered but it has not finished yet. In any case it is // safe to call Close again. -func (buffer *Buffer) Close() error { +func (buffer *Buffer[T]) Close() error { if buffer.closed() { return ErrClosed } @@ -90,7 +111,7 @@ func (buffer *Buffer) Close() error { } } -func (buffer Buffer) closed() bool { +func (buffer *Buffer[T]) closed() bool { select { case <-buffer.doneCh: return true @@ -99,9 +120,9 @@ func (buffer Buffer) closed() bool { } } -func (buffer *Buffer) consume() { +func (buffer *Buffer[T]) consume() { count := 0 - items := make([]interface{}, buffer.options.Size) + items := make([]T, buffer.options.Size) mustFlush := false ticker, stopTicker := newTicker(buffer.options.FlushInterval) @@ -123,10 +144,10 @@ func (buffer *Buffer) consume() { if mustFlush { stopTicker() - buffer.options.Flusher.Write(items[:count]) + buffer.flushFunc(items[:count]) count = 0 - items = make([]interface{}, buffer.options.Size) + items = make([]T, buffer.options.Size) mustFlush = false ticker, stopTicker = newTicker(buffer.options.FlushInterval) } @@ -144,17 +165,3 @@ func newTicker(interval time.Duration) (<-chan time.Time, func()) { ticker := time.NewTicker(interval) return ticker.C, ticker.Stop } - -// New creates a new buffer instance with the provided options. -func New(opts ...Option) *Buffer { - buffer := &Buffer{ - dataCh: make(chan interface{}), - flushCh: make(chan struct{}), - closeCh: make(chan struct{}), - doneCh: make(chan struct{}), - options: resolveOptions(opts...), - } - go buffer.consume() - - return buffer -} diff --git a/buffer_test.go b/buffer_test.go index 869bbbc..d509466 100644 --- a/buffer_test.go +++ b/buffer_test.go @@ -6,7 +6,7 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - "github.com/globocom/go-buffer/v2" + "github.com/globocom/go-buffer/v3" ) var _ = Describe("Buffer", func() { @@ -20,37 +20,38 @@ var _ = Describe("Buffer", func() { It("creates a new Buffer instance", func() { // act sut := buffer.New( + flusher.Flush, buffer.WithSize(10), - buffer.WithFlusher(flusher), ) // assert Expect(sut).NotTo(BeNil()) }) + It("panics when provided an invalid flusher", func() { + Expect(func() { + buffer.New[string]( + nil, + buffer.WithSize(1), + ) + }).To(Panic()) + }) + Context("invalid options", func() { It("panics when provided an invalid size", func() { Expect(func() { buffer.New( + flusher.Flush, buffer.WithSize(0), ) }).To(Panic()) }) - It("panics when provided an invalid flusher", func() { - Expect(func() { - buffer.New( - buffer.WithSize(1), - buffer.WithFlusher(nil), - ) - }).To(Panic()) - }) - It("panics when provided an invalid flush interval", func() { Expect(func() { buffer.New( + flusher.Flush, buffer.WithSize(1), - buffer.WithFlusher(flusher), buffer.WithFlushInterval(-1), ) }).To(Panic()) @@ -59,8 +60,8 @@ var _ = Describe("Buffer", func() { It("panics when provided an invalid push timeout", func() { Expect(func() { buffer.New( + flusher.Flush, buffer.WithSize(1), - buffer.WithFlusher(flusher), buffer.WithPushTimeout(-1), ) }).To(Panic()) @@ -69,8 +70,8 @@ var _ = Describe("Buffer", func() { It("panics when provided an invalid flush timeout", func() { Expect(func() { buffer.New( + flusher.Flush, buffer.WithSize(1), - buffer.WithFlusher(flusher), buffer.WithFlushTimeout(-1), ) }).To(Panic()) @@ -79,8 +80,8 @@ var _ = Describe("Buffer", func() { It("panics when provided an invalid close timeout", func() { Expect(func() { buffer.New( + flusher.Flush, buffer.WithSize(1), - buffer.WithFlusher(flusher), buffer.WithCloseTimeout(-1), ) }).To(Panic()) @@ -92,14 +93,14 @@ var _ = Describe("Buffer", func() { It("pushes items into the buffer when Push is called", func() { // arrange sut := buffer.New( + flusher.Flush, buffer.WithSize(3), - buffer.WithFlusher(flusher), ) // act - err1 := sut.Push(1) - err2 := sut.Push(2) - err3 := sut.Push(3) + err1 := sut.Push("a") + err2 := sut.Push("b") + err3 := sut.Push("c") // assert Expect(err1).To(Succeed()) @@ -111,15 +112,15 @@ var _ = Describe("Buffer", func() { // arrange flusher.Func = func() { select {} } sut := buffer.New( + flusher.Flush, buffer.WithSize(2), - buffer.WithFlusher(flusher), buffer.WithPushTimeout(time.Second), ) // act - err1 := sut.Push(1) - err2 := sut.Push(2) - err3 := sut.Push(3) + err1 := sut.Push("a") + err2 := sut.Push("b") + err3 := sut.Push("c") // assert Expect(err1).To(Succeed()) @@ -130,13 +131,13 @@ var _ = Describe("Buffer", func() { It("fails when the buffer is closed", func() { // arrange sut := buffer.New( + flusher.Flush, buffer.WithSize(2), - buffer.WithFlusher(flusher), ) _ = sut.Close() // act - err := sut.Push(1) + err := sut.Push("a") // assert Expect(err).To(MatchError(buffer.ErrClosed)) @@ -147,20 +148,20 @@ var _ = Describe("Buffer", func() { It("flushes the buffer when it fills up", func(done Done) { // arrange sut := buffer.New( + flusher.Flush, buffer.WithSize(5), - buffer.WithFlusher(flusher), ) // act - _ = sut.Push(1) - _ = sut.Push(2) - _ = sut.Push(3) - _ = sut.Push(4) - _ = sut.Push(5) + _ = sut.Push("a") + _ = sut.Push("b") + _ = sut.Push("c") + _ = sut.Push("d") + _ = sut.Push("e") // assert result := <-flusher.Done - Expect(result.Items).To(ConsistOf(1, 2, 3, 4, 5)) + Expect(result.Items).To(ConsistOf("a", "b", "c", "d", "e")) close(done) }) @@ -169,17 +170,17 @@ var _ = Describe("Buffer", func() { interval := 3 * time.Second start := time.Now() sut := buffer.New( + flusher.Flush, buffer.WithSize(5), - buffer.WithFlusher(flusher), buffer.WithFlushInterval(interval), ) // act - _ = sut.Push(1) + _ = sut.Push("a") // assert result := <-flusher.Done - Expect(result.Items).To(ConsistOf(1)) + Expect(result.Items).To(ConsistOf("a")) Expect(result.Time).To(BeTemporally("~", start, interval+time.Second)) close(done) }, 5) @@ -187,11 +188,11 @@ var _ = Describe("Buffer", func() { It("flushes the buffer when Flush is called", func(done Done) { // arrange sut := buffer.New( + flusher.Flush, buffer.WithSize(3), - buffer.WithFlusher(flusher), ) - _ = sut.Push(1) - _ = sut.Push(2) + _ = sut.Push("a") + _ = sut.Push("b") // act err := sut.Flush() @@ -199,7 +200,7 @@ var _ = Describe("Buffer", func() { // assert result := <-flusher.Done Expect(err).To(Succeed()) - Expect(result.Items).To(ConsistOf(1, 2)) + Expect(result.Items).To(ConsistOf("a", "b")) close(done) }) @@ -207,11 +208,11 @@ var _ = Describe("Buffer", func() { // arrange flusher.Func = func() { time.Sleep(3 * time.Second) } sut := buffer.New( + flusher.Flush, buffer.WithSize(1), - buffer.WithFlusher(flusher), buffer.WithFlushTimeout(time.Second), ) - _ = sut.Push(1) + _ = sut.Push("a") // act err := sut.Flush() @@ -222,9 +223,9 @@ var _ = Describe("Buffer", func() { It("fails when the buffer is closed", func() { // arrange - sut := buffer.New( + sut := buffer.New[string]( + flusher.Flush, buffer.WithSize(2), - buffer.WithFlusher(flusher), ) _ = sut.Close() @@ -240,11 +241,11 @@ var _ = Describe("Buffer", func() { It("flushes the buffer and closes it when Close is called", func(done Done) { // arrange sut := buffer.New( + flusher.Flush, buffer.WithSize(3), - buffer.WithFlusher(flusher), ) - _ = sut.Push(1) - _ = sut.Push(2) + _ = sut.Push("a") + _ = sut.Push("b") // act err := sut.Close() @@ -252,7 +253,7 @@ var _ = Describe("Buffer", func() { // assert result := <-flusher.Done Expect(err).To(Succeed()) - Expect(result.Items).To(ConsistOf(1, 2)) + Expect(result.Items).To(ConsistOf("a", "b")) close(done) }) @@ -261,11 +262,11 @@ var _ = Describe("Buffer", func() { flusher.Func = func() { time.Sleep(2 * time.Second) } sut := buffer.New( + flusher.Flush, buffer.WithSize(1), - buffer.WithFlusher(flusher), buffer.WithCloseTimeout(time.Second), ) - _ = sut.Push(1) + _ = sut.Push("a") // act err := sut.Close() @@ -279,8 +280,8 @@ var _ = Describe("Buffer", func() { flusher.Func = func() { time.Sleep(2 * time.Second) } sut := buffer.New( + flusher.Flush, buffer.WithSize(1), - buffer.WithFlusher(flusher), buffer.WithCloseTimeout(time.Second), ) _ = sut.Close() @@ -297,11 +298,11 @@ var _ = Describe("Buffer", func() { flusher.Func = func() { time.Sleep(2 * time.Second) } sut := buffer.New( + flusher.Flush, buffer.WithSize(1), - buffer.WithFlusher(flusher), buffer.WithCloseTimeout(time.Second), ) - _ = sut.Push(1) + _ = sut.Push("a") // act err1 := sut.Close() @@ -323,11 +324,11 @@ type ( WriteCall struct { Time time.Time - Items []interface{} + Items []string } ) -func (flusher *MockFlusher) Write(items []interface{}) { +func (flusher *MockFlusher) Flush(items []string) { call := &WriteCall{ Time: time.Now(), Items: items, diff --git a/flusher.go b/flusher.go deleted file mode 100644 index e1078cd..0000000 --- a/flusher.go +++ /dev/null @@ -1,15 +0,0 @@ -package buffer - -type ( - // Flusher represents a destination of buffered data. - Flusher interface { - Write(items []interface{}) - } - - // FlusherFunc represents a flush function. - FlusherFunc func(items []interface{}) -) - -func (fn FlusherFunc) Write(items []interface{}) { - fn(items) -} diff --git a/go.mod b/go.mod index 8dd618c..3dae8d4 100644 --- a/go.mod +++ b/go.mod @@ -1,8 +1,19 @@ -module github.com/globocom/go-buffer/v2 +module github.com/globocom/go-buffer/v3 -go 1.13 +go 1.24 require ( github.com/onsi/ginkgo v1.13.0 github.com/onsi/gomega v1.10.1 ) + +require ( + github.com/fsnotify/fsnotify v1.4.9 // indirect + github.com/nxadm/tail v1.4.4 // indirect + golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7 // indirect + golang.org/x/sys v0.0.0-20200519105757-fe76b779f299 // indirect + golang.org/x/text v0.3.2 // indirect + golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 // indirect + gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect + gopkg.in/yaml.v2 v2.3.0 // indirect +) diff --git a/go.sum b/go.sum index 5276e40..e63d6ae 100644 --- a/go.sum +++ b/go.sum @@ -11,7 +11,6 @@ github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0 github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= -github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78= diff --git a/options.go b/options.go index dda28d2..794b64e 100644 --- a/options.go +++ b/options.go @@ -8,7 +8,6 @@ import ( const ( invalidSize = "size cannot be zero" - invalidFlusher = "flusher cannot be nil" invalidInterval = "interval must be greater than zero (%s)" invalidTimeout = "timeout cannot be negative (%s)" ) @@ -17,7 +16,6 @@ type ( // Configuration options. Options struct { Size uint - Flusher Flusher FlushInterval time.Duration PushTimeout time.Duration FlushTimeout time.Duration @@ -35,13 +33,6 @@ func WithSize(size uint) Option { } } -// WithFlusher sets the flusher that should be used to write out the buffer. -func WithFlusher(flusher Flusher) Option { - return func(options *Options) { - options.Flusher = flusher - } -} - // WithFlushInterval sets the interval between automatic flushes. func WithFlushInterval(interval time.Duration) Option { return func(options *Options) { @@ -74,9 +65,6 @@ func validateOptions(options *Options) error { if options.Size == 0 { return errors.New(invalidSize) } - if options.Flusher == nil { - return errors.New(invalidFlusher) - } if options.FlushInterval < 0 { return fmt.Errorf(invalidInterval, "FlushInterval") } @@ -96,7 +84,6 @@ func validateOptions(options *Options) error { func resolveOptions(opts ...Option) *Options { options := &Options{ Size: 0, - Flusher: nil, FlushInterval: 0, PushTimeout: time.Second, FlushTimeout: time.Second, diff --git a/options_test.go b/options_test.go index bff0846..18a52d6 100644 --- a/options_test.go +++ b/options_test.go @@ -6,7 +6,7 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - "github.com/globocom/go-buffer/v2" + "github.com/globocom/go-buffer/v3" ) var _ = Describe("Options", func() { @@ -21,18 +21,6 @@ var _ = Describe("Options", func() { Expect(opts.Size).To(BeIdenticalTo(uint(10))) }) - It("sets up flusher", func() { - // arrange - opts := &buffer.Options{} - flusher := func(items []interface{}) {} - - // act - buffer.WithFlusher(buffer.FlusherFunc(flusher))(opts) - - // assert - Expect(opts.Flusher).NotTo(BeNil()) - }) - It("sets up flush interval", func() { // arrange opts := &buffer.Options{}