forked from vitessio/vitess
/
zkcustomrule.go
178 lines (152 loc) · 5.39 KB
/
zkcustomrule.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
// Copyright 2014, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package zkcustomrule
import (
"flag"
"reflect"
"sync"
"time"
log "github.com/golang/glog"
"github.com/samuel/go-zookeeper/zk"
"golang.org/x/net/context"
"github.com/youtube/vitess/go/vt/servenv"
"github.com/youtube/vitess/go/vt/vttablet/tabletserver"
"github.com/youtube/vitess/go/vt/vttablet/tabletserver/rules"
"github.com/youtube/vitess/go/vt/topo/zk2topo"
)
var (
// Commandline flag to specify rule server and path.
zkRuleServer = flag.String("zkcustomrules_address", "", "zookeeper server to get rules from")
zkRulePath = flag.String("zkcustomrules_path", "", "path in the zookeeper server to get rules from")
)
// invalidQueryRulesVersion is used to mark invalid query rules
const invalidQueryRulesVersion int64 = -1
// ZkCustomRuleSource is zookeeper based custom rule source name
const zkCustomRuleSource string = "ZK_CUSTOM_RULE"
// ZkCustomRule is Zookeeper backed implementation of CustomRuleManager
type ZkCustomRule struct {
// Zookeeper connection. Set at construction time.
zconn zk2topo.Conn
// Path of the rules files. Set at construction time.
path string
// mu protects all the following fields.
mu sync.Mutex
watch <-chan zk.Event // Zookeeper watch for listenning data change notifications
currentRuleSet *rules.Rules
currentRuleSetVersion int64 // implemented with Zookeeper modification version
done chan struct{}
}
// NewZkCustomRule Creates new ZkCustomRule structure
func NewZkCustomRule(server, path string) *ZkCustomRule {
return &ZkCustomRule{
zconn: zk2topo.Connect(server),
path: path,
currentRuleSet: rules.New(),
currentRuleSetVersion: invalidQueryRulesVersion,
done: make(chan struct{}),
}
}
// Start registers Zookeeper watch, gets inital Rules and starts
// polling routine.
func (zkcr *ZkCustomRule) Start(qsc tabletserver.Controller) (err error) {
err = zkcr.refreshWatch()
if err != nil {
return err
}
err = zkcr.refreshData(qsc, false)
if err != nil {
return err
}
go zkcr.poll(qsc)
return nil
}
// refreshWatch gets a new watch channel for ZkCustomRule, it is called when
// the old watch channel is closed on errors
func (zkcr *ZkCustomRule) refreshWatch() error {
ctx := context.Background()
_, _, watch, err := zkcr.zconn.GetW(ctx, zkcr.path)
if err != nil {
log.Warningf("Fail to get a valid watch from ZK service: %v", err)
return err
}
zkcr.watch = watch
return nil
}
// refreshData gets query rules from Zookeeper and refresh internal Rules cache
// this function will also call rules.SetQueryRules to propagate rule changes to query service
func (zkcr *ZkCustomRule) refreshData(qsc tabletserver.Controller, nodeRemoval bool) error {
ctx := context.Background()
data, stat, err := zkcr.zconn.Get(ctx, zkcr.path)
if err != nil {
log.Warningf("Error encountered when trying to get data and watch from Zk: %v", err)
return err
}
qrs := rules.New()
if !nodeRemoval {
if err = qrs.UnmarshalJSON([]byte(data)); err != nil {
log.Warningf("Error unmarshaling query rules %v, original data '%s'", err, data)
return nil
}
}
zkcr.mu.Lock()
defer zkcr.mu.Unlock()
zkcr.currentRuleSetVersion = stat.Mzxid
if !reflect.DeepEqual(zkcr.currentRuleSet, qrs) {
zkcr.currentRuleSet = qrs.Copy()
qsc.SetQueryRules(zkCustomRuleSource, qrs.Copy())
log.Infof("Custom rule version %v fetched from Zookeeper and applied to vttablet", zkcr.currentRuleSetVersion)
}
return nil
}
const sleepDuringZkFailure time.Duration = 30 * time.Second
// poll polls the Zookeeper watch channel for data changes and refresh watch channel if watch channel is closed
// by Zookeeper Go library on error conditions such as connection reset
func (zkcr *ZkCustomRule) poll(qsc tabletserver.Controller) {
for {
select {
case <-zkcr.done:
return
case event := <-zkcr.watch:
switch event.Type {
case zk.EventNodeCreated, zk.EventNodeDataChanged, zk.EventNodeDeleted:
err := zkcr.refreshData(qsc, event.Type == zk.EventNodeDeleted) // refresh rules
if err != nil {
// Sleep to avoid busy waiting during connection re-establishment
<-time.After(sleepDuringZkFailure)
}
case zk.EventSession:
err := zkcr.refreshWatch() // need to to get a new watch
if err != nil {
// Sleep to avoid busy waiting during connection re-establishment
<-time.After(sleepDuringZkFailure)
}
zkcr.refreshData(qsc, false)
}
}
}
}
// Stop signals a termination to polling go routine and closes
// Zookeeper connection object.
func (zkcr *ZkCustomRule) Stop() {
close(zkcr.done)
zkcr.zconn.Close()
}
// GetRules retrives cached rules.
func (zkcr *ZkCustomRule) GetRules() (qrs *rules.Rules, version int64, err error) {
zkcr.mu.Lock()
defer zkcr.mu.Unlock()
return zkcr.currentRuleSet.Copy(), zkcr.currentRuleSetVersion, nil
}
// activateZkCustomRules activates zookeeper dynamic custom rule mechanism.
func activateZkCustomRules(qsc tabletserver.Controller) {
if *zkRuleServer != "" && *zkRulePath != "" {
qsc.RegisterQueryRuleSource(zkCustomRuleSource)
zkCustomRule := NewZkCustomRule(*zkRuleServer, *zkRulePath)
zkCustomRule.Start(qsc)
servenv.OnTerm(zkCustomRule.Stop)
}
}
func init() {
tabletserver.RegisterFunctions = append(tabletserver.RegisterFunctions, activateZkCustomRules)
}