Skip to content

Commit

Permalink
daser persist retry count after restart
Browse files Browse the repository at this point in the history
  • Loading branch information
walldiss committed May 10, 2023
1 parent e632a47 commit c346d81
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 14 deletions.
5 changes: 5 additions & 0 deletions das/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,13 @@ type workerCheckpoint struct {
}

func newCheckpoint(stats SamplingStats) checkpoint {
// make workers list to resume after
workers := make([]workerCheckpoint, 0, len(stats.Workers))
for _, w := range stats.Workers {
// no need to resume retry jobs, since they will kick off from aggregated failed map
if w.JobType == retryJob {
continue
}
workers = append(workers, workerCheckpoint{
From: w.Curr,
To: w.To,
Expand Down
47 changes: 47 additions & 0 deletions das/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/types"

"github.com/celestiaorg/celestia-node/header"
Expand Down Expand Up @@ -252,6 +253,52 @@ func TestCoordinator(t *testing.T) {
expectedState.Failed = make(map[uint64]int)
assert.Equal(t, expectedState, newCheckpoint(coordinator.state.unsafeStats()))
})

t.Run("persist retry count after on restart", func(t *testing.T) {
testParams := defaultTestParams()
testParams.dasParams.ConcurrencyLimit = 5
ctx, cancel := context.WithTimeout(context.Background(), testParams.timeoutDelay)

ch := checkpoint{
SampleFrom: testParams.sampleFrom,
NetworkHead: testParams.networkHead,
Failed: map[uint64]int{1: 1, 2: 2, 3: 3, 4: 4, 5: 5},
Workers: []workerCheckpoint{},
}

waitCh := make(chan struct{})
var wg sync.WaitGroup
wg.Add(testParams.dasParams.ConcurrencyLimit)
sampleFn := func(ctx context.Context, h *header.ExtendedHeader) error {
wg.Done()
select {
case <-ctx.Done():
return ctx.Err()
case <-waitCh:
return nil
}
}

coordinator := newSamplingCoordinator(
testParams.dasParams,
getterStub{},
sampleFn,
newBroadcastMock(1),
)

go coordinator.run(ctx, ch)
cancel()
wg.Wait()
close(waitCh)

stopCtx, cancel := context.WithTimeout(context.Background(), testParams.timeoutDelay)
defer cancel()
assert.NoError(t, coordinator.wait(stopCtx))

st := coordinator.state.unsafeStats()
fmt.Println(st)
require.Equal(t, ch, newCheckpoint(st))
})
}

func BenchmarkCoordinator(b *testing.B) {
Expand Down
45 changes: 35 additions & 10 deletions das/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package das

import (
"context"
"fmt"
"sync/atomic"
"time"

Expand Down Expand Up @@ -71,15 +72,28 @@ func (s *coordinatorState) resumeFromCheckpoint(c checkpoint) {
s.networkHead = c.NetworkHead

for h, count := range c.Failed {
// resumed retries should start without backoff delay
s.failed[h] = retryAttempt{
count: count,
after: time.Now(),
}
}
}

func (s *coordinatorState) handleResult(res result) {
delete(s.inProgress, res.id)

switch res.jobType {
case recentJob, catchupJob:
s.handleRecentOrCatchupResult(res)
case retryJob:
s.handleRetryResult(res)
}

s.checkDone()
}

func (s *coordinatorState) handleRecentOrCatchupResult(res result) {
// check if the worker retried any of the previously failed heights
for h := range s.failed {
if h < res.from || h > res.to {
Expand All @@ -93,16 +107,16 @@ func (s *coordinatorState) handleResult(res result) {

// update failed heights
for h := range res.failed {
// if job was already in retry and failed again, carry over attempt count
lastRetry, ok := s.inRetry[h]
if ok {
if res.job.jobType != retryJob {
// retry job has been already created by another worker (recent or catchup)
continue
}
delete(s.inRetry, h)
}
nextRetry, _ := s.retryStrategy.nextRetry(retryAttempt{}, time.Now())
s.failed[h] = nextRetry
}
}

func (s *coordinatorState) handleRetryResult(res result) {
// handle heights that failed again
for h := range res.failed {
lastRetry := s.inRetry[h]
// height will be retried after backoff
nextRetry, retryExceeded := s.retryStrategy.nextRetry(lastRetry, time.Now())
if retryExceeded {
log.Warnw("header exceeded maximum amount of sampling attempts",
Expand All @@ -111,7 +125,11 @@ func (s *coordinatorState) handleResult(res result) {
}
s.failed[h] = nextRetry
}
s.checkDone()

// cleanup retry from processed heights
for h := res.from; h <= res.to; h++ {
delete(s.inRetry, h)
}
}

func (s *coordinatorState) isNewHead(newHead int64) bool {
Expand Down Expand Up @@ -230,6 +248,7 @@ func (s *coordinatorState) unsafeStats() SamplingStats {
})

for h := range wstats.failed {
fmt.Println("from worker", h)
failed[h]++
if h < lowestFailedOrInProgress {
lowestFailedOrInProgress = h
Expand All @@ -243,12 +262,18 @@ func (s *coordinatorState) unsafeStats() SamplingStats {

// set lowestFailedOrInProgress to minimum failed - 1
for h, retry := range s.failed {
fmt.Println("from global", h)
failed[h] += retry.count
if h < lowestFailedOrInProgress {
lowestFailedOrInProgress = h
}
}

for h, retry := range s.inRetry {
fmt.Println("from retry", h)
failed[h] += retry.count
}

return SamplingStats{
SampledChainHead: lowestFailedOrInProgress - 1,
CatchupHead: s.next - 1,
Expand Down
5 changes: 2 additions & 3 deletions das/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,13 @@ func (w *worker) run(ctx context.Context, timeout time.Duration, resultCh chan<-

for curr := w.state.from; curr <= w.state.to; curr++ {
err := w.sample(ctx, timeout, curr)
w.setResult(curr, err)
if errors.Is(err, context.Canceled) {
// sampling worker will resume upon restart
break
return
}
w.setResult(curr, err)
}

log.With()
log.Infow(
"finished sampling headers",
"from", w.state.from,
Expand Down
3 changes: 2 additions & 1 deletion share/p2p/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ type Metrics struct {
totalRequestCounter syncint64.Counter
}

// ObserveRequests increments the total number of requests sent with the given status as an attribute.
// ObserveRequests increments the total number of requests sent with the given status as an
// attribute.
func (m *Metrics) ObserveRequests(count int64, status status) {
if m == nil {
return
Expand Down

0 comments on commit c346d81

Please sign in to comment.