diff --git a/store/heightsub.go b/store/heightsub.go index 2335001..002505a 100644 --- a/store/heightsub.go +++ b/store/heightsub.go @@ -19,12 +19,14 @@ type heightSub[H header.Header[H]] struct { height atomic.Uint64 heightReqsLk sync.Mutex heightReqs map[uint64]map[chan H]struct{} + knownHeights map[uint64]struct{} } // newHeightSub instantiates new heightSub. func newHeightSub[H header.Header[H]]() *heightSub[H] { return &heightSub[H]{ - heightReqs: make(map[uint64]map[chan H]struct{}), + heightReqs: make(map[uint64]map[chan H]struct{}), + knownHeights: map[uint64]struct{}{}, } } @@ -89,17 +91,13 @@ func (hs *heightSub[H]) Pub(headers ...H) { return } - height := hs.Height() from, to := headers[0].Height(), headers[ln-1].Height() - if height+1 != from && height != 0 { // height != 0 is needed to enable init from any height and not only 1 - log.Fatalf("PLEASE FILE A BUG REPORT: headers given to the heightSub are in the wrong order: expected %d, got %d", height+1, from) - return - } - hs.SetHeight(to) hs.heightReqsLk.Lock() defer hs.heightReqsLk.Unlock() + hs.tryAdvanceHeight(headers...) + // there is a common case where we Pub only header // in this case, we shouldn't loop over each heightReqs // and instead read from the map directly @@ -128,3 +126,26 @@ func (hs *heightSub[H]) Pub(headers ...H) { } } } + +func (hs *heightSub[H]) tryAdvanceHeight(headers ...H) { + curr := hs.Height() + + // collect all new heights. + for i := range headers { + h := headers[i].Height() + if h > curr { + hs.knownHeights[h] = struct{}{} + } + } + + // try advance heightSub.Height if we saw a relevant height before. + for len(hs.knownHeights) > 0 { + _, ok := hs.knownHeights[curr+1] + if !ok { + break + } + delete(hs.knownHeights, curr+1) + curr++ + } + hs.SetHeight(curr) +} diff --git a/store/heightsub_test.go b/store/heightsub_test.go index 3a48d95..64ef180 100644 --- a/store/heightsub_test.go +++ b/store/heightsub_test.go @@ -47,6 +47,37 @@ func TestHeightSub(t *testing.T) { } } +func TestHeightSubNonAdjacement(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + + hs := newHeightSub[*headertest.DummyHeader]() + + { + h := headertest.RandDummyHeader(t) + h.HeightI = 100 + hs.SetHeight(99) + hs.Pub(h) + } + + { + go func() { + // fixes flakiness on CI + time.Sleep(time.Millisecond) + + h1 := headertest.RandDummyHeader(t) + h1.HeightI = 200 + h2 := headertest.RandDummyHeader(t) + h2.HeightI = 300 + hs.Pub(h1, h2) + }() + + h, err := hs.Sub(ctx, 200) + assert.NoError(t, err) + assert.NotNil(t, h) + } +} + func TestHeightSubCancellation(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() diff --git a/store/store_test.go b/store/store_test.go index 53d40d5..e52ddb6 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -141,6 +141,58 @@ func TestStore_Append_BadHeader(t *testing.T) { require.Error(t, err) } +func TestStore_Append_stableHeadWhenGaps(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + t.Cleanup(cancel) + + suite := headertest.NewTestSuite(t) + + ds := sync.MutexWrap(datastore.NewMapDatastore()) + store := NewTestStore(t, ctx, ds, suite.Head(), WithWriteBatchSize(4)) + + head, err := store.Head(ctx) + require.NoError(t, err) + assert.Equal(t, head.Hash(), suite.Head().Hash()) + + in := suite.GenDummyHeaders(5) + + err = store.Append(ctx, in...) + require.NoError(t, err) + // wait for batch to be written. + time.Sleep(100 * time.Millisecond) + + wantHead := in[4] // last header from incomming headers. + + head, err = store.Head(ctx) + require.NoError(t, err) + assert.Equal(t, head.Hash(), wantHead.Hash()) + + in = suite.GenDummyHeaders(10) + // make a gap + missedHeaders, in := in[:5], in[5:] + latestHead := in[len(in)-1] + + err = store.Append(ctx, in...) + require.NoError(t, err) + // wait for batch to be written. + time.Sleep(100 * time.Millisecond) + + // head is not advanced due to a gap. + head, err = store.Head(ctx) + require.NoError(t, err) + assert.Equal(t, head.Hash(), wantHead.Hash()) + + err = store.Append(ctx, missedHeaders...) + require.NoError(t, err) + // wait for batch to be written. + time.Sleep(100 * time.Millisecond) + + // after appending missing headers we're on a latest header. + head, err = store.Head(ctx) + require.NoError(t, err) + assert.Equal(t, head.Hash(), latestHead.Hash()) +} + // TestStore_GetRange tests possible combinations of requests and ensures that // the store can handle them adequately (even malformed requests) func TestStore_GetRange(t *testing.T) {