forked from vitessio/vitess
-
Notifications
You must be signed in to change notification settings - Fork 0
/
watch.go
146 lines (130 loc) · 4.07 KB
/
watch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package etcdtopo
import (
"fmt"
"sync"
"github.com/coreos/go-etcd/etcd"
"golang.org/x/net/context"
"github.com/youtube/vitess/go/vt/topo"
)
func newWatchData(valueType dataType, node *etcd.Node) *topo.WatchData {
bytes, err := rawDataFromNodeValue(valueType, node.Value)
if err != nil {
return &topo.WatchData{Err: err}
}
return &topo.WatchData{
Contents: bytes,
Version: EtcdVersion(node.ModifiedIndex),
}
}
// Watch is part of the topo.Backend interface
func (s *Server) Watch(ctx context.Context, cellName, filePath string) (*topo.WatchData, <-chan *topo.WatchData, topo.CancelFunc) {
cell, err := s.getCell(cellName)
if err != nil {
return &topo.WatchData{Err: fmt.Errorf("Watch cannot get cell: %v", err)}, nil, nil
}
// Special paths where we need to be backward compatible.
var valueType dataType
valueType, filePath = oldTypeAndFilePath(filePath)
// Get the initial version of the file
initial, err := cell.Get(filePath, false /* sort */, false /* recursive */)
if err != nil {
// generic error
return &topo.WatchData{Err: convertError(err)}, nil, nil
}
if initial.Node == nil {
// node doesn't exist
return &topo.WatchData{Err: topo.ErrNoNode}, nil, nil
}
wd := newWatchData(valueType, initial.Node)
if wd.Err != nil {
return wd, nil, nil
}
// mu protects the stop channel. We need to make sure the 'cancel'
// func can be called multiple times, and that we don't close 'stop'
// more than once.
mu := sync.Mutex{}
stop := make(chan bool)
cancel := func() {
mu.Lock()
defer mu.Unlock()
if stop != nil {
close(stop)
stop = nil
}
}
notifications := make(chan *topo.WatchData, 10)
// This watch go routine will stop if the 'stop' channel is closed.
// Otherwise it will try to watch everything in a loop, and send events
// to the 'watch' channel.
// In any case, the Watch call will close the 'watch' channel.
// Note we pass in the 'stop' channel as a parameter because
// the go routine can take some time to start, and if someone
// calls 'cancel' before the go routine starts, stop will be nil (note
// this happens in the topo unit test, that cals cancel() right
// after setting the watch).
watchChannel := make(chan *etcd.Response)
watchError := make(chan error)
go func(stop chan bool) {
// We start watching from the etcd version we got
// during the get, and not from the ModifiedIndex of
// the node, as the node might be older than the
// retention period of the server.
versionToWatch := initial.EtcdIndex + 1
_, err := cell.Client.Watch(filePath, versionToWatch, false /* recursive */, watchChannel, stop)
// Watch will only return a non-nil error, otherwise
// it keeps on watching. Send the error down.
watchError <- err
close(watchError)
}(stop)
// This go routine is the main event handling routine:
// - it will stop if 'stop' is closed.
// - if it receives a notification from the watch, it will forward it
// to the notifications channel.
go func() {
defer close(notifications)
for resp := range watchChannel {
if resp.Action == "delete" || resp.Action == "compareAndDelete" {
// Node doesn't exist any more, we can
// stop watching. Swallow the watchError.
mu.Lock()
if stop == nil {
// Watch was already interrupted
mu.Unlock()
return
}
close(stop)
stop = nil
mu.Unlock()
<-watchError
notifications <- &topo.WatchData{Err: topo.ErrNoNode}
return
}
wd := newWatchData(valueType, resp.Node)
notifications <- wd
if wd.Err != nil {
// Error packing / unpacking data,
// stop the watch. Swallow the watchError.
mu.Lock()
if stop == nil {
// Watch was already interrupted
mu.Unlock()
return
}
close(stop)
stop = nil
mu.Unlock()
<-watchError
notifications <- &topo.WatchData{Err: wd.Err}
return
}
}
// Watch terminated, because of an error. Recover the error,
// and translate the interruption error.
err := <-watchError
if err == etcd.ErrWatchStoppedByUser {
err = topo.ErrInterrupted
}
notifications <- &topo.WatchData{Err: err}
}()
return wd, notifications, cancel
}