forked from puzpuzpuz/xsync
-
Notifications
You must be signed in to change notification settings - Fork 0
/
mapof.go
371 lines (352 loc) · 10.7 KB
/
mapof.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
//go:build go1.18
// +build go1.18
package xsync
import (
"fmt"
"math"
"sync"
"sync/atomic"
"unsafe"
)
// MapOf is like a Go map[string]V but is safe for concurrent
// use by multiple goroutines without additional locking or
// coordination. It follows the interface of sync.Map.
//
// A MapOf must not be copied after first use.
//
// MapOf uses a modified version of Cache-Line Hash Table (CLHT)
// data structure: https://github.com/LPD-EPFL/CLHT
//
// CLHT is built around idea to organize the hash table in
// cache-line-sized buckets, so that on all modern CPUs update
// operations complete with at most one cache-line transfer.
// Also, Get operations involve no write to memory, as well as no
// mutexes or any other sort of locks. Due to this design, in all
// considered scenarios MapOf outperforms sync.Map.
//
// One important difference with sync.Map is that only string keys
// are supported. That's because Golang standard library does not
// expose the built-in hash functions for interface{} values.
type MapOf[V any] struct {
table unsafe.Pointer // *mapTable
resizing int64 // resize in progress flag; updated atomically
resizeMu sync.Mutex // only used along with resizeCond
resizeCond sync.Cond // used to wake up resize waiters (concurrent modifications)
totalGrowths int64
totalShrinks int64
}
// NewMapOf creates a new MapOf instance.
func NewMapOf[V any]() *MapOf[V] {
m := &MapOf[V]{}
m.resizeCond = *sync.NewCond(&m.resizeMu)
table := newMapTable(minMapTableLen)
atomic.StorePointer(&m.table, unsafe.Pointer(table))
return m
}
// Load returns the value stored in the map for a key, or nil if no
// value is present.
// The ok result indicates whether value was found in the map.
func (m *MapOf[V]) Load(key string) (value V, ok bool) {
hash := maphash64(key)
table := (*mapTable)(atomic.LoadPointer(&m.table))
bidx := bucketIdx(table, hash)
b := &table.buckets[bidx]
topHashes := atomic.LoadUint64(&b.topHashes)
for i := 0; i < entriesPerMapBucket; i++ {
if !topHashMatch(hash, topHashes, i) {
continue
}
atomic_snapshot:
// Start atomic snapshot.
vp := atomic.LoadPointer(&b.values[i])
kp := atomic.LoadPointer(&b.keys[i])
if kp != nil && vp != nil {
if key == derefKey(kp) {
if uintptr(vp) == uintptr(atomic.LoadPointer(&b.values[i])) {
// Atomic snapshot succeeded.
return derefTypedValue[V](vp), true
}
// Concurrent update/remove. Go for another spin.
goto atomic_snapshot
}
}
}
return value, false
}
// Store sets the value for a key.
func (m *MapOf[V]) Store(key string, value V) {
m.doStore(key, value, false)
}
// LoadOrStore returns the existing value for the key if present.
// Otherwise, it stores and returns the given value.
// The loaded result is true if the value was loaded, false if stored.
func (m *MapOf[V]) LoadOrStore(key string, value V) (actual V, loaded bool) {
return m.doStore(key, value, true)
}
// LoadAndStore returns the existing value for the key if present,
// while setting the new value for the key.
// Otherwise, it stores and returns the given value.
// The loaded result is true if the value was loaded, false otherwise.
func (m *MapOf[V]) LoadAndStore(key string, value V) (actual V, loaded bool) {
return m.doStore(key, value, false)
}
func (m *MapOf[V]) doStore(key string, value V, loadIfExists bool) (V, bool) {
// Read-only path.
if loadIfExists {
if v, ok := m.Load(key); ok {
return v, true
}
}
// Write path.
hash := maphash64(key)
for {
var (
emptykp, emptyvp *unsafe.Pointer
emptyidx int
)
table := (*mapTable)(atomic.LoadPointer(&m.table))
bidx := bucketIdx(table, hash)
b := &table.buckets[bidx]
b.mu.Lock()
if m.newerTableExists(table) {
// Someone resized the table. Go for another attempt.
b.mu.Unlock()
continue
}
if m.resizeInProgress() {
// Resize is in progress. Wait, then go for another attempt.
b.mu.Unlock()
m.waitForResize()
continue
}
for i := 0; i < entriesPerMapBucket; i++ {
if b.keys[i] == nil {
if emptykp == nil {
emptykp = &b.keys[i]
emptyvp = &b.values[i]
emptyidx = i
}
continue
}
if !topHashMatch(hash, b.topHashes, i) {
continue
}
if key == derefKey(b.keys[i]) {
vp := b.values[i]
if loadIfExists {
b.mu.Unlock()
return derefTypedValue[V](vp), true
}
// In-place update case. Luckily we get a copy of the value
// interface{} on each call, thus the live value pointers are
// unique. Otherwise atomic snapshot won't be correct in case
// of multiple Store calls using the same value.
atomic.StorePointer(&b.values[i], unsafe.Pointer(&value))
b.mu.Unlock()
return derefTypedValue[V](vp), true
}
}
if emptykp != nil {
// Insertion case. First we update the value, then the key.
// This is important for atomic snapshot states.
atomic.StoreUint64(&b.topHashes, storeTopHash(hash, b.topHashes, emptyidx))
atomic.StorePointer(emptyvp, unsafe.Pointer(&value))
atomic.StorePointer(emptykp, unsafe.Pointer(&key))
b.mu.Unlock()
addSize(table, bidx, 1)
return value, false
}
// Need to grow the table. Then go for another attempt.
b.mu.Unlock()
m.resize(table, mapGrowHint)
}
}
func (m *MapOf[V]) newerTableExists(table *mapTable) bool {
curTablePtr := atomic.LoadPointer(&m.table)
return uintptr(curTablePtr) != uintptr(unsafe.Pointer(table))
}
func (m *MapOf[V]) resizeInProgress() bool {
return atomic.LoadInt64(&m.resizing) == 1
}
func (m *MapOf[V]) waitForResize() {
m.resizeMu.Lock()
for m.resizeInProgress() {
m.resizeCond.Wait()
}
m.resizeMu.Unlock()
}
func (m *MapOf[V]) resize(table *mapTable, hint mapResizeHint) {
var shrinkThreshold int64
tableLen := len(table.buckets)
// Fast path for shrink attempts.
if hint == mapShrinkHint {
shrinkThreshold = int64((tableLen * entriesPerMapBucket) / mapShrinkFraction)
if tableLen == minMapTableLen || sumSize(table) > shrinkThreshold {
return
}
}
// Slow path.
if !atomic.CompareAndSwapInt64(&m.resizing, 0, 1) {
// Someone else started resize. Wait for it to finish.
m.waitForResize()
return
}
var newTable *mapTable
switch hint {
case mapGrowHint:
// Grow the table with factor of 2.
atomic.AddInt64(&m.totalGrowths, 1)
newTable = newMapTable(tableLen << 1)
case mapShrinkHint:
if sumSize(table) <= shrinkThreshold {
// Shrink the table with factor of 2.
atomic.AddInt64(&m.totalShrinks, 1)
newTable = newMapTable(tableLen >> 1)
} else {
// No need to shrink. Wake up all waiters and give up.
m.resizeMu.Lock()
atomic.StoreInt64(&m.resizing, 0)
m.resizeCond.Broadcast()
m.resizeMu.Unlock()
return
}
default:
panic(fmt.Sprintf("unexpected resize hint: %d", hint))
}
copy:
for i := 0; i < tableLen; i++ {
copied, ok := copyBucket(&table.buckets[i], newTable)
if !ok {
// Table size is insufficient, need to grow it.
newTable = newMapTable(len(newTable.buckets) << 1)
goto copy
}
addSizeNonAtomic(newTable, uint64(i), copied)
}
// Publish the new table and wake up all waiters.
atomic.StorePointer(&m.table, unsafe.Pointer(newTable))
m.resizeMu.Lock()
atomic.StoreInt64(&m.resizing, 0)
m.resizeCond.Broadcast()
m.resizeMu.Unlock()
}
// LoadAndDelete deletes the value for a key, returning the previous
// value if any. The loaded result reports whether the key was
// present.
func (m *MapOf[V]) LoadAndDelete(key string) (value V, loaded bool) {
hash := maphash64(key)
delete_attempt:
hintNonEmpty := 0
table := (*mapTable)(atomic.LoadPointer(&m.table))
bidx := bucketIdx(table, hash)
b := &table.buckets[bidx]
b.mu.Lock()
if m.newerTableExists(table) {
// Someone resized the table. Go for another attempt.
b.mu.Unlock()
goto delete_attempt
}
if m.resizeInProgress() {
// Resize is in progress. Wait, then go for another attempt.
b.mu.Unlock()
m.waitForResize()
goto delete_attempt
}
for i := 0; i < entriesPerMapBucket; i++ {
kp := b.keys[i]
if kp == nil || !topHashMatch(hash, b.topHashes, i) {
continue
}
if key == derefKey(kp) {
vp := b.values[i]
// Deletion case. First we update the value, then the key.
// This is important for atomic snapshot states.
atomic.StoreUint64(&b.topHashes, eraseTopHash(b.topHashes, i))
atomic.StorePointer(&b.values[i], nil)
atomic.StorePointer(&b.keys[i], nil)
leftEmpty := false
if hintNonEmpty == 0 {
leftEmpty = isEmpty(b)
}
b.mu.Unlock()
addSize(table, bidx, -1)
// Might need to shrink the table.
if leftEmpty {
m.resize(table, mapShrinkHint)
}
return derefTypedValue[V](vp), true
}
hintNonEmpty++
}
b.mu.Unlock()
return value, false
}
// Delete deletes the value for a key.
func (m *MapOf[V]) Delete(key string) {
m.LoadAndDelete(key)
}
// Range calls f sequentially for each key and value present in the
// map. If f returns false, range stops the iteration.
//
// Range does not necessarily correspond to any consistent snapshot
// of the Map's contents: no key will be visited more than once, but
// if the value for any key is stored or deleted concurrently, Range
// may reflect any mapping for that key from any point during the
// Range call.
//
// It is safe to modify the map while iterating it. However, the
// concurrent modification rule apply, i.e. the changes may be not
// reflected in the subsequently iterated entries.
func (m *MapOf[V]) Range(f func(key string, value V) bool) {
tablep := atomic.LoadPointer(&m.table)
table := *(*mapTable)(tablep)
bentries := make([]rangeEntry, 0, entriesPerMapBucket)
for i := range table.buckets {
copyRangeEntries(&table.buckets[i], &bentries)
for j := range bentries {
k := derefKey(bentries[j].key)
v := derefTypedValue[V](bentries[j].value)
if !f(k, v) {
return
}
}
}
}
// Size returns current size of the map.
func (m *MapOf[V]) Size() int {
table := (*mapTable)(atomic.LoadPointer(&m.table))
return int(sumSize(table))
}
func derefTypedValue[V any](valuePtr unsafe.Pointer) (val V) {
return *(*V)(valuePtr)
}
// O(N) operation; use for debug purposes only
func (m *MapOf[V]) stats() mapStats {
stats := mapStats{
TotalGrowths: atomic.LoadInt64(&m.totalGrowths),
TotalShrinks: atomic.LoadInt64(&m.totalShrinks),
MinEntries: math.MaxInt32,
}
table := (*mapTable)(atomic.LoadPointer(&m.table))
stats.TableLen = len(table.buckets)
stats.Counter = int(sumSize(table))
stats.CounterLen = len(table.size)
for i := range table.buckets {
numEntries := 0
stats.Capacity += entriesPerMapBucket
b := &table.buckets[i]
for i := 0; i < entriesPerMapBucket; i++ {
if atomic.LoadPointer(&b.keys[i]) != nil {
stats.Size++
numEntries++
}
}
if numEntries < stats.MinEntries {
stats.MinEntries = numEntries
}
if numEntries > stats.MaxEntries {
stats.MaxEntries = numEntries
}
}
return stats
}