-
Notifications
You must be signed in to change notification settings - Fork 405
/
wait_utils.go
144 lines (116 loc) · 3.02 KB
/
wait_utils.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package common
import (
"fmt"
"math"
"sync"
)
/*
WaitGroup is used to manage and wait for a collection of
sessions. It is similar to sync.WaitGroup, but
AddSession/DoneSession/CloseGroup session is not only thread
safe but can be executed in any order unlike sync.WaitGroup.
Once a shutdown is initiated via CloseGroup(), add/done
operations will still function correctly, where
AddSession would return false. In this state,
CloseGroup() blocks until sessions get drained
via DoneSession() operations.
It is callers responsibility to make sure AddSessions
and DoneSessions math adds up to >= 0. In other words,
calling more DoneSessions() than sum of AddSessions()
will cause panic.
Example usage:
group := NewWaitGroup()
for item := range(items) {
go func(item string) {
if !group.AddSession(1) {
// group may be closing or full
return
}
defer group.DoneSession()
// do stuff
}(item)
}
// close the group and wait for active item.
group.CloseGroup()
*/
type WaitGroup struct {
cond *sync.Cond
closer chan struct{}
isClosed bool
sessions uint64
}
func NewWaitGroup() *WaitGroup {
return &WaitGroup{
cond: sync.NewCond(new(sync.Mutex)),
closer: make(chan struct{}),
}
}
// Closer returns a channel that is closed if
// WaitGroup is in closing state
func (r *WaitGroup) Closer() chan struct{} {
return r.closer
}
// AddSession manipulates the session counter by
// adding the delta value. Incrementing
// the session counter is not possible and will set
// return value to false if a close was initiated.
func (r *WaitGroup) AddSession(delta uint64) bool {
r.cond.L.Lock()
defer r.cond.L.Unlock()
// we cannot add if we are being shutdown
if r.isClosed {
return false
}
// we have maxed out
if r.sessions == math.MaxUint64-delta {
return false
}
r.sessions += delta
return true
}
// DoneSession decrements 1 from accumulated
// sessions and wakes up listeners when this reaches
// zero.
func (r *WaitGroup) DoneSession() {
r.cond.L.Lock()
defer r.cond.L.Unlock()
// illegal operation, it's callers responsibility
// to make sure subtraction and addition math is correct.
if r.sessions == 0 {
panic(fmt.Sprintf("common.WaitGroup misuse isClosed=%v", r.isClosed))
}
r.sessions -= 1
if r.sessions == 0 {
r.cond.Broadcast()
}
}
// CloseGroup initiates a close and blocks until
// session counter becomes zero.
func (r *WaitGroup) CloseGroup() {
r.cond.L.Lock()
if !r.isClosed {
r.isClosed = true
close(r.closer)
}
for r.sessions != 0 {
r.cond.Wait()
}
r.cond.L.Unlock()
}
// CloseGroupNB is non-blocking version of CloseGroup
// which returns a channel that can be waited on.
func (r *WaitGroup) CloseGroupNB() chan struct{} {
// set to closing state immediately
r.cond.L.Lock()
if !r.isClosed {
r.isClosed = true
close(r.closer)
}
r.cond.L.Unlock()
closer := make(chan struct{})
go func() {
defer close(closer)
r.CloseGroup()
}()
return closer
}