Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: add tracing for all components #304

Merged
merged 3 commits into from
Mar 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,9 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config,
return err
}

core.Wire(sched, fetch, consensus, dutyDB, vapi, parSigDB, parSigEx, sigAgg, aggSigDB, broadcaster)
core.Wire(sched, fetch, consensus, dutyDB, vapi,
parSigDB, parSigEx, sigAgg, aggSigDB, broadcaster,
core.WithTracing())

err = wireValidatorMock(conf, pubshares, sched)
if err != nil {
Expand Down
45 changes: 36 additions & 9 deletions app/log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ package log

import (
"context"
"fmt"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"

"github.com/obolnetwork/charon/app/errors"
Expand Down Expand Up @@ -64,30 +67,38 @@ func metricsTopicFromCtx(ctx context.Context) string {
}

// Debug logs the message and fields (incl fields in the context) at Debug level.
// TODO(corver): Add indication of when debug should be used.
// Debug should be used for most logging.
func Debug(ctx context.Context, msg string, fields ...z.Field) {
logger.Debug(msg, unwrapDedup(ctx, fields...)...)
zfl := unwrapDedup(ctx, fields...)
trace.SpanFromContext(ctx).AddEvent("log.Debug: "+msg, toAttributes(zfl))
logger.Debug(msg, zfl...)
}

// Info logs the message and fields (incl fields in the context) at Info level.
// TODO(corver): Add indication of when info should be used.
// Info should only be used for high level important events.
func Info(ctx context.Context, msg string, fields ...z.Field) {
logger.Info(msg, unwrapDedup(ctx, fields...)...)
zfl := unwrapDedup(ctx, fields...)
trace.SpanFromContext(ctx).AddEvent("log.Info: "+msg, toAttributes(zfl))
logger.Info(msg, zfl...)
}

// Warn logs the message and fields (incl fields in the context) at Warn level.
// TODO(corver): Add indication of when warn should be used.
// Warn should only be used when a problem is encountered that *does not* require any action to be taken.
func Warn(ctx context.Context, msg string, fields ...z.Field) {
zfl := unwrapDedup(ctx, fields...)
trace.SpanFromContext(ctx).AddEvent("log.Warn: "+msg, toAttributes(zfl))
logger.Warn(msg, zfl...)
incWarnCounter(ctx)
logger.Warn(msg, unwrapDedup(ctx, fields...)...)
}

// Error wraps err with msg and fields and logs it (incl fields in the context) at Error level.
// TODO(corver): Add indication of when error should be used.
// Error should only be used when a problem is encountered that *does* require action to be taken.
func Error(ctx context.Context, msg string, err error, fields ...z.Field) {
incErrorCounter(ctx)
err = errors.SkipWrap(err, msg, 2, fields...)
logger.Error(err.Error(), unwrapDedup(ctx, errFields(err))...)
zfl := unwrapDedup(ctx, errFields(err))
trace.SpanFromContext(ctx).RecordError(err, trace.WithStackTrace(true), toAttributes(zfl))
logger.Error(err.Error(), zfl...)
incErrorCounter(ctx)
}

// unwrapDedup returns the wrapped zap fields from the slice and from the context. Duplicate fields are dropped.
Expand Down Expand Up @@ -137,3 +148,19 @@ func errFields(err error) z.Field {
}
}
}

// toAttributes returns the zap fields as tracing event attributes.
func toAttributes(fields []zap.Field) trace.EventOption {
var kvs []attribute.KeyValue
for _, field := range fields {
if field.Interface != nil {
kvs = append(kvs, attribute.String(field.Key, fmt.Sprint(field.Interface)))
} else if field.String != "" {
kvs = append(kvs, attribute.String(field.Key, field.String))
} else if field.Integer != 0 {
kvs = append(kvs, attribute.Int64(field.Key, field.Integer))
}
}

return trace.WithAttributes(kvs...)
}
85 changes: 72 additions & 13 deletions core/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,38 @@ type Broadcaster interface {
Broadcast(context.Context, Duty, PubKey, AggSignedData) error
}

// wireFuncs defines the core workflow components as a list input and output functions
// instead as interfaces, since functions are easier to wrap than interfaces.
type wireFuncs struct {
SchedulerSubscribe func(func(context.Context, Duty, FetchArgSet) error)
FetcherFetch func(context.Context, Duty, FetchArgSet) error
FetcherSubscribe func(func(context.Context, Duty, UnsignedDataSet) error)
ConsensusPropose func(context.Context, Duty, UnsignedDataSet) error
ConsensusSubscribe func(func(context.Context, Duty, UnsignedDataSet) error)
DutyDBStore func(context.Context, Duty, UnsignedDataSet) error
DutyDBAwaitAttestation func(ctx context.Context, slot, commIdx int64) (*eth2p0.AttestationData, error)
DutyDBPubKeyByAttestation func(ctx context.Context, slot, commIdx, valCommIdx int64) (PubKey, error)
VAPIRegisterAwaitAttestation func(func(ctx context.Context, slot, commIdx int64) (*eth2p0.AttestationData, error))
VAPIRegisterPubKeyByAttestation func(func(ctx context.Context, slot, commIdx, valCommIdx int64) (PubKey, error))
VAPIRegisterParSigDB func(func(context.Context, Duty, ParSignedDataSet) error)
ParSigDBStoreInternal func(context.Context, Duty, ParSignedDataSet) error
ParSigDBStoreExternal func(context.Context, Duty, ParSignedDataSet) error
ParSigDBSubscribeInternal func(func(context.Context, Duty, ParSignedDataSet) error)
ParSigDBSubscribeThreshold func(func(context.Context, Duty, PubKey, []ParSignedData) error)
ParSigExBroadcast func(context.Context, Duty, ParSignedDataSet) error
ParSigExSubscribe func(func(context.Context, Duty, ParSignedDataSet) error)
SigAggAggregate func(context.Context, Duty, PubKey, []ParSignedData) error
SigAggSubscribe func(func(context.Context, Duty, PubKey, AggSignedData) error)
AggSigDBStore func(context.Context, Duty, PubKey, AggSignedData) error
AggSigDBAwait func(context.Context, Duty, PubKey) (AggSignedData, error)
BroadcasterBroadcast func(context.Context, Duty, PubKey, AggSignedData) error
}

// WireOption defines a functional option to configure wiring.
type WireOption func(*wireFuncs)

// Wire wires the workflow components together.
func Wire(
sched Scheduler,
func Wire(sched Scheduler,
fetch Fetcher,
cons Consensus,
dutyDB DutyDB,
Expand All @@ -140,16 +169,46 @@ func Wire(
sigAgg SigAgg,
aggSigDB AggSigDB,
bcast Broadcaster,
opts ...WireOption,
) {
sched.Subscribe(fetch.Fetch)
fetch.Subscribe(cons.Propose)
cons.Subscribe(dutyDB.Store)
vapi.RegisterAwaitAttestation(dutyDB.AwaitAttestation)
vapi.RegisterPubKeyByAttestation(dutyDB.PubKeyByAttestation)
vapi.RegisterParSigDB(parSigDB.StoreInternal)
parSigDB.SubscribeInternal(parSigEx.Broadcast)
parSigEx.Subscribe(parSigDB.StoreExternal)
parSigDB.SubscribeThreshold(sigAgg.Aggregate)
sigAgg.Subscribe(aggSigDB.Store)
sigAgg.Subscribe(bcast.Broadcast)
w := wireFuncs{
SchedulerSubscribe: sched.Subscribe,
FetcherFetch: fetch.Fetch,
FetcherSubscribe: fetch.Subscribe,
ConsensusPropose: cons.Propose,
ConsensusSubscribe: cons.Subscribe,
DutyDBStore: dutyDB.Store,
DutyDBAwaitAttestation: dutyDB.AwaitAttestation,
DutyDBPubKeyByAttestation: dutyDB.PubKeyByAttestation,
VAPIRegisterAwaitAttestation: vapi.RegisterAwaitAttestation,
VAPIRegisterPubKeyByAttestation: vapi.RegisterPubKeyByAttestation,
VAPIRegisterParSigDB: vapi.RegisterParSigDB,
ParSigDBStoreInternal: parSigDB.StoreInternal,
ParSigDBStoreExternal: parSigDB.StoreExternal,
ParSigDBSubscribeInternal: parSigDB.SubscribeInternal,
ParSigDBSubscribeThreshold: parSigDB.SubscribeThreshold,
ParSigExBroadcast: parSigEx.Broadcast,
ParSigExSubscribe: parSigEx.Subscribe,
SigAggAggregate: sigAgg.Aggregate,
SigAggSubscribe: sigAgg.Subscribe,
AggSigDBStore: aggSigDB.Store,
AggSigDBAwait: aggSigDB.Await,
BroadcasterBroadcast: bcast.Broadcast,
}

for _, opt := range opts {
opt(&w)
}

w.SchedulerSubscribe(w.FetcherFetch)
w.FetcherSubscribe(w.ConsensusPropose)
w.ConsensusSubscribe(w.DutyDBStore)
w.VAPIRegisterAwaitAttestation(w.DutyDBAwaitAttestation)
w.VAPIRegisterPubKeyByAttestation(w.DutyDBPubKeyByAttestation)
w.VAPIRegisterParSigDB(w.ParSigDBStoreInternal)
w.ParSigDBSubscribeInternal(w.ParSigExBroadcast)
w.ParSigExSubscribe(w.ParSigDBStoreExternal)
w.ParSigDBSubscribeThreshold(w.SigAggAggregate)
w.SigAggSubscribe(w.AggSigDBStore)
w.SigAggSubscribe(w.BroadcasterBroadcast)
}
7 changes: 7 additions & 0 deletions core/leadercast/leadercast.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package leadercast
import (
"context"

"go.opentelemetry.io/otel/trace"

"github.com/obolnetwork/charon/app/errors"
"github.com/obolnetwork/charon/app/log"
"github.com/obolnetwork/charon/app/z"
Expand Down Expand Up @@ -66,12 +68,17 @@ func (l *LeaderCast) Run(ctx context.Context) error {

log.Debug(ctx, "received duty from leader", z.Int("peer", source), z.Any("duty", duty))

var span trace.Span
ctx, span = core.StartDutyTrace(ctx, duty, "core/leadercast.Handle")

for _, sub := range l.subs {
if err := sub(ctx, duty, data); err != nil {
log.Error(ctx, "subscriber error", err)
continue
}
}

span.End()
}
}

Expand Down
5 changes: 5 additions & 0 deletions core/parsigex/parsigex.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"go.opentelemetry.io/otel/trace"

"github.com/obolnetwork/charon/app/errors"
"github.com/obolnetwork/charon/app/log"
Expand Down Expand Up @@ -64,6 +65,10 @@ func (m *ParSigEx) handle(s network.Stream) {
return
}

var span trace.Span
ctx, span = core.StartDutyTrace(ctx, msg.Duty, "core/parsigex.Handle")
defer span.End()

for _, sub := range m.subs {
err := sub(ctx, msg.Duty, msg.Data)
if err != nil {
Expand Down
6 changes: 6 additions & 0 deletions core/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
eth2p0 "github.com/attestantio/go-eth2-client/spec/phase0"
"github.com/jonboulle/clockwork"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/trace"

"github.com/obolnetwork/charon/app/errors"
"github.com/obolnetwork/charon/app/log"
Expand Down Expand Up @@ -142,15 +143,20 @@ func (s *Scheduler) scheduleSlot(ctx context.Context, slot slot) error {

instrumentDuty(duty, argSet)

var span trace.Span
ctx, span = core.StartDutyTrace(ctx, duty, "core/scheduler.scheduleSlot")

for _, sub := range s.subs {
err := sub(ctx, duty, argSet)
if err != nil {
// TODO(corver): Improve error handling; possibly call subscription async
// with backoff until duty expires.
span.End()
return err
}
}

span.End()
delete(s.duties, duty)
}

Expand Down
4 changes: 3 additions & 1 deletion core/sigagg/sigagg.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/coinbase/kryptology/pkg/signatures/bls/bls_sig"

"github.com/obolnetwork/charon/app/errors"
"github.com/obolnetwork/charon/app/tracer"
"github.com/obolnetwork/charon/core"
"github.com/obolnetwork/charon/tbls"
"github.com/obolnetwork/charon/tbls/tblsconv"
Expand Down Expand Up @@ -66,7 +67,6 @@ func (a *Aggregator) Aggregate(ctx context.Context, duty core.Duty, pubkey core.
if err != nil {
return err
}

if i == 0 {
firstParSig = parSig
firstRoot = root
Expand All @@ -86,7 +86,9 @@ func (a *Aggregator) Aggregate(ctx context.Context, duty core.Duty, pubkey core.
}

// Aggregate signatures
_, span := tracer.Start(ctx, "tbls.Aggregate")
sig, err := tbls.Aggregate(blsSigs)
span.End()
if err != nil {
return err
}
Expand Down
Loading