forked from vitessio/vitess
-
Notifications
You must be signed in to change notification settings - Fork 1
/
tee.go
229 lines (195 loc) · 6.79 KB
/
tee.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
/*
Copyright 2017 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package helpers
import (
"golang.org/x/net/context"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/topo"
)
// TeeFactory is an implementation of topo.Factory that uses a primary
// underlying topo.Server for all changes, but also duplicates the
// changes to a secondary topo.Server. It also locks both topo servers
// when needed. It is meant to be used during transitions from one
// topo.Server to another.
//
// - primary: we read everything from it, and write to it. We also create
// MasterParticipation from it.
// - secondary: we write to it as well, but we usually don't fail.
// - we lock primary/secondary if reverseLockOrder is False,
// or secondary/primary if reverseLockOrder is True.
type TeeFactory struct {
primary *topo.Server
secondary *topo.Server
reverseLockOrder bool
}
// HasGlobalReadOnlyCell is part of the topo.Factory interface.
func (f *TeeFactory) HasGlobalReadOnlyCell(serverAddr, root string) bool {
return false
}
// Create is part of the topo.Factory interface.
func (f *TeeFactory) Create(cell, serverAddr, root string) (topo.Conn, error) {
ctx := context.Background()
primaryConn, err := f.primary.ConnForCell(ctx, cell)
if err != nil {
return nil, err
}
secondaryConn, err := f.secondary.ConnForCell(ctx, cell)
if err != nil {
return nil, err
}
lockFirst := primaryConn
lockSecond := secondaryConn
if f.reverseLockOrder {
lockFirst = secondaryConn
lockSecond = primaryConn
}
return &TeeConn{
primary: primaryConn,
secondary: secondaryConn,
lockFirst: lockFirst,
lockSecond: lockSecond,
}, nil
}
// NewTee returns a new topo.Server object. It uses a TeeFactory.
func NewTee(primary, secondary *topo.Server, reverseLockOrder bool) (*topo.Server, error) {
f := &TeeFactory{
primary: primary,
secondary: secondary,
reverseLockOrder: reverseLockOrder,
}
return topo.NewWithFactory(f, "" /*serverAddress*/, "" /*root*/)
}
// TeeConn implements the topo.Conn interface.
type TeeConn struct {
primary topo.Conn
secondary topo.Conn
lockFirst topo.Conn
lockSecond topo.Conn
}
// Close is part of the topo.Conn interface.
func (c *TeeConn) Close() {
c.primary.Close()
c.secondary.Close()
}
// ListDir is part of the topo.Conn interface.
func (c *TeeConn) ListDir(ctx context.Context, dirPath string, full bool) ([]topo.DirEntry, error) {
return c.primary.ListDir(ctx, dirPath, full)
}
// Create is part of the topo.Conn interface.
func (c *TeeConn) Create(ctx context.Context, filePath string, contents []byte) (topo.Version, error) {
primaryVersion, err := c.primary.Create(ctx, filePath, contents)
if err != nil {
return nil, err
}
// This is critical enough that we want to fail. However, we support
// an unconditional update if the file already exists.
_, err = c.secondary.Create(ctx, filePath, contents)
if err == topo.ErrNodeExists {
_, err = c.secondary.Update(ctx, filePath, contents, nil)
}
if err != nil {
return nil, err
}
return primaryVersion, nil
}
// Update is part of the topo.Conn interface.
func (c *TeeConn) Update(ctx context.Context, filePath string, contents []byte, version topo.Version) (topo.Version, error) {
primaryVersion, err := c.primary.Update(ctx, filePath, contents, version)
if err != nil {
// Failed on primary, not updating secondary.
return nil, err
}
// Always do an unconditional update on secondary.
if _, err = c.secondary.Update(ctx, filePath, contents, nil); err != nil {
log.Warningf("secondary.Update(%v,unconditonal) failed: %v", filePath, err)
}
return primaryVersion, nil
}
// Get is part of the topo.Conn interface.
func (c *TeeConn) Get(ctx context.Context, filePath string) ([]byte, topo.Version, error) {
return c.primary.Get(ctx, filePath)
}
// Delete is part of the topo.Conn interface.
func (c *TeeConn) Delete(ctx context.Context, filePath string, version topo.Version) error {
// If primary fails, no need to go further.
if err := c.primary.Delete(ctx, filePath, version); err != nil {
return err
}
// Always do an unconditonal delete on secondary.
if err := c.secondary.Delete(ctx, filePath, nil); err != nil && err != topo.ErrNoNode {
// Secondary didn't work, and the node wasn't gone already.
log.Warningf("secondary.Delete(%v) failed: %v", filePath, err)
}
return nil
}
// Watch is part of the topo.Conn interface
func (c *TeeConn) Watch(ctx context.Context, filePath string) (*topo.WatchData, <-chan *topo.WatchData, topo.CancelFunc) {
return c.primary.Watch(ctx, filePath)
}
//
// Lock management.
//
// teeTopoLockDescriptor implements the topo.LockDescriptor interface.
type teeTopoLockDescriptor struct {
c *TeeConn
dirPath string
firstLockDescriptor topo.LockDescriptor
secondLockDescriptor topo.LockDescriptor
}
// Lock is part of the topo.Conn interface.
func (c *TeeConn) Lock(ctx context.Context, dirPath, contents string) (topo.LockDescriptor, error) {
// Lock lockFirst.
fLD, err := c.lockFirst.Lock(ctx, dirPath, contents)
if err != nil {
return nil, err
}
// Lock lockSecond.
sLD, err := c.lockSecond.Lock(ctx, dirPath, contents)
if err != nil {
if err := fLD.Unlock(ctx); err != nil {
log.Warningf("Failed to unlock lockFirst after failed lockSecond lock for %v: %v", dirPath, err)
}
return nil, err
}
// Remember both locks in teeTopoLockDescriptor.
return &teeTopoLockDescriptor{
c: c,
dirPath: dirPath,
firstLockDescriptor: fLD,
secondLockDescriptor: sLD,
}, nil
}
// Check is part of the topo.LockDescriptor interface.
func (ld *teeTopoLockDescriptor) Check(ctx context.Context) error {
if err := ld.firstLockDescriptor.Check(ctx); err != nil {
return err
}
return ld.secondLockDescriptor.Check(ctx)
}
// Unlock is part of the topo.LockDescriptor interface.
func (ld *teeTopoLockDescriptor) Unlock(ctx context.Context) error {
// Unlock lockSecond, then lockFirst.
serr := ld.secondLockDescriptor.Unlock(ctx)
ferr := ld.firstLockDescriptor.Unlock(ctx)
if serr != nil {
if ferr != nil {
log.Warningf("First Unlock(%v) failed: %v", ld.dirPath, ferr)
}
return serr
}
return ferr
}
// NewMasterParticipation is part of the topo.Conn interface.
func (c *TeeConn) NewMasterParticipation(name, id string) (topo.MasterParticipation, error) {
return c.primary.NewMasterParticipation(name, id)
}