/
channel.go
69 lines (60 loc) · 1.68 KB
/
channel.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package core
import (
"context"
"github.com/libp2p/go-libp2p-core/peer"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/sirupsen/logrus"
)
const PENDING_TX_TOPIC = "PENDING_TX_TOPIC"
const NEW_BLOCKS_TOPIC = "NEW_BLOCKS_TOPIC"
type MessageTransport struct {
Data []byte
}
// PubSubWrapper represents a subscription to a single PubSub topic. Messages
// can be published to the topic with PubSubWrapper.Publish, and received
// messages are pushed to the Messages channel.
// TODO this name is bad... but works for now..
type PubSubWrapper struct {
// A channel of signed transactions to send new pending transactions to peers
Data chan MessageTransport
TopicName string
Context context.Context
PubSub *pubsub.PubSub
Topic *pubsub.Topic
Sub *pubsub.Subscription
Self peer.ID
}
// Publish sends a message to the pubsub topic.
func (cr *PubSubWrapper) Publish(ctx context.Context, msgChan chan MessageTransport) error {
// publish loop
for {
select {
case m := <-msgChan:
err := cr.Topic.Publish(ctx, m.Data)
if err != nil {
return err
}
}
}
}
func (cr *PubSubWrapper) ListPeers(ps *pubsub.PubSub) []peer.ID {
return ps.ListPeers(cr.TopicName)
}
type MessageHandler func(data *pubsub.Message)
type PublishHandler func(msg []byte)
// readLoop pulls messages from the pubsub topic and pushes them onto the Messages channel.
func (cr *PubSubWrapper) ReadLoop(ctx context.Context, onMessage MessageHandler) {
for {
msg, err := cr.Sub.Next(ctx)
if err != nil {
logrus.Errorln(err)
close(cr.Data)
return
}
// only forward messages delivered by others
// if msg.ReceivedFrom == cr.Self {
// continue
// }
onMessage(msg)
}
}