-
Notifications
You must be signed in to change notification settings - Fork 69
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
lizhixin.lzx
committed
Jul 10, 2021
1 parent
c144977
commit dac8cd4
Showing
4 changed files
with
401 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,126 @@ | ||
package chanx | ||
|
||
import ( | ||
"errors" | ||
) | ||
|
||
var ErrIsEmpty = errors.New("ringbuffer is empty") | ||
|
||
// RingBuffer is a ring buffer for common types. | ||
// It never is full and always grows if it will be full. | ||
// It is not thread-safe(goroutine-safe) so you must use Lock to use it in multiple writers and multiple readers. | ||
type RingBuffer struct { | ||
buf []T | ||
initialSize int | ||
size int | ||
r int // read pointer | ||
w int // write pointer | ||
} | ||
|
||
func NewRingBuffer(initialSize int) *RingBuffer { | ||
if initialSize <= 0 { | ||
panic("initial size must be great than zero") | ||
} | ||
// initial size must >= 2 | ||
if initialSize == 1 { | ||
initialSize = 2 | ||
} | ||
|
||
return &RingBuffer{ | ||
buf: make([]T, initialSize), | ||
initialSize: initialSize, | ||
size: initialSize, | ||
} | ||
} | ||
|
||
func (r *RingBuffer) Read() (T, error) { | ||
if r.r == r.w { | ||
return nil, ErrIsEmpty | ||
} | ||
|
||
v := r.buf[r.r] | ||
r.r++ | ||
if r.r == r.size { | ||
r.r = 0 | ||
} | ||
|
||
return v, nil | ||
} | ||
|
||
func (r *RingBuffer) Pop() T { | ||
v, err := r.Read() | ||
if err == ErrIsEmpty { // Empty | ||
panic(ErrIsEmpty.Error()) | ||
} | ||
|
||
return v | ||
} | ||
|
||
func (r *RingBuffer) Peek() T { | ||
if r.r == r.w { // Empty | ||
panic(ErrIsEmpty.Error()) | ||
} | ||
|
||
v := r.buf[r.r] | ||
return v | ||
} | ||
|
||
func (r *RingBuffer) Write(v T) { | ||
r.buf[r.w] = v | ||
r.w++ | ||
|
||
if r.w == r.size { | ||
r.w = 0 | ||
} | ||
|
||
if r.w == r.r { // full | ||
r.grow() | ||
} | ||
} | ||
|
||
func (r *RingBuffer) grow() { | ||
var size int | ||
if r.size < 1024 { | ||
size = r.size * 2 | ||
} else { | ||
size = r.size + r.size/4 | ||
} | ||
|
||
buf := make([]T, size) | ||
|
||
copy(buf[0:], r.buf[r.r:]) | ||
copy(buf[r.size-r.r:], r.buf[0:r.r]) | ||
|
||
r.r = 0 | ||
r.w = r.size | ||
r.size = size | ||
r.buf = buf | ||
} | ||
|
||
func (r *RingBuffer) IsEmpty() bool { | ||
return r.r == r.w | ||
} | ||
|
||
// Capacity returns the size of the underlying buffer. | ||
func (r *RingBuffer) Capacity() int { | ||
return r.size | ||
} | ||
|
||
func (r *RingBuffer) Len() int { | ||
if r.r == r.w { | ||
return 0 | ||
} | ||
|
||
if r.w > r.r { | ||
return r.w - r.r | ||
} | ||
|
||
return r.size - r.r + r.w | ||
} | ||
|
||
func (r *RingBuffer) Reset() { | ||
r.r = 0 | ||
r.w = 0 | ||
r.size = r.initialSize | ||
r.buf = make([]T, r.initialSize) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,123 @@ | ||
package chanx | ||
|
||
import ( | ||
"testing" | ||
) | ||
|
||
import ( | ||
"github.com/stretchr/testify/assert" | ||
) | ||
|
||
func TestRingBuffer(t *testing.T) { | ||
rb := NewRingBuffer(10) | ||
v, err := rb.Read() | ||
assert.Nil(t, v) | ||
assert.Error(t, err, ErrIsEmpty) | ||
|
||
write := 0 | ||
read := 0 | ||
|
||
// write one and read it | ||
rb.Write(0) | ||
v, err = rb.Read() | ||
assert.NoError(t, err) | ||
assert.Equal(t, 0, v) | ||
assert.Equal(t, 1, rb.r) | ||
assert.Equal(t, 1, rb.w) | ||
assert.True(t, rb.IsEmpty()) | ||
|
||
// then write 10 | ||
for i := 0; i < 9; i++ { | ||
rb.Write(i) | ||
write += i | ||
} | ||
assert.Equal(t, 10, rb.Capacity()) | ||
assert.Equal(t, 9, rb.Len()) | ||
|
||
// write one more, the buffer is full so it grows | ||
rb.Write(10) | ||
write += 10 | ||
assert.Equal(t, 20, rb.Capacity()) | ||
assert.Equal(t, 10, rb.Len()) | ||
|
||
for i := 0; i < 90; i++ { | ||
rb.Write(i) | ||
write += i | ||
} | ||
|
||
assert.Equal(t, 160, rb.Capacity()) | ||
assert.Equal(t, 100, rb.Len()) | ||
|
||
for { | ||
v, err := rb.Read() | ||
if err == ErrIsEmpty { | ||
break | ||
} | ||
|
||
read += v.(int) | ||
} | ||
|
||
assert.Equal(t, write, read) | ||
|
||
rb.Reset() | ||
assert.Equal(t, 10, rb.Capacity()) | ||
assert.Equal(t, 0, rb.Len()) | ||
assert.True(t, rb.IsEmpty()) | ||
} | ||
|
||
func TestRingBuffer_One(t *testing.T) { | ||
rb := NewRingBuffer(1) | ||
v, err := rb.Read() | ||
assert.Nil(t, v) | ||
assert.Error(t, err, ErrIsEmpty) | ||
|
||
write := 0 | ||
read := 0 | ||
|
||
// write one and read it | ||
rb.Write(0) | ||
v, err = rb.Read() | ||
assert.NoError(t, err) | ||
assert.Equal(t, 0, v) | ||
assert.Equal(t, 1, rb.r) | ||
assert.Equal(t, 1, rb.w) | ||
assert.True(t, rb.IsEmpty()) | ||
|
||
// then write 10 | ||
for i := 0; i < 9; i++ { | ||
rb.Write(i) | ||
write += i | ||
} | ||
assert.Equal(t, 16, rb.Capacity()) | ||
assert.Equal(t, 9, rb.Len()) | ||
|
||
// write one more, the buffer is full so it grows | ||
rb.Write(10) | ||
write += 10 | ||
assert.Equal(t, 16, rb.Capacity()) | ||
assert.Equal(t, 10, rb.Len()) | ||
|
||
for i := 0; i < 90; i++ { | ||
rb.Write(i) | ||
write += i | ||
} | ||
|
||
assert.Equal(t, 128, rb.Capacity()) | ||
assert.Equal(t, 100, rb.Len()) | ||
|
||
for { | ||
v, err := rb.Read() | ||
if err == ErrIsEmpty { | ||
break | ||
} | ||
|
||
read += v.(int) | ||
} | ||
|
||
assert.Equal(t, write, read) | ||
|
||
rb.Reset() | ||
assert.Equal(t, 2, rb.Capacity()) | ||
assert.Equal(t, 0, rb.Len()) | ||
assert.True(t, rb.IsEmpty()) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,86 @@ | ||
package chanx | ||
|
||
// T defines interface{}, and will be used for generic type after go 1.18 is released. | ||
type T interface{} | ||
|
||
// UnboundedChan is an unbounded chan. | ||
// In is used to write without blocking, which supports multiple writers. | ||
// and Out is used to read, which supports multiple readers. | ||
// You can close the in channel if you want. | ||
type UnboundedChan struct { | ||
In chan<- T // channel for write | ||
Out <-chan T // channel for read | ||
buffer *RingBuffer // buffer | ||
} | ||
|
||
// Len returns len of In plus len of Out plus len of buffer. | ||
func (c UnboundedChan) Len() int { | ||
return len(c.In) + c.buffer.Len() + len(c.Out) | ||
} | ||
|
||
// BufLen returns len of the buffer. | ||
func (c UnboundedChan) BufLen() int { | ||
return c.buffer.Len() | ||
} | ||
|
||
// NewUnboundedChan creates the unbounded chan. | ||
// in is used to write without blocking, which supports multiple writers. | ||
// and out is used to read, which supports multiple readers. | ||
// You can close the in channel if you want. | ||
func NewUnboundedChan(initCapacity int) UnboundedChan { | ||
return NewUnboundedChanSize(initCapacity, initCapacity, initCapacity) | ||
} | ||
|
||
// NewUnboundedChanSize is like NewUnboundedChan but you can set initial capacity for In, Out, Buffer. | ||
func NewUnboundedChanSize(initInCapacity, initOutCapacity, initBufCapacity int) UnboundedChan { | ||
in := make(chan T, initInCapacity) | ||
out := make(chan T, initOutCapacity) | ||
ch := UnboundedChan{In: in, Out: out, buffer: NewRingBuffer(initBufCapacity)} | ||
|
||
go process(in, out, ch) | ||
|
||
return ch | ||
} | ||
|
||
func process(in, out chan T, ch UnboundedChan) { | ||
defer close(out) | ||
loop: | ||
for { | ||
val, ok := <-in | ||
if !ok { // in is closed | ||
break loop | ||
} | ||
|
||
// out is not full | ||
select { | ||
case out <- val: | ||
continue | ||
default: | ||
} | ||
|
||
// out is full | ||
ch.buffer.Write(val) | ||
for !ch.buffer.IsEmpty() { | ||
select { | ||
case val, ok := <-in: | ||
if !ok { // in is closed | ||
break loop | ||
} | ||
ch.buffer.Write(val) | ||
|
||
case out <- ch.buffer.Peek(): | ||
ch.buffer.Pop() | ||
if ch.buffer.IsEmpty() && ch.buffer.size > ch.buffer.initialSize { // after burst | ||
ch.buffer.Reset() | ||
} | ||
} | ||
} | ||
} | ||
|
||
// drain | ||
for !ch.buffer.IsEmpty() { | ||
out <- ch.buffer.Pop() | ||
} | ||
|
||
ch.buffer.Reset() | ||
} |
Oops, something went wrong.