-
Notifications
You must be signed in to change notification settings - Fork 16
/
subscriber.go
168 lines (145 loc) · 4.57 KB
/
subscriber.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
package p2p
import (
"context"
"errors"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/celestiaorg/go-header"
)
// SubscriberOption is a functional option for the Subscriber.
type SubscriberOption func(*SubscriberParams)
// SubscriberParams defines the parameters for the Subscriber
// configurable with SubscriberOption.
type SubscriberParams struct {
networkID string
metrics bool
}
// Subscriber manages the lifecycle and relationship of header Module
// with the "header-sub" gossipsub topic.
type Subscriber[H header.Header[H]] struct {
pubsubTopicID string
metrics *subscriberMetrics
pubsub *pubsub.PubSub
topic *pubsub.Topic
msgID pubsub.MsgIdFunction
}
// WithSubscriberMetrics enables metrics collection for the Subscriber.
func WithSubscriberMetrics() SubscriberOption {
return func(params *SubscriberParams) {
params.metrics = true
}
}
// WithSubscriberNetworkID sets the network ID for the Subscriber.
func WithSubscriberNetworkID(networkID string) SubscriberOption {
return func(params *SubscriberParams) {
params.networkID = networkID
}
}
// NewSubscriber returns a Subscriber that manages the header Module's
// relationship with the "header-sub" gossipsub topic.
func NewSubscriber[H header.Header[H]](
ps *pubsub.PubSub,
msgID pubsub.MsgIdFunction,
opts ...SubscriberOption,
) (*Subscriber[H], error) {
var params SubscriberParams
for _, opt := range opts {
opt(¶ms)
}
var metrics *subscriberMetrics
if params.metrics {
var err error
metrics, err = newSubscriberMetrics()
if err != nil {
return nil, err
}
}
return &Subscriber[H]{
metrics: metrics,
pubsubTopicID: PubsubTopicID(params.networkID),
pubsub: ps,
msgID: msgID,
}, nil
}
// Start starts the Subscriber and joins the instance's topic. SetVerifier must
// be called separately to ensure a validator is mounted on the topic.
func (s *Subscriber[H]) Start(context.Context) (err error) {
log.Infow("joining topic", "topic ID", s.pubsubTopicID)
s.topic, err = s.pubsub.Join(s.pubsubTopicID, pubsub.WithTopicMessageIdFn(s.msgID))
return err
}
// Stop closes the topic and unregisters its validator.
func (s *Subscriber[H]) Stop(context.Context) error {
regErr := s.pubsub.UnregisterTopicValidator(s.pubsubTopicID)
if regErr != nil {
// do not return this error as it is non-critical and usually
// means that a validator was not mounted.
log.Warnf("unregistering validator: %s", regErr)
}
err := s.topic.Close()
return errors.Join(err, s.metrics.Close())
}
// SetVerifier set given verification func as Header PubSub topic validator
// Does not punish peers if *header.VerifyError is given with Uncertain set to true.
func (s *Subscriber[H]) SetVerifier(val func(context.Context, H) error) error {
pval := func(ctx context.Context, p peer.ID, msg *pubsub.Message) (res pubsub.ValidationResult) {
defer func() {
err := recover()
if err != nil {
log.Errorf("PANIC while unmarshalling or verifying header: %s", err)
res = pubsub.ValidationReject
}
}()
hdr := header.New[H]()
err := hdr.UnmarshalBinary(msg.Data)
if err != nil {
log.Errorw("unmarshalling header",
"from", p.ShortString(),
"err", err)
s.metrics.reject(ctx)
return pubsub.ValidationReject
}
// ensure header validity
err = hdr.Validate()
if err != nil {
log.Errorw("invalid header",
"from", p.ShortString(),
"err", err)
s.metrics.reject(ctx)
return pubsub.ValidationReject
}
var verErr *header.VerifyError
err = val(ctx, hdr)
switch {
case errors.As(err, &verErr) && verErr.SoftFailure:
s.metrics.ignore(ctx)
return pubsub.ValidationIgnore
case err != nil:
s.metrics.reject(ctx)
return pubsub.ValidationReject
default:
}
// keep the valid header in the msg so Subscriptions can access it without
// additional unmarshalling
msg.ValidatorData = hdr
s.metrics.accept(ctx, len(msg.Data))
return pubsub.ValidationAccept
}
return s.pubsub.RegisterTopicValidator(s.pubsubTopicID, pval)
}
// Subscribe returns a new subscription to the Subscriber's
// topic.
func (s *Subscriber[H]) Subscribe() (header.Subscription[H], error) {
if s.topic == nil {
return nil, errors.New("header topic is not instantiated, service must be started before subscribing")
}
return newSubscription[H](s.topic, s.metrics)
}
// Broadcast broadcasts the given Header to the topic.
func (s *Subscriber[H]) Broadcast(ctx context.Context, header H, opts ...pubsub.PubOpt) error {
bin, err := header.MarshalBinary()
if err != nil {
return err
}
return s.topic.Publish(ctx, bin, opts...)
}