forked from ShiningRush/fastflow
/
mutex.go
129 lines (114 loc) · 2.95 KB
/
mutex.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package mongo
import (
"context"
"fmt"
"time"
"github.com/igxm/fastflow/pkg/mod"
"github.com/igxm/fastflow/pkg/utils/data"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
)
type LockDetail struct {
Key string `bson:"_id"`
ExpiredAt time.Time `bson:"expiredAt"`
Identity string `bson:"identity"`
}
type MongoMutex struct {
key string
clsName string
mongoDb *mongo.Database
lockDetail *LockDetail
}
func (m *MongoMutex) Lock(ctx context.Context, ops ...mod.LockOptionOp) error {
opt := mod.NewLockOption(ops)
if err := m.spinLock(ctx, opt); err != nil {
return err
}
// already keep lock
if m.lockDetail != nil {
return nil
}
// when get lock failed, loop to get it
ticker := time.NewTicker(opt.SpinInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if err := m.spinLock(ctx, opt); err != nil {
return nil
}
// already keep lock
if m.lockDetail != nil {
return nil
}
case <-ctx.Done():
return ctx.Err()
}
}
}
func (m *MongoMutex) spinLock(ctx context.Context, opt *mod.LockOption) error {
detail := LockDetail{}
err := m.mongoDb.Collection(m.clsName).FindOne(ctx, bson.M{"_id": m.key}).Decode(&detail)
if err != nil && err != mongo.ErrNoDocuments {
return fmt.Errorf("get lock detail failed: %w", err)
}
// no lock
if err == mongo.ErrNoDocuments {
d := &LockDetail{
Key: m.key,
ExpiredAt: time.Now().Add(opt.TTL),
Identity: opt.ReentrantIdentity,
}
_, err := m.mongoDb.Collection(m.clsName).InsertOne(ctx, d)
if err != nil {
if mongo.IsDuplicateKeyError(err) {
// race lock failed, ready to get lock next time
return nil
}
return fmt.Errorf("insert lock detail failed: %w", err)
}
m.lockDetail = d
return nil
}
// lock existed, we should check it is expired
if detail.ExpiredAt.Before(time.Now()) {
exp := time.Now().Add(opt.TTL)
ret, err := m.mongoDb.Collection(m.clsName).UpdateOne(ctx, bson.M{"_id": m.key, "expiredAt": detail.ExpiredAt}, bson.M{
"$set": bson.M{
"expiredAt": time.Now().Add(opt.TTL),
"identity": opt.ReentrantIdentity,
},
})
if err != nil {
return fmt.Errorf("get lock failed: %w", err)
}
// lock is keep by others
if ret.ModifiedCount == 0 {
return nil
}
detail.ExpiredAt = exp
m.lockDetail = &detail
return nil
}
// lock existed, we should check it is reentrant
if opt.ReentrantIdentity != "" && detail.Identity == opt.ReentrantIdentity {
m.lockDetail = &detail
return nil
}
// lock is keep by others, return to loop
return nil
}
func (m *MongoMutex) Unlock(ctx context.Context) error {
if m.lockDetail == nil {
return fmt.Errorf("the mutex is not locked")
}
ret, err := m.mongoDb.Collection(m.clsName).DeleteOne(ctx, bson.M{"_id": m.key, "expiredAt": m.lockDetail.ExpiredAt})
if err != nil {
return fmt.Errorf("delete lock detail failed: %w", err)
}
if ret.DeletedCount == 0 {
return data.ErrMutexAlreadyUnlock
}
m.lockDetail = nil
return nil
}