-
Notifications
You must be signed in to change notification settings - Fork 21
/
acquire.go
110 lines (90 loc) · 2.59 KB
/
acquire.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package lock
import (
"fmt"
"os"
"sort"
"time"
"golang.org/x/net/context"
etcd "github.com/coreos/etcd/client"
"gopkg.in/errgo.v1"
)
type Error struct {
hostname string
}
func (e *Error) Error() string {
return fmt.Sprintf("key is already locked by %s", e.hostname)
}
type Locker interface {
Acquire(key string, ttl uint64) (Lock, error)
WaitAcquire(key string, ttl uint64) (Lock, error)
Wait(key string) error
}
type EtcdLocker struct {
kapi etcd.KeysAPI
client etcd.Client
}
func NewEtcdLocker(client etcd.Client) Locker {
return &EtcdLocker{kapi: etcd.NewKeysAPI(client), client: client}
}
type Lock interface {
Release() error
}
type EtcdLock struct {
kapi etcd.KeysAPI
key string
index uint64
}
func (locker *EtcdLocker) Acquire(key string, ttl uint64) (Lock, error) {
return locker.acquire(locker.kapi, key, ttl, false)
}
func (locker *EtcdLocker) WaitAcquire(key string, ttl uint64) (Lock, error) {
return locker.acquire(locker.kapi, key, ttl, true)
}
func (locker *EtcdLocker) acquire(kapi etcd.KeysAPI, key string, ttl uint64, wait bool) (Lock, error) {
hasLock := false
key = addPrefix(key)
lock, err := addLockDirChild(locker.client, kapi, key)
if err != nil {
return nil, errgo.Mask(err)
}
for !hasLock {
res, err := kapi.Get(context.Background(), key, &etcd.GetOptions{Recursive: true, Sort: true})
if err != nil {
return nil, errgo.Mask(err)
}
if len(res.Node.Nodes) > 1 {
sort.Sort(res.Node.Nodes)
if res.Node.Nodes[0].CreatedIndex != lock.Node.CreatedIndex {
if !wait {
kapi.Delete(context.Background(), lock.Node.Key, &etcd.DeleteOptions{})
return nil, &Error{res.Node.Nodes[0].Value}
} else {
err = locker.Wait(lock.Node.Key)
if err != nil {
return nil, errgo.Mask(err)
}
}
} else {
// if the first index is the current one, it's our turn to lock the key
hasLock = true
}
} else {
// If there are only 1 node, it's our, lock is acquired
hasLock = true
}
}
// If we get the lock, set the ttl and return it
_, err = kapi.Set(context.Background(), lock.Node.Key, lock.Node.Value, &etcd.SetOptions{TTL: time.Duration(ttl) * time.Second})
if err != nil {
return nil, errgo.Mask(err)
}
return &EtcdLock{kapi, lock.Node.Key, lock.Node.CreatedIndex}, nil
}
func addLockDirChild(client etcd.Client, kapi etcd.KeysAPI, key string) (*etcd.Response, error) {
hostname, err := os.Hostname()
if err != nil {
return nil, errgo.Notef(err, "fail to get hostname")
}
client.Sync(context.Background())
return kapi.CreateInOrder(context.Background(), key, hostname, &etcd.CreateInOrderOptions{})
}