Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
- name: Set up Go 1.x
uses: actions/setup-go@v2
with:
go-version: ^1.14
go-version: 1.24
id: go

- name: Check out code into the Go module directory
Expand Down
1 change: 0 additions & 1 deletion .tool-versions

This file was deleted.

131 changes: 127 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,140 @@

# go-buffer

`go-buffer` represents a buffer that asynchronously flushes its contents. It is useful for applications that need to aggregate data before writing it to an external storage. A buffer is flushed manually, or automatically when it becomes full or after an interval has elapsed, whichever comes first.
`go-buffer` represents a buffer that asynchronously flushes its contents. It is useful for applications that need to
aggregate data before writing it to an external storage. A buffer is flushed manually, or automatically when it becomes
full or after an interval has elapsed, whichever comes first.

## Installation

go get github.com/globocom/go-buffer

Go < 1.18:

go get github.com/globocom/go-buffer@v2

## Examples

> [!NOTE]
> For v2, see [Examples v2](#examples-v2).

### Size-triggered flush

```go
package main

import (
"time"

"github.com/globocom/go-buffer/v3"
)

func main() {
buff := buffer.New(
// call this function when the buffer needs flushing
func(items []string) {
for _, item := range items {
println(string)
}
},
// buffer can hold up to 5 items
buffer.WithSize(5),
)
// ensure the buffer
defer buff.Close()

buff.Push("item 1")
buff.Push("item 2")
buff.Push("item 3")
buff.Push("item 4")
buff.Push("item 5")

// block the current goroutine
time.Sleep(3 * time.Second)

println("done")
}
```

### Interval-triggered flush

```go
package main

import (
"time"

"github.com/globocom/go-buffer/v3"
)

func main() {
buff := buffer.New(
// call this function when the buffer needs flushing
func(items []string) {
for _, item := range items {
println(item)
}
},
// buffer can hold up to 5 items
buffer.WithSize(5),
// buffer will be flushed every second, regardless of
// how many items were pushed
buffer.WithFlushInterval(time.Second),
)
defer buff.Close()

buff.Push("item 1")
buff.Push("item 2")
buff.Push("item 3")

// block the current goroutine
time.Sleep(3 * time.Second)

println("done")
}
```

### Manual flush

```go
package main

import (
"time"

"github.com/globocom/go-buffer/v3"
)

func main() {
buff := buffer.New(
// call this function when the buffer needs flushing
func(items []string) {
for _, item := range items {
println(item)
}
},
// buffer can hold up to 5 items
buffer.WithSize(5),
)
defer buff.Close()

buff.Push("item 1")
buff.Push("item 2")
buff.Push("item 3")

// block the current goroutine
time.Sleep(3*time.Second)

buff.Flush()
println("done")
}
```

## Examples v2

### Size-triggered flush

```golang
```go
package main

import (
Expand Down Expand Up @@ -62,7 +185,7 @@ func main() {

### Interval-triggered flush

```golang
```go
package main

import (
Expand Down Expand Up @@ -100,7 +223,7 @@ func main() {

### Manual flush

```golang
```go
package main

import (
Expand Down
16 changes: 8 additions & 8 deletions bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,21 @@ package buffer_test
import (
"testing"

"github.com/globocom/go-buffer/v2"
"github.com/globocom/go-buffer/v3"
)

func BenchmarkBuffer(b *testing.B) {
noop := buffer.FlusherFunc(func([]interface{}) {})
noop := func([]int) {}

b.Run("push only", func(b *testing.B) {
sut := buffer.New(
noop,
buffer.WithSize(uint(b.N)+1),
buffer.WithFlusher(noop),
)
defer sut.Close()

for i := 0; i < b.N; i++ {
err := sut.Push(i)
for b.Loop() {
err := sut.Push(1)
if err != nil {
b.Fail()
}
Expand All @@ -26,13 +26,13 @@ func BenchmarkBuffer(b *testing.B) {

b.Run("push and flush", func(b *testing.B) {
sut := buffer.New(
noop,
buffer.WithSize(1),
buffer.WithFlusher(noop),
)
defer sut.Close()

for i := 0; i < b.N; i++ {
err := sut.Push(i)
for b.Loop() {
err := sut.Push(1)
if err != nil {
b.Fail()
}
Expand Down
65 changes: 36 additions & 29 deletions buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,42 @@ var (

type (
// Buffer represents a data buffer that is asynchronously flushed, either manually or automatically.
Buffer struct {
Buffer[T any] struct {
io.Closer
dataCh chan interface{}
flushCh chan struct{}
closeCh chan struct{}
doneCh chan struct{}
options *Options
flushFunc func([]T)
dataCh chan T
flushCh chan struct{}
closeCh chan struct{}
doneCh chan struct{}
options *Options
}
)

// New creates a new buffer instance with the provided flush function and options.
// It panics if provided with a nil flush function.
func New[T any](flushFunc func([]T), opts ...Option) *Buffer[T] {
if flushFunc == nil {
panic("flush function cannot be nil")
}

buffer := &Buffer[T]{
flushFunc: flushFunc,
dataCh: make(chan T),
flushCh: make(chan struct{}),
closeCh: make(chan struct{}),
doneCh: make(chan struct{}),
options: resolveOptions(opts...),
}
go buffer.consume()

return buffer
}

// Push appends an item to the end of the buffer.
//
// It returns an ErrTimeout if if cannot be performed in a timely fashion, and
// It returns an ErrTimeout if it cannot be performed in a timely fashion, and
// an ErrClosed if the buffer has been closed.
func (buffer *Buffer) Push(item interface{}) error {
func (buffer *Buffer[T]) Push(item T) error {
if buffer.closed() {
return ErrClosed
}
Expand All @@ -46,7 +67,7 @@ func (buffer *Buffer) Push(item interface{}) error {
//
// It returns an ErrTimeout if if cannot be performed in a timely fashion, and
// an ErrClosed if the buffer has been closed.
func (buffer *Buffer) Flush() error {
func (buffer *Buffer[T]) Flush() error {
if buffer.closed() {
return ErrClosed
}
Expand All @@ -67,7 +88,7 @@ func (buffer *Buffer) Flush() error {
// An ErrTimeout can either mean that a flush could not be triggered, or it can
// mean that a flush was triggered but it has not finished yet. In any case it is
// safe to call Close again.
func (buffer *Buffer) Close() error {
func (buffer *Buffer[T]) Close() error {
if buffer.closed() {
return ErrClosed
}
Expand All @@ -90,7 +111,7 @@ func (buffer *Buffer) Close() error {
}
}

func (buffer Buffer) closed() bool {
func (buffer *Buffer[T]) closed() bool {
select {
case <-buffer.doneCh:
return true
Expand All @@ -99,9 +120,9 @@ func (buffer Buffer) closed() bool {
}
}

func (buffer *Buffer) consume() {
func (buffer *Buffer[T]) consume() {
count := 0
items := make([]interface{}, buffer.options.Size)
items := make([]T, buffer.options.Size)
mustFlush := false
ticker, stopTicker := newTicker(buffer.options.FlushInterval)

Expand All @@ -123,10 +144,10 @@ func (buffer *Buffer) consume() {

if mustFlush {
stopTicker()
buffer.options.Flusher.Write(items[:count])
buffer.flushFunc(items[:count])

count = 0
items = make([]interface{}, buffer.options.Size)
items = make([]T, buffer.options.Size)
mustFlush = false
ticker, stopTicker = newTicker(buffer.options.FlushInterval)
}
Expand All @@ -144,17 +165,3 @@ func newTicker(interval time.Duration) (<-chan time.Time, func()) {
ticker := time.NewTicker(interval)
return ticker.C, ticker.Stop
}

// New creates a new buffer instance with the provided options.
func New(opts ...Option) *Buffer {
buffer := &Buffer{
dataCh: make(chan interface{}),
flushCh: make(chan struct{}),
closeCh: make(chan struct{}),
doneCh: make(chan struct{}),
options: resolveOptions(opts...),
}
go buffer.consume()

return buffer
}
Loading