/
broadcaster.go
106 lines (91 loc) · 2.37 KB
/
broadcaster.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
package api
import (
"encoding/json"
"sync"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v4/async/event"
"go.uber.org/zap"
)
// Broadcaster is an interface broadcasting stream message across all available connections
type Broadcaster interface {
FromFeed(logger *zap.Logger, feed *event.Feed) error
Broadcast(msg Message) error
Register(conn broadcasted) bool
Deregister(conn broadcasted) bool
}
type broadcasted interface {
ID() string
Send([]byte)
}
type broadcaster struct {
mut sync.Mutex
connections map[string]broadcasted
}
func newBroadcaster() Broadcaster {
return &broadcaster{
mut: sync.Mutex{},
connections: map[string]broadcasted{},
}
}
// FromFeed subscribes to the given feed and broadcasts incoming messages
func (b *broadcaster) FromFeed(logger *zap.Logger, msgFeed *event.Feed) error {
cn := make(chan Message, 512)
sub := msgFeed.Subscribe(cn)
defer sub.Unsubscribe()
defer logger.Debug("done reading from feed")
for {
select {
case msg := <-cn:
go func(msg Message) {
if err := b.Broadcast(msg); err != nil {
logger.Error("could not broadcast message", zap.Error(err))
}
}(msg)
case err := <-sub.Err():
logger.Warn("could not read messages from msgFeed", zap.Error(err))
return err
}
}
}
// Broadcast broadcasts a message to all available connections
func (b *broadcaster) Broadcast(msg Message) error {
data, err := json.Marshal(&msg)
if err != nil {
return errors.Wrap(err, "could not marshal msg")
}
// lock is applied only when reading from the connections map
// therefore a new temp slice is created to hold all current connections and avoid concurrency issues
b.mut.Lock()
var conns []broadcasted
for _, c := range b.connections {
conns = append(conns, c)
}
b.mut.Unlock()
// send to all connections
for _, c := range conns {
c.Send(data)
}
return nil
}
// Register registers a connection for broadcasting
func (b *broadcaster) Register(conn broadcasted) bool {
b.mut.Lock()
defer b.mut.Unlock()
id := conn.ID()
if _, ok := b.connections[id]; !ok {
b.connections[id] = conn
return true
}
return false
}
// Deregister de-registers a connection for broadcasting
func (b *broadcaster) Deregister(conn broadcasted) bool {
b.mut.Lock()
defer b.mut.Unlock()
id := conn.ID()
if _, ok := b.connections[id]; ok {
delete(b.connections, id)
return true
}
return false
}