Skip to content

Commit

Permalink
Migrate SubmitAggregateAndProof (prysmaticlabs#4951)
Browse files Browse the repository at this point in the history
* Remove unused services, mark everything as deprecated, regen pb.go
* remove some code from cluster pk manager, gazelle
* goimports
* remove mocks
* Update WORKSPACE, deprecate old method, stub new method
* Move implementation to ethereumapis definition
* gofmt
* Add TODO for prysmaticlabs#4952
* Merge branch 'master' into migrate-submitaggregateandproof
* Update validator client to use new submit aggregate and proof method
* Merge branch 'migrate-submitaggregateandproof' of github.com:prysmaticlabs/prysm into migrate-submitaggregateandproof
* gaz
* rename
* rename
* Merge refs/heads/master into migrate-submitaggregateandproof
* Merge refs/heads/master into migrate-submitaggregateandproof
* Merge refs/heads/master into migrate-submitaggregateandproof
* Merge refs/heads/master into migrate-submitaggregateandproof
* Merge refs/heads/master into migrate-submitaggregateandproof
* fix tests
* Merge branch 'migrate-submitaggregateandproof' of github.com:prysmaticlabs/prysm into migrate-submitaggregateandproof
  • Loading branch information
prestonvanloon authored and cryptomental committed Feb 28, 2020
1 parent daa2637 commit 398cd64
Show file tree
Hide file tree
Showing 16 changed files with 886 additions and 519 deletions.
2 changes: 1 addition & 1 deletion WORKSPACE
Expand Up @@ -1297,7 +1297,7 @@ go_repository(
go_repository(
name = "com_github_prysmaticlabs_ethereumapis",
commit = "53ccc146f7f488c5c7634530057f4aedf510a9ac",
commit = "fca4d6f69bedb8615c2fc916d1a68f2692285caa",
importpath = "github.com/prysmaticlabs/ethereumapis",
patch_args = ["-p1"],
patches = [
Expand Down
36 changes: 2 additions & 34 deletions beacon-chain/rpc/aggregator/BUILD.bazel
@@ -1,46 +1,14 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "go_default_library",
srcs = ["server.go"],
importpath = "github.com/prysmaticlabs/prysm/beacon-chain/rpc/aggregator",
visibility = ["//beacon-chain:__subpackages__"],
deps = [
"//beacon-chain/blockchain:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/db:go_default_library",
"//beacon-chain/operations/attestations:go_default_library",
"//beacon-chain/p2p:go_default_library",
"//beacon-chain/sync:go_default_library",
"//beacon-chain/rpc/validator:go_default_library",
"//proto/beacon/rpc/v1:go_default_library",
"@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@io_opencensus_go//trace:go_default_library",
"@org_golang_google_grpc//codes:go_default_library",
"@org_golang_google_grpc//status:go_default_library",
],
)

go_test(
name = "go_default_test",
srcs = ["server_test.go"],
embed = [":go_default_library"],
deps = [
"//beacon-chain/blockchain/testing:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/db/testing:go_default_library",
"//beacon-chain/operations/attestations:go_default_library",
"//beacon-chain/p2p/testing:go_default_library",
"//beacon-chain/state:go_default_library",
"//beacon-chain/sync/initial-sync/testing:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",
"//proto/beacon/rpc/v1:go_default_library",
"//shared/attestationutil:go_default_library",
"//shared/bls:go_default_library",
"//shared/params:go_default_library",
"//shared/testutil:go_default_library",
"@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library",
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
"@com_github_prysmaticlabs_go_ssz//:go_default_library",
],
)
89 changes: 13 additions & 76 deletions beacon-chain/rpc/aggregator/server.go
Expand Up @@ -4,100 +4,37 @@ import (
"context"

ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
"github.com/prysmaticlabs/prysm/beacon-chain/operations/attestations"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/beacon-chain/sync"
"github.com/prysmaticlabs/prysm/beacon-chain/rpc/validator"
pb "github.com/prysmaticlabs/prysm/proto/beacon/rpc/v1"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

var log logrus.FieldLogger

func init() {
log = logrus.WithField("prefix", "rpc/aggregator")
}

// Server defines a server implementation of the gRPC aggregator service.
// Deprecated: Do not use.
type Server struct {
BeaconDB db.ReadOnlyDatabase
HeadFetcher blockchain.HeadFetcher
SyncChecker sync.Checker
AttPool attestations.Pool
P2p p2p.Broadcaster
ValidatorServer *validator.Server
}

// SubmitAggregateAndProof is called by a validator when its assigned to be an aggregator.
// The beacon node will broadcast aggregated attestation and proof on the aggregator's behavior.
// Deprecated: Use github.com/prysmaticlabs/prysm/beacon-chain/rpc/validator.SubmitAggregateAndProof.
// TODO(4952): Delete this method.
func (as *Server) SubmitAggregateAndProof(ctx context.Context, req *pb.AggregationRequest) (*pb.AggregationResponse, error) {
ctx, span := trace.StartSpan(ctx, "AggregatorServer.SubmitAggregation")
defer span.End()
span.AddAttributes(trace.Int64Attribute("slot", int64(req.Slot)))

if as.SyncChecker.Syncing() {
return nil, status.Errorf(codes.Unavailable, "Syncing to latest head, not ready to respond")
}

validatorIndex, exists, err := as.BeaconDB.ValidatorIndex(ctx, req.PublicKey)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not get validator index from DB: %v", err)
}
if !exists {
return nil, status.Error(codes.Internal, "Could not locate validator index in DB")
request := &ethpb.AggregationRequest{
Slot: req.Slot,
CommitteeIndex: req.CommitteeIndex,
PublicKey: req.PublicKey,
SlotSignature: req.SlotSignature,
}

epoch := helpers.SlotToEpoch(req.Slot)
activeValidatorIndices, err := as.HeadFetcher.HeadValidatorsIndices(epoch)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not get validators: %v", err)
}
seed, err := as.HeadFetcher.HeadSeed(epoch)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not get seed: %v", err)
}
committee, err := helpers.BeaconCommittee(activeValidatorIndices, seed, req.Slot, req.CommitteeIndex)
// Passthrough request to non-deprecated method.
res, err := as.ValidatorServer.SubmitAggregateAndProof(ctx, request)
if err != nil {
return nil, err
}

// Check if the validator is an aggregator
isAggregator, err := helpers.IsAggregator(uint64(len(committee)), req.SlotSignature)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not get aggregator status: %v", err)
}
if !isAggregator {
return nil, status.Errorf(codes.InvalidArgument, "Validator is not an aggregator")
}

// Retrieve the unaggregated attestation from pool.
aggregatedAtts := as.AttPool.AggregatedAttestationsBySlotIndex(req.Slot, req.CommitteeIndex)

for _, aggregatedAtt := range aggregatedAtts {
if ctx.Err() != nil {
return nil, ctx.Err()
}
if helpers.IsAggregated(aggregatedAtt) {
if err := as.P2p.Broadcast(ctx, &ethpb.AggregateAttestationAndProof{
AggregatorIndex: validatorIndex,
SelectionProof: req.SlotSignature,
Aggregate: aggregatedAtt,
}); err != nil {
return nil, status.Errorf(codes.Internal, "Could not broadcast aggregated attestation: %v", err)
}

log.WithFields(logrus.Fields{
"slot": req.Slot,
"committeeIndex": req.CommitteeIndex,
"validatorIndex": validatorIndex,
"aggregatedCount": aggregatedAtt.AggregationBits.Count(),
}).Debug("Broadcasting aggregated attestation and proof")
}
}

return &pb.AggregationResponse{}, nil
return &pb.AggregationResponse{Root: res.AttestationDataRoot}, nil
}
8 changes: 1 addition & 7 deletions beacon-chain/rpc/service.go
Expand Up @@ -258,13 +258,7 @@ func (s *Service) Start() {
BlockNotifier: s.blockNotifier,
AttestationNotifier: s.operationNotifier,
}
aggregatorServer := &aggregator.Server{
BeaconDB: s.beaconDB,
HeadFetcher: s.headFetcher,
SyncChecker: s.syncService,
AttPool: s.attestationsPool,
P2p: s.p2p,
}
aggregatorServer := &aggregator.Server{ValidatorServer: validatorServer}
pb.RegisterAggregatorServiceServer(s.grpcServer, aggregatorServer)
ethpb.RegisterNodeServer(s.grpcServer, nodeServer)
ethpb.RegisterBeaconChainServer(s.grpcServer, beaconChainServer)
Expand Down
2 changes: 2 additions & 0 deletions beacon-chain/rpc/validator/BUILD.bazel
Expand Up @@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"aggregator.go",
"assignments.go",
"attester.go",
"exit.go",
Expand Down Expand Up @@ -57,6 +58,7 @@ go_library(
go_test(
name = "go_default_test",
srcs = [
"aggregator_test.go",
"assignments_test.go",
"attester_test.go",
"exit_test.go",
Expand Down
82 changes: 82 additions & 0 deletions beacon-chain/rpc/validator/aggregator.go
@@ -0,0 +1,82 @@
package validator

import (
"context"

ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

// SubmitAggregateAndProof is called by a validator when its assigned to be an aggregator.
// The beacon node will broadcast aggregated attestation and proof on the aggregator's behavior.
func (as *Server) SubmitAggregateAndProof(ctx context.Context, req *ethpb.AggregationRequest) (*ethpb.AggregationResponse, error) {
ctx, span := trace.StartSpan(ctx, "AggregatorServer.SubmitAggregation")
defer span.End()
span.AddAttributes(trace.Int64Attribute("slot", int64(req.Slot)))

if as.SyncChecker.Syncing() {
return nil, status.Errorf(codes.Unavailable, "Syncing to latest head, not ready to respond")
}

validatorIndex, exists, err := as.BeaconDB.ValidatorIndex(ctx, req.PublicKey)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not get validator index from DB: %v", err)
}
if !exists {
return nil, status.Error(codes.Internal, "Could not locate validator index in DB")
}

epoch := helpers.SlotToEpoch(req.Slot)
activeValidatorIndices, err := as.HeadFetcher.HeadValidatorsIndices(epoch)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not get validators: %v", err)
}
seed, err := as.HeadFetcher.HeadSeed(epoch)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not get seed: %v", err)
}
committee, err := helpers.BeaconCommittee(activeValidatorIndices, seed, req.Slot, req.CommitteeIndex)
if err != nil {
return nil, err
}

// Check if the validator is an aggregator
isAggregator, err := helpers.IsAggregator(uint64(len(committee)), req.SlotSignature)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not get aggregator status: %v", err)
}
if !isAggregator {
return nil, status.Errorf(codes.InvalidArgument, "Validator is not an aggregator")
}

// Retrieve the unaggregated attestation from pool.
aggregatedAtts := as.AttPool.AggregatedAttestationsBySlotIndex(req.Slot, req.CommitteeIndex)

for _, aggregatedAtt := range aggregatedAtts {
if ctx.Err() != nil {
return nil, ctx.Err()
}
if helpers.IsAggregated(aggregatedAtt) {
if err := as.P2P.Broadcast(ctx, &ethpb.AggregateAttestationAndProof{
AggregatorIndex: validatorIndex,
SelectionProof: req.SlotSignature,
Aggregate: aggregatedAtt,
}); err != nil {
return nil, status.Errorf(codes.Internal, "Could not broadcast aggregated attestation: %v", err)
}

log.WithFields(logrus.Fields{
"slot": req.Slot,
"committeeIndex": req.CommitteeIndex,
"validatorIndex": validatorIndex,
"aggregatedCount": aggregatedAtt.AggregationBits.Count(),
}).Debug("Broadcasting aggregated attestation and proof")
}
}

return &ethpb.AggregationResponse{}, nil
}

0 comments on commit 398cd64

Please sign in to comment.