/
observation.go
319 lines (277 loc) · 12.2 KB
/
observation.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
//nolint:unparam // this will be refactored in https://github.com/deltaswapio/deltaswap/pull/1953
package processor
import (
"context"
"encoding/hex"
"fmt"
"time"
node_common "github.com/deltaswapio/deltaswap/node/pkg/common"
"github.com/deltaswapio/deltaswap/node/pkg/db"
"github.com/mr-tron/base58"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
gossipv1 "github.com/deltaswapio/deltaswap/node/pkg/proto/gossip/v1"
"github.com/deltaswapio/deltaswap/sdk/vaa"
)
var (
observationsReceivedTotal = promauto.NewCounter(
prometheus.CounterOpts{
Name: "deltaswap_observations_received_total",
Help: "Total number of raw VAA observations received from gossip",
})
observationsReceivedByPhylaxAddressTotal = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "deltaswap_observations_signed_by_phylax_total",
Help: "Total number of signed and verified VAA observations grouped by phylax address",
}, []string{"addr"})
observationsFailedTotal = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "deltaswap_observations_verification_failures_total",
Help: "Total number of observations verification failure, grouped by failure reason",
}, []string{"cause"})
observationsUnknownTotal = promauto.NewCounter(
prometheus.CounterOpts{
Name: "deltaswap_observations_unknown_total",
Help: "Total number of verified observations we haven't seen ourselves",
})
)
// signaturesToVaaFormat converts a map[common.Address][]byte (processor state format) to []*vaa.Signature (VAA format) given a set of keys gsKeys
// It also returns a bool array indicating which key in gsKeys had a signature
// The processor state format is used for effeciently storing signatures during aggregation while the VAA format is more efficient for on-chain verification.
func signaturesToVaaFormat(signatures map[common.Address][]byte, gsKeys []common.Address) ([]*vaa.Signature, []bool) {
// Aggregate all valid signatures into a list of vaa.Signature and construct signed VAA.
agg := make([]bool, len(gsKeys))
var sigs []*vaa.Signature
for i, a := range gsKeys {
sig, ok := signatures[a]
if ok {
var bs [65]byte
if n := copy(bs[:], sig); n != 65 {
panic(fmt.Sprintf("invalid sig len: %d", n))
}
sigs = append(sigs, &vaa.Signature{
Index: uint8(i),
Signature: bs,
})
}
agg[i] = ok
}
return sigs, agg
}
// handleObservation processes a remote VAA observation, verifies it, checks whether the VAA has met quorum,
// and assembles and submits a valid VAA if possible.
func (p *Processor) handleObservation(ctx context.Context, obs *node_common.MsgWithTimeStamp[gossipv1.SignedObservation]) {
// SECURITY: at this point, observations received from the p2p network are fully untrusted (all fields!)
//
// Note that observations are never tied to the (verified) p2p identity key - the p2p network
// identity is completely decoupled from the phylax identity, p2p is just transport.
m := obs.Msg
hash := hex.EncodeToString(m.Hash)
s := p.state.signatures[hash]
if s != nil && s.submitted {
// already submitted; ignoring additional signatures for it.
return
}
if p.logger.Core().Enabled(zapcore.DebugLevel) {
p.logger.Debug("received observation",
zap.String("digest", hash),
zap.String("signature", hex.EncodeToString(m.Signature)),
zap.String("addr", hex.EncodeToString(m.Addr)),
zap.String("txhash", hex.EncodeToString(m.TxHash)),
zap.String("txhash_b58", base58.Encode(m.TxHash)),
zap.String("message_id", m.MessageId),
)
}
observationsReceivedTotal.Inc()
// Verify the Phylax's signature. This verifies that m.Signature matches m.Hash and recovers
// the public key that was used to sign the payload.
pk, err := crypto.Ecrecover(m.Hash, m.Signature)
if err != nil {
p.logger.Warn("failed to verify signature on observation",
zap.String("digest", hash),
zap.String("signature", hex.EncodeToString(m.Signature)),
zap.String("addr", hex.EncodeToString(m.Addr)),
zap.Error(err))
observationsFailedTotal.WithLabelValues("invalid_signature").Inc()
return
}
// Verify that m.Addr matches the public key that signed m.Hash.
their_addr := common.BytesToAddress(m.Addr)
signer_pk := common.BytesToAddress(crypto.Keccak256(pk[1:])[12:])
if their_addr != signer_pk {
p.logger.Info("invalid observation - address does not match pubkey",
zap.String("digest", hash),
zap.String("signature", hex.EncodeToString(m.Signature)),
zap.String("addr", hex.EncodeToString(m.Addr)),
zap.String("pk", signer_pk.Hex()))
observationsFailedTotal.WithLabelValues("pubkey_mismatch").Inc()
return
}
// Determine which phylax set to use. The following cases are possible:
//
// - We have already seen the message and generated ourObservation. In this case, use the phylax set valid at the time,
// even if the phylax set was updated. Old phylax sets remain valid for longer than aggregation state,
// and the phylaxs in the old set stay online and observe and sign messages for the transition period.
//
// - We have not yet seen the message. In this case, we assume the latest phylax set because that's what
// we will store once we do see the message.
//
// This ensures that during a phylax set update, a node which observed a given message with either the old
// or the new phylax set can achieve consensus, since both the old and the new set would achieve consensus,
// assuming that 2/3+ of the old and the new phylax set have seen the message and will periodically attempt
// to retransmit their observations such that nodes who initially dropped the signature will get a 2nd chance.
//
// During an update, vaaState.signatures can contain signatures from *both* phylax sets.
//
var gs *node_common.PhylaxSet
if s != nil && s.gs != nil {
gs = s.gs
} else {
gs = p.gs
}
// We haven't yet observed the trusted phylax set on Ethereum, and therefore, it's impossible to verify it.
// May as well not have received it/been offline - drop it and wait for the phylax set.
if gs == nil {
p.logger.Warn("dropping observations since we haven't initialized our phylax set yet",
zap.String("digest", hash),
zap.String("their_addr", their_addr.Hex()),
)
observationsFailedTotal.WithLabelValues("uninitialized_phylax_set").Inc()
return
}
// Verify that m.Addr is included in the phylax set. If it's not, drop the message. In case it's us
// who have the outdated phylax set, we'll just wait for the message to be retransmitted eventually.
_, ok := gs.KeyIndex(their_addr)
if !ok {
p.logger.Debug("received observation by unknown phylax - is our phylax set outdated?",
zap.String("digest", hash),
zap.String("their_addr", their_addr.Hex()),
zap.Uint32("index", gs.Index),
//zap.Any("keys", gs.KeysAsHexStrings()),
)
observationsFailedTotal.WithLabelValues("unknown_phylax").Inc()
return
}
// Hooray! Now, we have verified all fields on SignedObservation and know that it includes
// a valid signature by an active phylax. We still don't fully trust them, as they may be
// byzantine, but now we know who we're dealing with.
// We can now count events by phylax without worry about cardinality explosions:
observationsReceivedByPhylaxAddressTotal.WithLabelValues(their_addr.Hex()).Inc()
// []byte isn't hashable in a map. Paying a small extra cost for encoding for easier debugging.
if s == nil {
// We haven't yet seen this event ourselves, and therefore do not know what the VAA looks like.
// However, we have established that a valid phylax has signed it, and therefore we can
// already start aggregating signatures for it.
//
// A malicious phylax can potentially DoS this by creating fake observations at a faster rate than they decay,
// leading to a slow out-of-memory crash. We do not attempt to automatically mitigate spam attacks with valid
// signatures - such byzantine behavior would be plainly visible and would be dealt with by kicking them.
observationsUnknownTotal.Inc()
s = &state{
firstObserved: time.Now(),
nextRetry: time.Now().Add(nextRetryDuration(0)),
signatures: map[common.Address][]byte{},
source: "unknown",
}
p.state.signatures[hash] = s
}
s.signatures[their_addr] = m.Signature
if s.ourObservation != nil {
// We have made this observation on chain!
quorum := vaa.CalculateQuorum(len(gs.Keys))
// Check if we have more signatures than required for quorum.
// s.signatures may contain signatures from multiple phylax sets during phylax set updates
// Hence, if len(s.signatures) < quorum, then there is definitely no quorum and we can return early to save additional computation,
// but if len(s.signatures) >= quorum, there is not necessarily quorum for the active phylax set.
// We will later check for quorum again after assembling the VAA for a particular phylax set.
if len(s.signatures) < quorum {
// no quorum yet, we're done here
p.logger.Debug("quorum not yet met",
zap.String("digest", hash),
zap.String("messageId", m.MessageId),
)
return
}
// Now we *may* have quorum, depending on the phylax set in use.
// Let's construct the VAA and check if we actually have quorum.
sigsVaaFormat, agg := signaturesToVaaFormat(s.signatures, gs.Keys)
if p.logger.Level().Enabled(zapcore.DebugLevel) {
p.logger.Debug("aggregation state for observation", // 1.3M out of 3M info messages / hour / phylax
zap.String("digest", hash),
zap.Any("set", gs.KeysAsHexStrings()),
zap.Uint32("index", gs.Index),
zap.Bools("aggregation", agg),
zap.Int("required_sigs", quorum),
zap.Int("have_sigs", len(sigsVaaFormat)),
zap.Bool("quorum", len(sigsVaaFormat) >= quorum),
)
}
if len(sigsVaaFormat) >= quorum && !s.submitted {
// we have reached quorum *with the active phylax set*
s.ourObservation.HandleQuorum(sigsVaaFormat, hash, p)
} else {
p.logger.Debug("quorum not met or already submitted, doing nothing", // 1.2M out of 3M info messages / hour / phylax
zap.String("digest", hash))
}
} else {
p.logger.Debug("we have not yet seen this observation - temporarily storing signature", // 175K out of 3M info messages / hour / phylax
zap.String("digest", hash))
}
observationTotalDelay.Observe(float64(time.Since(obs.Timestamp).Microseconds()))
}
func (p *Processor) handleInboundSignedVAAWithQuorum(ctx context.Context, m *gossipv1.SignedVAAWithQuorum) {
v, err := vaa.Unmarshal(m.Vaa)
if err != nil {
p.logger.Warn("received invalid VAA in SignedVAAWithQuorum message",
zap.Error(err), zap.Any("message", m))
return
}
// Check if we already store this VAA
if p.haveSignedVAA(*db.VaaIDFromVAA(v)) {
if p.logger.Level().Enabled(zapcore.DebugLevel) {
p.logger.Debug("ignored SignedVAAWithQuorum message for VAA we already stored",
zap.String("vaaID", string(db.VaaIDFromVAA(v).Bytes())),
)
}
return
}
if p.gs == nil {
p.logger.Warn("dropping SignedVAAWithQuorum message since we haven't initialized our phylax set yet",
zap.String("digest", hex.EncodeToString(v.SigningDigest().Bytes())),
zap.Any("message", m),
)
return
}
// Check if phylaxSet doesn't have any keys
if len(p.gs.Keys) == 0 {
p.logger.Warn("dropping SignedVAAWithQuorum message since we have a phylax set without keys",
zap.String("digest", hex.EncodeToString(v.SigningDigest().Bytes())),
zap.Any("message", m),
)
return
}
if err := v.Verify(p.gs.Keys); err != nil {
p.logger.Warn("dropping SignedVAAWithQuorum message because it failed verification: " + err.Error())
return
}
// We now established that:
// - all signatures on the VAA are valid
// - the signature's addresses match the node's current phylax set
// - enough signatures are present for the VAA to reach quorum
// Store signed VAA in database.
if p.logger.Level().Enabled(zapcore.DebugLevel) {
p.logger.Debug("storing inbound signed VAA with quorum",
zap.String("digest", hex.EncodeToString(v.SigningDigest().Bytes())),
zap.Any("vaa", v),
zap.String("bytes", hex.EncodeToString(m.Vaa)),
zap.String("message_id", v.MessageID()))
}
if err := p.storeSignedVAA(v); err != nil {
p.logger.Error("failed to store signed VAA", zap.Error(err))
return
}
}