-
Notifications
You must be signed in to change notification settings - Fork 8
/
lockfile.go
245 lines (224 loc) · 7.98 KB
/
lockfile.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
/*
* Copyright (C) 2020-2022 Arm Limited or its affiliates and Contributors. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*/
//Distributed lock using lock files https://fileinfo.com/extension/lock
package filesystem
import (
"context"
"fmt"
"path/filepath"
"strings"
"time"
retry "github.com/avast/retry-go"
"github.com/ARM-software/golang-utils/utils/collection"
"github.com/ARM-software/golang-utils/utils/commonerrors"
"github.com/ARM-software/golang-utils/utils/parallelisation"
)
const LockFilePrefix = "lockfile"
// RemoteLockFile describes a distributed lock using only the file system.
// The locking mechanism is performed using directories and the atomic function `mkdir`.
// A major issue of distributed locks is the presence of stale locks due to many factors such as the loss of the holder of a lock for various reasons.
// To mitigate this problem, a "heart bit" file is modified regularly by the lock holder in order to specify the holder is still alive and the lock still valid.
type RemoteLockFile struct {
id string
prefix string
path string
timeBetweenLockTries time.Duration
fs *VFS
lockHeartBeatPeriod time.Duration
cancelStore *parallelisation.CancelFunctionStore
overrideStaleLock bool
}
// NewGenericRemoteLockFile creates a new remote lock using the file system.
func NewGenericRemoteLockFile(fs *VFS, lockID string, dirPath string, overrideStaleLock bool) ILock {
return &RemoteLockFile{
id: lockID,
prefix: LockFilePrefix,
path: dirPath,
timeBetweenLockTries: 10 * time.Millisecond,
fs: fs,
lockHeartBeatPeriod: 50 * time.Millisecond,
cancelStore: parallelisation.NewCancelFunctionsStore(),
overrideStaleLock: overrideStaleLock,
}
}
// NewRemoteLockFile creates a new remote lock using the file system.
// lockID Id for the lock.
// dirPath path where the lock should be applied to.
func NewRemoteLockFile(fs *VFS, lockID string, dirPath string) ILock {
return NewGenericRemoteLockFile(fs, lockID, dirPath, false)
}
func heartBeat(ctx context.Context, fs FS, period time.Duration, filepath string) {
for {
if err := parallelisation.DetermineContextError(ctx); err != nil {
return
}
now := time.Now()
_ = fs.WriteFile(filepath, []byte(fmt.Sprintf("alive @ %v", now)), 0775)
// FIXME: this is to overcome the problem found with different filesystems which do not update modTime on file change.
// e.g. https://github.com/spf13/afero/issues/297
_ = fs.Chtimes(filepath, now, now)
// sleeping until next heart beat
parallelisation.SleepWithContext(ctx, period-time.Millisecond)
}
}
func (l *RemoteLockFile) lockPath() string {
return filepath.Join(l.path, fmt.Sprintf("%v-%v", strings.TrimSpace(l.prefix), strings.TrimSpace(l.id)))
}
// IsStale checks whether the lock is stale (i.e. no heart beat detected) or not.
func (l *RemoteLockFile) IsStale() bool {
lockPath := l.lockPath()
heartBeatFiles, err := l.fs.Ls(lockPath)
if err != nil {
return false
}
if len(heartBeatFiles) == 0 {
// if directory exists but no files are present, then it could be that the directory has been created
// but that the heart beat file hasn't yet. Therefore we check the age of the directory and deduce whether
// it is stale or not.
dirInfo, err := l.fs.StatTimes(lockPath)
if err != nil {
return false
}
return isStale(dirInfo, l.lockHeartBeatPeriod)
}
return areHeartBeatFilesAllStale(l.fs, lockPath, heartBeatFiles, l.lockHeartBeatPeriod)
}
func areHeartBeatFilesAllStale(fs *VFS, lockPath string, heartBeatFiles []string, lockHeartBeatPeriod time.Duration) bool {
staleFiles := []bool{}
for i := range heartBeatFiles {
heartBeat := filepath.Join(lockPath, heartBeatFiles[i]) // there should only be one file in the directory
// check the time since the heart beat was last modified.
// if this is less than that beat period then the lock is alive
info, err := fs.StatTimes(heartBeat)
isStaleB := false
if err == nil {
isStaleB = isStale(info, lockHeartBeatPeriod)
}
staleFiles = append(staleFiles, isStaleB)
}
return collection.All(staleFiles)
}
func isStale(filetime FileTimeInfo, beatPeriod time.Duration) bool {
if filetime == nil {
return false
}
return time.Since(filetime.ModTime()).Milliseconds() > 2*beatPeriod.Milliseconds()
}
func (l *RemoteLockFile) ReleaseIfStale(ctx context.Context) error {
if l.IsStale() {
return l.Unlock(ctx)
}
return nil
}
// TryLock attempts to lock the lock straight away.
func (l *RemoteLockFile) TryLock(ctx context.Context) (err error) {
if err := parallelisation.DetermineContextError(ctx); err != nil {
return err
}
lockPath := l.lockPath()
// create directory as lock
err = l.fs.vfs.Mkdir(lockPath, 0755)
if commonerrors.Any(ConvertFileSytemError(err), commonerrors.ErrExists) {
if l.IsStale() {
if l.overrideStaleLock {
_ = l.ReleaseIfStale(ctx)
err = l.TryLock(ctx)
return err
}
return commonerrors.ErrStaleLock
}
return commonerrors.ErrLocked
}
if err != nil {
return
}
// FIXME: the following is to overcome the problem found with different filesystems which do not update modTime on directory creation.
// e.g. https://github.com/spf13/afero/issues/297
now := time.Now()
_ = l.fs.Chtimes(lockPath, now, now)
// create a heart beat file that will be updated whilst the lock is active
// there will be a context for cancelling update status when unlock is called
// the status file will update the file (modtime) until told to cancel through ctx
heartBeatFilePath := l.heartBeatFile(lockPath)
subctx, cancelFunc := context.WithCancel(ctx)
l.cancelStore.RegisterCancelFunction(cancelFunc)
go heartBeat(subctx, l.fs, l.lockHeartBeatPeriod, heartBeatFilePath)
return nil
}
func (l *RemoteLockFile) heartBeatFile(lockPath string) string {
return filepath.Join(lockPath, fmt.Sprintf("%v.lock", l.id))
}
// Lock locks the lock. This call will block until the lock is available.
func (l *RemoteLockFile) Lock(ctx context.Context) error {
for {
if err := parallelisation.DetermineContextError(ctx); err != nil {
return err
}
if err := l.TryLock(ctx); err != nil {
if err == commonerrors.ErrLocked {
waitCtx, cancel := context.WithTimeout(ctx, l.timeBetweenLockTries)
<-waitCtx.Done()
cancel()
} else {
return err
}
} else {
return nil
}
}
}
// LockWithTimeout tries to lock the lock until the timeout expires
func (l *RemoteLockFile) LockWithTimeout(ctx context.Context, timeout time.Duration) error {
if err := parallelisation.DetermineContextError(ctx); err != nil {
return err
}
return parallelisation.RunActionWithTimeoutAndCancelStore(ctx, timeout, l.cancelStore, l.Lock)
}
// Unlock unlocks the lock
func (l *RemoteLockFile) Unlock(context.Context) error {
l.cancelStore.Cancel()
return retry.Do(
func() error {
err := l.fs.Rm(l.lockPath())
if err != nil {
return fmt.Errorf("cannot unlock lock [%v]: %w", l.id, err)
}
if l.fs.Exists(l.lockPath()) {
return fmt.Errorf("cannot unlock lock [%v]: %w", l.id, commonerrors.ErrLocked)
}
return nil
},
retry.MaxJitter(25*time.Millisecond),
retry.DelayType(retry.RandomDelay),
retry.Attempts(10),
)
}
// MakeStale is mostly useful for testing purposes and tries to mock locks going stale.
func (l *RemoteLockFile) MakeStale(ctx context.Context) error {
l.cancelStore.Cancel()
parallelisation.SleepWithContext(ctx, l.lockHeartBeatPeriod+time.Millisecond)
lockPath := l.lockPath()
filePath := l.heartBeatFile(lockPath)
time := time.Now().Add(-1 * (l.lockHeartBeatPeriod + time.Millisecond))
return retry.Do(
func() error {
if !l.fs.Exists(lockPath) {
return nil
}
if l.fs.Exists(filePath) {
_ = l.fs.Chtimes(filePath, time, time)
} else {
_ = l.fs.Chtimes(lockPath, time, time)
}
if !l.IsStale() {
return fmt.Errorf("cannot make lock [%v] stale", l.id)
}
return nil
},
retry.MaxJitter(l.lockHeartBeatPeriod),
retry.DelayType(retry.RandomDelay),
retry.Attempts(10),
)
}