diff --git a/chanx/ringbuffer.go b/chanx/ringbuffer.go deleted file mode 100644 index ade0ed7..0000000 --- a/chanx/ringbuffer.go +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package chanx - -import ( - "errors" -) - -var ErrIsEmpty = errors.New("ringbuffer is empty") - -// RingBuffer is a ring buffer for common types. -// It never is full and always grows if it will be full. -// It is not thread-safe(goroutine-safe) so you must use Lock to use it in multiple writers and multiple readers. -type RingBuffer struct { - buf []T - initialSize int - size int - r int // read pointer - w int // write pointer -} - -func NewRingBuffer(initialSize int) *RingBuffer { - if initialSize <= 0 { - panic("initial size must be great than zero") - } - // initial size must >= 2 - if initialSize == 1 { - initialSize = 2 - } - - return &RingBuffer{ - buf: make([]T, initialSize), - initialSize: initialSize, - size: initialSize, - } -} - -func (r *RingBuffer) Read() (T, error) { - if r.r == r.w { - return nil, ErrIsEmpty - } - - v := r.buf[r.r] - r.r++ - if r.r == r.size { - r.r = 0 - } - - return v, nil -} - -func (r *RingBuffer) Pop() T { - v, err := r.Read() - if err == ErrIsEmpty { // Empty - panic(ErrIsEmpty.Error()) - } - - return v -} - -func (r *RingBuffer) Peek() T { - if r.r == r.w { // Empty - panic(ErrIsEmpty.Error()) - } - - v := r.buf[r.r] - return v -} - -func (r *RingBuffer) Write(v T) { - r.buf[r.w] = v - r.w++ - - if r.w == r.size { - r.w = 0 - } - - if r.w == r.r { // full - r.grow() - } -} - -func (r *RingBuffer) grow() { - var size int - if r.size < 1024 { - size = r.size * 2 - } else { - size = r.size + r.size/4 - } - - buf := make([]T, size) - - copy(buf[0:], r.buf[r.r:]) - copy(buf[r.size-r.r:], r.buf[0:r.r]) - - r.r = 0 - r.w = r.size - r.size = size - r.buf = buf -} - -func (r *RingBuffer) IsEmpty() bool { - return r.r == r.w -} - -// Capacity returns the size of the underlying buffer. -func (r *RingBuffer) Capacity() int { - return r.size -} - -func (r *RingBuffer) Len() int { - if r.r == r.w { - return 0 - } - - if r.w > r.r { - return r.w - r.r - } - - return r.size - r.r + r.w -} - -func (r *RingBuffer) Reset() { - r.r = 0 - r.w = 0 - r.size = r.initialSize - r.buf = make([]T, r.initialSize) -} diff --git a/chanx/ringbuffer_test.go b/chanx/ringbuffer_test.go deleted file mode 100644 index fa82eb7..0000000 --- a/chanx/ringbuffer_test.go +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package chanx - -import ( - "testing" -) - -import ( - "github.com/stretchr/testify/assert" -) - -func TestRingBuffer(t *testing.T) { - rb := NewRingBuffer(10) - v, err := rb.Read() - assert.Nil(t, v) - assert.Error(t, err, ErrIsEmpty) - - write := 0 - read := 0 - - // write one and read it - rb.Write(0) - v, err = rb.Read() - assert.NoError(t, err) - assert.Equal(t, 0, v) - assert.Equal(t, 1, rb.r) - assert.Equal(t, 1, rb.w) - assert.True(t, rb.IsEmpty()) - - // then write 10 - for i := 0; i < 9; i++ { - rb.Write(i) - write += i - } - assert.Equal(t, 10, rb.Capacity()) - assert.Equal(t, 9, rb.Len()) - - // write one more, the buffer is full so it grows - rb.Write(10) - write += 10 - assert.Equal(t, 20, rb.Capacity()) - assert.Equal(t, 10, rb.Len()) - - for i := 0; i < 90; i++ { - rb.Write(i) - write += i - } - - assert.Equal(t, 160, rb.Capacity()) - assert.Equal(t, 100, rb.Len()) - - for { - v, err := rb.Read() - if err == ErrIsEmpty { - break - } - - read += v.(int) - } - - assert.Equal(t, write, read) - - rb.Reset() - assert.Equal(t, 10, rb.Capacity()) - assert.Equal(t, 0, rb.Len()) - assert.True(t, rb.IsEmpty()) -} - -func TestRingBuffer_One(t *testing.T) { - rb := NewRingBuffer(1) - v, err := rb.Read() - assert.Nil(t, v) - assert.Error(t, err, ErrIsEmpty) - - write := 0 - read := 0 - - // write one and read it - rb.Write(0) - v, err = rb.Read() - assert.NoError(t, err) - assert.Equal(t, 0, v) - assert.Equal(t, 1, rb.r) - assert.Equal(t, 1, rb.w) - assert.True(t, rb.IsEmpty()) - - // then write 10 - for i := 0; i < 9; i++ { - rb.Write(i) - write += i - } - assert.Equal(t, 16, rb.Capacity()) - assert.Equal(t, 9, rb.Len()) - - // write one more, the buffer is full so it grows - rb.Write(10) - write += 10 - assert.Equal(t, 16, rb.Capacity()) - assert.Equal(t, 10, rb.Len()) - - for i := 0; i < 90; i++ { - rb.Write(i) - write += i - } - - assert.Equal(t, 128, rb.Capacity()) - assert.Equal(t, 100, rb.Len()) - - for { - v, err := rb.Read() - if err == ErrIsEmpty { - break - } - - read += v.(int) - } - - assert.Equal(t, write, read) - - rb.Reset() - assert.Equal(t, 2, rb.Capacity()) - assert.Equal(t, 0, rb.Len()) - assert.True(t, rb.IsEmpty()) -} diff --git a/chanx/unbounded_chan.go b/chanx/unbounded_chan.go deleted file mode 100644 index 28f692a..0000000 --- a/chanx/unbounded_chan.go +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package chanx - -// T defines interface{}, and will be used for generic type after go 1.18 is released. -type T interface{} - -// UnboundedChan is an unbounded chan. -// In is used to write without blocking, which supports multiple writers. -// and Out is used to read, which supports multiple readers. -// You can close the in channel if you want. -type UnboundedChan struct { - In chan<- T // channel for write - Out <-chan T // channel for read - buffer *RingBuffer // buffer -} - -// Len returns len of In plus len of Out plus len of buffer. -func (c UnboundedChan) Len() int { - return len(c.In) + c.buffer.Len() + len(c.Out) -} - -// BufLen returns len of the buffer. -func (c UnboundedChan) BufLen() int { - return c.buffer.Len() -} - -// NewUnboundedChan creates the unbounded chan. -// in is used to write without blocking, which supports multiple writers. -// and out is used to read, which supports multiple readers. -// You can close the in channel if you want. -func NewUnboundedChan(initCapacity int) UnboundedChan { - return NewUnboundedChanSize(initCapacity, initCapacity, initCapacity) -} - -// NewUnboundedChanSize is like NewUnboundedChan but you can set initial capacity for In, Out, Buffer. -func NewUnboundedChanSize(initInCapacity, initOutCapacity, initBufCapacity int) UnboundedChan { - in := make(chan T, initInCapacity) - out := make(chan T, initOutCapacity) - ch := UnboundedChan{In: in, Out: out, buffer: NewRingBuffer(initBufCapacity)} - - go process(in, out, ch) - - return ch -} - -func process(in, out chan T, ch UnboundedChan) { - defer close(out) -loop: - for { - val, ok := <-in - if !ok { // in is closed - break loop - } - - // out is not full - select { - case out <- val: - continue - default: - } - - // out is full - ch.buffer.Write(val) - for !ch.buffer.IsEmpty() { - select { - case val, ok := <-in: - if !ok { // in is closed - break loop - } - ch.buffer.Write(val) - - case out <- ch.buffer.Peek(): - ch.buffer.Pop() - if ch.buffer.IsEmpty() && ch.buffer.size > ch.buffer.initialSize { // after burst - ch.buffer.Reset() - } - } - } - } - - // drain - for !ch.buffer.IsEmpty() { - out <- ch.buffer.Pop() - } - - ch.buffer.Reset() -} diff --git a/container/chan/unbounded_chan.go b/container/chan/unbounded_chan.go new file mode 100644 index 0000000..f225d8e --- /dev/null +++ b/container/chan/unbounded_chan.go @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package gxchan + +import ( + "github.com/dubbogo/gost/container/queue" +) + +// UnboundedChan is a chan that could grow if the number of elements exceeds the capacity. +type UnboundedChan struct { + in chan interface{} + out chan interface{} + queue *gxqueue.CircularUnboundedQueue +} + +// NewUnboundedChan creates an instance of UnboundedChan. +func NewUnboundedChan(capacity int) *UnboundedChan { + ch := &UnboundedChan{ + in: make(chan interface{}, capacity/3), + out: make(chan interface{}, capacity/3), + queue: gxqueue.NewCircularUnboundedQueue(capacity - 2*(capacity/3)), + } + + go ch.run() + + return ch +} + +// In returns write-only chan +func (ch *UnboundedChan) In() chan<- interface{} { + return ch.in +} + +// Out returns read-only chan +func (ch *UnboundedChan) Out() <-chan interface{} { + return ch.out +} + +func (ch *UnboundedChan) Len() int { + return len(ch.in) + len(ch.out) + ch.queue.Len() +} + +func (ch *UnboundedChan) run() { + defer func() { + close(ch.out) + }() + + for { + val, ok := <-ch.in + if !ok { + // `ch.in` was closed and queue has no elements + return + } + + select { + // data was written to `ch.out` + case ch.out <- val: + continue + // `ch.out` is full, move the data to `ch.queue` + default: + ch.queue.Push(val) + } + + for !ch.queue.IsEmpty() { + select { + case val, ok := <-ch.in: + if !ok { + ch.closeWait() + return + } + ch.queue.Push(val) + case ch.out <- ch.queue.Peek(): + ch.queue.Pop() + } + } + ch.shrinkQueue() + } +} + +func (ch *UnboundedChan) shrinkQueue() { + if ch.queue.IsEmpty() && ch.queue.Cap() > ch.queue.InitialSize() { + ch.queue.Reset() + } +} + +func (ch *UnboundedChan) closeWait() { + for !ch.queue.IsEmpty() { + ch.out <- ch.queue.Pop() + } +} diff --git a/chanx/unbounded_chan_test.go b/container/chan/unbounded_chan_test.go similarity index 59% rename from chanx/unbounded_chan_test.go rename to container/chan/unbounded_chan_test.go index 0963759..671d5f3 100644 --- a/chanx/unbounded_chan_test.go +++ b/container/chan/unbounded_chan_test.go @@ -15,69 +15,67 @@ * limitations under the License. */ -package chanx +package gxchan import ( "sync" "testing" ) -func TestMakeUnboundedChan(t *testing.T) { - ch := NewUnboundedChan(100) +import ( + "github.com/stretchr/testify/assert" +) + +func TestUnboundedChan(t *testing.T) { + ch := NewUnboundedChan(300) + + var count int for i := 1; i < 200; i++ { - ch.In <- int64(i) + ch.In() <- i + } + + for i := 1; i < 60; i++ { + v, _ := <-ch.Out() + count += v.(int) } - var count int64 - var wg sync.WaitGroup + assert.Equal(t, 100, ch.queue.Cap()) + + for i := 200; i <= 1200; i++ { + ch.In() <- i + } + assert.Equal(t, 1600, ch.queue.Cap()) + + wg := sync.WaitGroup{} wg.Add(1) go func() { defer wg.Done() - - for v := range ch.Out { - count += v.(int64) + var icount int + for v := range ch.Out() { + count += v.(int) + icount++ + if icount == 900 { + break + } } }() - for i := 200; i <= 1000; i++ { - ch.In <- int64(i) - } - close(ch.In) - wg.Wait() - if count != 500500 { - t.Fatalf("expected 500500 but got %d", count) - } -} - -func TestMakeUnboundedChanSize(t *testing.T) { - ch := NewUnboundedChanSize(10, 50, 100) - - for i := 1; i < 200; i++ { - ch.In <- int64(i) - } + close(ch.In()) - var count int64 - var wg sync.WaitGroup + // buffer should be empty wg.Add(1) go func() { defer wg.Done() - - for v := range ch.Out { - count += v.(int64) + for v := range ch.Out() { + count += v.(int) } }() - for i := 200; i <= 1000; i++ { - ch.In <- int64(i) - } - close(ch.In) - wg.Wait() - if count != 500500 { - t.Fatalf("expected 500500 but got %d", count) - } + assert.Equal(t, 720600, count) + } diff --git a/container/queue/circular_unbounded_queue.go b/container/queue/circular_unbounded_queue.go new file mode 100644 index 0000000..645afeb --- /dev/null +++ b/container/queue/circular_unbounded_queue.go @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package gxqueue + +const ( + fastGrowThreshold = 1024 +) + +// CircularUnboundedQueue is a circular structure and will grow automatically if it exceeds the capacity. +type CircularUnboundedQueue struct { + data []interface{} + head, tail int + isize int // initial size +} + +func NewCircularUnboundedQueue(size int) *CircularUnboundedQueue { + if size < 0 { + panic("size should be greater than zero") + } + return &CircularUnboundedQueue{ + data: make([]interface{}, size+1), + isize: size, + } +} + +func (q *CircularUnboundedQueue) IsEmpty() bool { + return q.head == q.tail +} + +func (q *CircularUnboundedQueue) Push(t interface{}) { + q.data[q.tail] = t + + q.tail = (q.tail + 1) % len(q.data) + if q.tail == q.head { + q.grow() + } +} + +func (q *CircularUnboundedQueue) Pop() interface{} { + if q.IsEmpty() { + panic("queue has no element") + } + + t := q.data[q.head] + q.head = (q.head + 1) % len(q.data) + + return t +} + +func (q *CircularUnboundedQueue) Peek() interface{} { + if q.IsEmpty() { + panic("queue has no element") + } + return q.data[q.head] +} + +func (q *CircularUnboundedQueue) Cap() int { + return len(q.data) - 1 +} + +func (q *CircularUnboundedQueue) Len() int { + head, tail := q.head, q.tail + if head > tail { + tail += len(q.data) + } + return tail - head +} + +func (q *CircularUnboundedQueue) Reset() { + q.data = make([]interface{}, q.isize+1) + q.head, q.tail = 0, 0 +} + +func (q *CircularUnboundedQueue) InitialSize() int { + return q.isize +} + +func (q *CircularUnboundedQueue) grow() { + oldsize := len(q.data) - 1 + var newsize int + if oldsize < fastGrowThreshold { + newsize = oldsize * 2 + } else { + newsize = oldsize + oldsize/4 + } + + newdata := make([]interface{}, newsize+1) + copy(newdata[0:], q.data[q.head:]) + copy(newdata[len(q.data)-q.head:], q.data[:q.head]) + + q.data = newdata + q.head, q.tail = 0, oldsize+1 +} diff --git a/container/queue/circular_unbounded_queue_test.go b/container/queue/circular_unbounded_queue_test.go new file mode 100644 index 0000000..f8b3d21 --- /dev/null +++ b/container/queue/circular_unbounded_queue_test.go @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package gxqueue + +import ( + "testing" +) + +import ( + "github.com/stretchr/testify/assert" +) + +func TestCircularUnboundedQueueWithoutGrowing(t *testing.T) { + queue := NewCircularUnboundedQueue(10) + + queue.Reset() + + // write 1 element + queue.Push(1) + assert.Equal(t, 1, queue.Len()) + assert.Equal(t, 10, queue.Cap()) + // peek and pop + assert.Equal(t, 1, queue.Peek()) + assert.Equal(t, 1, queue.Pop()) + // inspect len and cap + assert.Equal(t, 0, queue.Len()) + assert.Equal(t, 10, queue.Cap()) + + // write 8 elements + for i := 0; i < 8; i++ { + queue.Push(i) + } + assert.Equal(t, 8, queue.Len()) + assert.Equal(t, 10, queue.Cap()) + + var v interface{} + // pop 5 elements + for i := 0; i < 5; i++ { + v = queue.Pop() + assert.Equal(t, i, v) + } + assert.Equal(t, 3, queue.Len()) + assert.Equal(t, 10, queue.Cap()) + + // write 6 elements + for i := 0; i < 6; i++ { + queue.Push(i) + } + assert.Equal(t, 9, queue.Len()) + assert.Equal(t, 10, queue.Cap()) +} + +func TestBufferWithGrowing(t *testing.T) { + // size < fastGrowThreshold + queue := NewCircularUnboundedQueue(10) + + // write 11 elements + for i := 0; i < 11; i++ { + queue.Push(i) + } + + assert.Equal(t, 11, queue.Len()) + assert.Equal(t, 20, queue.Cap()) + + queue.Reset() + assert.Equal(t, 0, queue.Len()) + assert.Equal(t, 10, queue.Cap()) + + queue = NewCircularUnboundedQueue(fastGrowThreshold) + + // write fastGrowThreshold+1 elements + for i := 0; i < fastGrowThreshold+1; i++ { + queue.Push(i) + } + + assert.Equal(t, fastGrowThreshold+1, queue.Len()) + assert.Equal(t, fastGrowThreshold+fastGrowThreshold/4, queue.Cap()) + + queue.Reset() + assert.Equal(t, 0, queue.Len()) + assert.Equal(t, fastGrowThreshold, queue.Cap()) +}