Skip to content

Commit

Permalink
Cached head root retrieve from DB on miss (prysmaticlabs#4552)
Browse files Browse the repository at this point in the history
  • Loading branch information
terencechain authored and cryptomental committed Feb 28, 2020
1 parent 437afc5 commit 87a4a97
Show file tree
Hide file tree
Showing 12 changed files with 87 additions and 29 deletions.
22 changes: 18 additions & 4 deletions beacon-chain/blockchain/chain_info.go
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/gogo/protobuf/proto"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/go-ssz"
"github.com/prysmaticlabs/prysm/beacon-chain/core/epoch/precompute"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
Expand All @@ -29,7 +30,7 @@ type GenesisTimeFetcher interface {
// directly retrieves head related data.
type HeadFetcher interface {
HeadSlot() uint64
HeadRoot() []byte
HeadRoot(ctx context.Context) ([]byte, error)
HeadBlock() *ethpb.SignedBeaconBlock
HeadState(ctx context.Context) (*pb.BeaconState, error)
HeadValidatorsIndices(epoch uint64) ([]uint64, error)
Expand Down Expand Up @@ -109,16 +110,29 @@ func (s *Service) HeadSlot() uint64 {
}

// HeadRoot returns the root of the head of the chain.
func (s *Service) HeadRoot() []byte {
func (s *Service) HeadRoot(ctx context.Context) ([]byte, error) {
s.headLock.RLock()
defer s.headLock.RUnlock()

root := s.canonicalRoots[s.headSlot]
if len(root) != 0 {
return root
return root, nil
}

return params.BeaconConfig().ZeroHash[:]
b, err := s.beaconDB.HeadBlock(ctx)
if err != nil {
return nil, err
}
if b == nil {
return params.BeaconConfig().ZeroHash[:], nil
}

r, err := ssz.HashTreeRoot(b.Block)
if err != nil {
return nil, err
}

return r[:], nil
}

// HeadBlock returns the head block of the chain.
Expand Down
4 changes: 3 additions & 1 deletion beacon-chain/blockchain/chain_info_norace_test.go
Expand Up @@ -39,7 +39,9 @@ func TestHeadRoot_DataRace(t *testing.T) {
[32]byte{},
)
}()
s.HeadRoot()
if _, err := s.HeadRoot(context.Background()); err != nil {
t.Fatal(err)
}
}

func TestHeadBlock_DataRace(t *testing.T) {
Expand Down
14 changes: 11 additions & 3 deletions beacon-chain/blockchain/chain_info_test.go
Expand Up @@ -33,7 +33,11 @@ func TestHeadRoot_Nil(t *testing.T) {
db := testDB.SetupDB(t)
defer testDB.TeardownDB(t, db)
c := setupBeaconChain(t, db)
if !bytes.Equal(c.HeadRoot(), params.BeaconConfig().ZeroHash[:]) {
headRoot, err := c.HeadRoot(context.Background())
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(headRoot, params.BeaconConfig().ZeroHash[:]) {
t.Error("Incorrect pre chain start value")
}
}
Expand Down Expand Up @@ -131,8 +135,12 @@ func TestHeadRoot_CanRetrieve(t *testing.T) {
c := &Service{canonicalRoots: make(map[uint64][]byte)}
c.headSlot = 100
c.canonicalRoots[c.headSlot] = []byte{'A'}
if !bytes.Equal([]byte{'A'}, c.HeadRoot()) {
t.Errorf("Wanted head root: %v, got: %d", []byte{'A'}, c.HeadRoot())
headRoot, err := c.HeadRoot(context.Background())
if err != nil {
t.Fatal(err)
}
if !bytes.Equal([]byte{'A'}, headRoot) {
t.Errorf("Wanted head root: %v, got: %d", []byte{'A'}, headRoot)
}
}

Expand Down
5 changes: 0 additions & 5 deletions beacon-chain/blockchain/metrics.go
Expand Up @@ -15,10 +15,6 @@ var (
Name: "beacon_head_slot",
Help: "Slot of the head block of the beacon chain",
})
beaconHeadRoot = promauto.NewGauge(prometheus.GaugeOpts{
Name: "beacon_head_root",
Help: "Root of the head block of the beacon chain, it returns the lowest 8 bytes interpreted as little endian",
})
competingAtts = promauto.NewCounter(prometheus.CounterOpts{
Name: "competing_attestations",
Help: "The # of attestations received and processed from a competing chain",
Expand Down Expand Up @@ -60,7 +56,6 @@ var (
func (s *Service) reportSlotMetrics(currentSlot uint64) {
beaconSlot.Set(float64(currentSlot))
beaconHeadSlot.Set(float64(s.HeadSlot()))
beaconHeadRoot.Set(float64(bytesutil.ToLowInt64(s.HeadRoot())))
if s.headState != nil {
headFinalizedEpoch.Set(float64(s.headState.FinalizedCheckpoint.Epoch))
headFinalizedRoot.Set(float64(bytesutil.ToLowInt64(s.headState.FinalizedCheckpoint.Root)))
Expand Down
6 changes: 5 additions & 1 deletion beacon-chain/blockchain/receive_attestation.go
Expand Up @@ -42,7 +42,11 @@ func (s *Service) ReceiveAttestationNoPubsub(ctx context.Context, att *ethpb.Att
return errors.Wrap(err, "could not get head from fork choice service")
}
// Only save head if it's different than the current head.
if !bytes.Equal(headRoot, s.HeadRoot()) {
cachedHeadRoot, err := s.HeadRoot(ctx)
if err != nil {
return errors.Wrap(err, "could not get head root from cache")
}
if !bytes.Equal(headRoot, cachedHeadRoot) {
signed, err := s.beaconDB.Block(ctx, bytesutil.ToBytes32(headRoot))
if err != nil {
return errors.Wrap(err, "could not compute state from block head")
Expand Down
22 changes: 17 additions & 5 deletions beacon-chain/blockchain/receive_block.go
Expand Up @@ -87,7 +87,11 @@ func (s *Service) ReceiveBlockNoPubsub(ctx context.Context, block *ethpb.SignedB
}

// Only save head if it's different than the current head.
if !bytes.Equal(headRoot, s.HeadRoot()) {
cachedHeadRoot, err := s.HeadRoot(ctx)
if err != nil {
return errors.Wrap(err, "could not get head root from cache")
}
if !bytes.Equal(headRoot, cachedHeadRoot) {
signedHeadBlock, err := s.beaconDB.Block(ctx, bytesutil.ToBytes32(headRoot))
if err != nil {
return errors.Wrap(err, "could not compute state from block head")
Expand Down Expand Up @@ -152,8 +156,11 @@ func (s *Service) ReceiveBlockNoPubsubForkchoice(ctx context.Context, block *eth
if err != nil {
return errors.Wrap(err, "could not get signing root on received block")
}

if !bytes.Equal(root[:], s.HeadRoot()) {
cachedHeadRoot, err := s.HeadRoot(ctx)
if err != nil {
return errors.Wrap(err, "could not get head root from cache")
}
if !bytes.Equal(root[:], cachedHeadRoot) {
if err := s.saveHead(ctx, blockCopy, root); err != nil {
return errors.Wrap(err, "could not save head")
}
Expand Down Expand Up @@ -199,16 +206,21 @@ func (s *Service) ReceiveBlockNoVerify(ctx context.Context, block *ethpb.SignedB
return errors.Wrap(err, "could not get signing root on received blockCopy")
}

cachedHeadRoot, err := s.HeadRoot(ctx)
if err != nil {
return errors.Wrap(err, "could not get head root from cache")
}

if featureconfig.Get().InitSyncCacheState {
if !bytes.Equal(root[:], s.HeadRoot()) {
if !bytes.Equal(root[:], cachedHeadRoot) {
if err := s.saveHeadNoDB(ctx, blockCopy, root); err != nil {
err := errors.Wrap(err, "could not save head")
traceutil.AnnotateError(span, err)
return err
}
}
} else {
if !bytes.Equal(root[:], s.HeadRoot()) {
if !bytes.Equal(root[:], cachedHeadRoot) {
if err := s.saveHead(ctx, blockCopy, root); err != nil {
err := errors.Wrap(err, "could not save head")
traceutil.AnnotateError(span, err)
Expand Down
6 changes: 5 additions & 1 deletion beacon-chain/blockchain/receive_block_test.go
Expand Up @@ -97,7 +97,11 @@ func TestReceiveReceiveBlockNoPubsub_CanSaveHeadInfo(t *testing.T) {
t.Fatal(err)
}

if !bytes.Equal(r[:], chainService.HeadRoot()) {
headRoot, err := chainService.HeadRoot(context.Background())
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(r[:], headRoot) {
t.Error("Incorrect head root saved")
}

Expand Down
6 changes: 5 additions & 1 deletion beacon-chain/blockchain/service_test.go
Expand Up @@ -332,7 +332,11 @@ func TestChainService_InitializeChainInfo(t *testing.T) {
if headBlock.Block.Slot != c.HeadSlot() {
t.Error("head slot incorrect")
}
if !bytes.Equal(headRoot[:], c.HeadRoot()) {
r, err := c.HeadRoot(context.Background())
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(headRoot[:], r) {
t.Error("head slot incorrect")
}
if c.genesisRoot != genesisRoot {
Expand Down
4 changes: 2 additions & 2 deletions beacon-chain/blockchain/testing/mock.go
Expand Up @@ -128,8 +128,8 @@ func (ms *ChainService) HeadSlot() uint64 {
}

// HeadRoot mocks HeadRoot method in chain service.
func (ms *ChainService) HeadRoot() []byte {
return ms.Root
func (ms *ChainService) HeadRoot(ctx context.Context) ([]byte, error) {
return ms.Root, nil

}

Expand Down
5 changes: 4 additions & 1 deletion beacon-chain/rpc/validator/attester.go
Expand Up @@ -60,7 +60,10 @@ func (vs *Server) GetAttestationData(ctx context.Context, req *ethpb.Attestation
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not retrieve head state: %v", err)
}
headRoot := vs.HeadFetcher.HeadRoot()
headRoot, err := vs.HeadFetcher.HeadRoot(ctx)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not retrieve head root: %v", err)
}

if helpers.CurrentEpoch(headState) < helpers.SlotToEpoch(req.Slot) {
headState, err = state.ProcessSlots(ctx, headState, helpers.StartSlot(helpers.SlotToEpoch(req.Slot)))
Expand Down
6 changes: 4 additions & 2 deletions beacon-chain/rpc/validator/proposer.go
Expand Up @@ -36,8 +36,10 @@ func (vs *Server) GetBlock(ctx context.Context, req *ethpb.BlockRequest) (*ethpb
}

// Retrieve the parent block as the current head of the canonical chain.
parentRoot := vs.HeadFetcher.HeadRoot()

parentRoot, err := vs.HeadFetcher.HeadRoot(ctx)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not retrieve head root: %v", err)
}
eth1Data, err := vs.eth1Data(ctx, req.Slot)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not get ETH1 data: %v", err)
Expand Down
16 changes: 13 additions & 3 deletions beacon-chain/sync/rpc_status.go
Expand Up @@ -57,11 +57,16 @@ func (r *Service) sendRPCStatusRequest(ctx context.Context, id peer.ID) error {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

headRoot, err := r.chain.HeadRoot(ctx)
if err != nil {
return err
}

resp := &pb.Status{
HeadForkVersion: r.chain.CurrentFork().CurrentVersion,
FinalizedRoot: r.chain.FinalizedCheckpt().Root,
FinalizedEpoch: r.chain.FinalizedCheckpt().Epoch,
HeadRoot: r.chain.HeadRoot(),
HeadRoot: headRoot,
HeadSlot: r.chain.HeadSlot(),
}
stream, err := r.p2p.Send(ctx, resp, id)
Expand Down Expand Up @@ -130,18 +135,23 @@ func (r *Service) statusRPCHandler(ctx context.Context, msg interface{}, stream
}
r.p2p.Peers().SetChainState(stream.Conn().RemotePeer(), m)

headRoot, err := r.chain.HeadRoot(ctx)
if err != nil {
return err
}

resp := &pb.Status{
HeadForkVersion: r.chain.CurrentFork().CurrentVersion,
FinalizedRoot: r.chain.FinalizedCheckpt().Root,
FinalizedEpoch: r.chain.FinalizedCheckpt().Epoch,
HeadRoot: r.chain.HeadRoot(),
HeadRoot: headRoot,
HeadSlot: r.chain.HeadSlot(),
}

if _, err := stream.Write([]byte{responseCodeSuccess}); err != nil {
log.WithError(err).Error("Failed to write to stream")
}
_, err := r.p2p.Encoding().EncodeWithLength(stream, resp)
_, err = r.p2p.Encoding().EncodeWithLength(stream, resp)

return err
}
Expand Down

0 comments on commit 87a4a97

Please sign in to comment.