-
Notifications
You must be signed in to change notification settings - Fork 13
/
broadcaster.go
139 lines (119 loc) · 3.34 KB
/
broadcaster.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
// Copyright (c) 2014-2017 Bitmark Inc.
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package peer
import (
"github.com/bitmark-inc/bitmarkd/fault"
"github.com/bitmark-inc/bitmarkd/messagebus"
"github.com/bitmark-inc/bitmarkd/util"
"github.com/bitmark-inc/bitmarkd/zmqutil"
"github.com/bitmark-inc/logger"
zmq "github.com/pebbe/zmq4"
"time"
)
const (
broadcasterZapDomain = "broadcaster"
heartbeatInterval = 60 * time.Second
heartbeatTimeout = 2 * heartbeatInterval
)
type broadcaster struct {
log *logger.L
socket4 *zmq.Socket
socket6 *zmq.Socket
}
// initialise the broadcaster
func (brdc *broadcaster) initialise(privateKey []byte, publicKey []byte, broadcast []string) error {
log := logger.New("broadcaster")
if nil == log {
return fault.ErrInvalidLoggerChannel
}
brdc.log = log
log.Info("initialising…")
c, err := util.NewConnections(broadcast)
if nil != err {
log.Errorf("ip and port error: %v", err)
return err
}
// allocate IPv4 and IPv6 sockets
brdc.socket4, brdc.socket6, err = zmqutil.NewBind(log, zmq.PUB, broadcasterZapDomain, privateKey, publicKey, c)
if nil != err {
log.Errorf("bind error: %v", err)
return err
}
return nil
}
// broadcasting main loop
func (brdc *broadcaster) Run(args interface{}, shutdown <-chan struct{}) {
log := brdc.log
log.Info("starting…")
queue := messagebus.Bus.Broadcast.Chan()
loop:
for {
log.Info("waiting…")
select {
case <-shutdown:
break loop
case item := <-queue:
log.Infof("sending: %s data: %x", item.Command, item.Parameters)
if nil == brdc.socket4 && nil == brdc.socket6 {
log.Error("no IPv4 or IPv6 socket for broadcast")
}
if err := brdc.process(brdc.socket4, &item); nil != err {
log.Criticalf("IPv4 error: %s", err)
fault.Panicf("broadcaster: IPv4 error: %s", err)
}
if err := brdc.process(brdc.socket6, &item); nil != err {
log.Criticalf("IPv6 error: %s", err)
fault.Panicf("broadcaster: IPv6 error: %s", err)
}
case <-time.After(heartbeatInterval):
// this will only occur if so data was sent during the interval
beat := &messagebus.Message{
Command: "heart",
Parameters: [][]byte{[]byte("beat")},
}
log.Info("send heartbeat")
if nil == brdc.socket4 && nil == brdc.socket6 {
log.Error("no IPv4 or IPv6 socket for heartbeat")
}
if err := brdc.process(brdc.socket4, beat); nil != err {
log.Criticalf("IPv4 error: %s", err)
fault.Panicf("broadcaster: IPv4 error: %s", err)
}
if err := brdc.process(brdc.socket6, beat); nil != err {
log.Criticalf("IPv6 error: %s", err)
fault.Panicf("broadcaster: IPv6 error: %s", err)
}
}
}
log.Info("shutting down…")
if nil != brdc.socket4 {
brdc.socket4.Close()
}
if nil != brdc.socket6 {
brdc.socket6.Close()
}
log.Info("stopped")
}
// process some items into a block and publish it
func (brdc *broadcaster) process(socket *zmq.Socket, item *messagebus.Message) error {
if nil == socket {
return nil
}
_, err := socket.Send(item.Command, zmq.SNDMORE|zmq.DONTWAIT)
if nil != err {
return err
}
last := len(item.Parameters) - 1
for i, p := range item.Parameters {
if i == last {
_, err = socket.SendBytes(p, 0|zmq.DONTWAIT)
} else {
_, err = socket.SendBytes(p, zmq.SNDMORE|zmq.DONTWAIT)
}
if nil != err {
return err
}
}
return nil
}