This repository has been archived by the owner on Mar 16, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
broadcaster.go
122 lines (111 loc) · 2 KB
/
broadcaster.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package broadcaster
import (
"context"
"sync"
)
type Broadcaster[T any] struct {
lock sync.Mutex
consumers map[*Subscription[T]]struct{}
done chan struct{}
C chan T
closed bool
}
func New[T any]() *Broadcaster[T] {
return &Broadcaster[T]{
consumers: map[*Subscription[T]]struct{}{},
done: make(chan struct{}),
C: make(chan T),
}
}
func (b *Broadcaster[T]) Start(ctx context.Context) {
defer close(b.done)
for {
select {
case <-ctx.Done():
b.Close()
return
case i, ok := <-b.C:
if !ok {
b.lock.Lock()
for sub := range b.consumers {
sub.close(false)
}
b.lock.Unlock()
return
}
b.lock.Lock()
for sub := range b.consumers {
sub.C <- i
}
b.lock.Unlock()
}
}
}
func (b *Broadcaster[T]) Shutdown() {
b.lock.Lock()
if b.closed {
b.lock.Unlock()
return
}
b.closed = true
close(b.C)
b.lock.Unlock()
<-b.done
}
func (b *Broadcaster[T]) Close() {
b.lock.Lock()
defer b.lock.Unlock()
if b.closed {
return
}
for sub := range b.consumers {
sub.close(false)
}
b.closed = true
close(b.C)
}
func (b *Broadcaster[T]) Subscribe() *Subscription[T] {
b.lock.Lock()
defer b.lock.Unlock()
c := make(chan T, 1)
if b.closed {
close(c)
return &Subscription[T]{
C: c,
broadcaster: b,
closed: true,
}
}
sub := &Subscription[T]{
C: c,
broadcaster: b,
}
b.consumers[sub] = struct{}{}
return sub
}
type Subscription[T any] struct {
C chan T
broadcaster *Broadcaster[T]
closed bool
}
func (s *Subscription[T]) Close() {
s.close(true)
}
func (s *Subscription[T]) close(lock bool) {
if lock {
go func() {
// empty the channel to ensure that the broadcaster is not blocking on writing to this subscription while
// we wait for the broadcaster to release the lock
for range s.C {
}
}()
s.broadcaster.lock.Lock()
defer s.broadcaster.lock.Unlock()
}
if s.closed {
return
}
delete(s.broadcaster.consumers, s)
close(s.C)
s.closed = true
}