-
Notifications
You must be signed in to change notification settings - Fork 47
/
sendmany.go
78 lines (64 loc) · 2.13 KB
/
sendmany.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT License was not distributed with this
// file, you can obtain one at https://opensource.org/licenses/MIT.
//
// Copyright (c) DUSK NETWORK. All rights reserved.
package writer
import (
"context"
"errors"
"github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/message"
"github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/protocol"
"github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/topics"
"github.com/dusk-network/dusk-blockchain/pkg/util/container/ring"
"github.com/dusk-network/dusk-blockchain/pkg/util/nativeutils/eventbus"
"github.com/dusk-network/dusk-protobuf/autogen/go/rusk"
)
// SendToMany collects topics.KadcastSendToMany event to distribute a single
// message to multiple nodes via rusk.NetworkClient Send call.
type SendToMany struct {
Base
}
// NewSendToMany ...
func NewSendToMany(ctx context.Context, s eventbus.Subscriber, g *protocol.Gossip, rusk rusk.NetworkClient) ring.Writer {
w := &SendToMany{
Base: Base{
subscriber: s,
gossip: g,
client: rusk,
ctx: ctx,
topic: topics.KadcastSendToMany,
},
}
w.Subscribe()
return w
}
// Subscribe subscribes to eventbus Kadcast messages.
func (w *SendToMany) Subscribe() {
// KadcastPoint subs
w.subscriptionID = w.subscriber.Subscribe(w.topic, eventbus.NewStreamListener(w))
}
// Write ...
func (w *SendToMany) Write(data []byte, metadata *message.Metadata, priority byte) (int, error) {
if err := w.sendToMany(data, metadata, priority); err != nil {
log.WithError(err).Warn("write failed")
}
return 0, nil
}
// sendToMany sends a message to N random endpoints returned by AliveNodes.
func (w *SendToMany) sendToMany(data []byte, metadata *message.Metadata, _ byte) error {
if metadata == nil {
return errors.New("empty message metadata")
}
// get N active nodes
req := &rusk.AliveNodesRequest{MaxNodes: uint32(metadata.NumNodes)}
resp, err := w.client.AliveNodes(w.ctx, req)
if err != nil {
log.WithError(err).Warn("get alive nodes failed")
return err
}
for _, addr := range resp.Address {
_ = w.Send(data, addr)
}
return nil
}