-
Notifications
You must be signed in to change notification settings - Fork 4.4k
/
semaphore.go
106 lines (93 loc) · 2.78 KB
/
semaphore.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
// Package semaphore implements a simple semaphore that is based on
// golang.org/x/sync/semaphore but doesn't support weights. It's advantage over
// a simple buffered chan is that the capacity of the semaphore (i.e. the number
// of slots available) can be changed dynamically at runtime without waiting for
// all existing work to stop. This makes it easier to implement e.g. concurrency
// limits on certain operations that can be reconfigured at runtime.
package semaphore
import (
"container/list"
"context"
"sync"
)
// Dynamic implements a semaphore whose capacity can be changed dynamically at
// run time.
type Dynamic struct {
size int64
cur int64
waiters list.List
mu sync.Mutex
}
// NewDynamic returns a dynamic semaphore with the given initial capacity. Note
// that this is for convenience and to match golang.org/x/sync/semaphore however
// it's possible to use a zero-value semaphore provided SetSize is called before
// use.
func NewDynamic(n int64) *Dynamic {
return &Dynamic{
size: n,
}
}
// SetSize dynamically updates the number of available slots. If there are more
// than n slots currently acquired, no further acquires will succeed until
// sufficient have been released to take the total outstanding below n again.
func (s *Dynamic) SetSize(n int64) error {
s.mu.Lock()
defer s.mu.Unlock()
s.size = n
return nil
}
// Acquire attempts to acquire one "slot" in the semaphore, blocking only until
// ctx is Done. On success, returns nil. On failure, returns ctx.Err() and leaves
// the semaphore unchanged.
//
// If ctx is already done, Acquire may still succeed without blocking.
func (s *Dynamic) Acquire(ctx context.Context) error {
s.mu.Lock()
if s.cur < s.size {
s.cur++
s.mu.Unlock()
return nil
}
// Need to wait, add to waiter list
ready := make(chan struct{})
elem := s.waiters.PushBack(ready)
s.mu.Unlock()
select {
case <-ctx.Done():
err := ctx.Err()
s.mu.Lock()
select {
case <-ready:
// Acquired the semaphore after we were canceled. Rather than trying to
// fix up the queue, just pretend we didn't notice the cancellation.
err = nil
default:
s.waiters.Remove(elem)
}
s.mu.Unlock()
return err
case <-ready:
return nil
}
}
// Release releases the semaphore. It will panic if release is called on an
// empty semphore.
func (s *Dynamic) Release() {
s.mu.Lock()
defer s.mu.Unlock()
if s.cur < 1 {
panic("semaphore: bad release")
}
next := s.waiters.Front()
// If there are no waiters, just decrement and we're done
if next == nil {
s.cur--
return
}
// Need to yield our slot to the next waiter.
// Remove them from the list
s.waiters.Remove(next)
// And trigger it's chan before we release the lock
close(next.Value.(chan struct{}))
// Note we _don't_ decrement inflight since the slot was yielded directly.
}