-
Notifications
You must be signed in to change notification settings - Fork 1
/
quit.go
186 lines (170 loc) · 4.01 KB
/
quit.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
package qu
import (
"strings"
"sync"
"time"
"github.com/cybriq/p9/pkg/log"
"go.uber.org/atomic"
)
// C is your basic empty struct signalling channel
type C chan struct{}
var (
createdList []string
createdChannels []C
mx sync.Mutex
logEnabled = atomic.NewBool(false)
)
// SetLogging switches on and off the channel logging
func SetLogging(on bool) {
logEnabled.Store(on)
}
func l(a ...interface{}) {
if logEnabled.Load() {
D.Ln(a...)
}
}
// T creates an unbuffered chan struct{} for trigger and quit signalling (momentary and breaker switches)
func T() C {
mx.Lock()
defer mx.Unlock()
msg := log.Caller("chan from", 1)
l("created", msg)
createdList = append(createdList, msg)
o := make(C)
createdChannels = append(createdChannels, o)
return o
}
// Ts creates a buffered chan struct{} which is specifically intended for signalling without blocking, generally one is
// the size of buffer to be used, though there might be conceivable cases where the channel should accept more signals
// without blocking the caller
func Ts(n int) C {
mx.Lock()
defer mx.Unlock()
msg := log.Caller("buffered chan from", 1)
l("created", msg)
createdList = append(createdList, msg)
o := make(C, n)
createdChannels = append(createdChannels, o)
return o
}
// Q closes the channel, which makes it emit a nil every time it is selected
func (c C) Q() {
l(func() (o string) {
loc := getLocForChan(c)
mx.Lock()
defer mx.Unlock()
if !testChanIsClosed(c) {
close(c)
return "closing chan from " + loc + log.Caller("\n"+strings.Repeat(" ",
48,
)+"from", 1,
)
} else {
return "from" + log.Caller("", 1) + "\n" + strings.Repeat(" ", 48) +
"channel " + loc + " was already closed"
}
}(),
)
}
// Signal sends struct{}{} on the channel which functions as a momentary switch, useful in pairs for stop/start
func (c C) Signal() {
l(func() (o string) { return "signalling " + getLocForChan(c) }())
c <- struct{}{}
}
// Wait should be placed with a `<-` in a select case in addition to the channel variable name
func (c C) Wait() <-chan struct{} {
l(func() (o string) {
return "waiting on " + getLocForChan(c) + log.Caller("at",
1,
)
}(),
)
return c
}
// testChanIsClosed allows you to see whether the channel has been closed so you can avoid a panic by trying to close or
// signal on it
func testChanIsClosed(ch C) (o bool) {
if ch == nil {
return true
}
select {
case <-ch:
o = true
default:
}
return
}
// getLocForChan finds which record connects to the channel in question
func getLocForChan(c C) (s string) {
s = "not found"
mx.Lock()
for i := range createdList {
if i >= len(createdChannels) {
break
}
if createdChannels[i] == c {
s = createdList[i]
}
}
mx.Unlock()
return
}
// once a minute clean up the channel cache to remove closed channels no longer in use
func init() {
go func() {
for {
<-time.After(time.Minute)
D.Ln("cleaning up closed channels")
var c []C
var ll []string
mx.Lock()
for i := range createdChannels {
if i >= len(createdList) {
break
}
if testChanIsClosed(createdChannels[i]) {
} else {
c = append(c, createdChannels[i])
ll = append(ll, createdList[i])
}
}
createdChannels = c
createdList = ll
mx.Unlock()
}
}()
}
// PrintChanState creates an output showing the current state of the channels being monitored
// This is a function for use by the programmer while debugging
func PrintChanState() {
mx.Lock()
for i := range createdChannels {
if i >= len(createdList) {
break
}
if testChanIsClosed(createdChannels[i]) {
_T.Ln(">>> closed", createdList[i])
} else {
_T.Ln("<<< open", createdList[i])
}
}
mx.Unlock()
}
// GetOpenChanCount returns the number of qu channels that are still open
// todo: this needs to only apply to unbuffered type
func GetOpenChanCount() (o int) {
mx.Lock()
var c int
for i := range createdChannels {
if i >= len(createdChannels) {
break
}
if testChanIsClosed(createdChannels[i]) {
c++
} else {
o++
}
}
mx.Unlock()
return
}