forked from ZhengHe-MD/agollo
-
Notifications
You must be signed in to change notification settings - Fork 0
/
poller.go
124 lines (101 loc) · 2.59 KB
/
poller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package agollo
import (
"context"
"encoding/json"
"net/http"
"time"
)
// this is a static check
var _ poller = (*longPoller)(nil)
// poller fetch confi updates
type poller interface {
// start poll updates
start()
// preload fetch all config to local cache, and update all notifications
preload() error
// stop poll updates
stop()
}
// notificationHandler handle namespace update notification
type notificationHandler func(namespace string) error
// longPoller implement poller interface
type longPoller struct {
conf *Conf
pollerInterval time.Duration
ctx context.Context
cancel context.CancelFunc
requester requester
notifications *notificatonRepo
handler notificationHandler
}
// newLongPoller create a Poller
func newLongPoller(conf *Conf, interval time.Duration, handler notificationHandler) poller {
poller := &longPoller{
conf: conf,
pollerInterval: interval,
requester: newHTTPRequester(&http.Client{Timeout: longPoolTimeout}),
notifications: new(notificatonRepo),
handler: handler,
}
for _, namespace := range conf.NameSpaceNames {
poller.notifications.setNotificationID(namespace, defaultNotificationID)
}
return poller
}
func (p *longPoller) start() {
go p.watchUpdates()
}
func (p *longPoller) preload() error {
return p.pumpUpdates()
}
func (p *longPoller) watchUpdates() {
p.ctx, p.cancel = context.WithCancel(context.Background())
defer p.cancel()
timer := time.NewTimer(p.pollerInterval)
defer timer.Stop()
for {
select {
case <-timer.C:
p.pumpUpdates()
timer.Reset(p.pollerInterval)
case <-p.ctx.Done():
return
}
}
}
func (p *longPoller) stop() {
p.cancel()
}
func (p *longPoller) updateNotificationConf(notification *notification) {
p.notifications.setNotificationID(notification.NamespaceName, notification.NotificationID)
}
// pumpUpdates fetch updated namespace, handle updated namespace then update notification id
func (p *longPoller) pumpUpdates() error {
var ret error
updates, err := p.poll()
if err != nil {
return err
}
for _, update := range updates {
if err := p.handler(update.NamespaceName); err != nil {
ret = err
continue
}
p.updateNotificationConf(update)
}
return ret
}
// poll until a update or timeout
func (p *longPoller) poll() ([]*notification, error) {
notifications := p.notifications.toString()
url := notificationURL(p.conf, notifications)
bts, err := p.requester.request(url)
if err != nil || len(bts) == 0 {
return nil, err
}
var ret []*notification
if err := json.Unmarshal(bts, &ret); err != nil {
return nil, err
}
return ret, nil
}