Skip to content

Commit

Permalink
app/retry: implement async retryer (#346)
Browse files Browse the repository at this point in the history
Package retry provides a generic async slot function executor with retries for robustness against network failures.
Functions are linked to a slot, executed asynchronously and network or context errors retried with backoff
until duties related to a slot have elapsed (5 slots later).

category: feature
ticket: #354
  • Loading branch information
corverroos committed Apr 5, 2022
1 parent a2d294d commit 07a2e54
Show file tree
Hide file tree
Showing 11 changed files with 483 additions and 16 deletions.
17 changes: 15 additions & 2 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/obolnetwork/charon/app/errors"
"github.com/obolnetwork/charon/app/lifecycle"
"github.com/obolnetwork/charon/app/log"
"github.com/obolnetwork/charon/app/retry"
"github.com/obolnetwork/charon/app/tracer"
"github.com/obolnetwork/charon/app/version"
"github.com/obolnetwork/charon/app/z"
Expand Down Expand Up @@ -315,9 +316,16 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config,
return err
}

retryer, err := retry.New(ctx, eth2Cl)
if err != nil {
return err
}

core.Wire(sched, fetch, consensus, dutyDB, vapi,
parSigDB, parSigEx, sigAgg, aggSigDB, broadcaster,
core.WithTracing())
core.WithTracing(),
core.WithAsyncRetry(retryer),
)

err = wireValidatorMock(conf, pubshares, sched)
if err != nil {
Expand All @@ -332,6 +340,7 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config,
life.RegisterStart(lifecycle.AsyncBackground, lifecycle.StartScheduler, lifecycle.HookFuncErr(sched.Run))
life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartAggSigDB, lifecycle.HookFuncCtx(aggSigDB.Run))
life.RegisterStop(lifecycle.StopScheduler, lifecycle.HookFuncMin(sched.Stop))
life.RegisterStop(lifecycle.StopRetryer, lifecycle.HookFuncCtx(retryer.Shutdown))

return nil
}
Expand Down Expand Up @@ -458,7 +467,11 @@ func wireValidatorMock(conf Config, pubshares []eth2p0.BLSPubKey, sched core.Sch
ctx = log.WithTopic(ctx, "vmock")
go func() {
addr := "http://" + conf.ValidatorAPIAddr
cl, err := eth2http.New(ctx, eth2http.WithLogLevel(1), eth2http.WithAddress(addr))
cl, err := eth2http.New(ctx,
eth2http.WithLogLevel(1),
eth2http.WithAddress(addr),
eth2http.WithTimeout(time.Second*10), // Allow sufficient time to block while fetching duties.
)
if err != nil {
log.Warn(ctx, "Cannot connect to validatorapi", z.Err(err))
return
Expand Down
1 change: 1 addition & 0 deletions app/lifecycle/order.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,5 @@ const (
StopMonitoringAPI
StopBeaconMock // Need to close this before validator API, since it can hold long lived connections.
StopValidatorAPI
StopRetryer
)
5 changes: 5 additions & 0 deletions app/log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ func WithCtx(ctx context.Context, fields ...z.Field) context.Context {
return context.WithValue(ctx, ctxKey{}, append(fields, fieldsFromCtx(ctx)...))
}

// CopyFields returns a copy of the target with which the logging fields of the source context are associated.
func CopyFields(target context.Context, source context.Context) context.Context {
return context.WithValue(target, ctxKey{}, fieldsFromCtx(source))
}

// WithTopic is a convenience function that adds the topic
// contextual logging field to the returned child context.
func WithTopic(ctx context.Context, component string) context.Context {
Expand Down
18 changes: 18 additions & 0 deletions app/log/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"testing"
"time"

"github.com/stretchr/testify/require"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest"

Expand Down Expand Up @@ -76,6 +77,23 @@ func TestErrorWrapOther(t *testing.T) {
testutil.RequireGoldenBytes(t, buf.Bytes())
}

func TestCopyFields(t *testing.T) {
buf := setup(t)

ctx1, cancel := context.WithCancel(context.Background())
ctx1 = log.WithCtx(ctx1, z.Str("source", "source"))
ctx2 := log.CopyFields(context.Background(), ctx1)

cancel()
require.Error(t, ctx1.Err())
require.NoError(t, ctx2.Err())

log.Info(ctx1, "see source")
log.Info(ctx2, "also source")

testutil.RequireGoldenBytes(t, buf.Bytes())
}

// setup returns a buffer that logs are written to and stubs non-deterministic logging fields.
func setup(t *testing.T) *bytes.Buffer {
t.Helper()
Expand Down
2 changes: 2 additions & 0 deletions app/log/testdata/TestCopyFields
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
00:00 INFO see source {"source": "source", "caller": "log_test.go:91"}
00:00 INFO also source {"source": "source", "caller": "log_test.go:92"}
12 changes: 6 additions & 6 deletions app/log/testdata/TestErrorWrap
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
00:00 ERRO err1: first {"1": 1, "caller": "log_test.go:59"}
app/log/log_test.go:54 .TestErrorWrap
00:00 ERRO err2: second: first {"2": 2, "1": 1, "caller": "log_test.go:60"}
app/log/log_test.go:54 .TestErrorWrap
00:00 ERRO err3: third: second: first {"3": 3, "2": 2, "1": 1, "caller": "log_test.go:61"}
app/log/log_test.go:54 .TestErrorWrap
00:00 ERRO err1: first {"1": 1, "caller": "log_test.go:60"}
app/log/log_test.go:55 .TestErrorWrap
00:00 ERRO err2: second: first {"2": 2, "1": 1, "caller": "log_test.go:61"}
app/log/log_test.go:55 .TestErrorWrap
00:00 ERRO err3: third: second: first {"3": 3, "2": 2, "1": 1, "caller": "log_test.go:62"}
app/log/log_test.go:55 .TestErrorWrap
8 changes: 4 additions & 4 deletions app/log/testdata/TestErrorWrapOther
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
00:00 ERRO err1: EOF {"caller": "log_test.go:73"}
app/log/log_test.go:73 .TestErrorWrapOther
00:00 ERRO err2: wrap: EOF {"caller": "log_test.go:74"}
app/log/log_test.go:70 .TestErrorWrapOther
00:00 ERRO err1: EOF {"caller": "log_test.go:74"}
app/log/log_test.go:74 .TestErrorWrapOther
00:00 ERRO err2: wrap: EOF {"caller": "log_test.go:75"}
app/log/log_test.go:71 .TestErrorWrapOther
8 changes: 4 additions & 4 deletions app/log/testdata/TestWithContext
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
00:00 DEBG msg1 {"ctx1": 1, "caller": "log_test.go:43"}
00:00 INFO msg2 {"ctx2": 2, "wrap2": 2, "caller": "log_test.go:44"}
00:00 WARN msg3a {"wrap3": "a", "wrap2": 2, "caller": "log_test.go:45"}
00:00 WARN msg3b {"wrap3": "b", "wrap2": 2, "caller": "log_test.go:46"}
00:00 DEBG msg1 {"ctx1": 1, "caller": "log_test.go:44"}
00:00 INFO msg2 {"ctx2": 2, "wrap2": 2, "caller": "log_test.go:45"}
00:00 WARN msg3a {"wrap3": "a", "wrap2": 2, "caller": "log_test.go:46"}
00:00 WARN msg3b {"wrap3": "b", "wrap2": 2, "caller": "log_test.go:47"}
211 changes: 211 additions & 0 deletions app/retry/retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
// Copyright © 2021 Obol Technologies Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package retry provides a generic async slot function executor with retries for robustness against network failures.
// Functions are linked to a slot, executed asynchronously and network or context errors retried with backoff
// until duties related to a slot have elapsed (5 slots later).
package retry

import (
"context"
"net"
"strings"
"sync"
"testing"
"time"

eth2client "github.com/attestantio/go-eth2-client"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

"github.com/obolnetwork/charon/app/errors"
"github.com/obolnetwork/charon/app/log"
"github.com/obolnetwork/charon/app/tracer"
"github.com/obolnetwork/charon/app/z"
)

// lateFactor defines the number of slots duties may be late.
// See https://pintail.xyz/posts/modelling-the-impact-of-altair/#proposer-and-delay-rewards.
const lateFactor = 5

// slotTimeProvider defines eth2client interface for resolving slot start times.
type slotTimeProvider interface {
eth2client.GenesisTimeProvider
eth2client.SlotDurationProvider
}

// New returns a new Retryer instance.
func New(ctx context.Context, eth2Svc eth2client.Service) (*Retryer, error) {
eth2Cl, ok := eth2Svc.(slotTimeProvider)
if !ok {
return nil, errors.New("invalid eth2 service")
}

genesis, err := eth2Cl.GenesisTime(ctx)
if err != nil {
return nil, err
}

duration, err := eth2Cl.SlotDuration(ctx)
if err != nil {
return nil, err
}

// ctxTimeoutFunc returns a context that is cancelled when duties for a slot have elapsed.
ctxTimeoutFunc := func(ctx context.Context, slot int64) (context.Context, context.CancelFunc) {
start := genesis.Add(duration * time.Duration(slot))
end := start.Add(duration * time.Duration(lateFactor))

return context.WithTimeout(ctx, time.Until(end))
}

// backoffProvider is a naive constant 1s backoff function.
backoffProvider := func() func() <-chan time.Time {
return func() <-chan time.Time {
const backoff = time.Second
return time.After(backoff)
}
}

return &Retryer{
shutdown: make(chan struct{}),
ctxTimeoutFunc: ctxTimeoutFunc,
backoffProvider: backoffProvider,
}, nil
}

// NewForT returns a new Retryer instance for testing supporting a custom clock.
func NewForT(
_ *testing.T,
ctxTimeoutFunc func(ctx context.Context, slot int64) (context.Context, context.CancelFunc),
backoffProvider func() func() <-chan time.Time,
) (*Retryer, error) {
return &Retryer{
shutdown: make(chan struct{}),
ctxTimeoutFunc: ctxTimeoutFunc,
backoffProvider: backoffProvider,
}, nil
}

// Retryer provides execution of functions asynchronously with retry adding robustness to network errors.
type Retryer struct {
shutdown chan struct{}
ctxTimeoutFunc func(ctx context.Context, slot int64) (context.Context, context.CancelFunc)
backoffProvider func() func() <-chan time.Time

wg sync.WaitGroup
}

// DoAsync will execute the function including retries on network or context errors.
// It is intended to be used asynchronously:
// go retryer.DoAsync(ctx, duty.Slot, "foo", fn)
func (r *Retryer) DoAsync(parent context.Context, slot int64, name string, fn func(context.Context) error) {
if r.isShutdown() {
return
}

r.wg.Add(1)
defer r.wg.Done()

backoffFunc := r.backoffProvider()

// Switch to a new context since this is async and parent context may be closed.
ctx := log.CopyFields(context.Background(), parent)
ctx = log.WithTopic(ctx, "retry")
ctx = trace.ContextWithSpan(ctx, trace.SpanFromContext(parent))
ctx, cancel := r.ctxTimeoutFunc(ctx, slot)
defer cancel()

ctx, span := tracer.Start(ctx, "app/retry.DoAsync")
defer span.End()
span.SetAttributes(attribute.String("name", name))

for i := 0; ; i++ {
span.AddEvent("retry.attempt.start", trace.WithAttributes(attribute.Int("i", i)))

err := fn(ctx)
if err == nil {
return
}

var nerr net.Error
isNetErr := errors.As(err, &nerr)
isTempErr := isTemporaryBeaconErr(err)
isCtxErr := errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)
// Note that the local context is not checked, since we care about downstream timeouts.

if !isCtxErr && !isNetErr && !isTempErr {
log.Error(ctx, "Permanent failure calling "+name, err)
return
}

if ctx.Err() == nil {
log.Warn(ctx, "Temporary failure (will retry) calling "+name, z.Err(err))
span.AddEvent("retry.backoff.start")
select {
case <-backoffFunc():
case <-ctx.Done():
case <-r.shutdown:
return
}
span.AddEvent("retry.backoff.done")
}

if ctx.Err() != nil {
log.Error(ctx, "Timeout retrying "+name, ctx.Err())
return
}
}
}

// isTemporaryBeaconErr returns true if the error is a temporary beacon node error.
// eth2http doesn't return structured errors or error sentinels, so this is brittle.
func isTemporaryBeaconErr(err error) bool {
// Check for timing errors like:
// - Proposer duties were requested for a future epoch.
// - Cannot create attestation for future slot.
if strings.Contains(err.Error(), "future") { //nolint:gosimple // More checks will be added below.
return true
}

// TODO(corver): Add more checks here.

return false
}

// isShutdown returns true if Shutdown has been called.
func (r *Retryer) isShutdown() bool {
select {
case <-r.shutdown:
return true
default:
return false
}
}

// Shutdown triggers graceful shutdown and waits for all active function to complete or timeout.
func (r *Retryer) Shutdown(ctx context.Context) {
close(r.shutdown)

done := make(chan struct{})
go func() {
r.wg.Wait()
done <- struct{}{}
}()

select {
case <-ctx.Done():
case <-done:
}
}
Loading

0 comments on commit 07a2e54

Please sign in to comment.