/
processor.go
329 lines (288 loc) · 10.3 KB
/
processor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
package processor
import (
"context"
"crypto/ecdsa"
"encoding/hex"
"fmt"
"time"
"github.com/certusone/wormhole/node/pkg/db"
"github.com/certusone/wormhole/node/pkg/governor"
ethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"go.uber.org/zap"
"github.com/certusone/wormhole/node/pkg/accountant"
"github.com/certusone/wormhole/node/pkg/common"
"github.com/certusone/wormhole/node/pkg/gwrelayer"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
"github.com/certusone/wormhole/node/pkg/supervisor"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
dto "github.com/prometheus/client_model/go"
)
var GovInterval = time.Minute
var CleanupInterval = time.Second * 30
type (
// Observation defines the interface for any events observed by the guardian.
Observation interface {
// GetEmitterChain returns the id of the chain where this event was observed.
GetEmitterChain() vaa.ChainID
// MessageID returns a human-readable emitter_chain/emitter_address/sequence tuple.
MessageID() string
// SigningDigest returns the hash of the hash signing body of the observation. This is used
// for signature generation and verification.
SigningDigest() ethcommon.Hash
// IsReliable returns whether this message is considered reliable meaning it can be reobserved.
IsReliable() bool
// IsReobservation returns whether this message is the result of a reobservation request.
IsReobservation() bool
// HandleQuorum finishes processing the observation once a quorum of signatures have
// been received for it.
HandleQuorum(sigs []*vaa.Signature, hash string, p *Processor)
}
// state represents the local view of a given observation
state struct {
// First time this digest was seen (possibly even before we observed it ourselves).
firstObserved time.Time
// A re-observation request shall not be sent before this time.
nextRetry time.Time
// Number of times we sent a re-observation request
retryCtr uint
// Copy of our observation.
ourObservation Observation
// Map of signatures seen by guardian. During guardian set updates, this may contain signatures belonging
// to either the old or new guardian set.
signatures map[ethcommon.Address][]byte
// Flag set after reaching quorum and submitting the VAA.
submitted bool
// Flag set by the cleanup service after the settlement timeout has expired and misses were counted.
settled bool
// Human-readable description of the VAA's source, used for metrics.
source string
// Copy of the bytes we submitted (ourObservation, but signed and serialized). Used for retransmissions.
ourMsg []byte
// The hash of the transaction in which the observation was made. Used for re-observation requests.
txHash []byte
// Copy of the guardian set valid at observation/injection time.
gs *common.GuardianSet
}
observationMap map[string]*state
// aggregationState represents the node's aggregation of guardian signatures.
aggregationState struct {
signatures observationMap
}
)
// LoggingID can be used to identify a state object in a log message. Note that it should not
// be used to uniquely identify an observation. It is only meant for logging purposes.
func (s *state) LoggingID() string {
if s.ourObservation != nil {
return s.ourObservation.MessageID()
}
return hex.EncodeToString(s.txHash)
}
type PythNetVaaEntry struct {
v *vaa.VAA
updateTime time.Time // Used for determining when to delete entries
}
type Processor struct {
// msgC is a channel of observed emitted messages
msgC <-chan *common.MessagePublication
// setC is a channel of guardian set updates
setC <-chan *common.GuardianSet
// gossipSendC is a channel of outbound messages to broadcast on p2p
gossipSendC chan<- []byte
// obsvC is a channel of inbound decoded observations from p2p
obsvC chan *common.MsgWithTimeStamp[gossipv1.SignedObservation]
// obsvReqSendC is a send-only channel of outbound re-observation requests to broadcast on p2p
obsvReqSendC chan<- *gossipv1.ObservationRequest
// signedInC is a channel of inbound signed VAA observations from p2p
signedInC <-chan *gossipv1.SignedVAAWithQuorum
// gk is the node's guardian private key
gk *ecdsa.PrivateKey
logger *zap.Logger
db *db.Database
// Runtime state
// gs is the currently valid guardian set
gs *common.GuardianSet
// gst is managed by the processor and allows concurrent access to the
// guardian set by other components.
gst *common.GuardianSetState
// state is the current runtime VAA view
state *aggregationState
// gk pk as eth address
ourAddr ethcommon.Address
governor *governor.ChainGovernor
acct *accountant.Accountant
acctReadC <-chan *common.MessagePublication
pythnetVaas map[string]PythNetVaaEntry
gatewayRelayer *gwrelayer.GatewayRelayer
}
var (
observationChanDelay = promauto.NewHistogram(
prometheus.HistogramOpts{
Name: "wormhole_signed_observation_channel_delay_us",
Help: "Latency histogram for delay of signed observations in channel",
Buckets: []float64{10.0, 20.0, 50.0, 100.0, 1000.0, 5000.0, 10000.0},
})
observationTotalDelay = promauto.NewHistogram(
prometheus.HistogramOpts{
Name: "wormhole_signed_observation_total_delay_us",
Help: "Latency histogram for total time to process signed observations",
Buckets: []float64{10.0, 20.0, 50.0, 100.0, 1000.0, 5000.0, 10000.0},
})
)
func NewProcessor(
ctx context.Context,
db *db.Database,
msgC <-chan *common.MessagePublication,
setC <-chan *common.GuardianSet,
gossipSendC chan<- []byte,
obsvC chan *common.MsgWithTimeStamp[gossipv1.SignedObservation],
obsvReqSendC chan<- *gossipv1.ObservationRequest,
signedInC <-chan *gossipv1.SignedVAAWithQuorum,
gk *ecdsa.PrivateKey,
gst *common.GuardianSetState,
g *governor.ChainGovernor,
acct *accountant.Accountant,
acctReadC <-chan *common.MessagePublication,
gatewayRelayer *gwrelayer.GatewayRelayer,
) *Processor {
return &Processor{
msgC: msgC,
setC: setC,
gossipSendC: gossipSendC,
obsvC: obsvC,
obsvReqSendC: obsvReqSendC,
signedInC: signedInC,
gk: gk,
gst: gst,
db: db,
logger: supervisor.Logger(ctx),
state: &aggregationState{observationMap{}},
ourAddr: crypto.PubkeyToAddress(gk.PublicKey),
governor: g,
acct: acct,
acctReadC: acctReadC,
pythnetVaas: make(map[string]PythNetVaaEntry),
gatewayRelayer: gatewayRelayer,
}
}
func (p *Processor) Run(ctx context.Context) error {
cleanup := time.NewTicker(CleanupInterval)
// Always initialize the timer so don't have a nil pointer in the case below. It won't get rearmed after that.
govTimer := time.NewTimer(GovInterval)
for {
select {
case <-ctx.Done():
if p.acct != nil {
p.acct.Close()
}
// Log these as warnings so they show up in the benchmark logs.
metric := &dto.Metric{}
_ = observationChanDelay.Write(metric)
p.logger.Warn("PROCESSOR_METRICS", zap.Any("observationChannelDelay", metric.String()))
metric = &dto.Metric{}
_ = observationTotalDelay.Write(metric)
p.logger.Warn("PROCESSOR_METRICS", zap.Any("observationProcessingDelay", metric.String()))
return ctx.Err()
case p.gs = <-p.setC:
p.logger.Info("guardian set updated",
zap.Strings("set", p.gs.KeysAsHexStrings()),
zap.Uint32("index", p.gs.Index))
p.gst.Set(p.gs)
case k := <-p.msgC:
if p.governor != nil {
if !p.governor.ProcessMsg(k) {
continue
}
}
if p.acct != nil {
shouldPub, err := p.acct.SubmitObservation(k)
if err != nil {
return fmt.Errorf("failed to process message `%s`: %w", k.MessageIDString(), err)
}
if !shouldPub {
continue
}
}
p.handleMessage(k)
case k := <-p.acctReadC:
if p.acct == nil {
return fmt.Errorf("received an accountant event when accountant is not configured")
}
// SECURITY defense-in-depth: Make sure the accountant did not generate an unexpected message.
if !p.acct.IsMessageCoveredByAccountant(k) {
return fmt.Errorf("accountant published a message that is not covered by it: `%s`", k.MessageIDString())
}
p.handleMessage(k)
case m := <-p.obsvC:
observationChanDelay.Observe(float64(time.Since(m.Timestamp).Microseconds()))
p.handleObservation(ctx, m)
case m := <-p.signedInC:
p.handleInboundSignedVAAWithQuorum(ctx, m)
case <-cleanup.C:
p.handleCleanup(ctx)
case <-govTimer.C:
if p.governor != nil {
toBePublished, err := p.governor.CheckPending()
if err != nil {
return err
}
if len(toBePublished) != 0 {
for _, k := range toBePublished {
// SECURITY defense-in-depth: Make sure the governor did not generate an unexpected message.
if msgIsGoverned, err := p.governor.IsGovernedMsg(k); err != nil {
return fmt.Errorf("governor failed to determine if message should be governed: `%s`: %w", k.MessageIDString(), err)
} else if !msgIsGoverned {
return fmt.Errorf("governor published a message that should not be governed: `%s`", k.MessageIDString())
}
if p.acct != nil {
shouldPub, err := p.acct.SubmitObservation(k)
if err != nil {
return fmt.Errorf("failed to process message released by governor `%s`: %w", k.MessageIDString(), err)
}
if !shouldPub {
continue
}
}
p.handleMessage(k)
}
}
}
if (p.governor != nil) || (p.acct != nil) {
govTimer.Reset(GovInterval)
}
}
}
}
func (p *Processor) storeSignedVAA(v *vaa.VAA) error {
if v.EmitterChain == vaa.ChainIDPythNet {
key := fmt.Sprintf("%v/%v", v.EmitterAddress, v.Sequence)
p.pythnetVaas[key] = PythNetVaaEntry{v: v, updateTime: time.Now()}
return nil
}
return p.db.StoreSignedVAA(v)
}
// haveSignedVAA returns true if we already have a VAA for the given VAAID
func (p *Processor) haveSignedVAA(id db.VAAID) bool {
if id.EmitterChain == vaa.ChainIDPythNet {
if p.pythnetVaas == nil {
return false
}
key := fmt.Sprintf("%v/%v", id.EmitterAddress, id.Sequence)
_, exists := p.pythnetVaas[key]
return exists
}
if p.db == nil {
return false
}
ok, err := p.db.HasVAA(id)
if err != nil {
p.logger.Error("failed to look up VAA in database",
zap.String("vaaID", string(id.Bytes())),
zap.Error(err),
)
return false
}
return ok
}