forked from dolthub/vitess
-
Notifications
You must be signed in to change notification settings - Fork 0
/
memorytopo.go
384 lines (329 loc) · 9.01 KB
/
memorytopo.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
// Package memorytopo contains an implementation of the topo.Backend
// API based on an in-process memory map.
//
// At the moment, it is actually a topo.Impl implementation as well,
// based on faketopo. As we convert more and more code to the new
// file-based topo.Backend APIs, this will grow. Eventually, the topo.Impl
// interface will be retired.
package memorytopo
import (
"fmt"
"path"
"sort"
"strings"
"sync"
"golang.org/x/net/context"
"github.com/youtube/vitess/go/vt/topo"
"github.com/youtube/vitess/go/vt/topo/test/faketopo"
)
var (
nextWatchIndex = 0
)
// MemoryTopo is a memory-based implementation of topo.Backend.
// It takes a file-system like approach, with directories at each level
// being an actual directory node. This is meant to be closer to
// file-system like servers, like ZooKeeper or Chubby. The fake etcd
// version is closer to a node-based fake.
type MemoryTopo struct {
faketopo.FakeTopo
// mu protects the following fields.
mu sync.Mutex
// cells is the toplevel node that has one child per cell.
cells *node
// generation is used to generate unique incrementing version
// numbers. We want a global counter so when creating a file,
// then deleting it, then re-creating it, we don't restart the
// version at 1.
generation uint64
}
// node contains a directory or a file entry.
// Exactly one of contents or children is not nil.
type node struct {
name string
version uint64
contents []byte
children map[string]*node
// parent is a pointer to the parent node.
// It is set to nil in toplevel and cell node.
parent *node
watches map[int]chan *topo.WatchData
}
func (n *node) isDirectory() bool {
return n.children != nil
}
// NodeVersion is the local topo.Version implementation
type NodeVersion uint64
func (v NodeVersion) String() string {
return fmt.Sprintf("%v", uint64(v))
}
// NewMemoryTopo returns a new MemoryTopo for all the cells.
func NewMemoryTopo(cells []string) *MemoryTopo {
result := &MemoryTopo{}
result.cells = result.newDirectory("", nil)
for _, cell := range cells {
result.cells.children[cell] = result.newDirectory(cell, nil)
}
return result
}
func (mt *MemoryTopo) getNextVersion() uint64 {
mt.generation++
return mt.generation
}
func (mt *MemoryTopo) newFile(name string, contents []byte, parent *node) *node {
return &node{
name: name,
version: mt.getNextVersion(),
contents: contents,
parent: parent,
watches: make(map[int]chan *topo.WatchData),
}
}
func (mt *MemoryTopo) newDirectory(name string, parent *node) *node {
return &node{
name: name,
version: mt.getNextVersion(),
children: make(map[string]*node),
parent: parent,
}
}
func (mt *MemoryTopo) nodeByPath(cell, filePath string) *node {
parts := strings.Split(filePath, "/")
parts[0] = cell
n := mt.cells
for _, part := range parts {
if part == "" {
// Skip empty parts, usually happens at the end.
continue
}
if n.children == nil {
// This is a file.
return nil
}
child, ok := n.children[part]
if !ok {
// Path doesn't exist.
return nil
}
n = child
}
return n
}
func (mt *MemoryTopo) getOrCreatePath(cell, filePath string) *node {
parts := strings.Split(filePath, "/")
parts[0] = cell
n := mt.cells
for _, part := range parts {
if part == "" {
// Skip empty parts, usually happens at the end.
continue
}
if n.children == nil {
// This is a file.
return nil
}
child, ok := n.children[part]
if !ok {
// Path doesn't exist, create it.
child = mt.newDirectory(part, n)
n.children[part] = child
}
n = child
}
return n
}
// recursiveDelete deletes a node and its parent directory if empty.
func (mt *MemoryTopo) recursiveDelete(n *node) {
parent := n.parent
if parent == nil {
return
}
delete(parent.children, n.name)
if len(parent.children) == 0 {
mt.recursiveDelete(parent)
}
}
// ListDir is part of the topo.Backend interface.
func (mt *MemoryTopo) ListDir(ctx context.Context, cell, dirPath string) ([]string, error) {
mt.mu.Lock()
defer mt.mu.Unlock()
// Get the node to list.
n := mt.nodeByPath(cell, dirPath)
if n == nil {
return nil, topo.ErrNoNode
}
// Check it's a directory.
if !n.isDirectory() {
return nil, fmt.Errorf("node %v in cell %v is not a directory", dirPath, cell)
}
var result []string
for n := range n.children {
result = append(result, n)
}
sort.Strings(result)
return result, nil
}
// Create is part of topo.Backend interface.
func (mt *MemoryTopo) Create(ctx context.Context, cell, filePath string, contents []byte) (topo.Version, error) {
if contents == nil {
contents = []byte{}
}
mt.mu.Lock()
defer mt.mu.Unlock()
// Get the parent dir.
dir, file := path.Split(filePath)
p := mt.getOrCreatePath(cell, dir)
if p == nil {
return nil, fmt.Errorf("trying to create file %v in cell %v in a path that contains files", filePath, cell)
}
// Check the file doesn't already exist.
if _, ok := p.children[file]; ok {
return nil, topo.ErrNodeExists
}
// Create the file.
n := mt.newFile(file, contents, p)
p.children[file] = n
return NodeVersion(n.version), nil
}
// Update is part of topo.Backend interface.
func (mt *MemoryTopo) Update(ctx context.Context, cell, filePath string, contents []byte, version topo.Version) (topo.Version, error) {
if contents == nil {
contents = []byte{}
}
mt.mu.Lock()
defer mt.mu.Unlock()
// Get the parent dir, we'll need it in case of creation.
dir, file := path.Split(filePath)
p := mt.nodeByPath(cell, dir)
if p == nil {
return nil, topo.ErrNoNode
}
// Get the existing file.
n, ok := p.children[file]
if !ok {
// File doesn't exist, see if we need to create it.
if version != nil {
return nil, topo.ErrNoNode
}
n = mt.newFile(file, contents, p)
p.children[file] = n
return NodeVersion(n.version), nil
}
// Check if it's a directory.
if n.isDirectory() {
return nil, fmt.Errorf("Update(%v,%v) failed: it's a directory", cell, filePath)
}
// Check the version.
if version != nil && n.version != uint64(version.(NodeVersion)) {
return nil, topo.ErrBadVersion
}
// Now we can update.
n.version = mt.getNextVersion()
n.contents = contents
// Call the watches
for _, w := range n.watches {
w <- &topo.WatchData{
Contents: n.contents,
Version: NodeVersion(n.version),
}
}
return NodeVersion(n.version), nil
}
// Get is part of topo.Backend interface.
func (mt *MemoryTopo) Get(ctx context.Context, cell, filePath string) ([]byte, topo.Version, error) {
mt.mu.Lock()
defer mt.mu.Unlock()
// Get the node.
n := mt.nodeByPath(cell, filePath)
if n == nil {
return nil, nil, topo.ErrNoNode
}
if n.contents == nil {
// it's a directory
return nil, nil, fmt.Errorf("cannot Get() directory %v in cell %v", filePath, cell)
}
return n.contents, NodeVersion(n.version), nil
}
// Delete is part of topo.Backend interface.
func (mt *MemoryTopo) Delete(ctx context.Context, cell, filePath string, version topo.Version) error {
mt.mu.Lock()
defer mt.mu.Unlock()
// Get the parent dir.
dir, file := path.Split(filePath)
p := mt.nodeByPath(cell, dir)
if p == nil {
return topo.ErrNoNode
}
// Get the existing file.
n, ok := p.children[file]
if !ok {
return topo.ErrNoNode
}
// Check if it's a directory.
if n.isDirectory() {
return fmt.Errorf("Delete(%v,%v) failed: it's a directory", cell, filePath)
}
// Check the version.
if version != nil && n.version != uint64(version.(NodeVersion)) {
return topo.ErrBadVersion
}
// Now we can delete.
mt.recursiveDelete(n)
// Call the watches
for _, w := range n.watches {
w <- &topo.WatchData{
Err: topo.ErrNoNode,
}
close(w)
}
return nil
}
// Watch is part of the topo.Backend interface.
func (mt *MemoryTopo) Watch(ctx context.Context, cell string, filePath string) (*topo.WatchData, <-chan *topo.WatchData, topo.CancelFunc) {
mt.mu.Lock()
defer mt.mu.Unlock()
n := mt.nodeByPath(cell, filePath)
if n == nil {
return &topo.WatchData{Err: topo.ErrNoNode}, nil, nil
}
if n.contents == nil {
// it's a directory
return &topo.WatchData{Err: fmt.Errorf("cannot watch directory %v in cell %v", filePath, cell)}, nil, nil
}
current := &topo.WatchData{
Contents: n.contents,
Version: NodeVersion(n.version),
}
notifications := make(chan *topo.WatchData, 100)
watchIndex := nextWatchIndex
nextWatchIndex++
n.watches[watchIndex] = notifications
cancel := func() {
// This function can be called at any point, so we first need
// to make sure the watch is still valid.
mt.mu.Lock()
defer mt.mu.Unlock()
n := mt.nodeByPath(cell, filePath)
if n == nil {
return
}
if w, ok := n.watches[watchIndex]; ok {
delete(n.watches, watchIndex)
w <- &topo.WatchData{Err: topo.ErrInterrupted}
close(w)
}
}
return current, notifications, cancel
}
// GetKnownCells is part of the topo.Server interface.
func (mt *MemoryTopo) GetKnownCells(ctx context.Context) ([]string, error) {
mt.mu.Lock()
defer mt.mu.Unlock()
var result []string
for c := range mt.cells.children {
if c != "global" {
result = append(result, c)
}
}
return result, nil
}
var _ topo.Impl = (*MemoryTopo)(nil) // compile-time interface check