Skip to content

Commit

Permalink
fix(store): properly update heightSub height
Browse files Browse the repository at this point in the history
  • Loading branch information
cristaloleg committed Jun 25, 2024
1 parent 0a52c6e commit 9e25e4a
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 7 deletions.
35 changes: 28 additions & 7 deletions store/heightsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ type heightSub[H header.Header[H]] struct {
height atomic.Uint64
heightReqsLk sync.Mutex
heightReqs map[uint64]map[chan H]struct{}
knownHeights map[uint64]struct{}
}

// newHeightSub instantiates new heightSub.
func newHeightSub[H header.Header[H]]() *heightSub[H] {
return &heightSub[H]{
heightReqs: make(map[uint64]map[chan H]struct{}),
heightReqs: make(map[uint64]map[chan H]struct{}),
knownHeights: map[uint64]struct{}{},
}
}

Expand Down Expand Up @@ -89,17 +91,13 @@ func (hs *heightSub[H]) Pub(headers ...H) {
return
}

height := hs.Height()
from, to := headers[0].Height(), headers[ln-1].Height()
if height+1 != from && height != 0 { // height != 0 is needed to enable init from any height and not only 1
log.Fatalf("PLEASE FILE A BUG REPORT: headers given to the heightSub are in the wrong order: expected %d, got %d", height+1, from)
return
}
hs.SetHeight(to)

hs.heightReqsLk.Lock()
defer hs.heightReqsLk.Unlock()

hs.tryAdvanceHeight(headers...)

// there is a common case where we Pub only header
// in this case, we shouldn't loop over each heightReqs
// and instead read from the map directly
Expand Down Expand Up @@ -128,3 +126,26 @@ func (hs *heightSub[H]) Pub(headers ...H) {
}
}
}

func (hs *heightSub[H]) tryAdvanceHeight(headers ...H) {
curr := hs.Height()

// collect all new heights.
for i := range headers {
h := headers[i].Height()
if h > curr {
hs.knownHeights[h] = struct{}{}
}
}

// try advance heightSub.Height if we saw a relevant height before.
for len(hs.knownHeights) > 0 {
_, ok := hs.knownHeights[curr+1]
if !ok {
break
}
delete(hs.knownHeights, curr+1)
curr++
}
hs.SetHeight(curr)
}
31 changes: 31 additions & 0 deletions store/heightsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,37 @@ func TestHeightSub(t *testing.T) {
}
}

func TestHeightSubNonAdjacement(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

hs := newHeightSub[*headertest.DummyHeader]()

{
h := headertest.RandDummyHeader(t)
h.HeightI = 100
hs.SetHeight(99)
hs.Pub(h)
}

{
go func() {
// fixes flakiness on CI
time.Sleep(time.Millisecond)

h1 := headertest.RandDummyHeader(t)
h1.HeightI = 200
h2 := headertest.RandDummyHeader(t)
h2.HeightI = 300
hs.Pub(h1, h2)
}()

h, err := hs.Sub(ctx, 200)
assert.NoError(t, err)
assert.NotNil(t, h)
}
}

func TestHeightSubCancellation(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
Expand Down
52 changes: 52 additions & 0 deletions store/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,58 @@ func TestStore_Append_BadHeader(t *testing.T) {
require.Error(t, err)
}

func TestStore_Append_stableHeadWhenGaps(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
t.Cleanup(cancel)

suite := headertest.NewTestSuite(t)

ds := sync.MutexWrap(datastore.NewMapDatastore())
store := NewTestStore(t, ctx, ds, suite.Head(), WithWriteBatchSize(4))

head, err := store.Head(ctx)
require.NoError(t, err)
assert.Equal(t, head.Hash(), suite.Head().Hash())

in := suite.GenDummyHeaders(5)

err = store.Append(ctx, in...)
require.NoError(t, err)
// wait for batch to be written.
time.Sleep(100 * time.Millisecond)

wantHead := in[4] // last header from incomming headers.

head, err = store.Head(ctx)
require.NoError(t, err)
assert.Equal(t, head.Hash(), wantHead.Hash())

in = suite.GenDummyHeaders(10)
// make a gap
missedHeaders, in := in[:5], in[5:]
latestHead := in[len(in)-1]

err = store.Append(ctx, in...)
require.NoError(t, err)
// wait for batch to be written.
time.Sleep(100 * time.Millisecond)

// head is not advanced due to a gap.
head, err = store.Head(ctx)
require.NoError(t, err)
assert.Equal(t, head.Hash(), wantHead.Hash())

err = store.Append(ctx, missedHeaders...)
require.NoError(t, err)
// wait for batch to be written.
time.Sleep(100 * time.Millisecond)

// after appending missing headers we're on a latest header.
head, err = store.Head(ctx)
require.NoError(t, err)
assert.Equal(t, head.Hash(), latestHead.Hash())
}

// TestStore_GetRange tests possible combinations of requests and ensures that
// the store can handle them adequately (even malformed requests)
func TestStore_GetRange(t *testing.T) {
Expand Down

0 comments on commit 9e25e4a

Please sign in to comment.