-
Notifications
You must be signed in to change notification settings - Fork 893
/
pubsub.go
136 lines (115 loc) · 3.8 KB
/
pubsub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package shrexsub
import (
"context"
"fmt"
logging "github.com/ipfs/go-log/v2"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/celestiaorg/celestia-node/share"
pb "github.com/celestiaorg/celestia-node/share/p2p/shrexsub/pb"
)
var log = logging.Logger("shrex-sub")
// pubsubTopic hardcodes the name of the EDS floodsub topic with the provided networkID.
func pubsubTopicID(networkID string) string {
return fmt.Sprintf("%s/eds-sub/v0.1.0", networkID)
}
// ValidatorFn is an injectable func and governs EDS notification msg validity.
// It receives the notification and sender peer and expects the validation result.
// ValidatorFn is allowed to be blocking for an indefinite time or until the context is canceled.
type ValidatorFn func(context.Context, peer.ID, Notification) pubsub.ValidationResult
// BroadcastFn aliases the function that broadcasts the DataHash.
type BroadcastFn func(context.Context, Notification) error
// Notification is the format of message sent by Broadcaster
type Notification struct {
DataHash share.DataHash
Height uint64
}
// PubSub manages receiving and propagating the EDS from/to the network
// over "eds-sub" subscription.
type PubSub struct {
pubSub *pubsub.PubSub
topic *pubsub.Topic
pubsubTopic string
cancelRelay pubsub.RelayCancelFunc
}
// NewPubSub creates a libp2p.PubSub wrapper.
func NewPubSub(ctx context.Context, h host.Host, networkID string) (*PubSub, error) {
pubsub, err := pubsub.NewFloodSub(ctx, h)
if err != nil {
return nil, err
}
return &PubSub{
pubSub: pubsub,
pubsubTopic: pubsubTopicID(networkID),
}, nil
}
// Start creates an instances of FloodSub and joins specified topic.
func (s *PubSub) Start(context.Context) error {
topic, err := s.pubSub.Join(s.pubsubTopic)
if err != nil {
return err
}
cancel, err := topic.Relay()
if err != nil {
return err
}
s.cancelRelay = cancel
s.topic = topic
return nil
}
// Stop completely stops the PubSub:
// * Unregisters all the added Validators
// * Closes the `ShrEx/Sub` topic
func (s *PubSub) Stop(context.Context) error {
s.cancelRelay()
err := s.pubSub.UnregisterTopicValidator(s.pubsubTopic)
if err != nil {
log.Warnw("unregistering topic", "err", err)
}
return s.topic.Close()
}
// AddValidator registers given ValidatorFn for EDS notifications.
// Any amount of Validators can be registered.
func (s *PubSub) AddValidator(v ValidatorFn) error {
return s.pubSub.RegisterTopicValidator(s.pubsubTopic, v.validate)
}
func (v ValidatorFn) validate(ctx context.Context, p peer.ID, msg *pubsub.Message) pubsub.ValidationResult {
var pbmsg pb.RecentEDSNotification
if err := pbmsg.Unmarshal(msg.Data); err != nil {
log.Debugw("validator: unmarshal error", "err", err)
return pubsub.ValidationReject
}
n := Notification{
DataHash: pbmsg.DataHash,
Height: pbmsg.Height,
}
if n.DataHash.IsEmptyRoot() {
// we don't send empty EDS data hashes, but If someone sent it to us - do hard reject
return pubsub.ValidationReject
}
return v(ctx, p, n)
}
// Subscribe provides a new Subscription for EDS notifications.
func (s *PubSub) Subscribe() (*Subscription, error) {
if s.topic == nil {
return nil, fmt.Errorf("shrex-sub: topic is not started")
}
return newSubscription(s.topic)
}
// Broadcast sends the EDS notification (DataHash) to every connected peer.
func (s *PubSub) Broadcast(ctx context.Context, notification Notification) error {
if notification.DataHash.IsEmptyRoot() {
// no need to broadcast datahash of an empty block EDS
return nil
}
msg := pb.RecentEDSNotification{
Height: notification.Height,
DataHash: notification.DataHash,
}
data, err := msg.Marshal()
if err != nil {
return fmt.Errorf("shrex-sub: marshal notification, %w", err)
}
return s.topic.Publish(ctx, data)
}