-
Notifications
You must be signed in to change notification settings - Fork 13
/
broadcaster.go
147 lines (123 loc) · 3.48 KB
/
broadcaster.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
// SPDX-License-Identifier: ISC
// Copyright (c) 2014-2020 Bitmark Inc.
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package publish
import (
"time"
zmq "github.com/pebbe/zmq4"
"github.com/bitmark-inc/bitmarkd/messagebus"
"github.com/bitmark-inc/bitmarkd/mode"
"github.com/bitmark-inc/bitmarkd/util"
"github.com/bitmark-inc/bitmarkd/zmqutil"
"github.com/bitmark-inc/logger"
)
const (
broadcasterZapDomain = "broadcaster"
heartbeatInterval = 60 * time.Second
)
type broadcaster struct {
log *logger.L
chain string
socket4 *zmq.Socket
socket6 *zmq.Socket
}
// initialise the broadcaster
func (brdc *broadcaster) initialise(privateKey []byte, publicKey []byte, broadcast []string) error {
log := logger.New("broadcaster")
brdc.chain = mode.ChainName()
brdc.log = log
log.Info("initialising…")
c, err := util.NewConnections(broadcast)
if nil != err {
log.Errorf("ip and port error: %s", err)
return err
}
// allocate IPv4 and IPv6 sockets
brdc.socket4, brdc.socket6, err = zmqutil.NewBind(log, zmq.PUB, broadcasterZapDomain, privateKey, publicKey, c)
if nil != err {
log.Errorf("bind error: %s", err)
return err
}
return nil
}
// broadcasting main loop
func (brdc *broadcaster) Run(args interface{}, shutdown <-chan struct{}) {
log := brdc.log
log.Info("starting…")
// use default queue size
queue := messagebus.Bus.Broadcast.Chan(messagebus.Default)
loop:
for {
log.Debug("waiting…")
select {
case <-shutdown:
break loop
case item := <-queue:
log.Infof("sending: %s data: %x", item.Command, item.Parameters)
if nil == brdc.socket4 && nil == brdc.socket6 {
log.Error("no IPv4 or IPv6 socket for broadcast")
}
if err := brdc.process(brdc.socket4, &item); nil != err {
log.Criticalf("IPv4 error: %s", err)
logger.Panicf("broadcaster: IPv4 error: %s", err)
}
if err := brdc.process(brdc.socket6, &item); nil != err {
log.Criticalf("IPv6 error: %s", err)
logger.Panicf("broadcaster: IPv6 error: %s", err)
}
case <-time.After(heartbeatInterval): // timeout on queue empty
// this will only occur if so data was sent during the interval
beat := &messagebus.Message{
Command: "heart",
Parameters: [][]byte{[]byte("beat")},
}
log.Info("send heartbeat")
if nil == brdc.socket4 && nil == brdc.socket6 {
log.Error("no IPv4 or IPv6 socket for heartbeat")
}
if err := brdc.process(brdc.socket4, beat); nil != err {
log.Criticalf("IPv4 error: %s", err)
logger.Panicf("broadcaster: IPv4 error: %s", err)
}
if err := brdc.process(brdc.socket6, beat); nil != err {
log.Criticalf("IPv6 error: %s", err)
logger.Panicf("broadcaster: IPv6 error: %s", err)
}
}
}
log.Info("shutting down…")
if nil != brdc.socket4 {
brdc.socket4.Close()
}
if nil != brdc.socket6 {
brdc.socket6.Close()
}
log.Info("stopped")
}
// process some items into a block and publish it
func (brdc *broadcaster) process(socket *zmq.Socket, item *messagebus.Message) error {
if nil == socket {
return nil
}
_, err := socket.Send(brdc.chain, zmq.SNDMORE|zmq.DONTWAIT)
if nil != err {
return err
}
_, err = socket.Send(item.Command, zmq.SNDMORE|zmq.DONTWAIT)
if nil != err {
return err
}
last := len(item.Parameters) - 1
for i, p := range item.Parameters {
if i == last {
_, err = socket.SendBytes(p, 0|zmq.DONTWAIT)
} else {
_, err = socket.SendBytes(p, zmq.SNDMORE|zmq.DONTWAIT)
}
if nil != err {
return err
}
}
return nil
}