forked from vitessio/vitess
-
Notifications
You must be signed in to change notification settings - Fork 0
/
server.go
118 lines (101 loc) · 3.19 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
// Copyright 2013, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package zktopo
import (
"fmt"
"path"
"sort"
"github.com/youtube/vitess/go/stats"
"github.com/youtube/vitess/go/vt/topo"
"github.com/youtube/vitess/go/zk"
"launchpad.net/gozk/zookeeper"
)
// Server is the zookeeper topo.Server implementation.
type Server struct {
zconn zk.Conn
}
// Close is part of topo.Server interface.
func (zkts *Server) Close() {
zkts.zconn.Close()
}
// GetZConn returns the zookeeper connection for this Server.
func (zkts *Server) GetZConn() zk.Conn {
return zkts.zconn
}
// NewServer can be used to create a custom Server
// (for tests for instance) but it cannot change the globally
// registered one.
func NewServer(zconn zk.Conn) topo.Impl {
return &Server{zconn: zconn}
}
func init() {
zconn := zk.NewMetaConn()
stats.PublishJSONFunc("ZkMetaConn", zconn.String)
topo.RegisterServer("zookeeper", &Server{zconn: zconn})
}
//
// These helper methods are for ZK specific things
//
// PurgeActions removes all queued actions, leaving the action node
// itself in place.
//
// This inherently breaks the locking mechanism of the action queue,
// so this is a rare cleanup action, not a normal part of the flow.
//
// This can be used for tablets, shards and keyspaces.
func (zkts *Server) PurgeActions(zkActionPath string, canBePurged func(data string) bool) error {
if path.Base(zkActionPath) != "action" {
return fmt.Errorf("not action path: %v", zkActionPath)
}
children, _, err := zkts.zconn.Children(zkActionPath)
if err != nil {
return err
}
sort.Strings(children)
// Purge newer items first so the action queues don't try to process something.
for i := len(children) - 1; i >= 0; i-- {
actionPath := path.Join(zkActionPath, children[i])
data, _, err := zkts.zconn.Get(actionPath)
if err != nil && !zookeeper.IsError(err, zookeeper.ZNONODE) {
return fmt.Errorf("PurgeActions(%v) err: %v", zkActionPath, err)
}
if !canBePurged(data) {
continue
}
err = zk.DeleteRecursive(zkts.zconn, actionPath, -1)
if err != nil && !zookeeper.IsError(err, zookeeper.ZNONODE) {
return fmt.Errorf("PurgeActions(%v) err: %v", zkActionPath, err)
}
}
return nil
}
// PruneActionLogs prunes old actionlog entries. Returns how many
// entries were purged (even if there was an error).
//
// There is a chance some processes might still be waiting for action
// results, but it is very very small.
func (zkts *Server) PruneActionLogs(zkActionLogPath string, keepCount int) (prunedCount int, err error) {
if path.Base(zkActionLogPath) != "actionlog" {
return 0, fmt.Errorf("not actionlog path: %v", zkActionLogPath)
}
// get sorted list of children
children, _, err := zkts.zconn.Children(zkActionLogPath)
if err != nil {
return 0, err
}
sort.Strings(children)
// see if nothing to do
if len(children) <= keepCount {
return 0, nil
}
for i := 0; i < len(children)-keepCount; i++ {
actionPath := path.Join(zkActionLogPath, children[i])
err = zk.DeleteRecursive(zkts.zconn, actionPath, -1)
if err != nil {
return prunedCount, fmt.Errorf("purge action err: %v", err)
}
prunedCount++
}
return prunedCount, nil
}