/
sync.go
231 lines (201 loc) · 6.14 KB
/
sync.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
package utils
import (
"context"
"fmt"
"github.com/Laisky/zap"
"net/http"
"sync/atomic"
"time"
"github.com/Laisky/graphql"
"github.com/dgrijalva/jwt-go"
"github.com/pkg/errors"
)
const (
defaultLaiskyRemoteLockTokenUserKey = "uid"
defaultLaiskyRemoteLockAuthCookieName = "general"
defaultLaiskyRemoteLockTimeout = 5 * time.Second
defaultLaiskyRemoteLockRenewalDuration = 10 * time.Second
defaultLaiskyRemoteLockRenewalInterval = 1 * time.Second
defaultLaiskyRemoteLockIsRenewal = false
defaultLaiskyRemoteLockMaxRetry = 3
)
// Mutex mutex that support unblocking lock
type Mutex struct {
l uint32
}
// NewMutex create new mutex
func NewMutex() *Mutex {
return &Mutex{
l: 0,
}
}
// TryLock return true if succeed locked
func (m *Mutex) TryLock() bool {
return atomic.CompareAndSwapUint32(&m.l, 0, 1)
}
// IsLocked return true if is locked
func (m *Mutex) IsLocked() bool {
return atomic.LoadUint32(&m.l) == 1
}
// TryRelease return true if succeed release
func (m *Mutex) TryRelease() bool {
return atomic.CompareAndSwapUint32(&m.l, 1, 0)
}
// ForceRelease force release lock
func (m *Mutex) ForceRelease() {
atomic.StoreUint32(&m.l, 0)
}
// SpinLock block until succee acquired lock
func (m *Mutex) SpinLock(step, timeout time.Duration) {
start := Clock.GetUTCNow()
for {
if m.TryLock() || Clock.GetUTCNow().Sub(start) > timeout {
return
}
time.Sleep(step)
}
}
// LaiskyRemoteLock acquire lock from Laisky's GraphQL API
type LaiskyRemoteLock struct {
cli *graphql.Client
token,
tokenCookieName,
userID string
timeout time.Duration
}
// LaiskyRemoteLockOptFunc laisky's lock option
type LaiskyRemoteLockOptFunc func(*LaiskyRemoteLock)
// WithLaiskyRemoteLockTimeout set http client timeout
func WithLaiskyRemoteLockTimeout(timeout time.Duration) LaiskyRemoteLockOptFunc {
return func(opt *LaiskyRemoteLock) {
opt.timeout = timeout
}
}
type headerTransport struct {
base http.RoundTripper
headers map[string]string
}
// NewLaiskyRemoteLock create remote lock
func NewLaiskyRemoteLock(api, token string, opts ...LaiskyRemoteLockOptFunc) (l *LaiskyRemoteLock, err error) {
var payload jwt.MapClaims
if payload, err = ParseJWTTokenWithoutValidate(token); err != nil {
return nil, errors.Wrap(err, "token invalidate")
}
l = &LaiskyRemoteLock{
token: token,
tokenCookieName: defaultLaiskyRemoteLockAuthCookieName,
timeout: defaultLaiskyRemoteLockTimeout,
}
var ok bool
if l.userID, ok = payload[defaultLaiskyRemoteLockTokenUserKey].(string); !ok {
return nil, fmt.Errorf("unknown typo of %v, should be string", defaultLaiskyRemoteLockTokenUserKey)
}
for _, optf := range opts {
optf(l)
}
l.cli = graphql.NewClient(
api,
&http.Client{
Timeout: l.timeout,
},
graphql.WithCookie(l.tokenCookieName, l.token),
)
return l, nil
}
type acquireLockMutation struct {
AcquireLock bool `graphql:"AcquireLock(lock_name: $lock_name, is_renewal: $is_renewal, duration_sec: $duration_sec)"`
}
type acquireLockOption struct {
renewalInterval,
duration time.Duration
isRenewal bool
maxRetry int
}
// AcquireLockOptFunc options for acquire lock
type AcquireLockOptFunc func(*acquireLockOption)
// WithAcquireLockDuration set how long to extend lock
func WithAcquireLockDuration(duration time.Duration) AcquireLockOptFunc {
if duration <= 0 {
Logger.Panic("duration should greater than 0", zap.Duration("duration", duration))
}
return func(opt *acquireLockOption) {
opt.duration = duration
}
}
// WithAcquireLockRenewalInterval set how ofter to renewal lock
func WithAcquireLockRenewalInterval(renewalInterval time.Duration) AcquireLockOptFunc {
if renewalInterval < 100*time.Millisecond {
Logger.Panic("renewalInterval must greater than 100ms", zap.Duration("renewalInterval", renewalInterval))
}
return func(opt *acquireLockOption) {
opt.renewalInterval = renewalInterval
}
}
// WithAcquireLockIsRenewal set whether to auto renewal lock
func WithAcquireLockIsRenewal(isRenewal bool) AcquireLockOptFunc {
return func(opt *acquireLockOption) {
opt.isRenewal = isRenewal
}
}
// WithAcquireLockMaxRetry set max retry to acquire lock
func WithAcquireLockMaxRetry(maxRetry int) AcquireLockOptFunc {
if maxRetry < 0 {
Logger.Panic("maxRetry must greater than 0", zap.Int("maxRetry", maxRetry))
}
return func(opt *acquireLockOption) {
opt.maxRetry = maxRetry
}
}
// AcquireLock acquire lock with lockname,
// if `isRenewal=true`, will automate refresh lock's lease until ctx done.
// duration to specify how much time each renewal will extend.
func (l *LaiskyRemoteLock) AcquireLock(ctx context.Context, lockName string, opts ...AcquireLockOptFunc) (ok bool, err error) {
opt := &acquireLockOption{
renewalInterval: defaultLaiskyRemoteLockRenewalInterval,
duration: defaultLaiskyRemoteLockRenewalDuration,
isRenewal: defaultLaiskyRemoteLockIsRenewal,
maxRetry: defaultLaiskyRemoteLockMaxRetry,
}
for _, optf := range opts {
optf(opt)
}
var (
query = new(acquireLockMutation)
vars = map[string]interface{}{
"lock_name": graphql.String(lockName),
"is_renewal": graphql.Boolean(opt.isRenewal),
"duration_sec": graphql.Int(opt.duration.Seconds()),
}
)
if err = l.cli.Mutate(ctx, query, vars); err != nil {
return ok, errors.Wrap(err, "request graphql mutation")
}
if ok = query.AcquireLock; opt.isRenewal && ok {
go l.renewalLock(ctx, query, vars, opt)
}
return ok, nil
}
func (l *LaiskyRemoteLock) renewalLock(ctx context.Context, query *acquireLockMutation, vars map[string]interface{}, opt *acquireLockOption) {
var (
nRetry = 0
err error
ticker = time.NewTicker(opt.renewalInterval)
lockName = string(vars["lock_name"].(graphql.String))
)
defer ticker.Stop()
Logger.Debug("start to auto renewal lock", zap.String("lock_name", lockName))
for nRetry < opt.maxRetry {
select {
case <-ctx.Done():
return
case <-ticker.C:
}
if err = l.cli.Mutate(ctx, query, vars); err != nil {
Logger.Error("renewal lock", zap.Error(err), zap.Int("n_retry", nRetry), zap.String("lock_name", lockName))
time.Sleep(1 * time.Second)
nRetry++
}
nRetry = 0
Logger.Debug("success renewal lock", zap.String("lock_name", lockName))
}
}