-
Notifications
You must be signed in to change notification settings - Fork 2
/
constrainedwaitgroup.go
127 lines (107 loc) · 2.27 KB
/
constrainedwaitgroup.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package lib
import (
"context"
"sync"
"sync/atomic"
)
type ConstrainedWorkGroup struct {
wg sync.WaitGroup
sem chan struct{}
cond *sync.Cond
}
func NewConstrainedWorkGroup(maxConcurrency int) *ConstrainedWorkGroup {
return &ConstrainedWorkGroup{
sem: make(chan struct{}, maxConcurrency),
cond: sync.NewCond(&sync.Mutex{}),
}
}
func (cw *ConstrainedWorkGroup) Wait() {
cw.wg.Add(1)
cw.sem <- struct{}{}
}
func (cw *ConstrainedWorkGroup) Done() {
cw.cond.L.Lock()
defer cw.cond.L.Unlock()
cw.wg.Done()
<-cw.sem
cw.cond.Signal()
}
func (cw *ConstrainedWorkGroup) WaitAllDone() {
cw.wg.Wait()
}
func (cw *ConstrainedWorkGroup) RunningCount() int {
return len(cw.sem)
}
func (cw *ConstrainedWorkGroup) Max() int {
return cap(cw.sem)
}
func (cw *ConstrainedWorkGroup) WaitWithContext(ctx context.Context) bool {
select {
case <-ctx.Done():
return false
case cw.sem <- struct{}{}:
cw.wg.Add(1)
return true
}
}
func (cw *ConstrainedWorkGroup) RemainingCapacity() int {
return cap(cw.sem) - len(cw.sem)
}
func (cw *ConstrainedWorkGroup) NewSubWorker() ConcurrencyManager {
return &SubWorker{
cw: cw,
cond: sync.NewCond(&sync.Mutex{}),
}
}
func (cw *ConstrainedWorkGroup) WaitForADone() bool {
cw.cond.L.Lock()
defer cw.cond.L.Unlock()
if cw.RunningCount() == 0 {
return false
}
cw.cond.Wait()
return true
}
type SubWorker struct {
cw *ConstrainedWorkGroup
wg sync.WaitGroup
runningCount int32
cond *sync.Cond
}
func (sw *SubWorker) Wait() {
sw.cw.Wait()
sw.wg.Add(1)
atomic.AddInt32(&sw.runningCount, 1)
}
func (sw *SubWorker) WaitWithContext(ctx context.Context) bool {
if sw.cw.WaitWithContext(ctx) {
sw.wg.Add(1)
atomic.AddInt32(&sw.runningCount, 1)
return true
}
return false
}
func (sw *SubWorker) Done() {
sw.cond.L.Lock()
defer sw.cond.L.Unlock()
atomic.AddInt32(&sw.runningCount, -1)
sw.wg.Done()
sw.cw.Done()
sw.cond.Signal()
}
func (sw *SubWorker) WaitAllDone() {
sw.wg.Wait()
}
// WaitForADone Blocks until at least one goroutine has completed.
func (sw *SubWorker) WaitForADone() bool {
sw.cond.L.Lock()
defer sw.cond.L.Unlock()
if sw.RunningCount() == 0 {
return false
}
sw.cond.Wait()
return true
}
func (sw *SubWorker) RunningCount() int {
return int(atomic.LoadInt32(&sw.runningCount))
}