Skip to content

Commit

Permalink
eth/downloader: fix beacon sync start/stop thrashing data race
Browse files Browse the repository at this point in the history
  • Loading branch information
karalabe committed Jan 11, 2022
1 parent 4f75778 commit 5cf0685
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 9 deletions.
32 changes: 26 additions & 6 deletions eth/downloader/beaconsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,12 @@ import (
// the genesis block or an existing header in the database. Its operation is fully
// directed by the skeleton sync's head/tail events.
type beaconBackfiller struct {
downloader *Downloader // Downloader to direct via this callback implementation
syncMode SyncMode // Sync mode to use for backfilling the skeleton chains
success func() // Callback to run on successful sync cycle completion
filling bool // Flag whether the downloader is backfilling or not
lock sync.Mutex // Mutex protecting the sync lock
downloader *Downloader // Downloader to direct via this callback implementation
syncMode SyncMode // Sync mode to use for backfilling the skeleton chains
success func() // Callback to run on successful sync cycle completion
filling bool // Flag whether the downloader is backfilling or not
started chan struct{} // Notification channel whether the downloader inited
lock sync.Mutex // Mutex protecting the sync lock
}

// newBeaconBackfiller is a helper method to create the backfiller.
Expand All @@ -49,6 +50,24 @@ func newBeaconBackfiller(dl *Downloader, success func()) backfiller {

// suspend cancels any background downloader threads.
func (b *beaconBackfiller) suspend() {
// If no filling is running, don't waste cycles
b.lock.Lock()
filling := b.filling
started := b.started
b.lock.Unlock()

if !filling {
return
}
// A previous filling should be running, though it may happen that it hasn't
// yet started (being done on a new goroutine). Many concurrent beacon head
// announcements can lead to sync start/stop thrashing. In that case we need
// to wait for initialization before we can safely cancel it. It is safe to
// read this channel multiple times, it gets closed on startup.
<-started

// Now that we're sure the downloader successfully started up, we can cancel
// it safely without running the risk of data races.
b.downloader.Cancel()
}

Expand All @@ -62,6 +81,7 @@ func (b *beaconBackfiller) resume() {
return
}
b.filling = true
b.started = make(chan struct{})
mode := b.syncMode
b.lock.Unlock()

Expand All @@ -76,7 +96,7 @@ func (b *beaconBackfiller) resume() {
}()
// If the downloader fails, report an error as in beacon chain mode there
// should be no errors as long as the chain we're syncing to is valid.
if err := b.downloader.synchronise("", common.Hash{}, nil, mode, true); err != nil {
if err := b.downloader.synchronise("", common.Hash{}, nil, mode, true, b.started); err != nil {
log.Error("Beacon backfilling failed", "err", err)
return
}
Expand Down
21 changes: 19 additions & 2 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ func (d *Downloader) UnregisterPeer(id string) error {
// LegacySync tries to sync up our local block chain with a remote peer, both
// adding various sanity checks as well as wrapping it with various log entries.
func (d *Downloader) LegacySync(id string, head common.Hash, td *big.Int, mode SyncMode) error {
err := d.synchronise(id, head, td, mode, false)
err := d.synchronise(id, head, td, mode, false, nil)

switch err {
case nil, errBusy, errCanceled:
Expand All @@ -351,7 +351,21 @@ func (d *Downloader) LegacySync(id string, head common.Hash, td *big.Int, mode S
// synchronise will select the peer and use it for synchronising. If an empty string is given
// it will use the best peer possible and synchronize if its TD is higher than our own. If any of the
// checks fail an error will be returned. This method is synchronous
func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode SyncMode, beaconMode bool) error {
func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode SyncMode, beaconMode bool, beaconPing chan struct{}) error {
// The beacon header syncer is async. It will start this synchronization and
// will continue doing other tasks. However, if synchornization needs to be
// cancelled, the syncer needs to know if we reached the startup point (and
// inited the cancel cannel) or not yet. Make sure that we'll signal even in
// case of a failure.
if beaconPing != nil {
defer func() {
select {
case <-beaconPing: // already notified
default:
close(beaconPing) // weird exit condition, notify that it's safe to cancel (the nothing)
}
}()
}
// Mock out the synchronisation if testing
if d.synchroniseMock != nil {
return d.synchroniseMock(id, hash)
Expand Down Expand Up @@ -413,6 +427,9 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
return errUnknownPeer
}
}
if beaconPing != nil {
close(beaconPing)
}
return d.syncWithPeer(p, hash, td, beaconMode)
}

Expand Down
2 changes: 1 addition & 1 deletion eth/downloader/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (dl *downloadTester) sync(id string, td *big.Int, mode SyncMode) error {
td = dl.peers[id].chain.GetTd(head.Hash(), head.NumberU64())
}
// Synchronise with the chosen peer and ensure proper cleanup afterwards
err := dl.downloader.synchronise(id, head.Hash(), td, mode, false)
err := dl.downloader.synchronise(id, head.Hash(), td, mode, false, nil)
select {
case <-dl.downloader.cancelCh:
// Ok, downloader fully cancelled after sync cycle
Expand Down

0 comments on commit 5cf0685

Please sign in to comment.