/
message.go
167 lines (145 loc) · 5.9 KB
/
message.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
package processor
import (
"context"
"encoding/hex"
"github.com/alephium/wormhole-fork/node/pkg/db"
"github.com/mr-tron/base58"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"go.uber.org/zap"
"github.com/alephium/wormhole-fork/node/pkg/common"
"github.com/alephium/wormhole-fork/node/pkg/reporter"
"github.com/alephium/wormhole-fork/node/pkg/supervisor"
"github.com/alephium/wormhole-fork/node/pkg/vaa"
)
var (
// SECURITY: source_chain/target_chain are untrusted uint8 values. An attacker could cause a maximum of 255**2 label
// pairs to be created, which is acceptable.
messagesObservedTotal = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "wormhole_message_observations_total",
Help: "Total number of messages observed",
},
[]string{"emitter_chain"})
messagesSignedTotal = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "wormhole_message_observations_signed_total",
Help: "Total number of message observations that were successfully signed",
},
[]string{"emitter_chain"})
)
// handleMessage processes a message received from a chain and instantiates our deterministic copy of the VAA. An
// event may be received multiple times and must be handled in an idempotent fashion.
func (p *Processor) handleMessage(ctx context.Context, k *common.MessagePublication) {
if p.gs == nil {
p.logger.Warn("dropping observation since we haven't initialized our guardian set yet",
zap.Stringer("emitter_chain", k.EmitterChain),
zap.Stringer("target_chain", k.TargetChain),
zap.Stringer("emitter_address", k.EmitterAddress),
zap.Uint32("nonce", k.Nonce),
zap.Stringer("txhash", k.TxHash),
zap.Time("timestamp", k.Timestamp),
)
return
}
supervisor.Logger(ctx).Info("message publication confirmed",
zap.Stringer("emitter_chain", k.EmitterChain),
zap.Stringer("target_chain", k.TargetChain),
zap.Stringer("emitter_address", k.EmitterAddress),
zap.Uint32("nonce", k.Nonce),
zap.Stringer("txhash", k.TxHash),
zap.Time("timestamp", k.Timestamp),
)
messagesObservedTotal.With(prometheus.Labels{
"emitter_chain": k.EmitterChain.String(),
}).Add(1)
// All nodes will create the exact same VAA and sign its digest.
// Consensus is established on this digest.
v := &vaa.VAA{
Version: vaa.SupportedVAAVersion,
GuardianSetIndex: p.gs.Index,
Signatures: nil,
Timestamp: k.Timestamp,
Nonce: k.Nonce,
EmitterChain: k.EmitterChain,
TargetChain: k.TargetChain,
EmitterAddress: k.EmitterAddress,
Payload: k.Payload,
Sequence: k.Sequence,
ConsistencyLevel: k.ConsistencyLevel,
}
// A governance message should never be emitted on-chain
if v.EmitterAddress == p.governanceEmitterAddress && v.EmitterChain == p.governanceChainId {
supervisor.Logger(ctx).Error(
"EMERGENCY: PLEASE REPORT THIS IMMEDIATELY! A Solana message was emitted from the governance emitter. This should never be possible.",
zap.Stringer("emitter_chain", k.EmitterChain),
zap.Stringer("emitter_address", k.EmitterAddress),
zap.Uint32("nonce", k.Nonce),
zap.Stringer("txhash", k.TxHash),
zap.Time("timestamp", k.Timestamp))
return
}
// Ignore incoming observations when our database already has a quorum VAA for it.
// This can occur when we're receiving late observations due to node catchup, and
// processing those won't do us any good.
//
// Exception: if an observation is made within the settlement time (30s), we'll
// process it so other nodes won't consider it a miss.
if vb, err := p.db.GetSignedVAABytes(*db.VaaIDFromVAA(v)); err == nil {
// unmarshal vaa
var existing *vaa.VAA
if existing, err = vaa.Unmarshal(vb); err != nil {
panic("failed to unmarshal VAA from db")
}
if k.Timestamp.Sub(existing.Timestamp) > settlementTime {
p.logger.Info("ignoring observation since we already have a quorum VAA for it",
zap.Stringer("emitter_chain", k.EmitterChain),
zap.Stringer("target_chain", k.TargetChain),
zap.Stringer("emitter_address", k.EmitterAddress),
zap.String("emitter_address_b58", base58.Encode(k.EmitterAddress.Bytes())),
zap.Uint32("nonce", k.Nonce),
zap.Stringer("txhash", k.TxHash),
zap.String("txhash_b58", base58.Encode(k.TxHash.Bytes())),
zap.Time("timestamp", k.Timestamp),
zap.String("message_id", v.MessageID()),
zap.Duration("settlement_time", settlementTime),
)
return
}
} else if err != db.ErrVAANotFound {
p.logger.Error("failed to get VAA from db",
zap.Stringer("emitter_chain", k.EmitterChain),
zap.Stringer("target_chain", k.TargetChain),
zap.Stringer("emitter_address", k.EmitterAddress),
zap.Uint32("nonce", k.Nonce),
zap.Stringer("txhash", k.TxHash),
zap.Time("timestamp", k.Timestamp),
zap.Error(err),
)
}
// Generate digest of the unsigned VAA.
digest := v.SigningMsg()
// Sign the digest using our node's guardian key.
s, err := p.guardianSigner.Sign(digest.Bytes())
if err != nil {
panic(err)
}
p.logger.Info("observed and signed confirmed message publication",
zap.Stringer("source_chain", k.EmitterChain),
zap.Stringer("target_chain", k.TargetChain),
zap.Stringer("txhash", k.TxHash),
zap.String("txhash_b58", base58.Encode(k.TxHash.Bytes())),
zap.String("digest", hex.EncodeToString(digest.Bytes())),
zap.Uint32("nonce", k.Nonce),
zap.Uint64("sequence", k.Sequence),
zap.Stringer("emitter_chain", k.EmitterChain),
zap.Stringer("emitter_address", k.EmitterAddress),
zap.String("emitter_address_b58", base58.Encode(k.EmitterAddress.Bytes())),
zap.Uint8("consistency_level", k.ConsistencyLevel),
zap.String("message_id", v.MessageID()),
zap.String("signature", hex.EncodeToString(s)))
messagesSignedTotal.With(prometheus.Labels{
"emitter_chain": k.EmitterChain.String()}).Add(1)
p.attestationEvents.ReportMessagePublication(&reporter.MessagePublication{VAA: *v, InitiatingTxID: k.TxHash})
p.broadcastSignature(v, s, k.TxHash.Bytes())
}