Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add chanx #63

Merged
merged 1 commit into from
Jul 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 143 additions & 0 deletions chanx/ringbuffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package chanx

import (
"errors"
)

var ErrIsEmpty = errors.New("ringbuffer is empty")

// RingBuffer is a ring buffer for common types.
// It never is full and always grows if it will be full.
// It is not thread-safe(goroutine-safe) so you must use Lock to use it in multiple writers and multiple readers.
type RingBuffer struct {
buf []T
initialSize int
size int
r int // read pointer
w int // write pointer
}

func NewRingBuffer(initialSize int) *RingBuffer {
if initialSize <= 0 {
panic("initial size must be great than zero")
}
// initial size must >= 2
if initialSize == 1 {
initialSize = 2
}

return &RingBuffer{
buf: make([]T, initialSize),
initialSize: initialSize,
size: initialSize,
}
}

func (r *RingBuffer) Read() (T, error) {
if r.r == r.w {
return nil, ErrIsEmpty
}

v := r.buf[r.r]
r.r++
if r.r == r.size {
r.r = 0
}

return v, nil
}

func (r *RingBuffer) Pop() T {
v, err := r.Read()
if err == ErrIsEmpty { // Empty
panic(ErrIsEmpty.Error())
}

return v
}

func (r *RingBuffer) Peek() T {
if r.r == r.w { // Empty
panic(ErrIsEmpty.Error())
}

v := r.buf[r.r]
return v
}

func (r *RingBuffer) Write(v T) {
r.buf[r.w] = v
r.w++

if r.w == r.size {
r.w = 0
}

if r.w == r.r { // full
r.grow()
}
}

func (r *RingBuffer) grow() {
var size int
if r.size < 1024 {
size = r.size * 2
} else {
size = r.size + r.size/4
}

buf := make([]T, size)

copy(buf[0:], r.buf[r.r:])
copy(buf[r.size-r.r:], r.buf[0:r.r])

r.r = 0
r.w = r.size
r.size = size
r.buf = buf
}

func (r *RingBuffer) IsEmpty() bool {
return r.r == r.w
}

// Capacity returns the size of the underlying buffer.
func (r *RingBuffer) Capacity() int {
return r.size
}

func (r *RingBuffer) Len() int {
if r.r == r.w {
return 0
}

if r.w > r.r {
return r.w - r.r
}

return r.size - r.r + r.w
}

func (r *RingBuffer) Reset() {
r.r = 0
r.w = 0
r.size = r.initialSize
r.buf = make([]T, r.initialSize)
}
140 changes: 140 additions & 0 deletions chanx/ringbuffer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package chanx

import (
"testing"
)

import (
"github.com/stretchr/testify/assert"
)

func TestRingBuffer(t *testing.T) {
rb := NewRingBuffer(10)
v, err := rb.Read()
assert.Nil(t, v)
assert.Error(t, err, ErrIsEmpty)

write := 0
read := 0

// write one and read it
rb.Write(0)
v, err = rb.Read()
assert.NoError(t, err)
assert.Equal(t, 0, v)
assert.Equal(t, 1, rb.r)
assert.Equal(t, 1, rb.w)
assert.True(t, rb.IsEmpty())

// then write 10
for i := 0; i < 9; i++ {
rb.Write(i)
write += i
}
assert.Equal(t, 10, rb.Capacity())
assert.Equal(t, 9, rb.Len())

// write one more, the buffer is full so it grows
rb.Write(10)
write += 10
assert.Equal(t, 20, rb.Capacity())
assert.Equal(t, 10, rb.Len())

for i := 0; i < 90; i++ {
rb.Write(i)
write += i
}

assert.Equal(t, 160, rb.Capacity())
assert.Equal(t, 100, rb.Len())

for {
v, err := rb.Read()
if err == ErrIsEmpty {
break
}

read += v.(int)
}

assert.Equal(t, write, read)

rb.Reset()
assert.Equal(t, 10, rb.Capacity())
assert.Equal(t, 0, rb.Len())
assert.True(t, rb.IsEmpty())
}

func TestRingBuffer_One(t *testing.T) {
rb := NewRingBuffer(1)
v, err := rb.Read()
assert.Nil(t, v)
assert.Error(t, err, ErrIsEmpty)

write := 0
read := 0

// write one and read it
rb.Write(0)
v, err = rb.Read()
assert.NoError(t, err)
assert.Equal(t, 0, v)
assert.Equal(t, 1, rb.r)
assert.Equal(t, 1, rb.w)
assert.True(t, rb.IsEmpty())

// then write 10
for i := 0; i < 9; i++ {
rb.Write(i)
write += i
}
assert.Equal(t, 16, rb.Capacity())
assert.Equal(t, 9, rb.Len())

// write one more, the buffer is full so it grows
rb.Write(10)
write += 10
assert.Equal(t, 16, rb.Capacity())
assert.Equal(t, 10, rb.Len())

for i := 0; i < 90; i++ {
rb.Write(i)
write += i
}

assert.Equal(t, 128, rb.Capacity())
assert.Equal(t, 100, rb.Len())

for {
v, err := rb.Read()
if err == ErrIsEmpty {
break
}

read += v.(int)
}

assert.Equal(t, write, read)

rb.Reset()
assert.Equal(t, 2, rb.Capacity())
assert.Equal(t, 0, rb.Len())
assert.True(t, rb.IsEmpty())
}
103 changes: 103 additions & 0 deletions chanx/unbounded_chan.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package chanx

// T defines interface{}, and will be used for generic type after go 1.18 is released.
type T interface{}

// UnboundedChan is an unbounded chan.
// In is used to write without blocking, which supports multiple writers.
// and Out is used to read, which supports multiple readers.
// You can close the in channel if you want.
type UnboundedChan struct {
In chan<- T // channel for write
Out <-chan T // channel for read
buffer *RingBuffer // buffer
}

// Len returns len of In plus len of Out plus len of buffer.
func (c UnboundedChan) Len() int {
return len(c.In) + c.buffer.Len() + len(c.Out)
}

// BufLen returns len of the buffer.
func (c UnboundedChan) BufLen() int {
return c.buffer.Len()
}

// NewUnboundedChan creates the unbounded chan.
// in is used to write without blocking, which supports multiple writers.
// and out is used to read, which supports multiple readers.
// You can close the in channel if you want.
func NewUnboundedChan(initCapacity int) UnboundedChan {
return NewUnboundedChanSize(initCapacity, initCapacity, initCapacity)
}

// NewUnboundedChanSize is like NewUnboundedChan but you can set initial capacity for In, Out, Buffer.
func NewUnboundedChanSize(initInCapacity, initOutCapacity, initBufCapacity int) UnboundedChan {
in := make(chan T, initInCapacity)
out := make(chan T, initOutCapacity)
ch := UnboundedChan{In: in, Out: out, buffer: NewRingBuffer(initBufCapacity)}

go process(in, out, ch)

return ch
}

func process(in, out chan T, ch UnboundedChan) {
defer close(out)
loop:
for {
val, ok := <-in
if !ok { // in is closed
break loop
}

// out is not full
select {
case out <- val:
continue
default:
}

// out is full
ch.buffer.Write(val)
for !ch.buffer.IsEmpty() {
select {
case val, ok := <-in:
if !ok { // in is closed
break loop
}
ch.buffer.Write(val)

case out <- ch.buffer.Peek():
ch.buffer.Pop()
if ch.buffer.IsEmpty() && ch.buffer.size > ch.buffer.initialSize { // after burst
ch.buffer.Reset()
}
}
}
}

// drain
for !ch.buffer.IsEmpty() {
out <- ch.buffer.Pop()
}

ch.buffer.Reset()
}
Loading