forked from vitessio/vitess
-
Notifications
You must be signed in to change notification settings - Fork 1
/
lock.go
133 lines (113 loc) · 4.38 KB
/
lock.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
// Copyright 2013, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package zktopo
import (
"fmt"
"path"
"strings"
"time"
log "github.com/golang/glog"
"github.com/youtube/vitess/go/vt/topo"
"github.com/youtube/vitess/go/zk"
"golang.org/x/net/context"
"launchpad.net/gozk/zookeeper"
)
/*
This file contains the lock management code for zktopo.Server
*/
// lockForAction creates the action node in zookeeper, waits for the
// queue lock, displays a nice error message if it cant get it
func (zkts *Server) lockForAction(ctx context.Context, actionDir, contents string) (string, error) {
// create the action path
actionPath, err := zkts.zconn.Create(actionDir, contents, zookeeper.SEQUENCE|zookeeper.EPHEMERAL, zookeeper.WorldACL(zk.PERM_FILE))
if err != nil {
return "", err
}
// get the timeout from the context
var timeout time.Duration
deadline, ok := ctx.Deadline()
if !ok {
// enforce a default timeout
timeout = 30 * time.Second
} else {
timeout = deadline.Sub(time.Now())
}
// get the interrupted channel from context or don't interrupt
interrupted := ctx.Done()
if interrupted == nil {
interrupted = make(chan struct{})
}
err = zk.ObtainQueueLock(zkts.zconn, actionPath, timeout, interrupted)
if err != nil {
var errToReturn error
switch err {
case zk.ErrTimeout:
errToReturn = topo.ErrTimeout
case zk.ErrInterrupted:
// the context failed, get the error from it
if ctx.Err() == context.DeadlineExceeded {
errToReturn = topo.ErrTimeout
} else {
errToReturn = topo.ErrInterrupted
}
default:
errToReturn = fmt.Errorf("failed to obtain action lock: %v %v", actionPath, err)
}
// Regardless of the reason, try to cleanup.
log.Warningf("Failed to obtain action lock: %v", err)
zkts.zconn.Delete(actionPath, -1)
// Show the other actions in the directory
dir := path.Dir(actionPath)
children, _, err := zkts.zconn.Children(dir)
if err != nil {
log.Warningf("Failed to get children of %v: %v", dir, err)
return "", errToReturn
}
if len(children) == 0 {
log.Warningf("No other action running, you may just try again now.")
return "", errToReturn
}
childPath := path.Join(dir, children[0])
data, _, err := zkts.zconn.Get(childPath)
if err != nil {
log.Warningf("Failed to get first action node %v (may have just ended): %v", childPath, err)
return "", errToReturn
}
log.Warningf("------ Most likely blocking action: %v\n%v", childPath, data)
return "", errToReturn
}
return actionPath, nil
}
func (zkts *Server) unlockForAction(lockPath, results string) error {
// Write the data to the actionlog
actionLogPath := strings.Replace(lockPath, "/action/", "/actionlog/", 1)
if _, err := zk.CreateRecursive(zkts.zconn, actionLogPath, results, 0, zookeeper.WorldACL(zookeeper.PERM_ALL)); err != nil {
log.Warningf("Cannot create actionlog path %v (check the permissions with 'zk stat'), will keep the lock, use 'zk rm' to clear the lock", actionLogPath)
return err
}
// and delete the action
return zk.DeleteRecursive(zkts.zconn, lockPath, -1)
}
// LockKeyspaceForAction is part of topo.Server interface
func (zkts *Server) LockKeyspaceForAction(ctx context.Context, keyspace, contents string) (string, error) {
// Action paths end in a trailing slash to that when we create
// sequential nodes, they are created as children, not siblings.
actionDir := path.Join(GlobalKeyspacesPath, keyspace, "action") + "/"
return zkts.lockForAction(ctx, actionDir, contents)
}
// UnlockKeyspaceForAction is part of topo.Server interface
func (zkts *Server) UnlockKeyspaceForAction(ctx context.Context, keyspace, lockPath, results string) error {
return zkts.unlockForAction(lockPath, results)
}
// LockShardForAction is part of topo.Server interface
func (zkts *Server) LockShardForAction(ctx context.Context, keyspace, shard, contents string) (string, error) {
// Action paths end in a trailing slash to that when we create
// sequential nodes, they are created as children, not siblings.
actionDir := path.Join(GlobalKeyspacesPath, keyspace, "shards", shard, "action") + "/"
return zkts.lockForAction(ctx, actionDir, contents)
}
// UnlockShardForAction is part of topo.Server interface
func (zkts *Server) UnlockShardForAction(ctx context.Context, keyspace, shard, lockPath, results string) error {
return zkts.unlockForAction(lockPath, results)
}