Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ Still pretty specific to gotable, but contains logic required to maintain graph
#### Queue:
Package contains both a normal and priority queue. Both implementations never block on send and grow as much as necessary. Both also only return errors if you attempt to push to a disposed queue and will not panic like sending a message on a closed channel. The priority queue also allows you to place items in priority order inside the queue. If you give a useful hint to the regular queue, it is actually faster than a channel. The priority queue is somewhat slow currently and targeted for an update to a Fibonacci heap.

Also included in the queue package is a MPMC threadsafe ring buffer. This is a block full/empty queue, but will return a blocked thread if the queue is disposed while a thread is blocked. This can be used to synchronize goroutines and ensure goroutines quit so objects can be GC'd. Threadsafety is acheived using only CAS operations making this queue quite fast. Benchmarks can be found in that package.

#### Range Tree:
Useful to determine if n-dimensional points fall within an n-dimensional range. Not a typical range tree however, as we are actually using an n-dimensional sorted list of points as this proved to be simpler and faster than attempting a traditional range tree while saving space on any dimension greater than one. Inserts are typical BBST times at O(log n^d) where d is the number of dimensions.

Expand Down
2 changes: 1 addition & 1 deletion btree/palm/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,8 @@ func (ptree *ptree) runOperations() {
writeOperations[n] = append(writeOperations[n], toPerform.writes[i])
}

toPerform.signaler <- true
ptree.runAdds(writeOperations)
toPerform.signaler <- true
}

func (ptree *ptree) recursiveSplit(n, parent, left *node, nodes *[]*node, keys *Keys) {
Expand Down
6 changes: 2 additions & 4 deletions queue/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ limitations under the License.

package queue

type DisposedError struct{}
import "errors"

func (de DisposedError) Error() string {
return `Queue has been disposed.`
}
var disposedError = errors.New(`Queue has been disposed.`)
6 changes: 3 additions & 3 deletions queue/priority_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (pq *PriorityQueue) Put(items ...Item) error {
pq.lock.Lock()
if pq.disposed {
pq.lock.Unlock()
return DisposedError{}
return disposedError
}

for _, item := range items {
Expand Down Expand Up @@ -144,7 +144,7 @@ func (pq *PriorityQueue) Get(number int) ([]Item, error) {

if pq.disposed {
pq.lock.Unlock()
return nil, DisposedError{}
return nil, disposedError
}

var items []Item
Expand All @@ -159,7 +159,7 @@ func (pq *PriorityQueue) Get(number int) ([]Item, error) {
pq.disposeLock.Lock()
if pq.disposed {
pq.disposeLock.Unlock()
return nil, DisposedError{}
return nil, disposedError
}
pq.disposeLock.Unlock()

Expand Down
6 changes: 3 additions & 3 deletions queue/priority_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,18 +193,18 @@ func TestEmptyPriorityGetWithDispose(t *testing.T) {

wg.Wait()

assert.IsType(t, DisposedError{}, err)
assert.IsType(t, disposedError, err)
}

func TestPriorityGetPutDisposed(t *testing.T) {
q := NewPriorityQueue(1)
q.Dispose()

_, err := q.Get(1)
assert.IsType(t, DisposedError{}, err)
assert.IsType(t, disposedError, err)

err = q.Put(mockItem(1))
assert.IsType(t, DisposedError{}, err)
assert.IsType(t, disposedError, err)
}

func BenchmarkPriorityQueue(b *testing.B) {
Expand Down
35 changes: 28 additions & 7 deletions queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,31 @@ as opposed to panicking as with channels. Queues will grow with unbounded
behavior as opposed to channels which can be buffered but will pause
while a thread attempts to put to a full channel.

TODO: Unify the two types of queue to the same interface.
TODO: Implement an even faster lockless circular buffer.
Recently added is a lockless ring buffer using the same basic C design as
found here:

http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue

Modified for use with Go with the addition of some dispose semantics providing
the capability to release blocked threads. This works for both puts
and gets, either will return an error if they are blocked and the buffer
is disposed. This could serve as a signal to kill a goroutine. All threadsafety
is acheived using CAS operations, making this buffer pretty quick.

Benchmarks:
BenchmarkPriorityQueue-8 2000000 782 ns/op
BenchmarkQueue-8 2000000 671 ns/op
BenchmarkChannel-8 1000000 2083 ns/op
BenchmarkQueuePut-8 20000 84299 ns/op
BenchmarkQueueGet-8 20000 80753 ns/op
BenchmarkExecuteInParallel-8 20000 68891 ns/op
BenchmarkRBLifeCycle-8 10000000 177 ns/op
BenchmarkRBPut-8 30000000 58.1 ns/op
BenchmarkRBGet-8 50000000 26.8 ns/op
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alignment is a little off here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The benchmarking does this for some reason. Pretty annoying.


TODO: We really need a Fibonacci heap for the priority queue.
TODO: Unify the types of queue to the same interface.
*/

package queue

import (
Expand Down Expand Up @@ -128,7 +149,7 @@ func (q *Queue) Put(items ...interface{}) error {

if q.disposed {
q.lock.Unlock()
return DisposedError{}
return disposedError
}

q.items = append(q.items, items...)
Expand Down Expand Up @@ -163,7 +184,7 @@ func (q *Queue) Get(number int64) ([]interface{}, error) {

if q.disposed {
q.lock.Unlock()
return nil, DisposedError{}
return nil, disposedError
}

var items []interface{}
Expand All @@ -177,7 +198,7 @@ func (q *Queue) Get(number int64) ([]interface{}, error) {
sema.wg.Wait()
// we are now inside the put's lock
if q.disposed {
return nil, DisposedError{}
return nil, disposedError
}
items = q.items.get(number)
sema.response.Done()
Expand All @@ -201,7 +222,7 @@ func (q *Queue) TakeUntil(checker func(item interface{}) bool) ([]interface{}, e

if q.disposed {
q.lock.Unlock()
return nil, DisposedError{}
return nil, disposedError
}

result := q.items.getUntil(checker)
Expand Down
Binary file added queue/queue.prof
Binary file not shown.
8 changes: 4 additions & 4 deletions queue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func TestEmptyGetWithDispose(t *testing.T) {

wg.Wait()

assert.IsType(t, DisposedError{}, err)
assert.IsType(t, disposedError, err)
}

func TestGetPutDisposed(t *testing.T) {
Expand All @@ -198,10 +198,10 @@ func TestGetPutDisposed(t *testing.T) {
q.Dispose()

_, err := q.Get(1)
assert.IsType(t, DisposedError{}, err)
assert.IsType(t, disposedError, err)

err = q.Put(`a`)
assert.IsType(t, DisposedError{}, err)
assert.IsType(t, disposedError, err)
}

func BenchmarkQueue(b *testing.B) {
Expand Down Expand Up @@ -289,7 +289,7 @@ func TestTakeUntilOnDisposedQueue(t *testing.T) {
})

assert.Nil(t, result)
assert.IsType(t, DisposedError{}, err)
assert.IsType(t, disposedError, err)
}

func TestExecuteInParallel(t *testing.T) {
Expand Down
158 changes: 158 additions & 0 deletions queue/ring.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
Copyright 2014 Workiva, LLC

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package queue

import (
"runtime"
"sync/atomic"
)

// roundUp takes a uint64 greater than 0 and rounds it up to the next
// power of 2.
func roundUp(v uint64) uint64 {
v--
v |= v >> 1
v |= v >> 2
v |= v >> 4
v |= v >> 8
v |= v >> 16
v |= v >> 32
v++
return v
}

type node struct {
position uint64
data interface{}
}

type nodes []*node

// RingBuffer is a MPMC buffer that achieves threadsafety with CAS operations
// only. A put on full or get on empty call will block until an item
// is put or retrieved. Calling Dispose on the RingBuffer will unblock
// any blocked threads with an error. This buffer is similar to the buffer
// described here: http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
// with some minor additions.
type RingBuffer struct {
nodes nodes
queue, dequeue, mask, disposed uint64
}

func (rb *RingBuffer) init(size uint64) {
size = roundUp(size)
rb.nodes = make(nodes, size)
for i := uint64(0); i < size; i++ {
rb.nodes[i] = &node{position: i}
}
rb.mask = size - 1 // so we don't have to do this with every put/get operation
}

// Put adds the provided item to the queue. If the queue is full, this
// call will block until an item is added to the queue or Dispose is called
// on the queue. An error will be returned if the queue is disposed.
func (rb *RingBuffer) Put(item interface{}) error {
var n *node
pos := atomic.LoadUint64(&rb.queue)
L:
for {
if atomic.LoadUint64(&rb.disposed) == 1 {
return disposedError
}

n = rb.nodes[pos&rb.mask]
seq := atomic.LoadUint64(&n.position)
switch dif := seq - pos; {
case dif == 0:
if atomic.CompareAndSwapUint64(&rb.queue, pos, pos+1) {
break L
}
case dif < 0:
panic(`Ring buffer in a compromised state during a put operation.`)
default:
pos = atomic.LoadUint64(&rb.queue)
}
runtime.Gosched() // free up the cpu before the next iteration
}

n.data = item
atomic.StoreUint64(&n.position, pos+1)
return nil
}

// Get will return the next item in the queue. This call will block
// if the queue is empty. This call will unblock when an item is added
// to the queue or Dispose is called on the queue. An error will be returned
// if the queue is disposed.
func (rb *RingBuffer) Get() (interface{}, error) {
var n *node
pos := atomic.LoadUint64(&rb.dequeue)
L:
for {
if atomic.LoadUint64(&rb.disposed) == 1 {
return nil, disposedError
}

n = rb.nodes[pos&rb.mask]
seq := atomic.LoadUint64(&n.position)
switch dif := seq - (pos + 1); {
case dif == 0:
if atomic.CompareAndSwapUint64(&rb.dequeue, pos, pos+1) {
break L
}
case dif < 0:
panic(`Ring buffer in compromised state during a get operation.`)
default:
pos = atomic.LoadUint64(&rb.dequeue)
}
runtime.Gosched() // free up cpu before next iteration
}
data := n.data
n.data = nil
atomic.StoreUint64(&n.position, pos+rb.mask+1)
return data, nil
}

// Len returns the number of items in the queue.
func (rb *RingBuffer) Len() uint64 {
return atomic.LoadUint64(&rb.queue) - atomic.LoadUint64(&rb.dequeue)
}

// Cap returns the capacity of this ring buffer.
func (rb *RingBuffer) Cap() uint64 {
return uint64(len(rb.nodes))
}

// Dispose will dispose of this queue and free any blocked threads
// in the Put and/or Get methods. Calling those methods on a disposed
// queue will return an error.
func (rb *RingBuffer) Dispose() {
atomic.CompareAndSwapUint64(&rb.disposed, 0, 1)
}

// IsDisposed will return a bool indicating if this queue has been
// disposed.
func (rb *RingBuffer) IsDisposed() bool {
return atomic.LoadUint64(&rb.disposed) == 1
}

// NewRingBuffer will allocate, initialize, and return a ring buffer
// with the specified size.
func NewRingBuffer(size uint64) *RingBuffer {
rb := &RingBuffer{}
rb.init(size)
return rb
}
Loading