Skip to content

Commit

Permalink
Improvements of streamsync to deploy on mainnet (#4493)
Browse files Browse the repository at this point in the history
* add faultRecoveryThreshold to reset stream failures

* increase MaxStreamFailures to let stream be longer in the list

* set Concurrency to 2 for devnet to be same as MinStreams, otherwise it will rewrite MinStreams

* stream sync loop checks for ErrNotEnoughStreamsand waits for enough streams in case there are not enough connected streams in list

* fix fault recovery issue

* improve checkPrerequisites to be able to continue with minimum streams

* refactor fixValues function, put priority on MinStreams rather than Concurrency

* drop remote peer if sending empty blocks array

* goimports to fix build issue

* fix getReceipts array assignments

* fix getReceipts and add tests for it
  • Loading branch information
GheisMohammadi committed Sep 29, 2023
1 parent d8f1225 commit fa99cd1
Show file tree
Hide file tree
Showing 15 changed files with 91 additions and 23 deletions.
7 changes: 2 additions & 5 deletions api/service/stagedstreamsync/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@ const (
// no more request will be assigned to workers to wait for InsertChain to finish.
SoftQueueCap int = 100

// DefaultConcurrency is the default settings for concurrency
DefaultConcurrency int = 4

// ShortRangeTimeout is the timeout for each short range sync, which allow short range sync
// to restart automatically when stuck in `getBlockHashes`
ShortRangeTimeout time.Duration = 1 * time.Minute
Expand Down Expand Up @@ -74,10 +71,10 @@ type (

func (c *Config) fixValues() {
if c.Concurrency == 0 {
c.Concurrency = DefaultConcurrency
c.Concurrency = c.MinStreams
}
if c.Concurrency > c.MinStreams {
c.MinStreams = c.Concurrency
c.Concurrency = c.MinStreams
}
if c.MinStreams > c.InitStreams {
c.InitStreams = c.MinStreams
Expand Down
21 changes: 17 additions & 4 deletions api/service/stagedstreamsync/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,17 @@ func (d *Downloader) SubscribeDownloadFinished(ch chan struct{}) event.Subscript
// waitForBootFinish waits for stream manager to finish the initial discovery and have
// enough peers to start downloader
func (d *Downloader) waitForBootFinish() {
bootCompleted, numStreams := d.waitForEnoughStreams(d.config.InitStreams)
if bootCompleted {
fmt.Printf("boot completed for shard %d ( %d streams are connected )\n",
d.bc.ShardID(), numStreams)
}
}

func (d *Downloader) waitForEnoughStreams(requiredStreams int) (bool, int) {
d.logger.Info().Int("requiredStreams", requiredStreams).
Msg("waiting for enough stream connections to continue syncing")

evtCh := make(chan streammanager.EvtStreamAdded, 1)
sub := d.syncProtocol.SubscribeAddStreamEvent(evtCh)
defer sub.Unsubscribe()
Expand All @@ -177,12 +188,11 @@ func (d *Downloader) waitForBootFinish() {
trigger()

case <-checkCh:
if d.syncProtocol.NumStreams() >= d.config.InitStreams {
fmt.Printf("boot completed for shard %d ( %d streams are connected )\n", d.bc.ShardID(), d.syncProtocol.NumStreams())
return
if d.syncProtocol.NumStreams() >= requiredStreams {
return true, d.syncProtocol.NumStreams()
}
case <-d.closeC:
return
return false, d.syncProtocol.NumStreams()
}
}
}
Expand Down Expand Up @@ -212,6 +222,9 @@ func (d *Downloader) loop() {
case <-d.downloadC:
bnBeforeSync := d.bc.CurrentBlock().NumberU64()
estimatedHeight, addedBN, err := d.stagedSyncInstance.doSync(d.ctx, initSync)
if err == ErrNotEnoughStreams {
d.waitForEnoughStreams(d.config.MinStreams)
}
if err != nil {
//TODO: if there is a bad block which can't be resolved
if d.stagedSyncInstance.invalidBlock.Active {
Expand Down
2 changes: 1 addition & 1 deletion api/service/stagedstreamsync/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ var (
ErrUnexpectedNumberOfBlockHashes = WrapStagedSyncError("unexpected number of getBlocksByHashes result")
ErrUnexpectedBlockHashes = WrapStagedSyncError("unexpected get block hashes result delivered")
ErrNilBlock = WrapStagedSyncError("nil block found")
ErrNotEnoughStreams = WrapStagedSyncError("not enough streams")
ErrNotEnoughStreams = WrapStagedSyncError("number of streams smaller than minimum required")
ErrParseCommitSigAndBitmapFail = WrapStagedSyncError("parse commitSigAndBitmap failed")
ErrVerifyHeaderFail = WrapStagedSyncError("verify header failed")
ErrInsertChainFail = WrapStagedSyncError("insert to chain failed")
Expand Down
5 changes: 5 additions & 0 deletions api/service/stagedstreamsync/short_range_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/ethereum/go-ethereum/common"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/internal/utils"
syncProto "github.com/harmony-one/harmony/p2p/stream/protocols/sync"
sttypes "github.com/harmony-one/harmony/p2p/stream/types"
"github.com/pkg/errors"
Expand Down Expand Up @@ -132,6 +133,10 @@ func (sh *srHelper) getBlocksByHashes(ctx context.Context, hashes []common.Hash,

func (sh *srHelper) checkPrerequisites() error {
if sh.syncProtocol.NumStreams() < sh.config.Concurrency {
utils.Logger().Info().
Int("available streams", sh.syncProtocol.NumStreams()).
Interface("concurrency", sh.config.Concurrency).
Msg("not enough streams to do concurrent processes")
return ErrNotEnoughStreams
}
return nil
Expand Down
13 changes: 11 additions & 2 deletions api/service/stagedstreamsync/stage_bodies.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,13 +167,22 @@ func (b *StageBodies) runBlockWorkerLoop(ctx context.Context, gbm *blockDownload
Msg(WrapStagedSyncMsg("downloadRawBlocks failed"))
err = errors.Wrap(err, "request error")
gbm.HandleRequestError(batch, err, stid)
} else if blockBytes == nil || len(blockBytes) == 0 {
} else if blockBytes == nil {
utils.Logger().Warn().
Str("stream", string(stid)).
Interface("block numbers", batch).
Msg(WrapStagedSyncMsg("downloadRawBlocks failed, received empty blockBytes"))
Msg(WrapStagedSyncMsg("downloadRawBlocks failed, received invalid (nil) blockBytes"))
err := errors.New("downloadRawBlocks received invalid (nil) blockBytes")
gbm.HandleRequestError(batch, err, stid)
b.configs.protocol.StreamFailed(stid, "downloadRawBlocks failed")
} else if len(blockBytes) == 0 {
utils.Logger().Warn().
Str("stream", string(stid)).
Interface("block numbers", batch).
Msg(WrapStagedSyncMsg("downloadRawBlocks failed, received empty blockBytes, remote peer is not fully synced"))
err := errors.New("downloadRawBlocks received empty blockBytes")
gbm.HandleRequestError(batch, err, stid)
b.configs.protocol.RemoveStream(stid)
} else {
if err = b.saveBlocks(ctx, gbm.tx, batch, blockBytes, sigBytes, loopID, stid); err != nil {
panic(ErrSaveBlocksToDbFailed)
Expand Down
7 changes: 6 additions & 1 deletion api/service/stagedstreamsync/stage_epoch.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,12 @@ func (sr *StageEpoch) doShortRangeSyncForEpochSync(ctx context.Context, s *Stage
}

if err := sh.checkPrerequisites(); err != nil {
return 0, errors.Wrap(err, "prerequisite")
// if error is ErrNotEnoughStreams but still some streams available,
// it can continue syncing, otherwise return error
// here we are not doing concurrent processes, so even 1 stream should be enough
if err != ErrNotEnoughStreams || s.state.protocol.NumStreams() == 0 {
return 0, errors.Wrap(err, "prerequisite")
}
}
curBN := s.state.bc.CurrentBlock().NumberU64()
bns := make([]uint64, 0, BlocksPerRequest)
Expand Down
7 changes: 6 additions & 1 deletion api/service/stagedstreamsync/stage_short_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,12 @@ func (sr *StageShortRange) doShortRangeSync(ctx context.Context, s *StageState)
}

if err := sh.checkPrerequisites(); err != nil {
return 0, errors.Wrap(err, "prerequisite")
// if error is ErrNotEnoughStreams but still two streams available,
// it can continue syncing, otherwise return error
// at least 2 streams are needed to do concurrent processes
if err != ErrNotEnoughStreams || s.state.protocol.NumStreams() < 2 {
return 0, errors.Wrap(err, "prerequisite")
}
}
curBN := sr.configs.bc.CurrentBlock().NumberU64()
blkNums := sh.prepareBlockHashNumbers(curBN)
Expand Down
3 changes: 2 additions & 1 deletion api/service/stagedstreamsync/staged_stream_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,8 +337,9 @@ func (s *StagedStreamSync) promLabels() prometheus.Labels {
func (s *StagedStreamSync) checkHaveEnoughStreams() error {
numStreams := s.protocol.NumStreams()
if numStreams < s.config.MinStreams {
return fmt.Errorf("number of streams smaller than minimum: %v < %v",
s.logger.Debug().Msgf("number of streams smaller than minimum: %v < %v",
numStreams, s.config.MinStreams)
return ErrNotEnoughStreams
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/harmony/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ var (
Downloader: true,
StagedSync: false,
StagedSyncCfg: defaultStagedSyncConfig,
Concurrency: 4,
Concurrency: 2,
MinPeers: 2,
InitStreams: 2,
MaxAdvertiseWaitTime: 2, //minutes
Expand Down
3 changes: 2 additions & 1 deletion p2p/stream/common/requestmanager/interface_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"strconv"
"sync"
"time"

"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/rlp"
Expand Down Expand Up @@ -118,7 +119,7 @@ func (st *testStream) FailedTimes() int {
return 0
}

func (st *testStream) AddFailedTimes() {
func (st *testStream) AddFailedTimes(faultRecoveryThreshold time.Duration) {
return
}

Expand Down
3 changes: 2 additions & 1 deletion p2p/stream/common/streammanager/interface_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"strconv"
"sync"
"sync/atomic"
"time"

sttypes "github.com/harmony-one/harmony/p2p/stream/types"
"github.com/libp2p/go-libp2p/core/network"
Expand Down Expand Up @@ -74,7 +75,7 @@ func (st *testStream) FailedTimes() int {
return 0
}

func (st *testStream) AddFailedTimes() {
func (st *testStream) AddFailedTimes(faultRecoveryThreshold time.Duration) {
return
}

Expand Down
15 changes: 15 additions & 0 deletions p2p/stream/protocols/sync/chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,3 +215,18 @@ func checkBlocksByHashesResult(b []byte, hs []common.Hash) error {
}
return nil
}

func checkGetReceiptsResult(b []byte, hs []common.Hash) error {
var msg = &syncpb.Message{}
if err := protobuf.Unmarshal(b, msg); err != nil {
return err
}
bhResp, err := msg.GetReceiptsResponse()
if err != nil {
return err
}
if len(hs) != len(bhResp.Receipts) {
return errors.New("unexpected size")
}
return nil
}
6 changes: 5 additions & 1 deletion p2p/stream/protocols/sync/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@ const (
GetReceiptsCap = 10

// MaxStreamFailures is the maximum allowed failures before stream gets removed
MaxStreamFailures = 3
MaxStreamFailures = 5

// FaultRecoveryThreshold is the minimum duration before it resets the previous failures
// So, if stream hasn't had any issue for a certain amount of time since last failure, we can still trust it
FaultRecoveryThreshold = 30 * time.Minute

// minAdvertiseInterval is the minimum advertise interval
minAdvertiseInterval = 1 * time.Minute
Expand Down
5 changes: 4 additions & 1 deletion p2p/stream/protocols/sync/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,13 +272,16 @@ func (p *Protocol) RemoveStream(stID sttypes.StreamID) {
st.Close()
// stream manager removes this stream from the list and triggers discovery if number of streams are not enough
p.sm.RemoveStream(stID) //TODO: double check to see if this part is needed
p.logger.Info().
Str("stream ID", string(stID)).
Msg("stream removed")
}
}

func (p *Protocol) StreamFailed(stID sttypes.StreamID, reason string) {
st, exist := p.sm.GetStreamByID(stID)
if exist && st != nil {
st.AddFailedTimes()
st.AddFailedTimes(FaultRecoveryThreshold)
p.logger.Info().
Str("stream ID", string(st.ID())).
Int("num failures", st.FailedTimes()).
Expand Down
15 changes: 12 additions & 3 deletions p2p/stream/types/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/binary"
"io"
"sync"
"time"

libp2p_network "github.com/libp2p/go-libp2p/core/network"
"github.com/pkg/errors"
Expand All @@ -22,7 +23,7 @@ type Stream interface {
Close() error
CloseOnExit() error
FailedTimes() int
AddFailedTimes()
AddFailedTimes(faultRecoveryThreshold time.Duration)
ResetFailedTimes()
}

Expand All @@ -38,7 +39,8 @@ type BaseStream struct {
specErr error
specOnce sync.Once

failedTimes int
failedTimes int
lastFailureTime time.Time
}

// NewBaseStream creates BaseStream as the wrapper of libp2p Stream
Expand Down Expand Up @@ -82,8 +84,15 @@ func (st *BaseStream) FailedTimes() int {
return st.failedTimes
}

func (st *BaseStream) AddFailedTimes() {
func (st *BaseStream) AddFailedTimes(faultRecoveryThreshold time.Duration) {
if st.failedTimes > 0 {
durationSinceLastFailure := time.Now().Sub(st.lastFailureTime)
if durationSinceLastFailure >= faultRecoveryThreshold {
st.ResetFailedTimes()
}
}
st.failedTimes++
st.lastFailureTime = time.Now()
}

func (st *BaseStream) ResetFailedTimes() {
Expand Down

0 comments on commit fa99cd1

Please sign in to comment.