/
watcher.go
127 lines (108 loc) · 2.44 KB
/
watcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package rediswatcher
import (
"encoding/json"
"github.com/ProtocolONE/auth1.protocol.one/pkg/persist"
"github.com/go-redis/redis"
"github.com/google/uuid"
"go.uber.org/zap"
)
// Watcher is the watcher service.
type Watcher struct {
id uuid.UUID
options WatcherOptions
conn *redis.Client
callback map[string]func(string)
channels chan string
quit chan bool
}
type event struct {
Id uuid.UUID
Payload string
}
// NewWatcher return new watcher service.
func NewWatcher(client *redis.Client, setters ...WatcherOption) persist.Watcher {
id, err := uuid.NewRandom()
if err != nil {
panic(err)
}
w := &Watcher{
quit: make(chan bool),
channels: make(chan string),
callback: make(map[string]func(string)),
id: id,
}
for _, setter := range setters {
setter(&w.options)
}
w.conn = client
go func() {
err := w.subscribe()
if err != nil {
panic(err)
}
}()
return w
}
// Close exits the watcher.
func (w *Watcher) Close() error {
w.quit <- true
return nil
}
// SetUpdateCallBack sets the update callback function invoked by the watcher
// when the data is updated.
func (w *Watcher) SetUpdateCallback(channel string, callback func(string)) {
w.callback[channel] = callback
w.channels <- channel
}
// Update publishes a message to all other instances telling them to
// invoke their update callback
func (w *Watcher) Update(channel string, identity string) error {
ev, err := json.Marshal(&event{
Id: w.id,
Payload: identity,
})
if err != nil {
return err
}
return w.conn.Publish(channel, ev).Err()
}
func (w *Watcher) subscribe() error {
ps := w.conn.Subscribe()
defer func() {
err := ps.Close()
if err != nil {
zap.L().Error("Failed to close watcher subscription", zap.Error(err))
}
}()
ch := ps.Channel()
for {
select {
case channel := <-w.channels:
err := ps.Subscribe(channel)
if err != nil {
zap.L().Error("Failed to subscribe channel", zap.String("channel", channel))
}
case msg := <-ch:
if len(w.callback) == 0 {
continue
}
ev := event{}
err := json.Unmarshal([]byte(msg.Payload), &ev)
if err != nil {
zap.L().Error(
"Failed to unmarshal channel payload",
zap.String("channel", msg.Channel),
zap.String("payload", msg.Payload))
continue
}
if w.options.RaiseOwn == false && w.id == ev.Id {
continue
}
if h, ok := w.callback[msg.Channel]; ok {
h(ev.Payload)
}
case <-w.quit:
return nil
}
}
}