forked from vitessio/vitess
-
Notifications
You must be signed in to change notification settings - Fork 0
/
srv_vschema.go
70 lines (59 loc) · 2.13 KB
/
srv_vschema.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package topo
import (
"fmt"
"github.com/golang/protobuf/proto"
"golang.org/x/net/context"
vschemapb "github.com/youtube/vitess/go/vt/proto/vschema"
)
// This file contains the utility methods to manage SrvVSchema objects.
// WatchSrvVSchemaData is returned / streamed by WatchSrvVSchema.
// The WatchSrvVSchema API guarantees exactly one of Value or Err will be set.
type WatchSrvVSchemaData struct {
Value *vschemapb.SrvVSchema
Err error
}
// WatchSrvVSchema will set a watch on the SrvVSchema object.
// It has the same contract as Backend.Watch, but it also unpacks the
// contents into a SrvVSchema object.
func (ts Server) WatchSrvVSchema(ctx context.Context, cell string) (*WatchSrvVSchemaData, <-chan *WatchSrvVSchemaData, CancelFunc) {
current, wdChannel, cancel := ts.Watch(ctx, cell, SrvVSchemaFile)
if current.Err != nil {
return &WatchSrvVSchemaData{Err: current.Err}, nil, nil
}
value := &vschemapb.SrvVSchema{}
if err := proto.Unmarshal(current.Contents, value); err != nil {
// Cancel the watch, drain channel.
cancel()
for range wdChannel {
}
return &WatchSrvVSchemaData{Err: fmt.Errorf("error unpacking initial SrvVSchema object: %v", err)}, nil, nil
}
changes := make(chan *WatchSrvVSchemaData, 10)
// The background routine reads any event from the watch channel,
// translates it, and sends it to the caller.
// If cancel() is called, the underlying Watch() code will
// send an ErrInterrupted and then close the channel. We'll
// just propagate that back to our caller.
go func() {
defer close(changes)
for wd := range wdChannel {
if wd.Err != nil {
// Last error value, we're done.
// wdChannel will be closed right after
// this, no need to do anything.
changes <- &WatchSrvVSchemaData{Err: wd.Err}
return
}
value := &vschemapb.SrvVSchema{}
if err := proto.Unmarshal(wd.Contents, value); err != nil {
cancel()
for range wdChannel {
}
changes <- &WatchSrvVSchemaData{Err: fmt.Errorf("error unpacking SrvVSchema object: %v", err)}
return
}
changes <- &WatchSrvVSchemaData{Value: value}
}
}()
return &WatchSrvVSchemaData{Value: value}, changes, cancel
}