diff --git a/app/app.go b/app/app.go index 2ba87e141..1cc880466 100644 --- a/app/app.go +++ b/app/app.go @@ -322,7 +322,9 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config, return err } - core.Wire(sched, fetch, consensus, dutyDB, vapi, parSigDB, parSigEx, sigAgg, aggSigDB, broadcaster) + core.Wire(sched, fetch, consensus, dutyDB, vapi, + parSigDB, parSigEx, sigAgg, aggSigDB, broadcaster, + core.WithTracing()) err = wireValidatorMock(conf, pubshares, sched) if err != nil { diff --git a/app/log/log.go b/app/log/log.go index 07eaa6fa2..142037a4b 100644 --- a/app/log/log.go +++ b/app/log/log.go @@ -19,7 +19,10 @@ package log import ( "context" + "fmt" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "go.uber.org/zap" "github.com/obolnetwork/charon/app/errors" @@ -64,30 +67,38 @@ func metricsTopicFromCtx(ctx context.Context) string { } // Debug logs the message and fields (incl fields in the context) at Debug level. -// TODO(corver): Add indication of when debug should be used. +// Debug should be used for most logging. func Debug(ctx context.Context, msg string, fields ...z.Field) { - logger.Debug(msg, unwrapDedup(ctx, fields...)...) + zfl := unwrapDedup(ctx, fields...) + trace.SpanFromContext(ctx).AddEvent("log.Debug: "+msg, toAttributes(zfl)) + logger.Debug(msg, zfl...) } // Info logs the message and fields (incl fields in the context) at Info level. -// TODO(corver): Add indication of when info should be used. +// Info should only be used for high level important events. func Info(ctx context.Context, msg string, fields ...z.Field) { - logger.Info(msg, unwrapDedup(ctx, fields...)...) + zfl := unwrapDedup(ctx, fields...) + trace.SpanFromContext(ctx).AddEvent("log.Info: "+msg, toAttributes(zfl)) + logger.Info(msg, zfl...) } // Warn logs the message and fields (incl fields in the context) at Warn level. -// TODO(corver): Add indication of when warn should be used. +// Warn should only be used when a problem is encountered that *does not* require any action to be taken. func Warn(ctx context.Context, msg string, fields ...z.Field) { + zfl := unwrapDedup(ctx, fields...) + trace.SpanFromContext(ctx).AddEvent("log.Warn: "+msg, toAttributes(zfl)) + logger.Warn(msg, zfl...) incWarnCounter(ctx) - logger.Warn(msg, unwrapDedup(ctx, fields...)...) } // Error wraps err with msg and fields and logs it (incl fields in the context) at Error level. -// TODO(corver): Add indication of when error should be used. +// Error should only be used when a problem is encountered that *does* require action to be taken. func Error(ctx context.Context, msg string, err error, fields ...z.Field) { - incErrorCounter(ctx) err = errors.SkipWrap(err, msg, 2, fields...) - logger.Error(err.Error(), unwrapDedup(ctx, errFields(err))...) + zfl := unwrapDedup(ctx, errFields(err)) + trace.SpanFromContext(ctx).RecordError(err, trace.WithStackTrace(true), toAttributes(zfl)) + logger.Error(err.Error(), zfl...) + incErrorCounter(ctx) } // unwrapDedup returns the wrapped zap fields from the slice and from the context. Duplicate fields are dropped. @@ -137,3 +148,19 @@ func errFields(err error) z.Field { } } } + +// toAttributes returns the zap fields as tracing event attributes. +func toAttributes(fields []zap.Field) trace.EventOption { + var kvs []attribute.KeyValue + for _, field := range fields { + if field.Interface != nil { + kvs = append(kvs, attribute.String(field.Key, fmt.Sprint(field.Interface))) + } else if field.String != "" { + kvs = append(kvs, attribute.String(field.Key, field.String)) + } else if field.Integer != 0 { + kvs = append(kvs, attribute.Int64(field.Key, field.Integer)) + } + } + + return trace.WithAttributes(kvs...) +} diff --git a/core/interfaces.go b/core/interfaces.go index eb44e6c98..6ab89b32e 100644 --- a/core/interfaces.go +++ b/core/interfaces.go @@ -128,9 +128,38 @@ type Broadcaster interface { Broadcast(context.Context, Duty, PubKey, AggSignedData) error } +// wireFuncs defines the core workflow components as a list input and output functions +// instead as interfaces, since functions are easier to wrap than interfaces. +type wireFuncs struct { + SchedulerSubscribe func(func(context.Context, Duty, FetchArgSet) error) + FetcherFetch func(context.Context, Duty, FetchArgSet) error + FetcherSubscribe func(func(context.Context, Duty, UnsignedDataSet) error) + ConsensusPropose func(context.Context, Duty, UnsignedDataSet) error + ConsensusSubscribe func(func(context.Context, Duty, UnsignedDataSet) error) + DutyDBStore func(context.Context, Duty, UnsignedDataSet) error + DutyDBAwaitAttestation func(ctx context.Context, slot, commIdx int64) (*eth2p0.AttestationData, error) + DutyDBPubKeyByAttestation func(ctx context.Context, slot, commIdx, valCommIdx int64) (PubKey, error) + VAPIRegisterAwaitAttestation func(func(ctx context.Context, slot, commIdx int64) (*eth2p0.AttestationData, error)) + VAPIRegisterPubKeyByAttestation func(func(ctx context.Context, slot, commIdx, valCommIdx int64) (PubKey, error)) + VAPIRegisterParSigDB func(func(context.Context, Duty, ParSignedDataSet) error) + ParSigDBStoreInternal func(context.Context, Duty, ParSignedDataSet) error + ParSigDBStoreExternal func(context.Context, Duty, ParSignedDataSet) error + ParSigDBSubscribeInternal func(func(context.Context, Duty, ParSignedDataSet) error) + ParSigDBSubscribeThreshold func(func(context.Context, Duty, PubKey, []ParSignedData) error) + ParSigExBroadcast func(context.Context, Duty, ParSignedDataSet) error + ParSigExSubscribe func(func(context.Context, Duty, ParSignedDataSet) error) + SigAggAggregate func(context.Context, Duty, PubKey, []ParSignedData) error + SigAggSubscribe func(func(context.Context, Duty, PubKey, AggSignedData) error) + AggSigDBStore func(context.Context, Duty, PubKey, AggSignedData) error + AggSigDBAwait func(context.Context, Duty, PubKey) (AggSignedData, error) + BroadcasterBroadcast func(context.Context, Duty, PubKey, AggSignedData) error +} + +// WireOption defines a functional option to configure wiring. +type WireOption func(*wireFuncs) + // Wire wires the workflow components together. -func Wire( - sched Scheduler, +func Wire(sched Scheduler, fetch Fetcher, cons Consensus, dutyDB DutyDB, @@ -140,16 +169,46 @@ func Wire( sigAgg SigAgg, aggSigDB AggSigDB, bcast Broadcaster, + opts ...WireOption, ) { - sched.Subscribe(fetch.Fetch) - fetch.Subscribe(cons.Propose) - cons.Subscribe(dutyDB.Store) - vapi.RegisterAwaitAttestation(dutyDB.AwaitAttestation) - vapi.RegisterPubKeyByAttestation(dutyDB.PubKeyByAttestation) - vapi.RegisterParSigDB(parSigDB.StoreInternal) - parSigDB.SubscribeInternal(parSigEx.Broadcast) - parSigEx.Subscribe(parSigDB.StoreExternal) - parSigDB.SubscribeThreshold(sigAgg.Aggregate) - sigAgg.Subscribe(aggSigDB.Store) - sigAgg.Subscribe(bcast.Broadcast) + w := wireFuncs{ + SchedulerSubscribe: sched.Subscribe, + FetcherFetch: fetch.Fetch, + FetcherSubscribe: fetch.Subscribe, + ConsensusPropose: cons.Propose, + ConsensusSubscribe: cons.Subscribe, + DutyDBStore: dutyDB.Store, + DutyDBAwaitAttestation: dutyDB.AwaitAttestation, + DutyDBPubKeyByAttestation: dutyDB.PubKeyByAttestation, + VAPIRegisterAwaitAttestation: vapi.RegisterAwaitAttestation, + VAPIRegisterPubKeyByAttestation: vapi.RegisterPubKeyByAttestation, + VAPIRegisterParSigDB: vapi.RegisterParSigDB, + ParSigDBStoreInternal: parSigDB.StoreInternal, + ParSigDBStoreExternal: parSigDB.StoreExternal, + ParSigDBSubscribeInternal: parSigDB.SubscribeInternal, + ParSigDBSubscribeThreshold: parSigDB.SubscribeThreshold, + ParSigExBroadcast: parSigEx.Broadcast, + ParSigExSubscribe: parSigEx.Subscribe, + SigAggAggregate: sigAgg.Aggregate, + SigAggSubscribe: sigAgg.Subscribe, + AggSigDBStore: aggSigDB.Store, + AggSigDBAwait: aggSigDB.Await, + BroadcasterBroadcast: bcast.Broadcast, + } + + for _, opt := range opts { + opt(&w) + } + + w.SchedulerSubscribe(w.FetcherFetch) + w.FetcherSubscribe(w.ConsensusPropose) + w.ConsensusSubscribe(w.DutyDBStore) + w.VAPIRegisterAwaitAttestation(w.DutyDBAwaitAttestation) + w.VAPIRegisterPubKeyByAttestation(w.DutyDBPubKeyByAttestation) + w.VAPIRegisterParSigDB(w.ParSigDBStoreInternal) + w.ParSigDBSubscribeInternal(w.ParSigExBroadcast) + w.ParSigExSubscribe(w.ParSigDBStoreExternal) + w.ParSigDBSubscribeThreshold(w.SigAggAggregate) + w.SigAggSubscribe(w.AggSigDBStore) + w.SigAggSubscribe(w.BroadcasterBroadcast) } diff --git a/core/leadercast/leadercast.go b/core/leadercast/leadercast.go index e6d7503a7..86e4d7e21 100644 --- a/core/leadercast/leadercast.go +++ b/core/leadercast/leadercast.go @@ -17,6 +17,8 @@ package leadercast import ( "context" + "go.opentelemetry.io/otel/trace" + "github.com/obolnetwork/charon/app/errors" "github.com/obolnetwork/charon/app/log" "github.com/obolnetwork/charon/app/z" @@ -66,12 +68,17 @@ func (l *LeaderCast) Run(ctx context.Context) error { log.Debug(ctx, "received duty from leader", z.Int("peer", source), z.Any("duty", duty)) + var span trace.Span + ctx, span = core.StartDutyTrace(ctx, duty, "core/leadercast.Handle") + for _, sub := range l.subs { if err := sub(ctx, duty, data); err != nil { log.Error(ctx, "subscriber error", err) continue } } + + span.End() } } diff --git a/core/parsigex/parsigex.go b/core/parsigex/parsigex.go index b682f03d0..a116218de 100644 --- a/core/parsigex/parsigex.go +++ b/core/parsigex/parsigex.go @@ -22,6 +22,7 @@ import ( "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" + "go.opentelemetry.io/otel/trace" "github.com/obolnetwork/charon/app/errors" "github.com/obolnetwork/charon/app/log" @@ -64,6 +65,10 @@ func (m *ParSigEx) handle(s network.Stream) { return } + var span trace.Span + ctx, span = core.StartDutyTrace(ctx, msg.Duty, "core/parsigex.Handle") + defer span.End() + for _, sub := range m.subs { err := sub(ctx, msg.Duty, msg.Data) if err != nil { diff --git a/core/scheduler/scheduler.go b/core/scheduler/scheduler.go index 739c2b82a..af44cd377 100644 --- a/core/scheduler/scheduler.go +++ b/core/scheduler/scheduler.go @@ -24,6 +24,7 @@ import ( eth2p0 "github.com/attestantio/go-eth2-client/spec/phase0" "github.com/jonboulle/clockwork" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/trace" "github.com/obolnetwork/charon/app/errors" "github.com/obolnetwork/charon/app/log" @@ -142,15 +143,20 @@ func (s *Scheduler) scheduleSlot(ctx context.Context, slot slot) error { instrumentDuty(duty, argSet) + var span trace.Span + ctx, span = core.StartDutyTrace(ctx, duty, "core/scheduler.scheduleSlot") + for _, sub := range s.subs { err := sub(ctx, duty, argSet) if err != nil { // TODO(corver): Improve error handling; possibly call subscription async // with backoff until duty expires. + span.End() return err } } + span.End() delete(s.duties, duty) } diff --git a/core/sigagg/sigagg.go b/core/sigagg/sigagg.go index 0f2738406..14eb7ae5b 100644 --- a/core/sigagg/sigagg.go +++ b/core/sigagg/sigagg.go @@ -25,6 +25,7 @@ import ( "github.com/coinbase/kryptology/pkg/signatures/bls/bls_sig" "github.com/obolnetwork/charon/app/errors" + "github.com/obolnetwork/charon/app/tracer" "github.com/obolnetwork/charon/core" "github.com/obolnetwork/charon/tbls" "github.com/obolnetwork/charon/tbls/tblsconv" @@ -66,7 +67,6 @@ func (a *Aggregator) Aggregate(ctx context.Context, duty core.Duty, pubkey core. if err != nil { return err } - if i == 0 { firstParSig = parSig firstRoot = root @@ -86,7 +86,9 @@ func (a *Aggregator) Aggregate(ctx context.Context, duty core.Duty, pubkey core. } // Aggregate signatures + _, span := tracer.Start(ctx, "tbls.Aggregate") sig, err := tbls.Aggregate(blsSigs) + span.End() if err != nil { return err } diff --git a/core/tracing.go b/core/tracing.go new file mode 100644 index 000000000..e59f0a0d4 --- /dev/null +++ b/core/tracing.go @@ -0,0 +1,141 @@ +// Copyright © 2021 Obol Technologies Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package core + +import ( + "context" + "fmt" + "hash/fnv" + "strings" + + eth2p0 "github.com/attestantio/go-eth2-client/spec/phase0" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" + + "github.com/obolnetwork/charon/app/tracer" +) + +// StartDutyTrace returns a context and span rooted to the duty traceID and wrapped in a duty span. +// This creates a new trace root and should generally only be called when a new duty is scheduled +// or when a duty is received from the VC or peer. +func StartDutyTrace(ctx context.Context, duty Duty, spanName string, opts ...trace.SpanStartOption) (context.Context, trace.Span) { + h := fnv.New128a() + _, _ = h.Write([]byte(duty.String())) + + var traceID trace.TraceID + copy(traceID[:], h.Sum(nil)) + + var outerSpan, innerSpan trace.Span + ctx, outerSpan = tracer.Start(tracer.RootedCtx(ctx, traceID), fmt.Sprintf("core/duty.%s", strings.Title(duty.Type.String()))) + ctx, innerSpan = tracer.Start(ctx, spanName, opts...) + + outerSpan.SetAttributes(attribute.Int64("slot", duty.Slot)) + + return ctx, withEndSpan{ + Span: innerSpan, + endFunc: func() { outerSpan.End() }, + } +} + +// withEndSpan wraps a trace span and calls endFunc when End is called. +type withEndSpan struct { + trace.Span + endFunc func() +} + +func (s withEndSpan) End(options ...trace.SpanEndOption) { + s.Span.End(options...) + s.endFunc() +} + +// WithTracing wraps component input functions with tracing spans. +func WithTracing() WireOption { + return func(w *wireFuncs) { + clone := *w + + w.FetcherFetch = func(parent context.Context, duty Duty, set FetchArgSet) error { + ctx, span := tracer.Start(parent, "core/fetcher.Fetch") + defer span.End() + + return clone.FetcherFetch(ctx, duty, set) + } + w.ConsensusPropose = func(parent context.Context, duty Duty, set UnsignedDataSet) error { + ctx, span := tracer.Start(parent, "core/consensus.Propose") + defer span.End() + + return clone.ConsensusPropose(ctx, duty, set) + } + w.DutyDBStore = func(parent context.Context, duty Duty, set UnsignedDataSet) error { + ctx, span := tracer.Start(parent, "core/dutydb.Store") + defer span.End() + + return clone.DutyDBStore(ctx, duty, set) + } + w.DutyDBAwaitAttestation = func(parent context.Context, slot, commIdx int64) (*eth2p0.AttestationData, error) { + ctx, span := tracer.Start(parent, "core/dutydb.AwaitAttestation") + defer span.End() + + return clone.DutyDBAwaitAttestation(ctx, slot, commIdx) + } + w.DutyDBPubKeyByAttestation = func(parent context.Context, slot, commIdx, valCommIdx int64) (PubKey, error) { + ctx, span := tracer.Start(parent, "core/dutydb.PubKeyByAttestation") + defer span.End() + + return clone.DutyDBPubKeyByAttestation(ctx, slot, commIdx, valCommIdx) + } + w.ParSigDBStoreInternal = func(parent context.Context, duty Duty, set ParSignedDataSet) error { + ctx, span := tracer.Start(parent, "core/parsigdb.StoreInternal") + defer span.End() + + return clone.ParSigDBStoreInternal(ctx, duty, set) + } + w.ParSigDBStoreExternal = func(parent context.Context, duty Duty, set ParSignedDataSet) error { + ctx, span := tracer.Start(parent, "core/parsigdb.StoreExternal") + defer span.End() + + return clone.ParSigDBStoreExternal(ctx, duty, set) + } + w.ParSigExBroadcast = func(parent context.Context, duty Duty, set ParSignedDataSet) error { + ctx, span := tracer.Start(parent, "core/parsigex.Broadcast") + defer span.End() + + return clone.ParSigExBroadcast(ctx, duty, set) + } + w.SigAggAggregate = func(parent context.Context, duty Duty, key PubKey, data []ParSignedData) error { + ctx, span := tracer.Start(parent, "core/sigagg.Aggregate") + defer span.End() + + return clone.SigAggAggregate(ctx, duty, key, data) + } + w.AggSigDBStore = func(parent context.Context, duty Duty, key PubKey, data AggSignedData) error { + ctx, span := tracer.Start(parent, "core/aggsigdb.Store") + defer span.End() + + return clone.AggSigDBStore(ctx, duty, key, data) + } + w.AggSigDBAwait = func(parent context.Context, duty Duty, key PubKey) (AggSignedData, error) { + ctx, span := tracer.Start(parent, "core/aggsigdb.Await") + defer span.End() + + return clone.AggSigDBAwait(ctx, duty, key) + } + w.BroadcasterBroadcast = func(parent context.Context, duty Duty, key PubKey, data AggSignedData) error { + ctx, span := tracer.Start(parent, "core/broadcaster.Broadcast") + defer span.End() + + return clone.BroadcasterBroadcast(ctx, duty, key, data) + } + } +} diff --git a/core/types.go b/core/types.go index da158fd59..ae9f61adb 100644 --- a/core/types.go +++ b/core/types.go @@ -16,17 +16,13 @@ package core import ( "bytes" - "context" "encoding/hex" "fmt" - "hash/fnv" eth2v1 "github.com/attestantio/go-eth2-client/api/v1" eth2p0 "github.com/attestantio/go-eth2-client/spec/phase0" - "go.opentelemetry.io/otel/trace" "github.com/obolnetwork/charon/app/errors" - "github.com/obolnetwork/charon/app/tracer" ) // DutyType enumerates the different types of duties. @@ -207,15 +203,3 @@ type AggSignedData struct { func (a AggSignedData) Equal(b AggSignedData) bool { return bytes.Equal(a.Data, b.Data) && bytes.Equal(a.Signature, b.Signature) } - -// DutyTraceRoot returns a copy of the parent context containing a tracing span context rooted -// to the duty. -func DutyTraceRoot(ctx context.Context, duty Duty) context.Context { - h := fnv.New128a() - _, _ = h.Write([]byte(duty.String())) - - var traceID trace.TraceID - copy(traceID[:], h.Sum(nil)) - - return tracer.RootedCtx(ctx, traceID) -} diff --git a/core/types_test.go b/core/types_test.go index 9a11dba24..f20a2bd12 100644 --- a/core/types_test.go +++ b/core/types_test.go @@ -63,11 +63,11 @@ func TestWithDutySpanCtx(t *testing.T) { stop, err := tracer.Init(tracer.WithStdOut(io.Discard)) require.NoError(t, err) defer func() { - require.NoError(t, stop(context.Background())) + require.NoError(t, stop(ctx)) }() - _, span1 := tracer.Start(core.DutyTraceRoot(ctx, core.Duty{}), "span1") - _, span2 := tracer.Start(core.DutyTraceRoot(ctx, core.Duty{}), "span2") + _, span1 := core.StartDutyTrace(ctx, core.Duty{}, "span1") + _, span2 := core.StartDutyTrace(ctx, core.Duty{}, "span2") require.Equal(t, "7d0b160d5b04eac85dd1eaf0585c5b82", span1.SpanContext().TraceID().String()) require.Equal(t, span1.SpanContext().TraceID(), span2.SpanContext().TraceID()) diff --git a/core/validatorapi/router.go b/core/validatorapi/router.go index 78be85729..c407ea14a 100644 --- a/core/validatorapi/router.go +++ b/core/validatorapi/router.go @@ -158,12 +158,12 @@ func wrap(endpoint string, handler handlerFunc) http.Handler { writeResponse(ctx, w, endpoint, res) } - return trace(endpoint, wrap) + return wrapTrace(endpoint, wrap) } -// trace wraps the passed handler in a OpenTelemetry tracing span. -func trace(endpoint string, handler http.HandlerFunc) http.Handler { - return otelhttp.NewHandler(handler, "validator."+endpoint) +// wrapTrace wraps the passed handler in a OpenTelemetry tracing span. +func wrapTrace(endpoint string, handler http.HandlerFunc) http.Handler { + return otelhttp.NewHandler(handler, "core/validatorapi."+endpoint) } // getValidator returns a handler function for the get validators by pubkey or index endpoint. diff --git a/core/validatorapi/validatorapi.go b/core/validatorapi/validatorapi.go index bc9bad302..58b372885 100644 --- a/core/validatorapi/validatorapi.go +++ b/core/validatorapi/validatorapi.go @@ -22,9 +22,11 @@ import ( eth2v1 "github.com/attestantio/go-eth2-client/api/v1" eth2p0 "github.com/attestantio/go-eth2-client/spec/phase0" "github.com/coinbase/kryptology/pkg/signatures/bls/bls_sig" + "go.opentelemetry.io/otel/trace" "github.com/obolnetwork/charon/app/errors" "github.com/obolnetwork/charon/app/log" + "github.com/obolnetwork/charon/app/tracer" "github.com/obolnetwork/charon/app/z" "github.com/obolnetwork/charon/core" "github.com/obolnetwork/charon/tbls" @@ -167,14 +169,24 @@ func (c *Component) RegisterParSigDB(fn func(context.Context, core.Duty, core.Pa } // AttestationData implements the eth2client.AttesterDutiesProvider for the router. -func (c Component) AttestationData(ctx context.Context, slot eth2p0.Slot, committeeIndex eth2p0.CommitteeIndex) (*eth2p0.AttestationData, error) { +func (c Component) AttestationData(parent context.Context, slot eth2p0.Slot, committeeIndex eth2p0.CommitteeIndex) (*eth2p0.AttestationData, error) { + ctx, span := core.StartDutyTrace(parent, core.NewAttesterDuty(int64(slot)), "core/validatorapi.AttestationData") + defer span.End() + return c.awaitAttFunc(ctx, int64(slot), int64(committeeIndex)) } // SubmitAttestations implements the eth2client.AttestationsSubmitter for the router. func (c Component) SubmitAttestations(ctx context.Context, attestations []*eth2p0.Attestation) error { - setsBySlot := make(map[int64]core.ParSignedDataSet) + if len(attestations) > 0 { + // Pick the first attestation slot to use as trace root. + duty := core.NewAttesterDuty(int64(attestations[0].Data.Slot)) + var span trace.Span + ctx, span = core.StartDutyTrace(ctx, duty, "core/validatorapi.SubmitAttestations") + defer span.End() + } + setsBySlot := make(map[int64]core.ParSignedDataSet) for _, att := range attestations { slot := int64(att.Data.Slot) @@ -217,10 +229,7 @@ func (c Component) SubmitAttestations(ctx context.Context, attestations []*eth2p // Send sets to subscriptions. for slot, set := range setsBySlot { - duty := core.Duty{ - Slot: slot, - Type: core.DutyAttester, - } + duty := core.NewAttesterDuty(slot) log.Debug(ctx, "Attestation submitted by VC", z.I64("slot", slot)) @@ -236,12 +245,14 @@ func (c Component) SubmitAttestations(ctx context.Context, attestations []*eth2p } // verifyParSig verifies the partial signature against the root and validator. -func (c Component) verifyParSig(ctx context.Context, typ core.DutyType, epoch eth2p0.Epoch, +func (c Component) verifyParSig(parent context.Context, typ core.DutyType, epoch eth2p0.Epoch, pubkey core.PubKey, sigRoot eth2p0.Root, sig eth2p0.BLSSignature, ) error { if c.skipVerify { return nil } + ctx, span := tracer.Start(parent, "core/validatorapi.VerifyParSig") + defer span.End() // Wrap the signing root with the domain and serialise it. sigData, err := prepSigningData(ctx, c.eth2Cl, typ, epoch, sigRoot)