Skip to content

Commit

Permalink
ftr: add chanx (#63)
Browse files Browse the repository at this point in the history
  • Loading branch information
LaurenceLiZhixin committed Jul 10, 2021
1 parent c144977 commit d6ee0be
Show file tree
Hide file tree
Showing 4 changed files with 469 additions and 0 deletions.
143 changes: 143 additions & 0 deletions chanx/ringbuffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package chanx

import (
"errors"
)

var ErrIsEmpty = errors.New("ringbuffer is empty")

// RingBuffer is a ring buffer for common types.
// It never is full and always grows if it will be full.
// It is not thread-safe(goroutine-safe) so you must use Lock to use it in multiple writers and multiple readers.
type RingBuffer struct {
buf []T
initialSize int
size int
r int // read pointer
w int // write pointer
}

func NewRingBuffer(initialSize int) *RingBuffer {
if initialSize <= 0 {
panic("initial size must be great than zero")
}
// initial size must >= 2
if initialSize == 1 {
initialSize = 2
}

return &RingBuffer{
buf: make([]T, initialSize),
initialSize: initialSize,
size: initialSize,
}
}

func (r *RingBuffer) Read() (T, error) {
if r.r == r.w {
return nil, ErrIsEmpty
}

v := r.buf[r.r]
r.r++
if r.r == r.size {
r.r = 0
}

return v, nil
}

func (r *RingBuffer) Pop() T {
v, err := r.Read()
if err == ErrIsEmpty { // Empty
panic(ErrIsEmpty.Error())
}

return v
}

func (r *RingBuffer) Peek() T {
if r.r == r.w { // Empty
panic(ErrIsEmpty.Error())
}

v := r.buf[r.r]
return v
}

func (r *RingBuffer) Write(v T) {
r.buf[r.w] = v
r.w++

if r.w == r.size {
r.w = 0
}

if r.w == r.r { // full
r.grow()
}
}

func (r *RingBuffer) grow() {
var size int
if r.size < 1024 {
size = r.size * 2
} else {
size = r.size + r.size/4
}

buf := make([]T, size)

copy(buf[0:], r.buf[r.r:])
copy(buf[r.size-r.r:], r.buf[0:r.r])

r.r = 0
r.w = r.size
r.size = size
r.buf = buf
}

func (r *RingBuffer) IsEmpty() bool {
return r.r == r.w
}

// Capacity returns the size of the underlying buffer.
func (r *RingBuffer) Capacity() int {
return r.size
}

func (r *RingBuffer) Len() int {
if r.r == r.w {
return 0
}

if r.w > r.r {
return r.w - r.r
}

return r.size - r.r + r.w
}

func (r *RingBuffer) Reset() {
r.r = 0
r.w = 0
r.size = r.initialSize
r.buf = make([]T, r.initialSize)
}
140 changes: 140 additions & 0 deletions chanx/ringbuffer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package chanx

import (
"testing"
)

import (
"github.com/stretchr/testify/assert"
)

func TestRingBuffer(t *testing.T) {
rb := NewRingBuffer(10)
v, err := rb.Read()
assert.Nil(t, v)
assert.Error(t, err, ErrIsEmpty)

write := 0
read := 0

// write one and read it
rb.Write(0)
v, err = rb.Read()
assert.NoError(t, err)
assert.Equal(t, 0, v)
assert.Equal(t, 1, rb.r)
assert.Equal(t, 1, rb.w)
assert.True(t, rb.IsEmpty())

// then write 10
for i := 0; i < 9; i++ {
rb.Write(i)
write += i
}
assert.Equal(t, 10, rb.Capacity())
assert.Equal(t, 9, rb.Len())

// write one more, the buffer is full so it grows
rb.Write(10)
write += 10
assert.Equal(t, 20, rb.Capacity())
assert.Equal(t, 10, rb.Len())

for i := 0; i < 90; i++ {
rb.Write(i)
write += i
}

assert.Equal(t, 160, rb.Capacity())
assert.Equal(t, 100, rb.Len())

for {
v, err := rb.Read()
if err == ErrIsEmpty {
break
}

read += v.(int)
}

assert.Equal(t, write, read)

rb.Reset()
assert.Equal(t, 10, rb.Capacity())
assert.Equal(t, 0, rb.Len())
assert.True(t, rb.IsEmpty())
}

func TestRingBuffer_One(t *testing.T) {
rb := NewRingBuffer(1)
v, err := rb.Read()
assert.Nil(t, v)
assert.Error(t, err, ErrIsEmpty)

write := 0
read := 0

// write one and read it
rb.Write(0)
v, err = rb.Read()
assert.NoError(t, err)
assert.Equal(t, 0, v)
assert.Equal(t, 1, rb.r)
assert.Equal(t, 1, rb.w)
assert.True(t, rb.IsEmpty())

// then write 10
for i := 0; i < 9; i++ {
rb.Write(i)
write += i
}
assert.Equal(t, 16, rb.Capacity())
assert.Equal(t, 9, rb.Len())

// write one more, the buffer is full so it grows
rb.Write(10)
write += 10
assert.Equal(t, 16, rb.Capacity())
assert.Equal(t, 10, rb.Len())

for i := 0; i < 90; i++ {
rb.Write(i)
write += i
}

assert.Equal(t, 128, rb.Capacity())
assert.Equal(t, 100, rb.Len())

for {
v, err := rb.Read()
if err == ErrIsEmpty {
break
}

read += v.(int)
}

assert.Equal(t, write, read)

rb.Reset()
assert.Equal(t, 2, rb.Capacity())
assert.Equal(t, 0, rb.Len())
assert.True(t, rb.IsEmpty())
}
103 changes: 103 additions & 0 deletions chanx/unbounded_chan.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package chanx

// T defines interface{}, and will be used for generic type after go 1.18 is released.
type T interface{}

// UnboundedChan is an unbounded chan.
// In is used to write without blocking, which supports multiple writers.
// and Out is used to read, which supports multiple readers.
// You can close the in channel if you want.
type UnboundedChan struct {
In chan<- T // channel for write
Out <-chan T // channel for read
buffer *RingBuffer // buffer
}

// Len returns len of In plus len of Out plus len of buffer.
func (c UnboundedChan) Len() int {
return len(c.In) + c.buffer.Len() + len(c.Out)
}

// BufLen returns len of the buffer.
func (c UnboundedChan) BufLen() int {
return c.buffer.Len()
}

// NewUnboundedChan creates the unbounded chan.
// in is used to write without blocking, which supports multiple writers.
// and out is used to read, which supports multiple readers.
// You can close the in channel if you want.
func NewUnboundedChan(initCapacity int) UnboundedChan {
return NewUnboundedChanSize(initCapacity, initCapacity, initCapacity)
}

// NewUnboundedChanSize is like NewUnboundedChan but you can set initial capacity for In, Out, Buffer.
func NewUnboundedChanSize(initInCapacity, initOutCapacity, initBufCapacity int) UnboundedChan {
in := make(chan T, initInCapacity)
out := make(chan T, initOutCapacity)
ch := UnboundedChan{In: in, Out: out, buffer: NewRingBuffer(initBufCapacity)}

go process(in, out, ch)

return ch
}

func process(in, out chan T, ch UnboundedChan) {
defer close(out)
loop:
for {
val, ok := <-in
if !ok { // in is closed
break loop
}

// out is not full
select {
case out <- val:
continue
default:
}

// out is full
ch.buffer.Write(val)
for !ch.buffer.IsEmpty() {
select {
case val, ok := <-in:
if !ok { // in is closed
break loop
}
ch.buffer.Write(val)

case out <- ch.buffer.Peek():
ch.buffer.Pop()
if ch.buffer.IsEmpty() && ch.buffer.size > ch.buffer.initialSize { // after burst
ch.buffer.Reset()
}
}
}
}

// drain
for !ch.buffer.IsEmpty() {
out <- ch.buffer.Pop()
}

ch.buffer.Reset()
}
Loading

0 comments on commit d6ee0be

Please sign in to comment.