/
async.go
87 lines (80 loc) · 2.07 KB
/
async.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package apollo
import (
"fmt"
"net/http"
"strings"
"sync"
"time"
"github.com/apollo-client/apollo-go/log"
)
var (
chstr = make(chan string) // namespace channel
mns sync.Map // notifications sync map
)
// asyncApollo async get apollo config
func (c *Client) asyncApollo(namespace string, cb WatchCallback) error {
// init sync map notifcation
mns.Store(namespace, &Notifcation{NamespaceName: namespace, NotifcationID: -1})
// get apollo config first
status, apol, err := c.getConfigs(namespace, "")
if err != nil || status != http.StatusOK {
log.Errorf("watch namespace:%s, err:%v", namespace, err)
return fmt.Errorf("watch namespace:%s, err:%v", namespace, err)
}
// if success, callback function
if err = safeCallback(&apol, cb); err != nil {
log.Errorf("watch namespace:%s, err:%v", namespace, err)
return fmt.Errorf("watch namespace:%s, err:%v", namespace, err)
}
go func() {
// listen namespace channel
for nsp := range chstr {
if !strings.EqualFold(nsp, namespace) {
continue
}
ns, na, ne := c.getConfigs(namespace, apol.ReleaseKey)
fmt.Printf("configs namespace: %s, config:%v\n", nsp, na)
if ne != nil || ns != http.StatusOK {
continue
}
apol = na
_ = safeCallback(&apol, cb)
}
}()
return nil
}
// asyncNotifications async get notifications
func (c *Client) asyncNotifications() {
go func() {
ticker := time.NewTicker(c.opts.WatchInterval)
for range ticker.C {
// get all notifications
ns := make([]*Notifcation, 0)
mns.Range(func(key, value interface{}) bool {
n, ok := value.(*Notifcation)
if !ok {
log.Warnf("namespace notification err, namespace: %s", key)
return false
}
ns = append(ns, n)
return true
})
if len(ns) <= 0 {
continue
}
// get remote notifications
nns, nnn, nne := c.getNotifications(ns)
if nne != nil || nns != http.StatusOK {
continue
}
for _, n := range nnn {
if n == nil {
continue
}
// store notification and send namespace channel
mns.Store(n.NamespaceName, n)
chstr <- n.NamespaceName
}
}
}()
}