-
Notifications
You must be signed in to change notification settings - Fork 47
/
sendone.go
63 lines (52 loc) · 1.78 KB
/
sendone.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT License was not distributed with this
// file, you can obtain one at https://opensource.org/licenses/MIT.
//
// Copyright (c) DUSK NETWORK. All rights reserved.
package writer
import (
"context"
"errors"
"github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/message"
"github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/protocol"
"github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/topics"
"github.com/dusk-network/dusk-blockchain/pkg/util/container/ring"
"github.com/dusk-network/dusk-blockchain/pkg/util/nativeutils/eventbus"
"github.com/dusk-network/dusk-protobuf/autogen/go/rusk"
)
// SendToOne collects topics.KadcastSendToOne event to distribute a single
// message to a specified node via rusk.NetworkClient Send call.
type SendToOne struct {
Base
}
// NewSendToOne ...
func NewSendToOne(ctx context.Context, s eventbus.Subscriber, g *protocol.Gossip, rusk rusk.NetworkClient) ring.Writer {
w := &SendToOne{
Base: Base{
subscriber: s,
gossip: g,
client: rusk,
ctx: ctx,
topic: topics.KadcastSendToOne,
},
}
w.Subscribe()
return w
}
// Subscribe subscribes to eventbus Kadcast messages.
func (w *SendToOne) Subscribe() {
w.subscriptionID = w.subscriber.Subscribe(w.topic, eventbus.NewStreamListener(w))
}
// Write implements. ring.Writer.
func (w *SendToOne) Write(data []byte, metadata *message.Metadata, priority byte) (int, error) {
if err := w.sendToOne(data, metadata, priority); err != nil {
log.WithError(err).Warn("write failed")
}
return 0, nil
}
func (w *SendToOne) sendToOne(data []byte, metadata *message.Metadata, _ byte) error {
if metadata == nil {
return errors.New("empty message metadata")
}
return w.Send(data, metadata.Source)
}