-
-
Notifications
You must be signed in to change notification settings - Fork 1.4k
/
chan_struct.go
executable file
·84 lines (77 loc) · 2.16 KB
/
chan_struct.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package async
// UnboundedStructChan is a channel with an unbounded buffer for caching
// struct{} objects. This implementation is a specialized version that
// optimizes for struct{} objects than other types. A channel must be
// closed via Close method.
type UnboundedStructChan struct {
in, out, close chan struct{}
n uint64
}
// NewUnboundedStructChan returns a unbounded channel with unlimited capacity.
func NewUnboundedStructChan() *UnboundedStructChan {
ch := &UnboundedStructChan{
// The size of Struct is less than 16 bytes, we use 16 to fit
// a CPU cache line (L2, 256 Bytes), which may reduce cache misses.
in: make(chan struct{}, 16),
out: make(chan struct{}, 16),
close: make(chan struct{}),
}
go ch.processing()
return ch
}
// In returns a send-only channel that can be used to send values
// to the channel.
func (ch *UnboundedStructChan) In() chan<- struct{} { return ch.in }
// Out returns a receive-only channel that can be used to receive
// values from the channel.
func (ch *UnboundedStructChan) Out() <-chan struct{} { return ch.out }
// Close closes the channel.
func (ch *UnboundedStructChan) Close() { ch.close <- struct{}{} }
func (ch *UnboundedStructChan) processing() {
for {
select {
case _, ok := <-ch.in:
if !ok {
// We don't want the input channel be accidentally closed
// via close() instead of Close(). If that happens, it is
// a misuse, do a panic as warning.
panic("async: misuse of unbounded channel, In() was closed")
}
ch.n++
case <-ch.close:
ch.closed()
return
}
for ch.n > 0 {
select {
case ch.out <- struct{}{}:
ch.n--
case _, ok := <-ch.in:
if !ok {
// We don't want the input channel be accidentally closed
// via close() instead of Close(). If that happens, it is
// a misuse, do a panic as warning.
panic("async: misuse of unbounded channel, In() was closed")
}
ch.n++
case <-ch.close:
ch.closed()
return
}
}
}
}
func (ch *UnboundedStructChan) closed() {
close(ch.in)
for range ch.in {
ch.n++
}
for ; ch.n > 0; ch.n-- {
select {
case ch.out <- struct{}{}:
default:
}
}
close(ch.out)
close(ch.close)
}