/
zklock.go
151 lines (131 loc) · 3.71 KB
/
zklock.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
package cypress
import (
"context"
"errors"
"sync"
"sync/atomic"
"github.com/go-zookeeper/zk"
"go.uber.org/zap"
)
var (
// ErrLockFailed failed to lock
ErrLockFailed = errors.New("failed to lock")
// ErrLockCancelled not able to acquire the lock before context has cancelled
ErrLockCancelled = errors.New("context cancelled before lock is acquired")
)
type LockContext map[string]interface{}
type DistributedLocker interface {
Lock(ctx context.Context) (LockContext, error)
Unlock(lockCtx LockContext)
}
type DistributedSharedLocker interface {
RLock(ctx context.Context) (LockContext, error)
RUnlock(lockCtx LockContext)
Lock(ctx context.Context) (LockContext, error)
Unlock(lockCtx LockContext)
}
// ZkLock zookeeper based distributed lock
// This lock does not rely on sequential numbers, but may lead to starvation issue
type ZkLock struct {
conn *zk.Conn
path string
localLock *sync.Mutex
acquired bool
}
// NewZkLock creates a new ZkLock on the given path
func NewZkLock(conn *zk.Conn, path string) *ZkLock {
return &ZkLock{
conn, path, &sync.Mutex{}, false,
}
}
// Lock lock or return error if not able to lock
func (lock *ZkLock) Lock(ctx context.Context) (LockContext, error) {
lock.localLock.Lock()
unlockLocal := true
defer func() {
if unlockLocal {
lock.localLock.Unlock()
}
}()
if lock.acquired {
panic("bad zklock state")
}
var cancelled int32
cancelChannel := make(chan int32, 1)
defer close(cancelChannel)
go func() {
select {
case <-ctx.Done():
if ctx.Err() == context.DeadlineExceeded || ctx.Err() == context.Canceled {
atomic.StoreInt32(&cancelled, 1)
cancelChannel <- 1
}
case <-cancelChannel:
break
}
}()
for atomic.LoadInt32(&cancelled) == 0 {
exists, _, ch, err := lock.conn.ExistsW(lock.path)
if err != nil {
zap.L().Error("failed to check node", zap.String("path", lock.path), zap.Error(err))
return nil, ErrLockFailed
}
if !exists {
_, err = lock.conn.Create(lock.path, []byte{}, zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
if err == nil {
// wait and drop the create event
event := <-ch
if event.Type != zk.EventNodeCreated {
zap.L().Error("unexpected event, NodeCreated is expected", zap.String("path", lock.path), zap.Int32("event", int32(event.Type)))
return nil, ErrLockFailed
}
lock.acquired = true
break
} else if err == zk.ErrNodeExists {
// lock has been placed by other process
event := <-ch
if event.Type != zk.EventNodeCreated {
zap.L().Error("unexpected event type", zap.String("path", lock.path), zap.Int32("type", int32(event.Type)))
return nil, ErrLockFailed
}
} else {
zap.L().Error("unexpected create error", zap.String("path", lock.path), zap.Error(err))
return nil, ErrLockFailed
}
} else {
// wait for lock to be released or cancel event
select {
case event := <-ch:
if event.Type != zk.EventNodeDeleted {
zap.L().Error("unexpected event type", zap.String("path", lock.path), zap.Int32("type", int32(event.Type)))
return nil, ErrLockFailed
}
break
case <-cancelChannel:
break
}
}
}
// if lock is acquired, local lock will be unlocked in Release
unlockLocal = !lock.acquired
if atomic.LoadInt32(&cancelled) == 1 {
if lock.acquired {
lock.Release()
}
return nil, ErrLockCancelled
}
return nil, nil
}
// Release release the lock, ignore all errors
func (lock *ZkLock) Release() {
err := lock.conn.Delete(lock.path, -1)
if err != nil {
zap.L().Fatal("failed to delete lock node", zap.String("path", lock.path), zap.Error(err))
}
lock.acquired = false
lock.localLock.Unlock()
}
// Unlock release the lock, alias of Release
func (lock *ZkLock) Unlock(lockCtx LockContext) {
lock.Release()
}