-
Notifications
You must be signed in to change notification settings - Fork 1
/
detector.go
67 lines (59 loc) · 1.46 KB
/
detector.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package flux
import "sync"
func newLoopDetector() *loopDetector {
return &loopDetector{
waiting: map[StoreInterface][]StoreInterface{},
}
}
type loopDetector struct {
m sync.RWMutex
waiting map[StoreInterface][]StoreInterface
}
// finished marks a store as finished processing
func (w *loopDetector) finished(store StoreInterface) {
w.delete(store)
}
// request requests a wait. We return if a loop was not found, and if one is found we return the store
// that was waiting for this store.
func (w *loopDetector) request(store StoreInterface, waitFor ...StoreInterface) (loopFound bool, loopStore StoreInterface) {
// returns true if s1 is waiting for s2
var isWaiting func(s1, s2 StoreInterface) bool
isWaiting = func(s1, s2 StoreInterface) bool {
waits, ok := w.get(s1)
if !ok {
return false
}
for _, inner := range waits {
if inner == s2 {
return true
}
if isWaiting(inner, s2) {
return true
}
}
return false
}
for _, requested := range waitFor {
if isWaiting(requested, store) {
return true, requested
}
}
w.set(store, waitFor)
return false, nil
}
func (w *loopDetector) get(k StoreInterface) ([]StoreInterface, bool) {
w.m.RLock()
defer w.m.RUnlock()
v, ok := w.waiting[k]
return v, ok
}
func (w *loopDetector) set(k StoreInterface, v []StoreInterface) {
w.m.Lock()
defer w.m.Unlock()
w.waiting[k] = v
}
func (w *loopDetector) delete(k StoreInterface) {
w.m.Lock()
defer w.m.Unlock()
delete(w.waiting, k)
}