forked from prysmaticlabs/prysm
-
Notifications
You must be signed in to change notification settings - Fork 0
/
proposer.go
435 lines (386 loc) · 16.4 KB
/
proposer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
package validator
import (
"context"
"fmt"
"math/big"
"math/rand"
"github.com/pkg/errors"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/go-ssz"
"github.com/prysmaticlabs/prysm/beacon-chain/core/blocks"
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
blockfeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/block"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/core/state"
"github.com/prysmaticlabs/prysm/beacon-chain/core/state/interop"
stateTrie "github.com/prysmaticlabs/prysm/beacon-chain/state"
dbpb "github.com/prysmaticlabs/prysm/proto/beacon/db"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
"github.com/prysmaticlabs/prysm/shared/hashutil"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/trieutil"
"go.opencensus.io/trace"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// eth1DataNotification is a latch to stop flooding logs with the same warning.
var eth1DataNotification bool
// GetBlock is called by a proposer during its assigned slot to request a block to sign
// by passing in the slot and the signed randao reveal of the slot.
func (vs *Server) GetBlock(ctx context.Context, req *ethpb.BlockRequest) (*ethpb.BeaconBlock, error) {
ctx, span := trace.StartSpan(ctx, "ProposerServer.RequestBlock")
defer span.End()
span.AddAttributes(trace.Int64Attribute("slot", int64(req.Slot)))
if vs.SyncChecker.Syncing() {
return nil, status.Errorf(codes.Unavailable, "Syncing to latest head, not ready to respond")
}
// Retrieve the parent block as the current head of the canonical chain.
parentRoot, err := vs.HeadFetcher.HeadRoot(ctx)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not retrieve head root: %v", err)
}
eth1Data, err := vs.eth1Data(ctx, req.Slot)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not get ETH1 data: %v", err)
}
// Pack ETH1 deposits which have not been included in the beacon chain.
deposits, err := vs.deposits(ctx, eth1Data)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not get ETH1 deposits: %v", err)
}
// Pack aggregated attestations which have not been included in the beacon chain.
atts := vs.AttPool.AggregatedAttestations()
atts, err = vs.filterAttestationsForBlockInclusion(ctx, req.Slot, atts)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not filter attestations: %v", err)
}
// If there is any room left in the block, consider unaggregated attestations as well.
if len(atts) < int(params.BeaconConfig().MaxAttestations) {
uAtts := vs.AttPool.UnaggregatedAttestations()
uAtts, err = vs.filterAttestationsForBlockInclusion(ctx, req.Slot, uAtts)
if len(uAtts)+len(atts) > int(params.BeaconConfig().MaxAttestations) {
uAtts = uAtts[:int(params.BeaconConfig().MaxAttestations)-len(atts)]
}
atts = append(atts, uAtts...)
}
// Use zero hash as stub for state root to compute later.
stateRoot := params.BeaconConfig().ZeroHash[:]
graffiti := bytesutil.ToBytes32(req.Graffiti)
head, err := vs.HeadFetcher.HeadState(ctx)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not get head state %v", err)
}
blk := ðpb.BeaconBlock{
Slot: req.Slot,
ParentRoot: parentRoot[:],
StateRoot: stateRoot,
Body: ðpb.BeaconBlockBody{
Eth1Data: eth1Data,
Deposits: deposits,
Attestations: atts,
RandaoReveal: req.RandaoReveal,
ProposerSlashings: vs.SlashingsPool.PendingProposerSlashings(ctx),
AttesterSlashings: vs.SlashingsPool.PendingAttesterSlashings(ctx),
VoluntaryExits: vs.ExitPool.PendingExits(head, req.Slot),
Graffiti: graffiti[:],
},
}
// Compute state root with the newly constructed block.
stateRoot, err = vs.computeStateRoot(ctx, ðpb.SignedBeaconBlock{Block: blk, Signature: make([]byte, 96)})
if err != nil {
interop.WriteBlockToDisk(ðpb.SignedBeaconBlock{Block: blk}, true /*failed*/)
return nil, status.Errorf(codes.Internal, "Could not compute state root: %v", err)
}
blk.StateRoot = stateRoot
return blk, nil
}
// ProposeBlock is called by a proposer during its assigned slot to create a block in an attempt
// to get it processed by the beacon node as the canonical head.
func (vs *Server) ProposeBlock(ctx context.Context, blk *ethpb.SignedBeaconBlock) (*ethpb.ProposeResponse, error) {
root, err := ssz.HashTreeRoot(blk.Block)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not tree hash block: %v", err)
}
log.WithField("blockRoot", fmt.Sprintf("%#x", bytesutil.Trunc(root[:]))).Debugf(
"Block proposal received via RPC")
vs.BlockNotifier.BlockFeed().Send(&feed.Event{
Type: blockfeed.ReceivedBlock,
Data: &blockfeed.ReceivedBlockData{SignedBlock: blk},
})
if err := vs.BlockReceiver.ReceiveBlock(ctx, blk); err != nil {
return nil, status.Errorf(codes.Internal, "Could not process beacon block: %v", err)
}
if err := vs.deleteAttsInPool(blk.Block.Body.Attestations); err != nil {
return nil, status.Errorf(codes.Internal, "Could not delete attestations in pool: %v", err)
}
return ðpb.ProposeResponse{
BlockRoot: root[:],
}, nil
}
// eth1Data determines the appropriate eth1data for a block proposal. The algorithm for this method
// is as follows:
// - Determine the timestamp for the start slot for the eth1 voting period.
// - Determine the most recent eth1 block before that timestamp.
// - Subtract that eth1block.number by ETH1_FOLLOW_DISTANCE.
// - This is the eth1block to use for the block proposal.
func (vs *Server) eth1Data(ctx context.Context, slot uint64) (*ethpb.Eth1Data, error) {
if vs.MockEth1Votes {
return vs.mockETH1DataVote(ctx, slot)
}
if !vs.Eth1InfoFetcher.IsConnectedToETH1() {
return vs.randomETH1DataVote(ctx)
}
eth1DataNotification = false
eth1VotingPeriodStartTime, _ := vs.Eth1InfoFetcher.Eth2GenesisPowchainInfo()
eth1VotingPeriodStartTime += (slot - (slot % params.BeaconConfig().SlotsPerEth1VotingPeriod)) * params.BeaconConfig().SecondsPerSlot
// Look up most recent block up to timestamp
blockNumber, err := vs.Eth1BlockFetcher.BlockNumberByTimestamp(ctx, eth1VotingPeriodStartTime)
if err != nil {
return nil, errors.Wrap(err, "could not get block number from timestamp")
}
return vs.defaultEth1DataResponse(ctx, blockNumber)
}
func (vs *Server) mockETH1DataVote(ctx context.Context, slot uint64) (*ethpb.Eth1Data, error) {
if !eth1DataNotification {
log.Warn("Beacon Node is no longer connected to an ETH1 chain, so ETH1 data votes are now mocked.")
eth1DataNotification = true
}
// If a mock eth1 data votes is specified, we use the following for the
// eth1data we provide to every proposer based on https://github.com/ethereum/eth2.0-pm/issues/62:
//
// slot_in_voting_period = current_slot % SLOTS_PER_ETH1_VOTING_PERIOD
// Eth1Data(
// DepositRoot = hash(current_epoch + slot_in_voting_period),
// DepositCount = state.eth1_deposit_index,
// BlockHash = hash(hash(current_epoch + slot_in_voting_period)),
// )
slotInVotingPeriod := slot % params.BeaconConfig().SlotsPerEth1VotingPeriod
headState, err := vs.HeadFetcher.HeadState(ctx)
if err != nil {
return nil, err
}
enc, err := ssz.Marshal(helpers.SlotToEpoch(slot) + slotInVotingPeriod)
if err != nil {
return nil, err
}
depRoot := hashutil.Hash(enc)
blockHash := hashutil.Hash(depRoot[:])
return ðpb.Eth1Data{
DepositRoot: depRoot[:],
DepositCount: headState.Eth1DepositIndex(),
BlockHash: blockHash[:],
}, nil
}
func (vs *Server) randomETH1DataVote(ctx context.Context) (*ethpb.Eth1Data, error) {
if !eth1DataNotification {
log.Warn("Beacon Node is no longer connected to an ETH1 chain, so ETH1 data votes are now random.")
eth1DataNotification = true
}
headState, err := vs.HeadFetcher.HeadState(ctx)
if err != nil {
return nil, err
}
// set random roots and block hashes to prevent a majority from being
// built if the eth1 node is offline
depRoot := hashutil.Hash(bytesutil.Bytes32(rand.Uint64()))
blockHash := hashutil.Hash(bytesutil.Bytes32(rand.Uint64()))
return ðpb.Eth1Data{
DepositRoot: depRoot[:],
DepositCount: headState.Eth1DepositIndex(),
BlockHash: blockHash[:],
}, nil
}
// computeStateRoot computes the state root after a block has been processed through a state transition and
// returns it to the validator client.
func (vs *Server) computeStateRoot(ctx context.Context, block *ethpb.SignedBeaconBlock) ([]byte, error) {
var beaconState *stateTrie.BeaconState
var err error
if featureconfig.Get().NewStateMgmt {
beaconState, err = vs.StateGen.StateByRoot(ctx, bytesutil.ToBytes32(block.Block.ParentRoot))
if err != nil {
return nil, errors.Wrap(err, "could not retrieve beacon state")
}
} else {
beaconState, err = vs.BeaconDB.State(ctx, bytesutil.ToBytes32(block.Block.ParentRoot))
if err != nil {
return nil, errors.Wrap(err, "could not retrieve beacon state")
}
}
root, err := state.CalculateStateRoot(
ctx,
beaconState,
block,
)
if err != nil {
return nil, errors.Wrapf(err, "could not calculate state root at slot %d", beaconState.Slot())
}
log.WithField("beaconStateRoot", fmt.Sprintf("%#x", root)).Debugf("Computed state root")
return root[:], nil
}
// deposits returns a list of pending deposits that are ready for inclusion in the next beacon
// block. Determining deposits depends on the current eth1data vote for the block and whether or not
// this eth1data has enough support to be considered for deposits inclusion. If current vote has
// enough support, then use that vote for basis of determining deposits, otherwise use current state
// eth1data.
func (vs *Server) deposits(ctx context.Context, currentVote *ethpb.Eth1Data) ([]*ethpb.Deposit, error) {
if vs.MockEth1Votes || !vs.Eth1InfoFetcher.IsConnectedToETH1() {
return []*ethpb.Deposit{}, nil
}
// Need to fetch if the deposits up to the state's latest eth 1 data matches
// the number of all deposits in this RPC call. If not, then we return nil.
headState, err := vs.HeadFetcher.HeadState(ctx)
if err != nil {
return nil, status.Error(codes.Internal, "Could not get head state")
}
canonicalEth1Data, latestEth1DataHeight, err := vs.canonicalEth1Data(ctx, headState, currentVote)
if err != nil {
return nil, err
}
_, genesisEth1Block := vs.Eth1InfoFetcher.Eth2GenesisPowchainInfo()
if genesisEth1Block.Cmp(latestEth1DataHeight) == 0 {
return []*ethpb.Deposit{}, nil
}
upToEth1DataDeposits := vs.DepositFetcher.AllDeposits(ctx, latestEth1DataHeight)
depositData := [][]byte{}
for _, dep := range upToEth1DataDeposits {
depHash, err := ssz.HashTreeRoot(dep.Data)
if err != nil {
return nil, errors.Wrap(err, "could not hash deposit data")
}
depositData = append(depositData, depHash[:])
}
depositTrie, err := trieutil.GenerateTrieFromItems(depositData, int(params.BeaconConfig().DepositContractTreeDepth))
if err != nil {
return nil, errors.Wrap(err, "could not generate historical deposit trie from deposits")
}
allPendingContainers := vs.PendingDepositsFetcher.PendingContainers(ctx, latestEth1DataHeight)
// Deposits need to be received in order of merkle index root, so this has to make sure
// deposits are sorted from lowest to highest.
var pendingDeps []*dbpb.DepositContainer
for _, dep := range allPendingContainers {
if uint64(dep.Index) >= headState.Eth1DepositIndex() && uint64(dep.Index) < canonicalEth1Data.DepositCount {
pendingDeps = append(pendingDeps, dep)
}
}
for i := range pendingDeps {
// Don't construct merkle proof if the number of deposits is more than max allowed in block.
if uint64(i) == params.BeaconConfig().MaxDeposits {
break
}
pendingDeps[i].Deposit, err = constructMerkleProof(depositTrie, int(pendingDeps[i].Index), pendingDeps[i].Deposit)
if err != nil {
return nil, err
}
}
// Limit the return of pending deposits to not be more than max deposits allowed in block.
var pendingDeposits []*ethpb.Deposit
for i := 0; i < len(pendingDeps) && i < int(params.BeaconConfig().MaxDeposits); i++ {
pendingDeposits = append(pendingDeposits, pendingDeps[i].Deposit)
}
return pendingDeposits, nil
}
// canonicalEth1Data determines the canonical eth1data and eth1 block height to use for determining deposits.
func (vs *Server) canonicalEth1Data(ctx context.Context, beaconState *stateTrie.BeaconState, currentVote *ethpb.Eth1Data) (*ethpb.Eth1Data, *big.Int, error) {
var eth1BlockHash [32]byte
// Add in current vote, to get accurate vote tally
beaconState.AppendEth1DataVotes(currentVote)
hasSupport, err := blocks.Eth1DataHasEnoughSupport(beaconState, currentVote)
if err != nil {
return nil, nil, errors.Wrap(err, "could not determine if current eth1data vote has enough support")
}
var canonicalEth1Data *ethpb.Eth1Data
if hasSupport {
canonicalEth1Data = currentVote
eth1BlockHash = bytesutil.ToBytes32(currentVote.BlockHash)
} else {
canonicalEth1Data = beaconState.Eth1Data()
eth1BlockHash = bytesutil.ToBytes32(beaconState.Eth1Data().BlockHash)
}
_, latestEth1DataHeight, err := vs.Eth1BlockFetcher.BlockExists(ctx, eth1BlockHash)
if err != nil {
return nil, nil, errors.Wrap(err, "could not fetch eth1data height")
}
return canonicalEth1Data, latestEth1DataHeight, nil
}
// in case no vote for new eth1data vote considered best vote we
// default into returning the latest deposit root and the block
// hash of eth1 block hash that is FOLLOW_DISTANCE back from its
// latest block.
func (vs *Server) defaultEth1DataResponse(ctx context.Context, currentHeight *big.Int) (*ethpb.Eth1Data, error) {
eth1FollowDistance := int64(params.BeaconConfig().Eth1FollowDistance)
ancestorHeight := big.NewInt(0).Sub(currentHeight, big.NewInt(eth1FollowDistance))
blockHash, err := vs.Eth1BlockFetcher.BlockHashByHeight(ctx, ancestorHeight)
if err != nil {
return nil, errors.Wrap(err, "could not fetch ETH1_FOLLOW_DISTANCE ancestor")
}
// Fetch all historical deposits up to an ancestor height.
depositsTillHeight, depositRoot := vs.DepositFetcher.DepositsNumberAndRootAtHeight(ctx, ancestorHeight)
if depositsTillHeight == 0 {
return vs.ChainStartFetcher.ChainStartEth1Data(), nil
}
return ðpb.Eth1Data{
DepositRoot: depositRoot[:],
BlockHash: blockHash[:],
DepositCount: depositsTillHeight,
}, nil
}
// This filters the input attestations to return a list of valid attestations to be packaged inside a beacon block.
func (vs *Server) filterAttestationsForBlockInclusion(ctx context.Context, slot uint64, atts []*ethpb.Attestation) ([]*ethpb.Attestation, error) {
ctx, span := trace.StartSpan(ctx, "ProposerServer.filterAttestationsForBlockInclusion")
defer span.End()
validAtts := make([]*ethpb.Attestation, 0, len(atts))
inValidAtts := make([]*ethpb.Attestation, 0, len(atts))
bState, err := vs.HeadFetcher.HeadState(ctx)
if err != nil {
return nil, errors.New("could not head state from DB")
}
if bState.Slot() < slot {
bState, err = state.ProcessSlots(ctx, bState, slot)
if err != nil {
return nil, errors.Wrapf(err, "could not process slots up to %d", slot)
}
}
// TODO(3916): Insert optimizations to sort out the most profitable attestations
for i, att := range atts {
if i == int(params.BeaconConfig().MaxAttestations) {
break
}
if _, err := blocks.ProcessAttestation(ctx, bState, att); err != nil {
inValidAtts = append(inValidAtts, att)
continue
}
validAtts = append(validAtts, att)
}
if err := vs.deleteAttsInPool(inValidAtts); err != nil {
return nil, err
}
return validAtts, nil
}
// The input attestations are processed and seen by the node, this deletes them from pool
// so proposers don't include them in a block for the future.
func (vs *Server) deleteAttsInPool(atts []*ethpb.Attestation) error {
for _, att := range atts {
if helpers.IsAggregated(att) {
if err := vs.AttPool.DeleteAggregatedAttestation(att); err != nil {
return err
}
} else {
if err := vs.AttPool.DeleteUnaggregatedAttestation(att); err != nil {
return err
}
}
}
return nil
}
func constructMerkleProof(trie *trieutil.SparseMerkleTrie, index int, deposit *ethpb.Deposit) (*ethpb.Deposit, error) {
proof, err := trie.MerkleProof(index)
if err != nil {
return nil, errors.Wrapf(err, "could not generate merkle proof for deposit at index %d", index)
}
// For every deposit, we construct a Merkle proof using the powchain service's
// in-memory deposits trie, which is updated only once the state's LatestETH1Data
// property changes during a state transition after a voting period.
deposit.Proof = proof
return deposit, nil
}