/
validate_blob.go
251 lines (224 loc) · 9.16 KB
/
validate_blob.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
package sync
import (
"context"
"fmt"
"strings"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/signing"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/transition"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/state"
fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams"
"github.com/prysmaticlabs/prysm/v4/config/params"
"github.com/prysmaticlabs/prysm/v4/crypto/bls"
"github.com/prysmaticlabs/prysm/v4/crypto/rand"
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
"github.com/prysmaticlabs/prysm/v4/network/forks"
eth "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
prysmTime "github.com/prysmaticlabs/prysm/v4/time"
"github.com/prysmaticlabs/prysm/v4/time/slots"
"github.com/sirupsen/logrus"
)
func (s *Service) handleBlobParentStatus(ctx context.Context, root [32]byte) pubsub.ValidationResult {
if s.cfg.chain.HasBlock(ctx, root) {
// the parent will not be kept if it's invalid
return pubsub.ValidationAccept
}
if s.hasBadBlock(root) {
// [REJECT] The sidecar's block's parent (defined by sidecar.block_parent_root) passes validation.
return pubsub.ValidationReject
}
// [IGNORE] The sidecar's block's parent (defined by sidecar.block_parent_root) has been seen (via both gossip and non-gossip sources)
return pubsub.ValidationIgnore
}
func (s *Service) validateBlob(ctx context.Context, pid peer.ID, msg *pubsub.Message) (pubsub.ValidationResult, error) {
receivedTime := prysmTime.Now()
if pid == s.cfg.p2p.PeerID() {
return pubsub.ValidationAccept, nil
}
if s.cfg.initialSync.Syncing() {
return pubsub.ValidationIgnore, nil
}
if msg.Topic == nil {
return pubsub.ValidationReject, errInvalidTopic
}
m, err := s.decodePubsubMessage(msg)
if err != nil {
log.WithError(err).Error("Failed to decode message")
return pubsub.ValidationReject, err
}
sBlob, ok := m.(*eth.SignedBlobSidecar)
if !ok {
log.WithField("message", m).Error("Message is not of type *eth.SignedBlobSidecar")
return pubsub.ValidationReject, errWrongMessage
}
blob := sBlob.Message
// [REJECT] The sidecar's index is consistent with `MAX_BLOBS_PER_BLOCK` -- i.e. `sidecar.index < MAX_BLOBS_PER_BLOCK`
if blob.Index >= fieldparams.MaxBlobsPerBlock {
log.WithFields(blobFields(blob)).Debug("Sidecar index > MAX_BLOBS_PER_BLOCK")
return pubsub.ValidationReject, errors.New("incorrect blob sidecar index")
}
// [REJECT] The sidecar is for the correct subnet -- i.e. compute_subnet_for_blob_sidecar(sidecar.index) == subnet_id.
want := fmt.Sprintf("blob_sidecar_%d", computeSubnetForBlobSidecar(blob.Index))
if !strings.Contains(*msg.Topic, want) {
log.WithFields(blobFields(blob)).Debug("Sidecar index does not match topic")
return pubsub.ValidationReject, fmt.Errorf("wrong topic name: %s", *msg.Topic)
}
// [IGNORE] The sidecar is not from a future slot (with a MAXIMUM_GOSSIP_CLOCK_DISPARITY allowance) --
// i.e. validate that sidecar.slot <= current_slot (a client MAY queue future blocks for processing at the appropriate slot).
genesisTime := uint64(s.cfg.chain.GenesisTime().Unix())
if err := slots.VerifyTime(genesisTime, blob.Slot, earlyBlockProcessingTolerance); err != nil {
log.WithError(err).WithFields(blobFields(blob)).Debug("Ignored blob: too far into future")
return pubsub.ValidationIgnore, errors.Wrap(err, "blob too far into future")
}
// [IGNORE] The sidecar is from a slot greater than the latest finalized slot --
// i.e. validate that sidecar.slot > compute_start_slot_at_epoch(state.finalized_checkpoint.epoch)
startSlot, err := slots.EpochStart(s.cfg.chain.FinalizedCheckpt().Epoch)
if err != nil {
return pubsub.ValidationIgnore, err
}
if startSlot >= blob.Slot {
err := fmt.Errorf("finalized slot %d greater or equal to blob slot %d", startSlot, blob.Slot)
log.WithFields(blobFields(blob)).Debug(err)
return pubsub.ValidationIgnore, err
}
// Handle the parent status (not seen or invalid cases)
parentRoot := bytesutil.ToBytes32(blob.BlockParentRoot)
switch parentStatus := s.handleBlobParentStatus(ctx, parentRoot); parentStatus {
case pubsub.ValidationIgnore:
log.WithFields(blobFields(blob)).Debug("Parent block not found - saving blob to cache")
go func() {
if err := s.sendBatchRootRequest(context.Background(), [][32]byte{parentRoot}, rand.NewGenerator()); err != nil {
log.WithError(err).WithFields(blobFields(blob)).Debug("Failed to send batch root request")
}
}()
missingParentBlobSidecarCount.Inc()
return pubsub.ValidationIgnore, nil
case pubsub.ValidationReject:
log.WithFields(blobFields(blob)).Warning("Rejected blob: parent block is invalid")
return pubsub.ValidationReject, nil
default:
}
pubsubResult, err := s.validateBlobPostSeenParent(ctx, sBlob)
if err != nil {
return pubsubResult, err
}
if pubsubResult != pubsub.ValidationAccept {
return pubsubResult, nil
}
startTime, err := slots.ToTime(genesisTime, blob.Slot)
if err != nil {
return pubsub.ValidationIgnore, err
}
fields := blobFields(blob)
sinceSlotStartTime := receivedTime.Sub(startTime)
fields["sinceSlotStartTime"] = sinceSlotStartTime
fields["validationTime"] = s.cfg.clock.Now().Sub(receivedTime)
log.WithFields(fields).Debug("Received blob sidecar gossip")
blobSidecarArrivalGossipSummary.Observe(float64(sinceSlotStartTime.Milliseconds()))
msg.ValidatorData = sBlob
return pubsub.ValidationAccept, nil
}
func (s *Service) validateBlobPostSeenParent(ctx context.Context, sBlob *eth.SignedBlobSidecar) (pubsub.ValidationResult, error) {
blob := sBlob.Message
parentRoot := bytesutil.ToBytes32(blob.BlockParentRoot)
// [REJECT] The sidecar is from a higher slot than the sidecar's block's parent (defined by sidecar.block_parent_root).
parentSlot, err := s.cfg.chain.RecentBlockSlot(parentRoot)
if err != nil {
return pubsub.ValidationIgnore, err
}
if parentSlot >= blob.Slot {
err := fmt.Errorf("parent block slot %d greater or equal to blob slot %d", parentSlot, blob.Slot)
log.WithFields(blobFields(blob)).Debug(err)
return pubsub.ValidationReject, err
}
// [REJECT] The proposer signature, signed_blob_sidecar.signature,
// is valid with respect to the sidecar.proposer_index pubkey.
parentState, err := s.cfg.stateGen.StateByRoot(ctx, parentRoot)
if err != nil {
return pubsub.ValidationIgnore, err
}
if err := verifyBlobSignature(parentState, sBlob); err != nil {
log.WithError(err).WithFields(blobFields(blob)).Debug("Failed to verify blob signature")
return pubsub.ValidationReject, err
}
// [IGNORE] The sidecar is the only sidecar with valid signature received for the tuple (sidecar.block_root, sidecar.index).
if s.hasSeenBlobIndex(blob.BlockRoot, blob.Index) {
return pubsub.ValidationIgnore, nil
}
// [REJECT] The sidecar is proposed by the expected proposer_index for the block's slot in the context of the current shuffling (defined by block_parent_root/slot)
parentState, err = transition.ProcessSlotsUsingNextSlotCache(ctx, parentState, parentRoot[:], blob.Slot)
if err != nil {
return pubsub.ValidationIgnore, err
}
idx, err := helpers.BeaconProposerIndex(ctx, parentState)
if err != nil {
return pubsub.ValidationIgnore, err
}
if blob.ProposerIndex != idx {
err := fmt.Errorf("expected proposer index %d, got %d", idx, blob.ProposerIndex)
log.WithFields(blobFields(blob)).Debug(err)
return pubsub.ValidationReject, err
}
return pubsub.ValidationAccept, nil
}
func verifyBlobSignature(st state.BeaconState, blob *eth.SignedBlobSidecar) error {
currentEpoch := slots.ToEpoch(blob.Message.Slot)
fork, err := forks.Fork(currentEpoch)
if err != nil {
return err
}
domain, err := signing.Domain(fork, currentEpoch, params.BeaconConfig().DomainBlobSidecar, st.GenesisValidatorsRoot())
if err != nil {
return err
}
proposer, err := st.ValidatorAtIndex(blob.Message.ProposerIndex)
if err != nil {
return err
}
pb, err := bls.PublicKeyFromBytes(proposer.PublicKey)
if err != nil {
return err
}
sig, err := bls.SignatureFromBytes(blob.Signature)
if err != nil {
return err
}
sr, err := signing.ComputeSigningRoot(blob.Message, domain)
if err != nil {
return err
}
if !sig.Verify(pb, sr[:]) {
return signing.ErrSigFailedToVerify
}
return nil
}
// Returns true if the blob with the same root and index has been seen before.
func (s *Service) hasSeenBlobIndex(root []byte, index uint64) bool {
s.seenBlobLock.RLock()
defer s.seenBlobLock.RUnlock()
b := append(root, bytesutil.Bytes32(index)...)
_, seen := s.seenBlobCache.Get(string(b))
return seen
}
// Set blob index and root as seen.
func (s *Service) setSeenBlobIndex(root []byte, index uint64) {
s.seenBlobLock.Lock()
defer s.seenBlobLock.Unlock()
b := append(root, bytesutil.Bytes32(index)...)
s.seenBlobCache.Add(string(b), true)
}
func blobFields(b *eth.DeprecatedBlobSidecar) logrus.Fields {
return logrus.Fields{
"slot": b.Slot,
"proposerIndex": b.ProposerIndex,
"blockRoot": fmt.Sprintf("%#x", b.BlockRoot),
"kzgCommitment": fmt.Sprintf("%#x", b.KzgCommitment),
"index": b.Index,
}
}
func computeSubnetForBlobSidecar(index uint64) uint64 {
return index % params.BeaconConfig().BlobsidecarSubnetCount
}