Skip to content

Commit

Permalink
fix(sync): fix two synchronization issues
Browse files Browse the repository at this point in the history
  • Loading branch information
Wondertan committed Jul 4, 2023
1 parent 6ca6be6 commit 83d0baa
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 15 deletions.
3 changes: 1 addition & 2 deletions p2p/exchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/celestiaorg/go-libp2p-messenger/serde"

"github.com/celestiaorg/go-header"
"github.com/celestiaorg/go-header/headertest"
p2p_pb "github.com/celestiaorg/go-header/p2p/pb"
"github.com/celestiaorg/go-libp2p-messenger/serde"
)

const networkID = "private"
Expand Down
2 changes: 2 additions & 0 deletions sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ type Syncer[H header.Header] struct {
triggerSync chan struct{}
// pending keeps ranges of valid new network headers awaiting to be appended to store
pending ranges[H]
// incomingMu ensures only one incoming network head candidate is processed at the time
incomingMu sync.Mutex

// controls lifecycle for syncLoop
ctx context.Context
Expand Down
47 changes: 34 additions & 13 deletions sync/sync_getter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,50 @@ package sync
import (
"context"
"sync"
"sync/atomic"

"github.com/celestiaorg/go-header"
)

// syncGetter is a Getter wrapper that ensure only one Head call happens at the time
type syncGetter[H header.Header] struct {
getterLk sync.RWMutex
isGetterLk atomic.Bool
header.Getter[H]

headLk sync.RWMutex
headErr error
head H
}

func (se *syncGetter[H]) Head(ctx context.Context) (H, error) {
// the lock construction here ensures only one routine calling Head at a time
// Lock locks the getter for single user.
// Reports 'true' if the lock was held by the current routine.
// Does not require unlocking on 'false'.
func (se *syncGetter[H]) Lock() bool {
// the lock construction here ensures only one routine is freed at a time
// while others wait via Rlock
if !se.headLk.TryLock() {
se.headLk.RLock()
defer se.headLk.RUnlock()
return se.head, se.headErr
locked := se.getterLk.TryLock()
if !locked {
se.getterLk.RLock()
defer se.getterLk.RUnlock()
return false
}
defer se.headLk.Unlock()
se.isGetterLk.Store(locked)
return locked
}

// Unlock unlocks the getter.
func (se *syncGetter[H]) Unlock() {
se.checkLock("Unlock without preceding Lock on syncGetter")
se.getterLk.Unlock()
se.isGetterLk.Store(false)
}

se.head, se.headErr = se.Getter.Head(ctx)
return se.head, se.headErr
// Head must be called with held Lock.
func (se *syncGetter[H]) Head(ctx context.Context) (H, error) {
se.checkLock("Head without preceding Lock on syncGetter")
return se.Getter.Head(ctx)
}

// checkLock ensures api safety
func (se *syncGetter[H]) checkLock(msg string) {
if !se.isGetterLk.Load() {
panic(msg)
}
}
4 changes: 4 additions & 0 deletions sync/sync_getter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ func TestSyncGetterHead(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
if !sex.Lock() {
return
}
defer sex.Unlock()
h, err := sex.Head(ctx)
if h != nil || err != errFakeHead {
t.Fail()
Expand Down
20 changes: 20 additions & 0 deletions sync/sync_head.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,15 @@ func (s *Syncer[H]) Head(ctx context.Context) (H, error) {
// * If now >= TNH && now <= TNH + (THP) header propagation time
// * Wait for header to arrive instead of requesting it
// * This way we don't request as we know the new network header arrives exactly
//
// single-flight protection
// ensure only one Head is requested at the time
if !s.getter.Lock() {
// means that other routine held the lock and set the subjective head for us,
// so just recursively get it
return s.Head(ctx)
}
defer s.getter.Unlock()
netHead, err := s.getter.Head(ctx)
if err != nil {
return netHead, err
Expand Down Expand Up @@ -68,6 +77,14 @@ func (s *Syncer[H]) subjectiveHead(ctx context.Context) (H, error) {
}
// otherwise, request head from a trusted peer
log.Infow("stored head header expired", "height", storeHead.Height())
// single-flight protection
// ensure only one Head is requested at the time
if !s.getter.Lock() {
// means that other routine held the lock and set the subjective head for us,
// so just recursively get it
return s.subjectiveHead(ctx)
}
defer s.getter.Unlock()
trustHead, err := s.getter.Head(ctx)
if err != nil {
return trustHead, err
Expand Down Expand Up @@ -119,6 +136,9 @@ func (s *Syncer[H]) setSubjectiveHead(ctx context.Context, netHead H) {
// incomingNetworkHead processes new potential network headers.
// If the header valid, sets as new subjective header.
func (s *Syncer[H]) incomingNetworkHead(ctx context.Context, netHead H) pubsub.ValidationResult {
// ensure there is no racing between network head candidates
s.incomingMu.Lock()
defer s.incomingMu.Unlock()
// first of all, check the validity of the netHead
res := s.validateHead(ctx, netHead)
if res == pubsub.ValidationAccept {
Expand Down
46 changes: 46 additions & 0 deletions sync/sync_head_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package sync

import (
"context"
"sync"
"sync/atomic"
"testing"
"time"

pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/celestiaorg/go-header/headertest"
)

func TestSyncer_incomingNetworkHeadRaces(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
t.Cleanup(cancel)

suite := headertest.NewTestSuite(t)
store := headertest.NewStore[*headertest.DummyHeader](t, suite, 1)
syncer, err := NewSyncer[*headertest.DummyHeader](
store,
store,
headertest.NewDummySubscriber(),
)
require.NoError(t, err)

incoming := suite.NextHeader()

var hits atomic.Uint32
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
if syncer.incomingNetworkHead(ctx, incoming) == pubsub.ValidationAccept {
hits.Add(1)
}
}()
}

wg.Wait()
assert.EqualValues(t, 1, hits.Load())
}

0 comments on commit 83d0baa

Please sign in to comment.