diff --git a/app/app.go b/app/app.go index cb1eb6a15..188f62617 100644 --- a/app/app.go +++ b/app/app.go @@ -555,7 +555,7 @@ func newETH2Client(ctx context.Context, conf Config, life *lifecycle.Manager, } if conf.SimnetBMock { // Configure the beacon mock. - const dutyFactor = 100 // Duty factor spreads duties deterministicly in an epoch. + const dutyFactor = 100 // Duty factor spreads duties deterministically in an epoch. opts := []beaconmock.Option{ beaconmock.WithSlotDuration(time.Second), beaconmock.WithDeterministicAttesterDuties(dutyFactor), diff --git a/app/eth2wrap/eth2wrap_gen.go b/app/eth2wrap/eth2wrap_gen.go index d7d2208f3..b5c2327ff 100644 --- a/app/eth2wrap/eth2wrap_gen.go +++ b/app/eth2wrap/eth2wrap_gen.go @@ -42,6 +42,7 @@ type Client interface { eth2client.AttestationsSubmitter eth2client.AttesterDutiesProvider eth2client.BeaconBlockProposalProvider + eth2client.BeaconBlockRootProvider eth2client.BeaconBlockSubmitter eth2client.BeaconCommitteeSubscriptionsSubmitter eth2client.BlindedBeaconBlockProposalProvider @@ -367,6 +368,26 @@ func (m multi) BeaconBlockProposal(ctx context.Context, slot phase0.Slot, randao return res0, err } +// BeaconBlockRoot fetches a block's root given a block ID. +// Note this endpoint is cached in go-eth2-client. +func (m multi) BeaconBlockRoot(ctx context.Context, blockID string) (*phase0.Root, error) { + const label = "beacon_block_root" + + res0, err := provide(ctx, m.clients, + func(ctx context.Context, cl Client) (*phase0.Root, error) { + return cl.BeaconBlockRoot(ctx, blockID) + }, + nil, + ) + + if err != nil { + incError(label) + err = errors.Wrap(err, "eth2wrap") + } + + return res0, err +} + // SubmitBeaconBlock submits a beacon block. func (m multi) SubmitBeaconBlock(ctx context.Context, block *spec.VersionedSignedBeaconBlock) error { const label = "submit_beacon_block" diff --git a/app/eth2wrap/genwrap/genwrap.go b/app/eth2wrap/genwrap/genwrap.go index c20dc19d6..4d8382fa9 100644 --- a/app/eth2wrap/genwrap/genwrap.go +++ b/app/eth2wrap/genwrap/genwrap.go @@ -95,6 +95,7 @@ type Client interface { "AttestationsSubmitter": true, "AttesterDutiesProvider": true, "BeaconBlockProposalProvider": true, + "BeaconBlockRootProvider": false, "BeaconBlockSubmitter": true, "BeaconCommitteeSubscriptionsSubmitter": true, "BlindedBeaconBlockProposalProvider": true, diff --git a/app/simnet_test.go b/app/simnet_test.go index 8f6c3a2ca..7300e53d2 100644 --- a/app/simnet_test.go +++ b/app/simnet_test.go @@ -155,6 +155,15 @@ func TestSimnetNoNetwork_WithBuilderRegistrationMockVCs(t *testing.T) { testSimnet(t, args, expect) } +func TestSimnetNoNetwork_WithSyncCommitteeMockVCs(t *testing.T) { + args := newSimnetArgs(t) + args.BMockOpts = append(args.BMockOpts, beaconmock.WithSyncCommitteeDuties()) + args.BMockOpts = append(args.BMockOpts, beaconmock.WithNoAttesterDuties()) + args.BMockOpts = append(args.BMockOpts, beaconmock.WithNoProposerDuties()) + expect := newSimnetExpect(args.N, core.DutySyncMessage) + testSimnet(t, args, expect) +} + type simnetArgs struct { N int VMocks []bool diff --git a/app/vmock.go b/app/vmock.go index 5322a11ae..727549ec1 100644 --- a/app/vmock.go +++ b/app/vmock.go @@ -45,19 +45,49 @@ func wireValidatorMock(conf Config, pubshares []eth2p0.BLSPubKey, sched core.Sch return err } - // Prepare attestations when slots tick. + onStartup := true sched.SubscribeSlots(func(ctx context.Context, slot core.Slot) error { - vMockWrap(ctx, slot.Slot, func(ctx context.Context, state vMockState) error { + // Prepare attestations when slots tick. + vMockWrap(ctx, slot.Slot, slot.Epoch(), func(ctx context.Context, state vMockState) error { return state.Attester.Prepare(ctx) }) + // Prepare sync committee message when epoch tick. + if onStartup || slot.FirstInEpoch() { + vMockWrap(ctx, slot.Slot, slot.Epoch(), func(ctx context.Context, state vMockState) error { + // Either call if it is first slot in epoch or on charon startup. + return state.SyncCommMember.PrepareEpoch(ctx) + }) + } + + onStartup = false + + // Prepare sync committee selections when slots tick. + vMockWrap(ctx, slot.Slot, slot.Epoch(), func(ctx context.Context, state vMockState) error { + // Either call if it is first slot in epoch or on charon startup. + return state.SyncCommMember.PrepareSlot(ctx, eth2p0.Slot(slot.Slot)) + }) + + // Submit sync committee message 1/3 into the slot. + vMockWrap(ctx, slot.Slot, slot.Epoch(), func(ctx context.Context, state vMockState) error { + thirdDuration := slot.SlotDuration / 3 + thirdTime := slot.Time.Add(thirdDuration) + + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(time.Until(thirdTime)): + return state.SyncCommMember.Message(ctx, eth2p0.Slot(slot.Slot)) + } + }) + return nil }) // Handle duties when triggered. sched.SubscribeDuties(func(ctx context.Context, duty core.Duty, _ core.DutyDefinitionSet) error { - vMockWrap(ctx, duty.Slot, func(ctx context.Context, state vMockState) error { - return handleVMockDuty(ctx, duty, state.Eth2Cl, state.SignFunc, pubshares, state.Attester) + vMockWrap(ctx, duty.Slot, 0, func(ctx context.Context, state vMockState) error { + return handleVMockDuty(ctx, duty, state.Eth2Cl, state.SignFunc, pubshares, state.Attester, state.SyncCommMember) }) return nil @@ -66,7 +96,7 @@ func wireValidatorMock(conf Config, pubshares []eth2p0.BLSPubKey, sched core.Sch go func() { // TODO(corver): Improve registrations to use lock file and trigger on epoch transitions. for registration := range conf.TestConfig.BuilderRegistration { - vMockWrap(context.Background(), 0, func(ctx context.Context, state vMockState) error { + vMockWrap(context.Background(), 0, 0, func(ctx context.Context, state vMockState) error { return validatormock.Register(ctx, state.Eth2Cl, state.SignFunc, registration, pubshares[0]) }) } @@ -77,16 +107,17 @@ func wireValidatorMock(conf Config, pubshares []eth2p0.BLSPubKey, sched core.Sch // vMockState is the current validator mock state. type vMockState struct { - Eth2Cl eth2wrap.Client - SignFunc validatormock.SignFunc - Attester *validatormock.SlotAttester // Changes every slot + Eth2Cl eth2wrap.Client + SignFunc validatormock.SignFunc + Attester *validatormock.SlotAttester // Changes every slot + SyncCommMember *validatormock.SyncCommMember } -// vMockCallback is a validator mock callback function that has access to latest state. +// vMockCallback is a validator mock callback function that has access to the latest state. type vMockCallback func(context.Context, vMockState) error // newVMockWrapper returns a stateful validator mock wrapper function. -func newVMockWrapper(conf Config, pubshares []eth2p0.BLSPubKey) (func(context.Context, int64, vMockCallback), error) { +func newVMockWrapper(conf Config, pubshares []eth2p0.BLSPubKey) (func(ctx context.Context, slot int64, epoch int64, callback vMockCallback), error) { // Immutable state and providers. signFunc, err := newVMockSigner(conf, pubshares) if err != nil { @@ -97,11 +128,12 @@ func newVMockWrapper(conf Config, pubshares []eth2p0.BLSPubKey) (func(context.Co // Mutable state var ( - mu sync.Mutex - attester = new(validatormock.SlotAttester) + mu sync.Mutex + attester = new(validatormock.SlotAttester) + syncCommMem = new(validatormock.SyncCommMember) ) - return func(ctx context.Context, slot int64, fn vMockCallback) { + return func(ctx context.Context, slot, epoch int64, fn vMockCallback) { mu.Lock() defer mu.Unlock() @@ -117,11 +149,15 @@ func newVMockWrapper(conf Config, pubshares []eth2p0.BLSPubKey) (func(context.Co if slot != 0 && attester.Slot() != eth2p0.Slot(slot) { attester = validatormock.NewSlotAttester(eth2Cl, eth2p0.Slot(slot), signFunc, pubshares) } + if epoch != 0 && syncCommMem.Epoch() != eth2p0.Epoch(epoch) { + syncCommMem = validatormock.NewSyncCommMember(eth2Cl, eth2p0.Epoch(epoch), signFunc, pubshares) + } state := vMockState{ - Eth2Cl: eth2Cl, - SignFunc: signFunc, - Attester: attester, + Eth2Cl: eth2Cl, + SignFunc: signFunc, + Attester: attester, + SyncCommMember: syncCommMem, } // Validator mock calls are async @@ -130,7 +166,7 @@ func newVMockWrapper(conf Config, pubshares []eth2p0.BLSPubKey) (func(context.Co defer cancel() err := fn(ctx2, state) - if err != nil && ctx.Err() == nil { // Only log if parrent context wasn't closed. + if err != nil && ctx.Err() == nil { // Only log if parent context wasn't closed. log.Error(ctx, "Validator mock error", err) return } @@ -212,6 +248,7 @@ func newVMockSigner(conf Config, pubshares []eth2p0.BLSPubKey) (validatormock.Si // handleVMockDuty calls appropriate validator mock function to attestation and block proposal. func handleVMockDuty(ctx context.Context, duty core.Duty, eth2Cl eth2wrap.Client, signer validatormock.SignFunc, pubshares []eth2p0.BLSPubKey, attester *validatormock.SlotAttester, + syncCommMember *validatormock.SyncCommMember, ) error { switch duty.Type { case core.DutyAttester: @@ -239,6 +276,12 @@ func handleVMockDuty(ctx context.Context, duty core.Duty, eth2Cl eth2wrap.Client return errors.Wrap(err, "mock builder proposal failed") } log.Info(ctx, "Mock blinded block proposal submitted to validatorapi", z.I64("slot", duty.Slot)) + case core.DutySyncContribution: + err := syncCommMember.Aggregate(ctx, eth2p0.Slot(duty.Slot)) + if err != nil { + return errors.Wrap(err, "mock sync contribution failed") + } + log.Info(ctx, "Mock sync contribution submitted to validatorapi", z.I64("slot", duty.Slot)) default: return errors.New("invalid duty type") } diff --git a/core/validatorapi/router.go b/core/validatorapi/router.go index a7d9e624a..779a848f3 100644 --- a/core/validatorapi/router.go +++ b/core/validatorapi/router.go @@ -350,6 +350,11 @@ func proposerDuties(p eth2client.ProposerDutiesProvider) handlerFunc { return nil, err } + // response.data cannot be nil, it leads to NullPointerException in teku. + if len(data) == 0 { + data = []*eth2v1.ProposerDuty{} + } + return proposerDutiesResponse{ DependentRoot: stubRoot(epoch), // TODO(corver): Fill this properly Data: data, @@ -375,6 +380,11 @@ func attesterDuties(p eth2client.AttesterDutiesProvider) handlerFunc { return nil, err } + // response.data cannot be nil, it leads to NullPointerException in teku. + if len(data) == 0 { + data = []*eth2v1.AttesterDuty{} + } + return attesterDutiesResponse{ DependentRoot: stubRoot(epoch), // TODO(corver): Fill this properly Data: data, @@ -400,6 +410,11 @@ func syncCommitteeDuties(p eth2client.SyncCommitteeDutiesProvider) handlerFunc { return nil, err } + // response.data cannot be nil, it leads to NullPointerException in teku. + if len(data) == 0 { + data = []*eth2v1.SyncCommitteeDuty{} + } + return syncCommitteeDutiesResponse{Data: data}, nil } } diff --git a/testutil/beaconmock/beaconmock.go b/testutil/beaconmock/beaconmock.go index 71db81d36..b6d42734f 100644 --- a/testutil/beaconmock/beaconmock.go +++ b/testutil/beaconmock/beaconmock.go @@ -127,6 +127,7 @@ type Mock struct { BlindedBeaconBlockProposalFunc func(ctx context.Context, slot eth2p0.Slot, randaoReveal eth2p0.BLSSignature, graffiti []byte) (*eth2api.VersionedBlindedBeaconBlock, error) BeaconCommitteesFunc func(ctx context.Context, stateID string) ([]*eth2v1.BeaconCommittee, error) BeaconBlockProposalFunc func(ctx context.Context, slot eth2p0.Slot, randaoReveal eth2p0.BLSSignature, graffiti []byte) (*spec.VersionedBeaconBlock, error) + BeaconBlockRootFunc func(ctx context.Context, blockID string) (*eth2p0.Root, error) ProposerDutiesFunc func(context.Context, eth2p0.Epoch, []eth2p0.ValidatorIndex) ([]*eth2v1.ProposerDuty, error) SubmitAttestationsFunc func(context.Context, []*eth2p0.Attestation) error SubmitBeaconBlockFunc func(context.Context, *spec.VersionedSignedBeaconBlock) error @@ -179,6 +180,10 @@ func (m Mock) BeaconBlockProposal(ctx context.Context, slot eth2p0.Slot, randaoR return m.BeaconBlockProposalFunc(ctx, slot, randaoReveal, graffiti) } +func (m Mock) BeaconBlockRoot(ctx context.Context, blockID string) (*eth2p0.Root, error) { + return m.BeaconBlockRootFunc(ctx, blockID) +} + func (m Mock) BeaconCommittees(ctx context.Context, stateID string) ([]*eth2v1.BeaconCommittee, error) { return m.BeaconCommitteesFunc(ctx, stateID) } diff --git a/testutil/beaconmock/options.go b/testutil/beaconmock/options.go index d7729b5fd..ac7179cc2 100644 --- a/testutil/beaconmock/options.go +++ b/testutil/beaconmock/options.go @@ -348,6 +348,35 @@ func WithNoAttesterDuties() Option { } } +// WithSyncCommitteeDuties configures the mock to override SyncCommitteeDutiesFunc to return sync committee +// duties for epochs not divisible by 3. +func WithSyncCommitteeDuties() Option { + return func(mock *Mock) { + mock.SyncCommitteeDutiesFunc = func(ctx context.Context, epoch eth2p0.Epoch, indices []eth2p0.ValidatorIndex) ([]*eth2v1.SyncCommitteeDuty, error) { + vals, err := mock.Validators(ctx, "", indices) + if err != nil { + return nil, err + } + + var resp []*eth2v1.SyncCommitteeDuty + for i, index := range indices { + val, ok := vals[index] + if !ok { + continue + } + + resp = append(resp, ð2v1.SyncCommitteeDuty{ + PubKey: val.Validator.PublicKey, + ValidatorIndex: index, + ValidatorSyncCommitteeIndices: []eth2p0.CommitteeIndex{eth2p0.CommitteeIndex(i)}, + }) + } + + return resp, nil + } + } +} + // WithClock configures the mock with the provided clock. func WithClock(clock clockwork.Clock) Option { return func(mock *Mock) { @@ -457,22 +486,22 @@ func defaultMock(httpMock HTTPMock, httpServer *http.Server, clock clockwork.Clo Data: attData, }, nil }, - ValidatorsFunc: func(ctx context.Context, stateID string, indices []eth2p0.ValidatorIndex) (map[eth2p0.ValidatorIndex]*eth2v1.Validator, error) { + ValidatorsFunc: func(context.Context, string, []eth2p0.ValidatorIndex) (map[eth2p0.ValidatorIndex]*eth2v1.Validator, error) { return nil, nil }, - ValidatorsByPubKeyFunc: func(ctx context.Context, stateID string, pubkeys []eth2p0.BLSPubKey) (map[eth2p0.ValidatorIndex]*eth2v1.Validator, error) { + ValidatorsByPubKeyFunc: func(context.Context, string, []eth2p0.BLSPubKey) (map[eth2p0.ValidatorIndex]*eth2v1.Validator, error) { return nil, nil }, - SubmitAttestationsFunc: func(ctx context.Context, atts []*eth2p0.Attestation) error { + SubmitAttestationsFunc: func(context.Context, []*eth2p0.Attestation) error { return nil }, - SubmitBeaconBlockFunc: func(ctx context.Context, block *spec.VersionedSignedBeaconBlock) error { + SubmitBeaconBlockFunc: func(context.Context, *spec.VersionedSignedBeaconBlock) error { return nil }, - SubmitBlindedBeaconBlockFunc: func(ctx context.Context, block *eth2api.VersionedSignedBlindedBeaconBlock) error { + SubmitBlindedBeaconBlockFunc: func(context.Context, *eth2api.VersionedSignedBlindedBeaconBlock) error { return nil }, - SubmitVoluntaryExitFunc: func(ctx context.Context, exit *eth2p0.SignedVoluntaryExit) error { + SubmitVoluntaryExitFunc: func(context.Context, *eth2p0.SignedVoluntaryExit) error { return nil }, GenesisTimeFunc: func(ctx context.Context) (time.Time, error) { @@ -508,12 +537,15 @@ func defaultMock(httpMock HTTPMock, httpServer *http.Server, clock clockwork.Clo SlotsPerEpochFunc: func(ctx context.Context) (uint64, error) { return httpMock.SlotsPerEpoch(ctx) }, - SubmitProposalPreparationsFunc: func(_ context.Context, _ []*eth2v1.ProposalPreparation) error { + SubmitProposalPreparationsFunc: func(context.Context, []*eth2v1.ProposalPreparation) error { return nil }, - SyncCommitteeDutiesFunc: func(ctx context.Context, epoch eth2p0.Epoch, validatorIndices []eth2p0.ValidatorIndex) ([]*eth2v1.SyncCommitteeDuty, error) { + SyncCommitteeDutiesFunc: func(context.Context, eth2p0.Epoch, []eth2p0.ValidatorIndex) ([]*eth2v1.SyncCommitteeDuty, error) { return []*eth2v1.SyncCommitteeDuty{}, nil }, + SubmitSyncCommitteeMessagesFunc: func(context.Context, []*altair.SyncCommitteeMessage) error { + return nil + }, } } diff --git a/testutil/beaconmock/server.go b/testutil/beaconmock/server.go index f6c8ae002..2686d23a8 100644 --- a/testutil/beaconmock/server.go +++ b/testutil/beaconmock/server.go @@ -40,6 +40,7 @@ var staticJSON []byte // It serves all proxied endpoints not handled by charon's validatorapi. // Endpoints include static endpoints defined in static.json and a few stubbed paths. type HTTPMock interface { + eth2client.BeaconBlockRootProvider eth2client.DepositContractProvider eth2client.DomainProvider eth2client.ForkProvider @@ -51,7 +52,7 @@ type HTTPMock interface { eth2client.SlotDurationProvider eth2client.SlotsPerEpochProvider eth2client.SpecProvider - eth2client.SyncCommitteeDutiesProvider + eth2client.SyncCommitteeSubscriptionsSubmitter } // staticOverride defines a http server static override for a endpoint response value. @@ -75,9 +76,15 @@ func newHTTPServer(addr string, overrides ...staticOverride) (*http.Server, erro Handler: func(w http.ResponseWriter, r *http.Request) {}, }, { - Path: "/eth/v1/validator/duties/sync/{epoch}", + Path: "/eth/v1/validator/sync_committee_subscriptions", Handler: func(w http.ResponseWriter, r *http.Request) { - _, _ = w.Write([]byte(`{"data":[]}`)) + w.WriteHeader(http.StatusOK) + }, + }, + { + Path: "/eth/v1/beacon/blocks/head/root", + Handler: func(w http.ResponseWriter, r *http.Request) { + _, _ = w.Write([]byte(`{"data": {"root": "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2"}}`)) }, }, { diff --git a/testutil/validatormock/attest.go b/testutil/validatormock/attest.go index 6a9c17627..fdd9fe424 100644 --- a/testutil/validatormock/attest.go +++ b/testutil/validatormock/attest.go @@ -33,10 +33,10 @@ import ( // Type aliases for concise function signatures. type ( - validators map[eth2p0.ValidatorIndex]*eth2v1.Validator - duties []*eth2v1.AttesterDuty - selections []*eth2exp.BeaconCommitteeSubscriptionResponse - datas []*eth2p0.AttestationData + validators map[eth2p0.ValidatorIndex]*eth2v1.Validator + attDuties []*eth2v1.AttesterDuty + attSelections []*eth2exp.BeaconCommitteeSubscriptionResponse + attDatas []*eth2p0.AttestationData ) // NewSlotAttester returns a new SlotAttester. @@ -62,9 +62,9 @@ type SlotAttester struct { // Mutable fields vals validators - duties duties - selections selections - datas datas + duties attDuties + selections attSelections + datas attDatas dutiesOK chan struct{} selectionsOK chan struct{} datasOK chan struct{} @@ -77,8 +77,8 @@ func (a *SlotAttester) Slot() eth2p0.Slot { // Prepare should be called at the start of slot, it does the following: // - Filters active validators for the slot (this could be cached at start of epoch) -// - Fetches attester duties for the slot (this could be cached at start of epoch). -// - Prepares aggregation duties for slot attesters. +// - Fetches attester attDuties for the slot (this could be cached at start of epoch). +// - Prepares aggregation attDuties for slot attesters. // It panics if called more than once. func (a *SlotAttester) Prepare(ctx context.Context) error { var err error @@ -160,7 +160,7 @@ func activeValidators(ctx context.Context, eth2Cl eth2wrap.Client, // prepareAttesters returns the attesters (including duty and data) for the provided validators and slot. func prepareAttesters(ctx context.Context, eth2Cl eth2wrap.Client, vals validators, slot eth2p0.Slot, -) (duties, error) { +) (attDuties, error) { if len(vals) == 0 { return nil, nil } @@ -180,7 +180,7 @@ func prepareAttesters(ctx context.Context, eth2Cl eth2wrap.Client, vals validato return nil, err } - var duties duties + var duties attDuties for _, duty := range epochDuties { if duty.Slot != slot { continue @@ -195,8 +195,8 @@ func prepareAttesters(ctx context.Context, eth2Cl eth2wrap.Client, vals validato // prepareAggregators does beacon committee subscription selection for the provided attesters // and returns the selected aggregators. func prepareAggregators(ctx context.Context, eth2Cl eth2wrap.Client, signFunc SignFunc, - vals validators, duties duties, slot eth2p0.Slot, -) (selections, error) { + vals validators, duties attDuties, slot eth2p0.Slot, +) (attSelections, error) { if len(duties) == 0 { return nil, nil } @@ -242,7 +242,7 @@ func prepareAggregators(ctx context.Context, eth2Cl eth2wrap.Client, signFunc Si return nil, err } - var selections selections + var selections attSelections for _, selection := range allSelections { if !selection.IsAggregator { continue @@ -255,14 +255,14 @@ func prepareAggregators(ctx context.Context, eth2Cl eth2wrap.Client, signFunc Si return selections, nil } -// attest does attestations for the provided attesters and returns the attestation datas. -func attest(ctx context.Context, eth2Cl eth2wrap.Client, signFunc SignFunc, slot eth2p0.Slot, duties duties, -) (datas, error) { +// attest does attestations for the provided attesters and returns the attestation attDatas. +func attest(ctx context.Context, eth2Cl eth2wrap.Client, signFunc SignFunc, slot eth2p0.Slot, duties attDuties, +) (attDatas, error) { if len(duties) == 0 { return nil, nil } - // Group duties by committee. + // Group attDuties by committee. dutyByComm := make(map[eth2p0.CommitteeIndex][]*eth2v1.AttesterDuty) for _, duty := range duties { dutyByComm[duty.CommitteeIndex] = append(dutyByComm[duty.CommitteeIndex], duty) @@ -270,7 +270,7 @@ func attest(ctx context.Context, eth2Cl eth2wrap.Client, signFunc SignFunc, slot var ( atts []*eth2p0.Attestation - datas datas + datas attDatas ) for commIdx, duties := range dutyByComm { data, err := eth2Cl.AttestationData(ctx, slot, commIdx) @@ -314,10 +314,10 @@ func attest(ctx context.Context, eth2Cl eth2wrap.Client, signFunc SignFunc, slot return datas, nil } -// aggregate does attestation aggregation for the provided validators, selections and attestation datas and returns true. +// aggregate does attestation aggregation for the provided validators, attSelections and attestation attDatas and returns true. // It returns false if aggregation is not required. func aggregate(ctx context.Context, eth2Cl eth2wrap.Client, signFunc SignFunc, slot eth2p0.Slot, - vals validators, selections selections, datas datas, + vals validators, selections attSelections, datas attDatas, ) (bool, error) { if len(selections) == 0 { return false, nil @@ -391,7 +391,7 @@ func aggregate(ctx context.Context, eth2Cl eth2wrap.Client, signFunc SignFunc, s } // getAggregateAttestation returns an aggregated attestation for the provided committee. -func getAggregateAttestation(ctx context.Context, eth2Cl eth2wrap.Client, datas datas, +func getAggregateAttestation(ctx context.Context, eth2Cl eth2wrap.Client, datas attDatas, commIdx eth2p0.CommitteeIndex, ) (*eth2p0.Attestation, error) { for _, data := range datas { diff --git a/testutil/validatormock/synccomm.go b/testutil/validatormock/synccomm.go new file mode 100644 index 000000000..b59e1c44f --- /dev/null +++ b/testutil/validatormock/synccomm.go @@ -0,0 +1,229 @@ +// Copyright © 2022 Obol Labs Inc. +// +// This program is free software: you can redistribute it and/or modify it +// under the terms of the GNU General Public License as published by the Free +// Software Foundation, either version 3 of the License, or (at your option) +// any later version. +// +// This program is distributed in the hope that it will be useful, but WITHOUT +// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +// more details. +// +// You should have received a copy of the GNU General Public License along with +// this program. If not, see . + +package validatormock + +import ( + "context" + "sync" + + eth2v1 "github.com/attestantio/go-eth2-client/api/v1" + "github.com/attestantio/go-eth2-client/spec/altair" + eth2p0 "github.com/attestantio/go-eth2-client/spec/phase0" + + "github.com/obolnetwork/charon/app/eth2wrap" + "github.com/obolnetwork/charon/app/log" + "github.com/obolnetwork/charon/eth2util/signing" +) + +type ( + syncDuties []*eth2v1.SyncCommitteeDuty + syncSelections []any +) + +func NewSyncCommMember(eth2Cl eth2wrap.Client, epoch eth2p0.Epoch, signFunc SignFunc, pubkeys []eth2p0.BLSPubKey) *SyncCommMember { + return &SyncCommMember{ + eth2Cl: eth2Cl, + epoch: epoch, + pubkeys: pubkeys, + signFunc: signFunc, + dutiesOK: make(chan struct{}), + selections: make(map[eth2p0.Slot]syncSelections), + selectionsOK: make(map[eth2p0.Slot]chan struct{}), + } +} + +// SyncCommMember is a stateful structure providing sync committee message and contribution APIs. +type SyncCommMember struct { + // Immutable state + eth2Cl eth2wrap.Client + epoch eth2p0.Epoch + pubkeys []eth2p0.BLSPubKey + signFunc SignFunc + + // Mutable state + mu sync.Mutex + vals validators + duties syncDuties + dutiesOK chan struct{} + selections map[eth2p0.Slot]syncSelections + selectionsOK map[eth2p0.Slot]chan struct{} +} + +func (s *SyncCommMember) Epoch() eth2p0.Epoch { + return s.epoch +} + +func (s *SyncCommMember) setSelections(slot eth2p0.Slot, selections syncSelections) { + s.mu.Lock() + defer s.mu.Unlock() + + s.selections[slot] = selections + + // Mark selections as done + ch, ok := s.selectionsOK[slot] + if !ok { + ch = make(chan struct{}) + s.selectionsOK[slot] = ch + } + close(ch) +} + +//nolint:unused +func (s *SyncCommMember) getSelections(slot eth2p0.Slot) syncSelections { + s.mu.Lock() + defer s.mu.Unlock() + + return s.selections[slot] +} + +func (s *SyncCommMember) getSelectionsOK(slot eth2p0.Slot) chan struct{} { + s.mu.Lock() + defer s.mu.Unlock() + + ch, ok := s.selectionsOK[slot] + if !ok { + ch = make(chan struct{}) + s.selectionsOK[slot] = ch + } + + return ch +} + +// PrepareEpoch stores sync committee attDuties and submits sync committee subscriptions at the start of an epoch. +func (s *SyncCommMember) PrepareEpoch(ctx context.Context) error { + var err error + s.vals, err = activeValidators(ctx, s.eth2Cl, s.pubkeys) + if err != nil { + return err + } + + s.duties, err = prepareSyncCommDuties(ctx, s.eth2Cl, s.vals, s.epoch) + if err != nil { + return err + } + + err = subscribeSyncCommSubnets(ctx, s.eth2Cl, s.epoch, s.duties) + if err != nil { + return err + } + close(s.dutiesOK) + + return nil +} + +// PrepareSlot prepares selection proofs at the start of a slot. +func (s *SyncCommMember) PrepareSlot(ctx context.Context, slot eth2p0.Slot) error { + wait(ctx, s.dutiesOK) + selections, err := prepareSyncContributions(ctx, s.eth2Cl, s.signFunc, s.vals, s.duties, slot) + if err != nil { + return err + } + s.setSelections(slot, selections) + + return nil +} + +// Message submits Sync committee messages at desired i.e., 1/3rd into the slot. +func (s *SyncCommMember) Message(ctx context.Context, slot eth2p0.Slot) error { + wait(ctx, s.dutiesOK) + return submitSyncMessage(ctx, s.eth2Cl, slot, s.signFunc, s.duties) +} + +// Aggregate submits Sync committee messages at desired i.e., 2/3rd into the slot. +func (s *SyncCommMember) Aggregate(ctx context.Context, slot eth2p0.Slot) error { + wait(ctx, s.getSelectionsOK(slot)) + return nil +} + +func prepareSyncContributions(context.Context, eth2wrap.Client, SignFunc, + validators, syncDuties, eth2p0.Slot, +) (syncSelections, error) { + return nil, nil +} + +// subscribeSyncCommSubnets submits sync committee subscriptions at the start of an epoch until next epoch. +func subscribeSyncCommSubnets(ctx context.Context, eth2Cl eth2wrap.Client, epoch eth2p0.Epoch, duties syncDuties) error { + var subs []*eth2v1.SyncCommitteeSubscription + for _, duty := range duties { + subs = append(subs, ð2v1.SyncCommitteeSubscription{ + ValidatorIndex: duty.ValidatorIndex, + SyncCommitteeIndices: duty.ValidatorSyncCommitteeIndices, + UntilEpoch: epoch + 1, + }) + } + + err := eth2Cl.SubmitSyncCommitteeSubscriptions(ctx, subs) + if err != nil { + return err + } + + log.Info(ctx, "Mock sync committee subscription submitted") + + return nil +} + +// prepareSyncCommDuties returns sync committee duties for the epoch. +func prepareSyncCommDuties(ctx context.Context, eth2Cl eth2wrap.Client, vals validators, epoch eth2p0.Epoch) (syncDuties, error) { + if len(vals) == 0 { + return nil, nil + } + + var vIdxs []eth2p0.ValidatorIndex + for idx := range vals { + vIdxs = append(vIdxs, idx) + } + + return eth2Cl.SyncCommitteeDuties(ctx, epoch, vIdxs) +} + +// submitSyncMessage submits signed sync committee messages for desired slot. +func submitSyncMessage(ctx context.Context, eth2Cl eth2wrap.Client, slot eth2p0.Slot, signFunc SignFunc, duties syncDuties) error { + if len(duties) == 0 { + return nil + } + + blockRoot, err := eth2Cl.BeaconBlockRoot(ctx, "head") + if err != nil { + return err + } + + epoch, err := epochFromSlot(ctx, eth2Cl, slot) + if err != nil { + return err + } + + sigData, err := signing.GetDataRoot(ctx, eth2Cl, signing.DomainSyncCommittee, epoch, *blockRoot) + if err != nil { + return err + } + + var msgs []*altair.SyncCommitteeMessage + for _, duty := range duties { + sig, err := signFunc(duty.PubKey, sigData[:]) + if err != nil { + return err + } + + msgs = append(msgs, &altair.SyncCommitteeMessage{ + Slot: slot, + BeaconBlockRoot: *blockRoot, + ValidatorIndex: duty.ValidatorIndex, + Signature: sig, + }) + } + + return eth2Cl.SubmitSyncCommitteeMessages(ctx, msgs) +} diff --git a/testutil/validatormock/synccontributionflow_test.go b/testutil/validatormock/synccontributionflow_test.go index 2b25772d0..0a10fe38e 100644 --- a/testutil/validatormock/synccontributionflow_test.go +++ b/testutil/validatormock/synccontributionflow_test.go @@ -54,9 +54,9 @@ func PseudoSyncCommContributionFlow(t *testing.T, supportDVT bool) { startEpoch := (epoch / period) * period endEpoch := startEpoch + period - // Get the sync committee duties for this epoch. + // Get the sync committee attDuties for this epoch. - // One option is to fetch the sync committee duties for a subset of validators + // One option is to fetch the sync committee attDuties for a subset of validators duties, _ := eth2Cl.SyncCommitteeDuties(ctx, epoch, valIdxs) for _, duty := range duties { // Note SyncCommitteeDuty contains a public key which charon needs to intercept/swap. @@ -114,7 +114,7 @@ func PseudoSyncCommContributionFlow(t *testing.T, supportDVT bool) { _ = eth2Cl.SubmitSyncCommitteeMessages(ctx, msgs) // For each slot, some validators are also aggregators and need to submit contributions. - // This can be calculated at any time in the sync committee period after the duties have been fetched. + // This can be calculated at any time in the sync committee period after the attDuties have been fetched. syncCommSize, _ := spec["SYNC_COMMITTEE_SIZE"].(uint64) subnetCount, _ := spec["SYNC_COMMITTEE_SUBNET_COUNT"].(uint64) @@ -148,7 +148,7 @@ func PseudoSyncCommContributionFlow(t *testing.T, supportDVT bool) { continue } - // Add aggregator duties per slot. + // Add aggregator attDuties per slot. aggsPerSubComm[subcommittee] = append(aggsPerSubComm[subcommittee], aggregator{ ValidatorIndex: duty.ValidatorIndex, Pubkey: duty.PubKey, @@ -180,7 +180,7 @@ func PseudoSyncCommContributionFlow(t *testing.T, supportDVT bool) { // Fetch cached pubkey from selection.ValidatorIndex var pubkey eth2p0.BLSPubKey - // Add aggregator duties per slot. + // Add aggregator attDuties per slot. aggsPerSubComm[selection.Data.SubcommitteeIndex] = append(aggsPerSubComm[selection.Data.SubcommitteeIndex], aggregator{ ValidatorIndex: selection.ValidatorIndex, Pubkey: pubkey, @@ -257,8 +257,8 @@ type SyncCommitteeSelection struct { IsAggregator bool } -// SyncCommitteeSelections is the new proposed endpoint that returns aggregated sync committee selections -// for the provided partial selections. +// SyncCommitteeSelections is the new proposed endpoint that returns aggregated sync committee attSelections +// for the provided partial attSelections. // // Note endpoint MUST be called at the start of the slot, since all VCs in the cluster need to do it at the same time. // This is by convention, to ensure timely successful aggregation.