forked from juju/utils
/
fslock.go
394 lines (356 loc) · 11.3 KB
/
fslock.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
// Copyright 2013 Canonical Ltd.
// Licensed under the LGPLv3, see LICENCE file for details.
// Package fslock provides an on-disk mutex protecting a resource
//
// A lock is represented on disk by a directory of a particular name,
// containing an information file. Taking a lock is done by renaming a
// temporary directory into place. We use temporary directories because for
// all filesystems we believe that exactly one attempt to claim the lock will
// succeed and the others will fail.
package fslock
import (
"fmt"
"io/ioutil"
"os"
"path"
"regexp"
"runtime"
"sync"
"time"
"github.com/juju/errors"
"github.com/juju/loggo"
"github.com/juju/utils"
"github.com/juju/utils/clock"
goyaml "gopkg.in/yaml.v2"
)
const (
// NameRegexp specifies the regular expression used to identify valid lock names.
NameRegexp = "^[a-z]+[a-z0-9.-]*$"
heldFilename = "held"
)
var (
logger = loggo.GetLogger("juju.utils.fslock")
// ErrLockNotHeld is returned by Unlock if the lock file is not held by this lock
ErrLockNotHeld = errors.New("lock not held")
// ErrTimeout is returned by LockWithTimeout if the lock could not be obtained before the given deadline
ErrTimeout = errors.New("lock timeout exceeded")
validName = regexp.MustCompile(NameRegexp)
)
// LockConfig defines the configuration of the new lock. Sensible defaults can be
// obtained from Defaults().
type LockConfig struct {
// Clock is used to generate delays
Clock clock.Clock
// WaitDelay is how long to wait after trying to aquire a lock before trying again
WaitDelay time.Duration
// LividityTimeout is how old a lock can be without us considering its
// parent process dead.
LividityTimeout time.Duration
// ReadRetryTimeout is how long to wait after trying to examine a lock
// and not finding it before trying again.
ReadRetryTimeout time.Duration
}
// Defaults generates a LockConfig pre-filled with sensible defaults.
func Defaults() LockConfig {
return LockConfig{
Clock: clock.WallClock,
WaitDelay: 1 * time.Second,
LividityTimeout: 30 * time.Second,
ReadRetryTimeout: time.Millisecond * 10,
}
}
// Lock is a file system lock
type Lock struct {
name string
parent string
clock clock.Clock
nonce string
PID int
stopWritingAliveFile chan struct{}
createAliveFileRunning sync.WaitGroup
waitDelay time.Duration
lividityTimeout time.Duration
readRetryTimeout time.Duration
sanityCheck chan struct{}
}
type onDisk struct {
Nonce string
PID int
Message string
}
// NewLock returns a new lock with the given name within the given lock
// directory, without acquiring it. The lock name must match the regular
// expression defined by NameRegexp.
func NewLock(lockDir, name string, cfg LockConfig) (*Lock, error) {
if !validName.MatchString(name) {
return nil, fmt.Errorf("Invalid lock name %q. Names must match %q", name, NameRegexp)
}
uuid, err := utils.NewUUID()
if err != nil {
return nil, err
}
lock := &Lock{
name: name,
parent: lockDir,
clock: cfg.Clock,
nonce: uuid.String(),
PID: os.Getpid(),
stopWritingAliveFile: make(chan struct{}, 1),
waitDelay: cfg.WaitDelay,
lividityTimeout: cfg.LividityTimeout,
readRetryTimeout: cfg.ReadRetryTimeout,
sanityCheck: make(chan struct{}),
}
// Ensure the parent exists.
if err := os.MkdirAll(lock.parent, 0755); err != nil {
return nil, err
}
// Ensure that an old alive file doesn't exist. RemoveAll doesn't raise
// an error if the target doesn't exist, so we don't expect any errors.
if err := os.RemoveAll(lock.aliveFile(lock.PID)); err != nil {
return nil, err
}
return lock, nil
}
func (lock *Lock) lockDir() string {
return path.Join(lock.parent, lock.name)
}
func (lock *Lock) heldFile() string {
return path.Join(lock.lockDir(), "held")
}
func (lock *Lock) aliveFile(PID int) string {
return path.Join(lock.lockDir(), fmt.Sprintf("alive.%d", PID))
}
// isAlive checks that the PID given is alive by looking to see if it is the
// current process's PID or, if it isn't, for a file named alive.<PID>, which
// has been updated in the last 30 seconds.
func (lock *Lock) isAlive(PID int) bool {
if PID == lock.PID {
return true
}
for i := 0; i < 10; i++ {
aliveInfo, err := os.Lstat(lock.aliveFile(PID))
if err == nil {
return time.Now().Before(aliveInfo.ModTime().Add(lock.lividityTimeout))
}
time.Sleep(lock.readRetryTimeout)
}
return false
}
// createAliveFile kicks off a gorouteine that creates a proof of life file
// and keeps its timestamp current.
func (lock *Lock) createAliveFile() {
lock.createAliveFileRunning.Add(1)
close(lock.sanityCheck)
go func() {
defer lock.createAliveFileRunning.Done()
aliveFile := lock.aliveFile(lock.PID)
if err := ioutil.WriteFile(aliveFile, []byte{}, 644); err != nil {
return
}
for {
select {
case <-time.After(5 * lock.waitDelay):
now := time.Now()
if err := os.Chtimes(aliveFile, now, now); err != nil {
return
}
case <-lock.stopWritingAliveFile:
return
}
}
}()
}
func (lock *Lock) declareDead() {
select {
case lock.stopWritingAliveFile <- struct{}{}:
default:
}
lock.createAliveFileRunning.Wait()
lock.sanityCheck = make(chan struct{}) // refresh sanity check
}
// clean reads the lock and checks that it is valid. If the lock points to a running
// juju process that is older than the lock file, the lock is left in place, else
// the lock is removed.
func (lock *Lock) clean() error {
// If a lock exists, see if it is stale
lockInfo, err := lock.readLock()
if err != nil {
return err
}
if lock.isAlive(lockInfo.PID) {
// lock is current. Do nothing.
logger.Debugf("Lock alive")
return nil
}
logger.Debugf("Lock dead")
return lock.BreakLock()
}
// If message is set, it will write the message to the lock directory as the
// lock is taken.
func (lock *Lock) acquire(message string) (bool, error) {
// If the lockDir exists, then the lock is held by someone else.
_, err := os.Stat(lock.lockDir())
if err == nil {
return false, nil
}
if !os.IsNotExist(err) {
return false, err
}
// Create a temporary directory (in the parent dir), and then move it to
// the right name. Using the same directory to make sure the directories
// are on the same filesystem. Use a directory name starting with "." as
// it isn't a valid lock name.
tempLockName := fmt.Sprintf(".%s", lock.nonce)
tempDirName, err := ioutil.TempDir(lock.parent, tempLockName)
if err != nil {
return false, err // this shouldn't really fail...
}
// write lock into the temp dir
l := onDisk{
PID: lock.PID,
Nonce: lock.nonce,
Message: message,
}
lockInfo, err := goyaml.Marshal(&l)
if err != nil {
return false, err // this shouldn't fail either...
}
err = ioutil.WriteFile(path.Join(tempDirName, heldFilename), lockInfo, 0664)
if err != nil {
return false, err
}
// Now move the temp directory to the lock directory.
err = utils.ReplaceFile(tempDirName, lock.lockDir())
if err != nil {
// Any error on rename means we failed.
// Beaten to it, clean up temporary directory.
os.RemoveAll(tempDirName)
return false, nil
}
// We now have the lock.
lock.createAliveFile()
return true, nil
}
// lockLoop tries to acquire the lock. If the acquisition fails, the
// continueFunc is run to see if the function should continue waiting.
func (lock *Lock) lockLoop(message string, continueFunc func() error) error {
var heldMessage = ""
for {
acquired, err := lock.acquire(message)
if err != nil {
return errors.Trace(err)
}
if acquired {
return nil
}
if err = continueFunc(); err != nil {
return errors.Trace(err)
}
currMessage, err := lock.Message()
if err != nil {
return errors.Trace(err)
}
if currMessage != heldMessage {
logger.Infof("attempted lock failed %q, %s, currently held: %s", lock.name, message, currMessage)
heldMessage = currMessage
}
<-lock.clock.After(lock.waitDelay)
}
}
// Lock blocks until it is able to acquire the lock. Since we are dealing
// with sharing and locking using the filesystem, it is good behaviour to
// provide a message that is saved with the lock. This is output in debugging
// information, and can be queried by any other Lock dealing with the same
// lock name and lock directory.
func (lock *Lock) Lock(message string) error {
lock.clean()
// The continueFunc is effectively a no-op, causing continual looping
// until the lock is acquired.
continueFunc := func() error { return nil }
return lock.lockLoop(message, continueFunc)
}
// LockWithTimeout tries to acquire the lock. If it cannot acquire the lock
// within the given duration, it returns ErrTimeout. See `Lock` for
// information about the message.
func (lock *Lock) LockWithTimeout(duration time.Duration, message string) error {
deadline := lock.clock.Now().Add(duration)
continueFunc := func() error {
if lock.clock.Now().After(deadline) {
return ErrTimeout
}
return nil
}
return lock.lockLoop(message, continueFunc)
}
// LockWithFunc blocks until it is able to acquire the lock. If the lock is failed to
// be acquired, the continueFunc is called prior to the sleeping. If the
// continueFunc returns an error, that error is returned from LockWithFunc.
func (lock *Lock) LockWithFunc(message string, continueFunc func() error) error {
return lock.lockLoop(message, continueFunc)
}
func (lock *Lock) readLock() (lockInfo onDisk, err error) {
lockFile, err := ioutil.ReadFile(lock.heldFile())
if err != nil {
return lockInfo, err
}
err = goyaml.Unmarshal(lockFile, &lockInfo)
return lockInfo, err
}
// isLockHeld returns whether the lock is currently held by the receiver.
func (lock *Lock) isLockHeld() bool {
lockInfo, err := lock.readLock()
if err != nil {
return false
}
return lockInfo.Nonce == lock.nonce
}
// Unlock releases a held lock. If the lock is not held ErrLockNotHeld is
// returned.
func (lock *Lock) Unlock() error {
if !lock.isLockHeld() {
return ErrLockNotHeld
}
// To ensure reasonable unlocking, we should rename to a temp name, and delete that.
lock.declareDead()
tempLockName := fmt.Sprintf(".%s.%s", lock.name, lock.nonce)
tempDirName := path.Join(lock.parent, tempLockName)
// Now move the lock directory to the temp directory to release the lock.
for i := 0; ; i++ {
err := utils.ReplaceFile(lock.lockDir(), tempDirName)
if err == nil {
break
}
if i == 100 {
logger.Debugf("Failed to replace lock, giving up: (%s)", err)
return err
}
logger.Debugf("Failed to replace lock, retrying: (%s)", err)
runtime.Gosched()
}
// And now cleanup.
if err := os.RemoveAll(tempDirName); err != nil {
logger.Debugf("Failed to remove lock: %s", err)
return err
}
return nil
}
// IsLocked returns true if the lock is currently held by anyone.
func (lock *Lock) IsLocked() bool {
_, err := os.Stat(lock.heldFile())
return err == nil
}
// BreakLock forcibly breaks the lock that is currently being held.
func (lock *Lock) BreakLock() error {
lock.declareDead()
return os.RemoveAll(lock.lockDir())
}
// Message returns the saved message, or the empty string if there is no
// saved message.
func (lock *Lock) Message() (string, error) {
lockInfo, err := lock.readLock()
if err != nil && !os.IsNotExist(err) {
return "", errors.Trace(err)
}
return lockInfo.Message, nil
}