forked from vitessio/vitess
-
Notifications
You must be signed in to change notification settings - Fork 1
/
srv_keyspace.go
78 lines (64 loc) · 2.33 KB
/
srv_keyspace.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
package topo
import (
"fmt"
"path"
"github.com/golang/protobuf/proto"
"golang.org/x/net/context"
topodatapb "github.com/youtube/vitess/go/vt/proto/topodata"
)
// This file contains the utility methods to manage SrvKeyspace objects.
func srvKeyspaceFileName(keyspace string) string {
return path.Join("keyspaces", keyspace, SrvKeyspaceFile)
}
// WatchSrvKeyspaceData is returned / streamed by WatchSrvKeyspace.
// The WatchSrvKeyspace API guarantees exactly one of Value or Err will be set.
type WatchSrvKeyspaceData struct {
Value *topodatapb.SrvKeyspace
Err error
}
// WatchSrvKeyspace will set a watch on the SrvKeyspace object.
// It has the same contract as Backend.Watch, but it also unpacks the
// contents into a SrvKeyspace object.
func (ts Server) WatchSrvKeyspace(ctx context.Context, cell, keyspace string) (*WatchSrvKeyspaceData, <-chan *WatchSrvKeyspaceData, CancelFunc) {
filePath := srvKeyspaceFileName(keyspace)
current, wdChannel, cancel := ts.Watch(ctx, cell, filePath)
if current.Err != nil {
return &WatchSrvKeyspaceData{Err: current.Err}, nil, nil
}
value := &topodatapb.SrvKeyspace{}
if err := proto.Unmarshal(current.Contents, value); err != nil {
// Cancel the watch, drain channel.
cancel()
for range wdChannel {
}
return &WatchSrvKeyspaceData{Err: fmt.Errorf("error unpacking initial SrvKeyspace object: %v", err)}, nil, nil
}
changes := make(chan *WatchSrvKeyspaceData, 10)
// The background routine reads any event from the watch channel,
// translates it, and sends it to the caller.
// If cancel() is called, the underlying Watch() code will
// send an ErrInterrupted and then close the channel. We'll
// just propagate that back to our caller.
go func() {
defer close(changes)
for wd := range wdChannel {
if wd.Err != nil {
// Last error value, we're done.
// wdChannel will be closed right after
// this, no need to do anything.
changes <- &WatchSrvKeyspaceData{Err: wd.Err}
return
}
value := &topodatapb.SrvKeyspace{}
if err := proto.Unmarshal(wd.Contents, value); err != nil {
cancel()
for range wdChannel {
}
changes <- &WatchSrvKeyspaceData{Err: fmt.Errorf("error unpacking SrvKeyspace object: %v", err)}
return
}
changes <- &WatchSrvKeyspaceData{Value: value}
}
}()
return &WatchSrvKeyspaceData{Value: value}, changes, cancel
}