forked from cloudfoundry-attic/app-manager
-
Notifications
You must be signed in to change notification settings - Fork 0
/
bbs_helpers.go
92 lines (76 loc) · 1.95 KB
/
bbs_helpers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package shared
import (
"reflect"
"time"
"github.com/cloudfoundry/storeadapter"
)
func RetryIndefinitelyOnStoreTimeout(callback func() error) error {
for {
err := callback()
if err == storeadapter.ErrorTimeout {
time.Sleep(time.Second)
continue
}
return err
}
}
func WatchWithFilter(store storeadapter.StoreAdapter, path string, outChan interface{}, filter interface{}) (chan<- bool, <-chan error) {
filterType := reflect.TypeOf(filter)
if filterType.Kind() != reflect.Func {
panic("filter must be a func")
}
if filterType.NumIn() != 1 {
panic("filter must take one argument")
}
if !filterType.In(0).AssignableTo(reflect.TypeOf(storeadapter.WatchEvent{})) {
panic("filter must take a single WatchEvent argument")
}
if filterType.NumOut() != 2 {
panic("filter must return two arguments")
}
if filterType.Out(1).Kind() != reflect.Bool {
panic("filter must return a bool as its second argument")
}
chanType := reflect.TypeOf(outChan)
if chanType.Kind() != reflect.Chan {
panic("outChan must be a channel")
}
if chanType.ChanDir() != reflect.BothDir {
panic("outChan should be bidirectional")
}
if !filterType.Out(0).AssignableTo(chanType.Elem()) {
panic("filter must return an object, as its first argument, that can be passed into outChan")
}
filterValue := reflect.ValueOf(filter)
chanValue := reflect.ValueOf(outChan)
stopOuter := make(chan bool)
errsOuter := make(chan error)
events, stopInner, errsInner := store.Watch(path)
go func() {
defer chanValue.Close()
defer close(errsOuter)
for {
select {
case <-stopOuter:
close(stopInner)
return
case event, ok := <-events:
if !ok {
return
}
out := filterValue.Call([]reflect.Value{reflect.ValueOf(event)})
filterOK := out[1].Bool()
if !filterOK {
continue
}
chanValue.Send(out[0])
case err, ok := <-errsInner:
if ok {
errsOuter <- err
}
return
}
}
}()
return stopOuter, errsOuter
}