/
semaphore.go
195 lines (168 loc) · 4.8 KB
/
semaphore.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
package sync
import (
"fmt"
"strings"
"sync"
"time"
log "github.com/sirupsen/logrus"
sema "golang.org/x/sync/semaphore"
)
type PrioritySemaphore struct {
name string
limit int
pending *priorityQueue
semaphore *sema.Weighted
lockHolder map[string]bool
lock *sync.Mutex
nextWorkflow NextWorkflow
log *log.Entry
}
var _ Semaphore = &PrioritySemaphore{}
func NewSemaphore(name string, limit int, nextWorkflow NextWorkflow, lockType string) *PrioritySemaphore {
return &PrioritySemaphore{
name: name,
limit: limit,
pending: &priorityQueue{itemByKey: make(map[string]*item)},
semaphore: sema.NewWeighted(int64(limit)),
lockHolder: make(map[string]bool),
lock: &sync.Mutex{},
nextWorkflow: nextWorkflow,
log: log.WithFields(log.Fields{
lockType: name,
}),
}
}
func (s *PrioritySemaphore) getName() string {
return s.name
}
func (s *PrioritySemaphore) getLimit() int {
return s.limit
}
func (s *PrioritySemaphore) getCurrentPending() []string {
var keys []string
for _, item := range s.pending.items {
keys = append(keys, item.key)
}
return keys
}
func (s *PrioritySemaphore) getCurrentHolders() []string {
var keys []string
for k := range s.lockHolder {
keys = append(keys, k)
}
return keys
}
func (s *PrioritySemaphore) resize(n int) bool {
s.lock.Lock()
defer s.lock.Unlock()
cur := len(s.lockHolder)
// downward case, acquired n locks
if cur > n {
cur = n
}
semaphore := sema.NewWeighted(int64(n))
status := semaphore.TryAcquire(int64(cur))
if status {
s.log.Infof("%s semaphore resized from %d to %d", s.name, cur, n)
s.semaphore = semaphore
s.limit = n
}
return status
}
func (s *PrioritySemaphore) release(key string) bool {
s.lock.Lock()
defer s.lock.Unlock()
if _, ok := s.lockHolder[key]; ok {
delete(s.lockHolder, key)
// When semaphore resized downward
// Remove the excess holders from map once the done.
if len(s.lockHolder) >= s.limit {
return true
}
s.semaphore.Release(1)
availableLocks := s.limit - len(s.lockHolder)
s.log.Infof("Lock has been released by %s. Available locks: %d", key, availableLocks)
if s.pending.Len() > 0 {
triggerCount := availableLocks
if s.pending.Len() < triggerCount {
triggerCount = s.pending.Len()
}
for idx := 0; idx < triggerCount; idx++ {
item := s.pending.items[idx]
keyStr := fmt.Sprint(item.key)
items := strings.Split(keyStr, "/")
workflowKey := keyStr
if len(items) == 3 {
workflowKey = fmt.Sprintf("%s/%s", items[0], items[1])
}
s.log.Debugf("Enqueue the workflow %s", workflowKey)
s.nextWorkflow(workflowKey)
}
}
}
return true
}
// addToQueue adds the holderkey into priority queue that maintains the priority order to acquire the lock.
func (s *PrioritySemaphore) addToQueue(holderKey string, priority int32, creationTime time.Time) {
s.lock.Lock()
defer s.lock.Unlock()
if _, ok := s.lockHolder[holderKey]; ok {
s.log.Debugf("Lock is already acquired by %s", holderKey)
return
}
s.pending.add(holderKey, priority, creationTime)
s.log.Debugf("Added into queue: %s", holderKey)
}
func (s *PrioritySemaphore) removeFromQueue(holderKey string) {
s.lock.Lock()
defer s.lock.Unlock()
s.pending.remove(holderKey)
s.log.Debugf("Removed from queue: %s", holderKey)
}
func (s *PrioritySemaphore) acquire(holderKey string) bool {
if s.semaphore.TryAcquire(1) {
s.lockHolder[holderKey] = true
return true
}
return false
}
func isSameWorkflowNodeKeys(firstKey, secondKey string) bool {
firstItems := strings.Split(firstKey, "/")
secondItems := strings.Split(secondKey, "/")
if len(firstItems) != len(secondItems) {
return false
}
// compare workflow name
return firstItems[1] == secondItems[1]
}
func (s *PrioritySemaphore) tryAcquire(holderKey string) (bool, string) {
s.lock.Lock()
defer s.lock.Unlock()
if _, ok := s.lockHolder[holderKey]; ok {
s.log.Debugf("%s is already holding a lock", holderKey)
return true, ""
}
var nextKey string
waitingMsg := fmt.Sprintf("Waiting for %s lock. Lock status: %d/%d ", s.name, s.limit-len(s.lockHolder), s.limit)
// Check whether requested holdkey is in front of priority queue.
// If it is in front position, it will allow to acquire lock.
// If it is not a front key, it needs to wait for its turn.
if s.pending.Len() > 0 {
item := s.pending.peek()
nextKey = fmt.Sprintf("%v", item.key)
if holderKey != nextKey && !isSameWorkflowNodeKeys(holderKey, nextKey) {
// Enqueue the front workflow if lock is available
if len(s.lockHolder) < s.limit {
s.nextWorkflow(nextKey)
}
return false, waitingMsg
}
}
if s.acquire(holderKey) {
s.pending.pop()
s.log.Infof("%s acquired by %s ", s.name, nextKey)
return true, ""
}
s.log.Debugf("Current semaphore Holders. %v", s.lockHolder)
return false, waitingMsg
}