diff --git a/README.md b/README.md index 881e287..4bf5ddf 100644 --- a/README.md +++ b/README.md @@ -20,6 +20,8 @@ Still pretty specific to gotable, but contains logic required to maintain graph #### Queue: Package contains both a normal and priority queue. Both implementations never block on send and grow as much as necessary. Both also only return errors if you attempt to push to a disposed queue and will not panic like sending a message on a closed channel. The priority queue also allows you to place items in priority order inside the queue. If you give a useful hint to the regular queue, it is actually faster than a channel. The priority queue is somewhat slow currently and targeted for an update to a Fibonacci heap. +Also included in the queue package is a MPMC threadsafe ring buffer. This is a block full/empty queue, but will return a blocked thread if the queue is disposed while a thread is blocked. This can be used to synchronize goroutines and ensure goroutines quit so objects can be GC'd. Threadsafety is acheived using only CAS operations making this queue quite fast. Benchmarks can be found in that package. + #### Range Tree: Useful to determine if n-dimensional points fall within an n-dimensional range. Not a typical range tree however, as we are actually using an n-dimensional sorted list of points as this proved to be simpler and faster than attempting a traditional range tree while saving space on any dimension greater than one. Inserts are typical BBST times at O(log n^d) where d is the number of dimensions. diff --git a/btree/palm/tree.go b/btree/palm/tree.go index 3b34584..a0e3f14 100644 --- a/btree/palm/tree.go +++ b/btree/palm/tree.go @@ -151,8 +151,8 @@ func (ptree *ptree) runOperations() { writeOperations[n] = append(writeOperations[n], toPerform.writes[i]) } - toPerform.signaler <- true ptree.runAdds(writeOperations) + toPerform.signaler <- true } func (ptree *ptree) recursiveSplit(n, parent, left *node, nodes *[]*node, keys *Keys) { diff --git a/queue/error.go b/queue/error.go index e728c8d..29c062c 100644 --- a/queue/error.go +++ b/queue/error.go @@ -16,8 +16,6 @@ limitations under the License. package queue -type DisposedError struct{} +import "errors" -func (de DisposedError) Error() string { - return `Queue has been disposed.` -} +var disposedError = errors.New(`Queue has been disposed.`) diff --git a/queue/priority_queue.go b/queue/priority_queue.go index 8722fc1..3ccfd57 100644 --- a/queue/priority_queue.go +++ b/queue/priority_queue.go @@ -107,7 +107,7 @@ func (pq *PriorityQueue) Put(items ...Item) error { pq.lock.Lock() if pq.disposed { pq.lock.Unlock() - return DisposedError{} + return disposedError } for _, item := range items { @@ -144,7 +144,7 @@ func (pq *PriorityQueue) Get(number int) ([]Item, error) { if pq.disposed { pq.lock.Unlock() - return nil, DisposedError{} + return nil, disposedError } var items []Item @@ -159,7 +159,7 @@ func (pq *PriorityQueue) Get(number int) ([]Item, error) { pq.disposeLock.Lock() if pq.disposed { pq.disposeLock.Unlock() - return nil, DisposedError{} + return nil, disposedError } pq.disposeLock.Unlock() diff --git a/queue/priority_queue_test.go b/queue/priority_queue_test.go index e900692..615f6ff 100644 --- a/queue/priority_queue_test.go +++ b/queue/priority_queue_test.go @@ -193,7 +193,7 @@ func TestEmptyPriorityGetWithDispose(t *testing.T) { wg.Wait() - assert.IsType(t, DisposedError{}, err) + assert.IsType(t, disposedError, err) } func TestPriorityGetPutDisposed(t *testing.T) { @@ -201,10 +201,10 @@ func TestPriorityGetPutDisposed(t *testing.T) { q.Dispose() _, err := q.Get(1) - assert.IsType(t, DisposedError{}, err) + assert.IsType(t, disposedError, err) err = q.Put(mockItem(1)) - assert.IsType(t, DisposedError{}, err) + assert.IsType(t, disposedError, err) } func BenchmarkPriorityQueue(b *testing.B) { diff --git a/queue/queue.go b/queue/queue.go index 8ba9f11..856ae3e 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -24,10 +24,31 @@ as opposed to panicking as with channels. Queues will grow with unbounded behavior as opposed to channels which can be buffered but will pause while a thread attempts to put to a full channel. -TODO: Unify the two types of queue to the same interface. -TODO: Implement an even faster lockless circular buffer. +Recently added is a lockless ring buffer using the same basic C design as +found here: + +http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue + +Modified for use with Go with the addition of some dispose semantics providing +the capability to release blocked threads. This works for both puts +and gets, either will return an error if they are blocked and the buffer +is disposed. This could serve as a signal to kill a goroutine. All threadsafety +is acheived using CAS operations, making this buffer pretty quick. + +Benchmarks: +BenchmarkPriorityQueue-8 2000000 782 ns/op +BenchmarkQueue-8 2000000 671 ns/op +BenchmarkChannel-8 1000000 2083 ns/op +BenchmarkQueuePut-8 20000 84299 ns/op +BenchmarkQueueGet-8 20000 80753 ns/op +BenchmarkExecuteInParallel-8 20000 68891 ns/op +BenchmarkRBLifeCycle-8 10000000 177 ns/op +BenchmarkRBPut-8 30000000 58.1 ns/op +BenchmarkRBGet-8 50000000 26.8 ns/op + +TODO: We really need a Fibonacci heap for the priority queue. +TODO: Unify the types of queue to the same interface. */ - package queue import ( @@ -128,7 +149,7 @@ func (q *Queue) Put(items ...interface{}) error { if q.disposed { q.lock.Unlock() - return DisposedError{} + return disposedError } q.items = append(q.items, items...) @@ -163,7 +184,7 @@ func (q *Queue) Get(number int64) ([]interface{}, error) { if q.disposed { q.lock.Unlock() - return nil, DisposedError{} + return nil, disposedError } var items []interface{} @@ -177,7 +198,7 @@ func (q *Queue) Get(number int64) ([]interface{}, error) { sema.wg.Wait() // we are now inside the put's lock if q.disposed { - return nil, DisposedError{} + return nil, disposedError } items = q.items.get(number) sema.response.Done() @@ -201,7 +222,7 @@ func (q *Queue) TakeUntil(checker func(item interface{}) bool) ([]interface{}, e if q.disposed { q.lock.Unlock() - return nil, DisposedError{} + return nil, disposedError } result := q.items.getUntil(checker) diff --git a/queue/queue.prof b/queue/queue.prof new file mode 100644 index 0000000..6bddb3b Binary files /dev/null and b/queue/queue.prof differ diff --git a/queue/queue_test.go b/queue/queue_test.go index ad28212..6536d9d 100644 --- a/queue/queue_test.go +++ b/queue/queue_test.go @@ -189,7 +189,7 @@ func TestEmptyGetWithDispose(t *testing.T) { wg.Wait() - assert.IsType(t, DisposedError{}, err) + assert.IsType(t, disposedError, err) } func TestGetPutDisposed(t *testing.T) { @@ -198,10 +198,10 @@ func TestGetPutDisposed(t *testing.T) { q.Dispose() _, err := q.Get(1) - assert.IsType(t, DisposedError{}, err) + assert.IsType(t, disposedError, err) err = q.Put(`a`) - assert.IsType(t, DisposedError{}, err) + assert.IsType(t, disposedError, err) } func BenchmarkQueue(b *testing.B) { @@ -289,7 +289,7 @@ func TestTakeUntilOnDisposedQueue(t *testing.T) { }) assert.Nil(t, result) - assert.IsType(t, DisposedError{}, err) + assert.IsType(t, disposedError, err) } func TestExecuteInParallel(t *testing.T) { diff --git a/queue/ring.go b/queue/ring.go new file mode 100644 index 0000000..9c137a9 --- /dev/null +++ b/queue/ring.go @@ -0,0 +1,158 @@ +/* +Copyright 2014 Workiva, LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package queue + +import ( + "runtime" + "sync/atomic" +) + +// roundUp takes a uint64 greater than 0 and rounds it up to the next +// power of 2. +func roundUp(v uint64) uint64 { + v-- + v |= v >> 1 + v |= v >> 2 + v |= v >> 4 + v |= v >> 8 + v |= v >> 16 + v |= v >> 32 + v++ + return v +} + +type node struct { + position uint64 + data interface{} +} + +type nodes []*node + +// RingBuffer is a MPMC buffer that achieves threadsafety with CAS operations +// only. A put on full or get on empty call will block until an item +// is put or retrieved. Calling Dispose on the RingBuffer will unblock +// any blocked threads with an error. This buffer is similar to the buffer +// described here: http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue +// with some minor additions. +type RingBuffer struct { + nodes nodes + queue, dequeue, mask, disposed uint64 +} + +func (rb *RingBuffer) init(size uint64) { + size = roundUp(size) + rb.nodes = make(nodes, size) + for i := uint64(0); i < size; i++ { + rb.nodes[i] = &node{position: i} + } + rb.mask = size - 1 // so we don't have to do this with every put/get operation +} + +// Put adds the provided item to the queue. If the queue is full, this +// call will block until an item is added to the queue or Dispose is called +// on the queue. An error will be returned if the queue is disposed. +func (rb *RingBuffer) Put(item interface{}) error { + var n *node + pos := atomic.LoadUint64(&rb.queue) +L: + for { + if atomic.LoadUint64(&rb.disposed) == 1 { + return disposedError + } + + n = rb.nodes[pos&rb.mask] + seq := atomic.LoadUint64(&n.position) + switch dif := seq - pos; { + case dif == 0: + if atomic.CompareAndSwapUint64(&rb.queue, pos, pos+1) { + break L + } + case dif < 0: + panic(`Ring buffer in a compromised state during a put operation.`) + default: + pos = atomic.LoadUint64(&rb.queue) + } + runtime.Gosched() // free up the cpu before the next iteration + } + + n.data = item + atomic.StoreUint64(&n.position, pos+1) + return nil +} + +// Get will return the next item in the queue. This call will block +// if the queue is empty. This call will unblock when an item is added +// to the queue or Dispose is called on the queue. An error will be returned +// if the queue is disposed. +func (rb *RingBuffer) Get() (interface{}, error) { + var n *node + pos := atomic.LoadUint64(&rb.dequeue) +L: + for { + if atomic.LoadUint64(&rb.disposed) == 1 { + return nil, disposedError + } + + n = rb.nodes[pos&rb.mask] + seq := atomic.LoadUint64(&n.position) + switch dif := seq - (pos + 1); { + case dif == 0: + if atomic.CompareAndSwapUint64(&rb.dequeue, pos, pos+1) { + break L + } + case dif < 0: + panic(`Ring buffer in compromised state during a get operation.`) + default: + pos = atomic.LoadUint64(&rb.dequeue) + } + runtime.Gosched() // free up cpu before next iteration + } + data := n.data + n.data = nil + atomic.StoreUint64(&n.position, pos+rb.mask+1) + return data, nil +} + +// Len returns the number of items in the queue. +func (rb *RingBuffer) Len() uint64 { + return atomic.LoadUint64(&rb.queue) - atomic.LoadUint64(&rb.dequeue) +} + +// Cap returns the capacity of this ring buffer. +func (rb *RingBuffer) Cap() uint64 { + return uint64(len(rb.nodes)) +} + +// Dispose will dispose of this queue and free any blocked threads +// in the Put and/or Get methods. Calling those methods on a disposed +// queue will return an error. +func (rb *RingBuffer) Dispose() { + atomic.CompareAndSwapUint64(&rb.disposed, 0, 1) +} + +// IsDisposed will return a bool indicating if this queue has been +// disposed. +func (rb *RingBuffer) IsDisposed() bool { + return atomic.LoadUint64(&rb.disposed) == 1 +} + +// NewRingBuffer will allocate, initialize, and return a ring buffer +// with the specified size. +func NewRingBuffer(size uint64) *RingBuffer { + rb := &RingBuffer{} + rb.init(size) + return rb +} diff --git a/queue/ring_test.go b/queue/ring_test.go new file mode 100644 index 0000000..09538f8 --- /dev/null +++ b/queue/ring_test.go @@ -0,0 +1,290 @@ +/* +Copyright 2014 Workiva, LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package queue + +import ( + "sync" + "sync/atomic" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestRingInsert(t *testing.T) { + rb := NewRingBuffer(5) + assert.Equal(t, uint64(8), rb.Cap()) + + err := rb.Put(5) + if !assert.Nil(t, err) { + return + } + + result, err := rb.Get() + if !assert.Nil(t, err) { + return + } + + assert.Equal(t, 5, result) +} + +func TestRingMultipleInserts(t *testing.T) { + rb := NewRingBuffer(5) + + err := rb.Put(1) + if !assert.Nil(t, err) { + return + } + + err = rb.Put(2) + if !assert.Nil(t, err) { + return + } + + result, err := rb.Get() + if !assert.Nil(t, err) { + return + } + + assert.Equal(t, 1, result) + + result, err = rb.Get() + if assert.Nil(t, err) { + return + } + + assert.Equal(t, 2, result) +} + +func TestIntertwinedGetAndPut(t *testing.T) { + rb := NewRingBuffer(5) + err := rb.Put(1) + if !assert.Nil(t, err) { + return + } + + result, err := rb.Get() + if !assert.Nil(t, err) { + return + } + + assert.Equal(t, 1, result) + + err = rb.Put(2) + if !assert.Nil(t, err) { + return + } + + result, err = rb.Get() + if !assert.Nil(t, err) { + return + } + + assert.Equal(t, 2, result) +} + +func TestPutToFull(t *testing.T) { + rb := NewRingBuffer(3) + + for i := 0; i < 4; i++ { + err := rb.Put(i) + if !assert.Nil(t, err) { + return + } + } + + var wg sync.WaitGroup + wg.Add(2) + + go func() { + err := rb.Put(4) + assert.Nil(t, err) + wg.Done() + }() + + go func() { + defer wg.Done() + result, err := rb.Get() + if !assert.Nil(t, err) { + return + } + + assert.Equal(t, 0, result) + }() + + wg.Wait() +} + +func TestRingGetEmpty(t *testing.T) { + rb := NewRingBuffer(3) + + var wg sync.WaitGroup + wg.Add(1) + + // want to kick off this consumer to ensure it blocks + go func() { + wg.Done() + result, err := rb.Get() + assert.Nil(t, err) + assert.Equal(t, 0, result) + wg.Done() + }() + + wg.Wait() + wg.Add(2) + + go func() { + defer wg.Done() + err := rb.Put(0) + assert.Nil(t, err) + }() + + wg.Wait() +} + +func TestRingLen(t *testing.T) { + rb := NewRingBuffer(4) + assert.Equal(t, uint64(0), rb.Len()) + + rb.Put(1) + assert.Equal(t, uint64(1), rb.Len()) + + rb.Get() + assert.Equal(t, uint64(0), rb.Len()) + + for i := 0; i < 4; i++ { + rb.Put(1) + } + assert.Equal(t, uint64(4), rb.Len()) + + rb.Get() + assert.Equal(t, uint64(3), rb.Len()) +} + +func TestDisposeOnGet(t *testing.T) { + numThreads := 8 + var wg sync.WaitGroup + wg.Add(numThreads) + rb := NewRingBuffer(4) + var spunUp sync.WaitGroup + spunUp.Add(numThreads) + + for i := 0; i < numThreads; i++ { + go func() { + spunUp.Done() + defer wg.Done() + _, err := rb.Get() + assert.NotNil(t, err) + }() + } + + spunUp.Wait() + + rb.Dispose() + + wg.Wait() + + assert.True(t, rb.IsDisposed()) +} + +func TestDisposeOnPut(t *testing.T) { + numThreads := 8 + var wg sync.WaitGroup + wg.Add(numThreads) + rb := NewRingBuffer(4) + var spunUp sync.WaitGroup + spunUp.Add(numThreads) + + // fill up the queue + for i := 0; i < 4; i++ { + rb.Put(i) + } + + // it's now full + for i := 0; i < numThreads; i++ { + go func(i int) { + spunUp.Done() + defer wg.Done() + err := rb.Put(i) + assert.NotNil(t, err) + }(i) + } + + spunUp.Wait() + + rb.Dispose() + + wg.Wait() + + assert.True(t, rb.IsDisposed()) +} + +func BenchmarkRBLifeCycle(b *testing.B) { + rb := NewRingBuffer(64) + + counter := uint64(0) + var wg sync.WaitGroup + wg.Add(1) + + go func() { + defer wg.Done() + for { + _, err := rb.Get() + assert.Nil(b, err) + + if atomic.AddUint64(&counter, 1) == uint64(b.N) { + return + } + } + }() + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + rb.Put(i) + } + + wg.Wait() +} + +func BenchmarkRBPut(b *testing.B) { + rbs := make([]*RingBuffer, 0, b.N) + + for i := 0; i < b.N; i++ { + rbs = append(rbs, NewRingBuffer(2)) + } + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + rbs[i].Put(i) + } +} + +func BenchmarkRBGet(b *testing.B) { + rbs := make([]*RingBuffer, 0, b.N) + + for i := 0; i < b.N; i++ { + rbs = append(rbs, NewRingBuffer(2)) + rbs[i].Put(i) + } + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + rbs[i].Get() + } +}