Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

initial lockfree attempt #191

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 113 additions & 0 deletions autopaho/queue/benchmarks/bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package benchmarks

import (
"runtime"
"strings"
"testing"

"github.com/eclipse/paho.golang/autopaho/queue"
"github.com/eclipse/paho.golang/autopaho/queue/lockfree"
"github.com/eclipse/paho.golang/autopaho/queue/memory"
)

func benchmarkConcurrentEnqueueDequeue(b *testing.B, q queue.Queue) {
data := strings.NewReader("test data")
workers := runtime.GOMAXPROCS(0) // Use as many goroutines as there are available CPUs

b.ResetTimer()
b.ReportAllocs()
b.SetParallelism(workers) // Set the number of goroutines to use in parallel benchmarks

b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
if err := q.Enqueue(data); err != nil {
b.Fatal(err)
}
if err := q.Dequeue(); err != nil {
b.Fatal(err)
}
}
})
}

func seedQueue(q queue.Queue, count int) queue.Queue {
for i := 0; i < count; i++ {
data := strings.NewReader("test data")
q.Enqueue(data)
}
return q
}

func benchmarkEnqueue(b *testing.B, q queue.Queue) {
data := strings.NewReader("test data")

b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
if err := q.Enqueue(data); err != nil {
b.Fatal(err)
}
}
}

func benchmarkPeek(b *testing.B, q queue.Queue) {
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
if _, err := q.Peek(); err != nil {
b.Fatal(err)
}
}
}

func benchmarkDequeue(b *testing.B, q queue.Queue) {
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
q.Dequeue()
}
}

func BenchmarkEnqueueMemory(b *testing.B) {
queue := memory.New()
benchmarkEnqueue(b, queue)
}

func BenchmarkEnqueueLockFree(b *testing.B) {
queue := lockfree.New()
benchmarkEnqueue(b, queue)
}

func BenchmarkDequeueMemory(b *testing.B) {
q := memory.New()
seedQueue(q, 1000)
benchmarkEnqueue(b, q)
}

func BenchmarkDequeueLockFree(b *testing.B) {
q := lockfree.New()
seedQueue(q, 1000)
benchmarkDequeue(b, q)
}

func BenchmarkPeakMemory(b *testing.B) {
q := memory.New()
seedQueue(q, 1000)
benchmarkPeek(b, q)
}

func BenchmarkPeakLockFree(b *testing.B) {
q := lockfree.New()
seedQueue(q, 1000)
benchmarkPeek(b, q)
}

func BenchmarkConcurrentMemory(b *testing.B) {
q := memory.New()
benchmarkConcurrentEnqueueDequeue(b, q)
}

func BenchmarkConcurrentLockFree(b *testing.B) {
q := lockfree.New()
benchmarkConcurrentEnqueueDequeue(b, q)
}
135 changes: 135 additions & 0 deletions autopaho/queue/lockfree/queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package lockfree

import (
"bytes"
"io"
"sync/atomic"
"unsafe"

"github.com/eclipse/paho.golang/autopaho/queue"
)

type Queue struct {
head unsafe.Pointer // *node
tail unsafe.Pointer // *node
waitChan unsafe.Pointer // *chan struct{}
}

type node struct {
value []byte
next unsafe.Pointer // *node
}

// NewLockFree creates a queue with a dummy node.
func New() *Queue {
dummy := &node{}
return &Queue{
head: unsafe.Pointer(dummy),
tail: unsafe.Pointer(dummy),
}
}

// Enqueue adds an item to the queue.
func (q *Queue) Enqueue(p io.Reader) error {
data, err := io.ReadAll(p)
if err != nil {
return err
}

n := &node{value: data}
for {
tail := (*node)(atomic.LoadPointer(&q.tail))
next := (*node)(atomic.LoadPointer(&tail.next))
if tail == (*node)(atomic.LoadPointer(&q.tail)) { // Still the tail?
if next == nil {
if atomic.CompareAndSwapPointer(&tail.next, nil, unsafe.Pointer(n)) {
atomic.CompareAndSwapPointer(&q.tail, unsafe.Pointer(tail), unsafe.Pointer(n))
// Signal that the queue is not empty if needed
q.signalNotEmpty()
return nil
}
} else {
atomic.CompareAndSwapPointer(&q.tail, unsafe.Pointer(tail), unsafe.Pointer(next))
}
}
}
}

// Dequeue removes the oldest item from the queue.
func (q *Queue) Dequeue() error {
for {
head := (*node)(atomic.LoadPointer(&q.head))
tail := (*node)(atomic.LoadPointer(&q.tail))
next := (*node)(atomic.LoadPointer(&head.next))
if head == (*node)(atomic.LoadPointer(&q.head)) { // Still the head?
if head == tail {
if next == nil {
return queue.ErrEmpty // Queue is empty
}
// Tail falling behind, advance it
atomic.CompareAndSwapPointer(&q.tail, unsafe.Pointer(tail), unsafe.Pointer(next))
} else {
// Read value before CAS, otherwise another dequeue might free the next node
if atomic.CompareAndSwapPointer(&q.head, unsafe.Pointer(head), unsafe.Pointer(next)) {
return nil
}
}
}
}
}

// Peek retrieves the oldest item from the queue without removing it.
func (q *Queue) Peek() (io.ReadCloser, error) {
for {
head := (*node)(atomic.LoadPointer(&q.head))
next := (*node)(atomic.LoadPointer(&head.next))
if next != nil { // There is an item in the queue
return io.NopCloser(bytes.NewReader(next.value)), nil
}

if atomic.LoadPointer(&q.waitChan) != nil {
// The wait channel is set, meaning the queue may not be empty
continue // Retry the loop since the queue state may have changed
}
// The queue is empty
return nil, queue.ErrEmpty
}
}

// Wait returns a channel that is closed when there is something in the queue.
func (q *Queue) Wait() chan struct{} {
for {
if !q.isEmpty() {
// If the queue is not empty, return a closed channel
c := make(chan struct{})
close(c)
return c
}

// Attempt to create a wait channel if it doesn't exist
if atomic.LoadPointer(&q.waitChan) == nil {
newCh := make(chan struct{})
if atomic.CompareAndSwapPointer(&q.waitChan, nil, unsafe.Pointer(&newCh)) {
return newCh
}
}
}
}

// isEmpty checks if the queue is empty.
func (q *Queue) isEmpty() bool {
head := (*node)(atomic.LoadPointer(&q.head))
tail := (*node)(atomic.LoadPointer(&q.tail))
next := (*node)(atomic.LoadPointer(&head.next))
return head == tail && next == nil
}

// signalNotEmpty signals that the queue is not empty.
func (q *Queue) signalNotEmpty() {
chPtr := atomic.LoadPointer(&q.waitChan)
if chPtr != nil {
ch := *(*chan struct{})(chPtr)
close(ch) // Close the channel to signal that the queue is not empty
atomic.StorePointer(&q.waitChan, nil) // Reset the wait channel pointer
}
}
Loading