/
lock.go
332 lines (274 loc) · 10.3 KB
/
lock.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
package store
import (
"errors"
"fmt"
"io"
"os"
"path/filepath"
"sync"
"time"
)
// Lock An interprocess mutex lock.
// See Also: Directory.obtainLock(String)
type Lock interface {
// Closer Releases exclusive access.
// Note that exceptions thrown from close may require human intervention, as it may mean the lock was no longer valid, or that fs permissions prevent removal of the lock file, or other reasons.
// Closes this stream and releases any system resources associated with it. If the stream is already closed then invoking this method has no effect.
// As noted in AutoCloseable.close(), cases where the close may fail require careful attention. It is strongly advised to relinquish the underlying resources and to internally mark the Closeable as closed, prior to throwing the IOException.
// Throws: LockReleaseFailedException – optional specific exception) if the lock could not be properly released.
io.Closer
// EnsureValid Best effort check that this lock is still valid. Locks could become invalidated externally for a number of reasons, for example if a user deletes the lock file manually or when a network filesystem is in use.
// Throws: IOException – if the lock is no longer valid.
EnsureValid() error
}
// LockFactory Base class for Locking implementation. Directory uses instances of this class to implement locking.
//
// Lucene uses NativeFSLockFactory by default for FSDirectory-based index directories.
//
// Special care needs to be taken if you change the locking implementation: First be certain that no writer is
// in fact writing to the index otherwise you can easily corrupt your index. Be sure to do the LockFactory
// change on all Lucene instances and clean up all leftover lock files before starting the new configuration
// for the first time. Different implementations can not work together!
//
// If you suspect that some LockFactory implementation is not working properly in your environment, you can
// easily test it by using VerifyingLockFactory, LockVerifyServer and LockStressTest.
//
// See Also: LockVerifyServer,
//
// LockStressTest,
// VerifyingLockFactory
type LockFactory interface {
// ObtainLock Return a new obtained Lock instance identified by lockName.
// Params: lockName – name of the lock to be created.
// Throws: LockObtainFailedException – (optional specific exception) if the lock could not be obtained
// because it is currently held elsewhere.
// IOException – if any i/o error occurs attempting to gain the lock
ObtainLock(dir Directory, lockName string) (Lock, error)
}
// FSLockFactory Base class for file system based locking implementation. This class is explicitly
// checking that the passed Directory is an FSDirectory.
type FSLockFactory interface {
LockFactory
// FSLockFactoryInner Implement this method to obtain a lock for a FSDirectory instance.
// Throws: IOException – if the lock could not be obtained.
FSLockFactoryInner
}
type FSLockFactoryInner interface {
ObtainFSLock(dir FSDirectory, lockName string) (Lock, error)
}
type FSLockFactoryBase struct {
locker FSLockFactoryInner
}
func NewFSLockFactoryBase(inner FSLockFactoryInner) *FSLockFactoryBase {
return &FSLockFactoryBase{locker: inner}
}
func (f *FSLockFactoryBase) ObtainLock(dir Directory, lockName string) (Lock, error) {
fsDir, ok := dir.(FSDirectory)
if !ok {
return nil, errors.New("can only be used with FSDirectory subclasses")
}
return f.locker.ObtainFSLock(fsDir, lockName)
}
var _ FSLockFactory = &SimpleFSLockFactory{}
// SimpleFSLockFactory Implements LockFactory using Files.createFile.
// The main downside with using this API for locking is that the Lucene write lock may not be released when the JVM exits abnormally.
// When this happens, an LockObtainFailedException is hit when trying to create a writer, in which case you may need to explicitly clear the lock file first by manually removing the file. But, first be certain that no writer is in fact writing to the index otherwise you can easily corrupt your index.
// Special care needs to be taken if you change the locking implementation: First be certain that no writer is in fact writing to the index otherwise you can easily corrupt your index. Be sure to do the LockFactory change all Lucene instances and clean up all leftover lock files before starting the new configuration for the first time. Different implementations can not work together!
// If you suspect that this or any other LockFactory is not working properly in your environment, you can easily test it by using VerifyingLockFactory, LockVerifyServer and LockStressTest.
// This is a singleton, you have to use INSTANCE.
// See Also: LockFactory
type SimpleFSLockFactory struct {
*FSLockFactoryBase
}
func NewSimpleFSLockFactory() *SimpleFSLockFactory {
factory := &SimpleFSLockFactory{}
factory.FSLockFactoryBase = NewFSLockFactoryBase(factory)
return factory
}
func (s *SimpleFSLockFactory) ObtainFSLock(dir FSDirectory, lockName string) (Lock, error) {
lockDir, err := dir.GetDirectory()
if err != nil {
return nil, err
}
// Ensure that lockDir exists and is a directory.
// note: this will fail if lockDir is a symlink
// Files.createDirectories(lockDir)
lockFile := filepath.Join(lockDir, lockName)
// create the file: this will fail if it already exists
if _, err = os.Stat(lockFile); err == nil {
return nil, fmt.Errorf("lock held elsewhere: %s", lockFile)
}
if os.IsNotExist(err) {
// 文件不存在
if _, err := os.Create(lockFile); err != nil {
return nil, err
}
info, err := os.Stat(lockFile)
if err != nil {
return nil, err
}
_, ctime, _ := FileTime(info)
return NewSimpleFSLock(lockFile, ctime), nil
}
return nil, err
}
var _ Lock = &SimpleFSLock{}
type SimpleFSLock struct {
path string
creationTime time.Time
closed bool
}
func NewSimpleFSLock(path string, creationTime time.Time) *SimpleFSLock {
return &SimpleFSLock{path: path, creationTime: creationTime}
}
func (s *SimpleFSLock) Close() error {
return os.Remove(s.path)
}
func (s *SimpleFSLock) EnsureValid() error {
if s.closed {
return fmt.Errorf("lock instance already released: %s", s.path)
}
info, err := os.Stat(s.path)
if err != nil {
return err
}
_, ctime, _ := FileTime(info)
if !s.creationTime.Equal(ctime) {
return fmt.Errorf(
"underlying file changed by an external force at %s, (lock=%s)",
ctime.String(), s.path)
}
return nil
}
var _ LockFactory = &SingleInstanceLockFactory{}
// SingleInstanceLockFactory Implements LockFactory for a single in-process instance, meaning all
// locking will take place through this one instance. Only use this LockFactory when you are certain
// all IndexWriters for a given index are running against a single shared in-process Directory instance.
// This is currently the default locking for RAMDirectory.
// See Also: LockFactory
type SingleInstanceLockFactory struct {
sync.RWMutex
locks map[string]struct{}
}
func NewSingleInstanceLockFactory() *SingleInstanceLockFactory {
return &SingleInstanceLockFactory{
RWMutex: sync.RWMutex{},
locks: make(map[string]struct{}),
}
}
func (s *SingleInstanceLockFactory) ObtainLock(_ Directory, lockName string) (Lock, error) {
s.Lock()
defer s.Unlock()
if _, ok := s.locks[lockName]; ok {
return nil, fmt.Errorf("lock instance already obtained: (lockName=%s)", lockName)
}
s.locks[lockName] = struct{}{}
return NewSingleInstanceLock(s, lockName), nil
}
var _ Lock = &SingleInstanceLock{}
type SingleInstanceLock struct {
*SingleInstanceLockFactory
lockName string
closed bool
}
func NewSingleInstanceLock(factory *SingleInstanceLockFactory, lockName string) *SingleInstanceLock {
return &SingleInstanceLock{SingleInstanceLockFactory: factory, lockName: lockName}
}
func (s *SingleInstanceLock) Close() error {
if s.closed {
return nil
}
s.Lock()
defer s.Unlock()
if _, ok := s.locks[s.lockName]; !ok {
return fmt.Errorf("lock was already released: %s", s.lockName)
}
delete(s.locks, s.lockName)
s.closed = true
return nil
}
func (s *SingleInstanceLock) EnsureValid() error {
if s.closed {
return fmt.Errorf("lock instance already released: %s", s.lockName)
}
s.Lock()
defer s.Unlock()
if _, ok := s.locks[s.lockName]; !ok {
return fmt.Errorf("lock instance was invalidated from map: %s", s.lockName)
}
return nil
}
/*
const (
MSG_LOCK_RELEASED = 0
MSG_LOCK_ACQUIRED = 1
)
var _ LockFactory = &VerifyingLockFactory{}
// VerifyingLockFactory A LockFactory that wraps another LockFactory and verifies that each lock
// obtain/release is "correct" (never results in two processes holding the lock at the same time).
// It does this by contacting an external server (LockVerifyServer) to assert that at most one process
// holds the lock at a time. To use this, you should also run LockVerifyServer on the host and port
// matching what you pass to the constructor.
//
// See Also: LockVerifyServer,
//
// LockStressTest
type VerifyingLockFactory struct {
lf LockFactory
in io.ByteReader
out io.ByteWriter
}
// NewVerifyingLockFactory
// lf: the LockFactory that we are testing
// in: the socket's input to LockVerifyServer
// out: the socket's output to LockVerifyServer
func NewVerifyingLockFactory(lf LockFactory, in io.ByteReader, out io.ByteWriter) *VerifyingLockFactory {
return &VerifyingLockFactory{lf: lf, in: in, out: out}
}
func (v *VerifyingLockFactory) ObtainLock(dir Directory, lockName string) (Lock, error) {
lock, err := v.lf.ObtainLock(dir, lockName)
if err != nil {
return nil, err
}
return NewCheckedLock(v, lock), nil
}
var _ Lock = &CheckedLock{}
type CheckedLock struct {
*VerifyingLockFactory
lock Lock
buff []byte
}
func NewCheckedLock(factory *VerifyingLockFactory, lock Lock) *CheckedLock {
return &CheckedLock{
VerifyingLockFactory: factory,
lock: lock,
buff: make([]byte, 1),
}
}
func (c *CheckedLock) Close() error {
if err := c.lock.EnsureValid(); err != nil {
return err
}
return c.verify(MSG_LOCK_RELEASED)
}
func (c *CheckedLock) EnsureValid() error {
return c.lock.EnsureValid()
}
func (c *CheckedLock) verify(message byte) error {
err := c.out.WriteByte(message)
if err != nil {
return err
}
ret, err := c.in.ReadByte()
if err != nil {
return err
}
if ret < 0 {
return errors.New("lock server died because of locking error")
}
if ret != message {
return errors.New("protocol violation")
}
return nil
}
*/