Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
corverroos committed Mar 30, 2022
1 parent 9555c26 commit a472001
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 26 deletions.
45 changes: 36 additions & 9 deletions app/log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ package log

import (
"context"
"fmt"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"

"github.com/obolnetwork/charon/app/errors"
Expand Down Expand Up @@ -64,30 +67,38 @@ func metricsTopicFromCtx(ctx context.Context) string {
}

// Debug logs the message and fields (incl fields in the context) at Debug level.
// TODO(corver): Add indication of when debug should be used.
// Debug should be used for most logging.
func Debug(ctx context.Context, msg string, fields ...z.Field) {
logger.Debug(msg, unwrapDedup(ctx, fields...)...)
zfl := unwrapDedup(ctx, fields...)
trace.SpanFromContext(ctx).AddEvent("log.Debug: "+msg, toAttributes(zfl))
logger.Debug(msg, zfl...)
}

// Info logs the message and fields (incl fields in the context) at Info level.
// TODO(corver): Add indication of when info should be used.
// Info should only be used for high level important events.
func Info(ctx context.Context, msg string, fields ...z.Field) {
logger.Info(msg, unwrapDedup(ctx, fields...)...)
zfl := unwrapDedup(ctx, fields...)
trace.SpanFromContext(ctx).AddEvent("log.Info: "+msg, toAttributes(zfl))
logger.Info(msg, zfl...)
}

// Warn logs the message and fields (incl fields in the context) at Warn level.
// TODO(corver): Add indication of when warn should be used.
// Warn should only be used when a problem is encountered that *does not* require any action to be taken.
func Warn(ctx context.Context, msg string, fields ...z.Field) {
zfl := unwrapDedup(ctx, fields...)
trace.SpanFromContext(ctx).AddEvent("log.Warn: "+msg, toAttributes(zfl))
logger.Warn(msg, zfl...)
incWarnCounter(ctx)
logger.Warn(msg, unwrapDedup(ctx, fields...)...)
}

// Error wraps err with msg and fields and logs it (incl fields in the context) at Error level.
// TODO(corver): Add indication of when error should be used.
// Error should only be used when a problem is encountered that *does* require action to be taken.
func Error(ctx context.Context, msg string, err error, fields ...z.Field) {
incErrorCounter(ctx)
err = errors.SkipWrap(err, msg, 2, fields...)
logger.Error(err.Error(), unwrapDedup(ctx, errFields(err))...)
zfl := unwrapDedup(ctx, errFields(err))
trace.SpanFromContext(ctx).RecordError(err, trace.WithStackTrace(true), toAttributes(zfl))
logger.Error(err.Error(), zfl...)
incErrorCounter(ctx)
}

// unwrapDedup returns the wrapped zap fields from the slice and from the context. Duplicate fields are dropped.
Expand Down Expand Up @@ -137,3 +148,19 @@ func errFields(err error) z.Field {
}
}
}

// toAttributes returns the zap fields as tracing event attributes.
func toAttributes(fields []zap.Field) trace.EventOption {
var kvs []attribute.KeyValue
for _, field := range fields {
if field.Interface != nil {
kvs = append(kvs, attribute.String(field.Key, fmt.Sprint(field.Interface)))
} else if field.String != "" {
kvs = append(kvs, attribute.String(field.Key, field.String))
} else if field.Integer != 0 {
kvs = append(kvs, attribute.Int64(field.Key, field.Integer))
}
}

return trace.WithAttributes(kvs...)
}
3 changes: 1 addition & 2 deletions core/leadercast/leadercast.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (

"github.com/obolnetwork/charon/app/errors"
"github.com/obolnetwork/charon/app/log"
"github.com/obolnetwork/charon/app/tracer"
"github.com/obolnetwork/charon/app/z"
"github.com/obolnetwork/charon/core"
)
Expand Down Expand Up @@ -70,7 +69,7 @@ func (l *LeaderCast) Run(ctx context.Context) error {
log.Debug(ctx, "received duty from leader", z.Int("peer", source), z.Any("duty", duty))

var span trace.Span
ctx, span = tracer.Start(core.DutyTraceRoot(ctx, duty), "core/leadercast.Handle")
ctx, span = core.StartDutyTrace(ctx, duty, "core/leadercast.Handle")

for _, sub := range l.subs {
if err := sub(ctx, duty, data); err != nil {
Expand Down
3 changes: 1 addition & 2 deletions core/parsigex/parsigex.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (

"github.com/obolnetwork/charon/app/errors"
"github.com/obolnetwork/charon/app/log"
"github.com/obolnetwork/charon/app/tracer"
"github.com/obolnetwork/charon/app/z"
"github.com/obolnetwork/charon/core"
)
Expand Down Expand Up @@ -67,7 +66,7 @@ func (m *ParSigEx) handle(s network.Stream) {
}

var span trace.Span
ctx, span = tracer.Start(core.DutyTraceRoot(ctx, msg.Duty), "core/parsigex.Handle")
ctx, span = core.StartDutyTrace(ctx, msg.Duty, "core/parsigex.Handle")
defer span.End()

for _, sub := range m.subs {
Expand Down
3 changes: 1 addition & 2 deletions core/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (

"github.com/obolnetwork/charon/app/errors"
"github.com/obolnetwork/charon/app/log"
"github.com/obolnetwork/charon/app/tracer"
"github.com/obolnetwork/charon/app/z"
"github.com/obolnetwork/charon/core"
)
Expand Down Expand Up @@ -145,7 +144,7 @@ func (s *Scheduler) scheduleSlot(ctx context.Context, slot slot) error {
instrumentDuty(duty, argSet)

var span trace.Span
ctx, span = tracer.Start(core.DutyTraceRoot(ctx, duty), "core/scheduler.scheduleSlot")
ctx, span = core.StartDutyTrace(ctx, duty, "core/scheduler.scheduleSlot")

for _, sub := range s.subs {
err := sub(ctx, duty, argSet)
Expand Down
32 changes: 28 additions & 4 deletions core/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,48 @@ package core

import (
"context"
"fmt"
"hash/fnv"
"strings"

eth2p0 "github.com/attestantio/go-eth2-client/spec/phase0"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

"github.com/obolnetwork/charon/app/tracer"
)

// DutyTraceRoot returns a copy of the parent context containing a tracing span context rooted
// to the duty.
func DutyTraceRoot(ctx context.Context, duty Duty) context.Context {
// StartDutyTrace returns a context and span rooted to the duty traceID and wrapped in a duty span.
// This creates a new trace root and should generally only be called when a new duty is scheduled
// or when a duty is received from the VC or peer.
func StartDutyTrace(ctx context.Context, duty Duty, spanName string, opts ...trace.SpanStartOption) (context.Context, trace.Span) {
h := fnv.New128a()
_, _ = h.Write([]byte(duty.String()))

var traceID trace.TraceID
copy(traceID[:], h.Sum(nil))

return tracer.RootedCtx(ctx, traceID)
var outerSpan, innerSpan trace.Span
ctx, outerSpan = tracer.Start(tracer.RootedCtx(ctx, traceID), fmt.Sprintf("core/duty.%s", strings.Title(duty.Type.String())))
ctx, innerSpan = tracer.Start(ctx, spanName, opts...)

outerSpan.SetAttributes(attribute.Int64("slot", duty.Slot))

return ctx, withEndSpan{
Span: innerSpan,
endFunc: func() { outerSpan.End() },
}
}

// withEndSpan wraps a trace span and calls endFunc when End is called.
type withEndSpan struct {
trace.Span
endFunc func()
}

func (s withEndSpan) End(options ...trace.SpanEndOption) {
s.Span.End(options...)
s.endFunc()
}

// WithTracing wraps component input functions with tracing spans.
Expand Down
4 changes: 2 additions & 2 deletions core/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ func TestWithDutySpanCtx(t *testing.T) {
require.NoError(t, stop(ctx))
}()

_, span1 := tracer.Start(core.DutyTraceRoot(ctx, core.Duty{}), "span1")
_, span2 := tracer.Start(core.DutyTraceRoot(ctx, core.Duty{}), "span2")
_, span1 := core.StartDutyTrace(ctx, core.Duty{}, "span1")
_, span2 := core.StartDutyTrace(ctx, core.Duty{}, "span2")

require.Equal(t, "7d0b160d5b04eac85dd1eaf0585c5b82", span1.SpanContext().TraceID().String())
require.Equal(t, span1.SpanContext().TraceID(), span2.SpanContext().TraceID())
Expand Down
8 changes: 3 additions & 5 deletions core/validatorapi/validatorapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,8 @@ func (c *Component) RegisterParSigDB(fn func(context.Context, core.Duty, core.Pa
}

// AttestationData implements the eth2client.AttesterDutiesProvider for the router.
func (c Component) AttestationData(ctx context.Context, slot eth2p0.Slot, committeeIndex eth2p0.CommitteeIndex) (*eth2p0.AttestationData, error) {
duty := core.NewAttesterDuty(int64(slot))
var span trace.Span
ctx, span = tracer.Start(core.DutyTraceRoot(ctx, duty), "core/validatorapi.AttestationData")
func (c Component) AttestationData(parent context.Context, slot eth2p0.Slot, committeeIndex eth2p0.CommitteeIndex) (*eth2p0.AttestationData, error) {
ctx, span := core.StartDutyTrace(parent, core.NewAttesterDuty(int64(slot)), "core/validatorapi.AttestationData")
defer span.End()

return c.awaitAttFunc(ctx, int64(slot), int64(committeeIndex))
Expand All @@ -184,7 +182,7 @@ func (c Component) SubmitAttestations(ctx context.Context, attestations []*eth2p
// Pick the first attestation slot to use as trace root.
duty := core.NewAttesterDuty(int64(attestations[0].Data.Slot))
var span trace.Span
ctx, span = tracer.Start(core.DutyTraceRoot(ctx, duty), "core/validatorapi.SubmitAttestations")
ctx, span = core.StartDutyTrace(ctx, duty, "core/validatorapi.SubmitAttestations")
defer span.End()
}

Expand Down

0 comments on commit a472001

Please sign in to comment.