-
Notifications
You must be signed in to change notification settings - Fork 5
/
ringbuffer.go
156 lines (120 loc) · 3.95 KB
/
ringbuffer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package lockless_generic_ring_buffer
import (
"errors"
"runtime"
"sync"
"sync/atomic"
)
var (
MaxUint32 = ^uint32(0)
MaxConsumerError = errors.New("max amount of consumers reached cannot create any more")
)
type RingBuffer[T any] struct {
buffer []T
length uint32
headPointer uint32 // next position to write
maxReaderIndex uint32
readersPosition []*uint32
consumerLock sync.Mutex
}
type Consumer[T any] struct {
ring *RingBuffer[T]
id uint32
}
func CreateBuffer[T any](size uint32, maxConsumer uint32) RingBuffer[T] {
return RingBuffer[T]{
buffer: make([]T, size, size),
length: size,
headPointer: 0,
readersPosition: make([]*uint32, maxConsumer),
consumerLock: sync.Mutex{},
}
}
/*
CreateConsumer
Create a consumer by assigning it the id of the first empty position in the consumerPosition array. A nil value represents
an unclaimed/not used consumer.
Locks can be used as it has no effect on read/write operations and is only to keep consumer consistency, thus the
algorithm is still lockless For best performance, consumers should be preallocated before starting buffer operations
*/
func (ringbuffer *RingBuffer[T]) CreateConsumer() (Consumer[T], error) {
ringbuffer.consumerLock.Lock()
defer ringbuffer.consumerLock.Unlock()
var insertIndex = ringbuffer.length // default to maximum case
// locate first empty position, if no empty positions return error
for i, consumer := range ringbuffer.readersPosition {
if consumer == nil {
insertIndex = uint32(i)
break
}
}
if int(insertIndex) == len(ringbuffer.readersPosition) {
return Consumer[T]{}, MaxConsumerError
}
if insertIndex >= ringbuffer.maxReaderIndex {
atomic.AddUint32(&ringbuffer.maxReaderIndex, 1)
}
var readPosition = ringbuffer.headPointer - 1
ringbuffer.readersPosition[insertIndex] = &readPosition
return Consumer[T]{
id: insertIndex,
ring: ringbuffer,
}, nil
}
func (consumer *Consumer[T]) Remove() {
consumer.ring.removeConsumer(consumer.id)
}
func (consumer *Consumer[T]) Get() T {
return consumer.ring.readIndex(consumer.id)
}
func (ringbuffer *RingBuffer[T]) removeConsumer(consumerId uint32) {
ringbuffer.consumerLock.Lock()
defer ringbuffer.consumerLock.Unlock()
ringbuffer.readersPosition[consumerId] = nil
if consumerId == ringbuffer.maxReaderIndex-1 {
ringbuffer.maxReaderIndex--
}
}
func (ringbuffer *RingBuffer[T]) Write(value T) {
var lastRead uint32
var currentRead uint32
var i uint32
/*
We are blocking until the all at least one space is available in the buffer to write.
As overflow properties of uint32 are utilized to ensure slice index boundaries are adhered too we add length of
buffer to current consumer read positions allowing us to determine the least read consumer.
For example: buffer of size 2
uint8 head = 1
uint8 tail = 255
tail + 2 => 1 with overflow, same as buffer
Concurrent access does not matter as variables can only be updated to make progression towards head pointer, if a
variable is updated during iteration it has no effect on data integrity. Worst case it causes another yield to
scheduler and recheck for current minimum consumer.
*/
for {
lastRead = ringbuffer.headPointer + ringbuffer.length
for i = 0; i <= ringbuffer.maxReaderIndex; i++ {
if ringbuffer.readersPosition[i] == nil {
continue
}
currentRead = *ringbuffer.readersPosition[i] + ringbuffer.length
if currentRead < lastRead {
lastRead = currentRead
}
}
if lastRead > ringbuffer.headPointer {
break
}
runtime.Gosched()
}
ringbuffer.buffer[ringbuffer.headPointer%ringbuffer.length] = value
atomic.AddUint32(&ringbuffer.headPointer, 1)
}
func (ringbuffer *RingBuffer[T]) readIndex(consumerId uint32) T {
var newIndex = atomic.AddUint32(ringbuffer.readersPosition[consumerId], 1)
// yield until work is available
for newIndex >= ringbuffer.headPointer {
runtime.Gosched()
}
return ringbuffer.buffer[newIndex%ringbuffer.length]
}