forked from blocklessnetwork/b7s
-
Notifications
You must be signed in to change notification settings - Fork 1
/
waitmap.go
109 lines (88 loc) · 2.53 KB
/
waitmap.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package waitmap
import (
"context"
"sync"
)
// WaitMap is a key-value store that enables not only setting and getting
// values from a map, but also waiting until value for a key becomes available.
// Important: Since this implementation is tied pretty closely to how it will be used,
// (as an internal package), it has the peculiar behavior of only the first `Set` setting
// the value. Subsequent `Sets()` are recorded, but don't change the returned value.
type WaitMap struct {
sync.Mutex
m map[string][]any
subs map[string][]chan any
}
// New creates a new WaitMap.
func New() *WaitMap {
wm := WaitMap{
m: make(map[string][]any),
subs: make(map[string][]chan any),
}
return &wm
}
// Set sets the value for a key. If the value already exists, we append it to a list.
func (w *WaitMap) Set(key string, value any) {
w.Lock()
defer w.Unlock()
_, ok := w.m[key]
if !ok {
w.m[key] = make([]any, 0)
}
w.m[key] = append(w.m[key], value)
// Send the new value to any waiting subscribers of the key.
for _, sub := range w.subs[key] {
sub <- value
}
delete(w.subs, key)
}
// Wait will wait until the value for a key becomes available.
func (w *WaitMap) Wait(key string) any {
w.Lock()
// Unlock cannot be deferred so we can ublock Set() while waiting.
values, ok := w.m[key]
if ok {
w.Unlock()
return values[0]
}
// If there's no value yet, subscribe to any new values for this key.
ch := make(chan any)
w.subs[key] = append(w.subs[key], ch)
w.Unlock()
return <-ch
}
// WaitFor will wait for the value for a key to become available, but no longer than the specified duration.
func (w *WaitMap) WaitFor(ctx context.Context, key string) (any, bool) {
w.Lock()
// Unlock cannot be deferred so we can ublock Set() while waiting.
values, ok := w.m[key]
if ok {
w.Unlock()
return values[0], true
}
// If there's no value yet, subscribe to any new values for this key.
// Use a bufferred channel since we might bail before collecting our value.
ch := make(chan any, 1)
w.subs[key] = append(w.subs[key], ch)
w.Unlock()
select {
case <-ctx.Done():
return nil, false
case value := <-ch:
return value, true
}
}
// Get will return the current value for the key, if any.
func (w *WaitMap) Get(key string) (any, bool) {
w.Lock()
defer w.Unlock()
values, ok := w.m[key]
if !ok {
return values, ok
}
// As noted in the comment at the beginning of this file,
// this is special behavior because of the way this map will be used.
// Get will always return the first value.
value := values[0]
return value, true
}