diff --git a/api/service/stagedstreamsync/const.go b/api/service/stagedstreamsync/const.go index 5c735d764b..048b5d812d 100644 --- a/api/service/stagedstreamsync/const.go +++ b/api/service/stagedstreamsync/const.go @@ -23,9 +23,6 @@ const ( // no more request will be assigned to workers to wait for InsertChain to finish. SoftQueueCap int = 100 - // DefaultConcurrency is the default settings for concurrency - DefaultConcurrency int = 4 - // ShortRangeTimeout is the timeout for each short range sync, which allow short range sync // to restart automatically when stuck in `getBlockHashes` ShortRangeTimeout time.Duration = 1 * time.Minute @@ -74,10 +71,10 @@ type ( func (c *Config) fixValues() { if c.Concurrency == 0 { - c.Concurrency = DefaultConcurrency + c.Concurrency = c.MinStreams } if c.Concurrency > c.MinStreams { - c.MinStreams = c.Concurrency + c.Concurrency = c.MinStreams } if c.MinStreams > c.InitStreams { c.InitStreams = c.MinStreams diff --git a/api/service/stagedstreamsync/downloader.go b/api/service/stagedstreamsync/downloader.go index 96add97fd3..3711048955 100644 --- a/api/service/stagedstreamsync/downloader.go +++ b/api/service/stagedstreamsync/downloader.go @@ -153,6 +153,17 @@ func (d *Downloader) SubscribeDownloadFinished(ch chan struct{}) event.Subscript // waitForBootFinish waits for stream manager to finish the initial discovery and have // enough peers to start downloader func (d *Downloader) waitForBootFinish() { + bootCompleted, numStreams := d.waitForEnoughStreams(d.config.InitStreams) + if bootCompleted { + fmt.Printf("boot completed for shard %d ( %d streams are connected )\n", + d.bc.ShardID(), numStreams) + } +} + +func (d *Downloader) waitForEnoughStreams(requiredStreams int) (bool, int) { + d.logger.Info().Int("requiredStreams", requiredStreams). + Msg("waiting for enough stream connections to continue syncing") + evtCh := make(chan streammanager.EvtStreamAdded, 1) sub := d.syncProtocol.SubscribeAddStreamEvent(evtCh) defer sub.Unsubscribe() @@ -177,12 +188,11 @@ func (d *Downloader) waitForBootFinish() { trigger() case <-checkCh: - if d.syncProtocol.NumStreams() >= d.config.InitStreams { - fmt.Printf("boot completed for shard %d ( %d streams are connected )\n", d.bc.ShardID(), d.syncProtocol.NumStreams()) - return + if d.syncProtocol.NumStreams() >= requiredStreams { + return true, d.syncProtocol.NumStreams() } case <-d.closeC: - return + return false, d.syncProtocol.NumStreams() } } } @@ -212,6 +222,9 @@ func (d *Downloader) loop() { case <-d.downloadC: bnBeforeSync := d.bc.CurrentBlock().NumberU64() estimatedHeight, addedBN, err := d.stagedSyncInstance.doSync(d.ctx, initSync) + if err == ErrNotEnoughStreams { + d.waitForEnoughStreams(d.config.MinStreams) + } if err != nil { //TODO: if there is a bad block which can't be resolved if d.stagedSyncInstance.invalidBlock.Active { diff --git a/api/service/stagedstreamsync/errors.go b/api/service/stagedstreamsync/errors.go index d18020dd06..9f1e1eb60b 100644 --- a/api/service/stagedstreamsync/errors.go +++ b/api/service/stagedstreamsync/errors.go @@ -14,7 +14,7 @@ var ( ErrUnexpectedNumberOfBlockHashes = WrapStagedSyncError("unexpected number of getBlocksByHashes result") ErrUnexpectedBlockHashes = WrapStagedSyncError("unexpected get block hashes result delivered") ErrNilBlock = WrapStagedSyncError("nil block found") - ErrNotEnoughStreams = WrapStagedSyncError("not enough streams") + ErrNotEnoughStreams = WrapStagedSyncError("number of streams smaller than minimum required") ErrParseCommitSigAndBitmapFail = WrapStagedSyncError("parse commitSigAndBitmap failed") ErrVerifyHeaderFail = WrapStagedSyncError("verify header failed") ErrInsertChainFail = WrapStagedSyncError("insert to chain failed") diff --git a/api/service/stagedstreamsync/short_range_helper.go b/api/service/stagedstreamsync/short_range_helper.go index 42327c78df..aa6b785120 100644 --- a/api/service/stagedstreamsync/short_range_helper.go +++ b/api/service/stagedstreamsync/short_range_helper.go @@ -7,6 +7,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/harmony-one/harmony/core/types" + "github.com/harmony-one/harmony/internal/utils" syncProto "github.com/harmony-one/harmony/p2p/stream/protocols/sync" sttypes "github.com/harmony-one/harmony/p2p/stream/types" "github.com/pkg/errors" @@ -132,6 +133,10 @@ func (sh *srHelper) getBlocksByHashes(ctx context.Context, hashes []common.Hash, func (sh *srHelper) checkPrerequisites() error { if sh.syncProtocol.NumStreams() < sh.config.Concurrency { + utils.Logger().Info(). + Int("available streams", sh.syncProtocol.NumStreams()). + Interface("concurrency", sh.config.Concurrency). + Msg("not enough streams to do concurrent processes") return ErrNotEnoughStreams } return nil diff --git a/api/service/stagedstreamsync/stage_bodies.go b/api/service/stagedstreamsync/stage_bodies.go index 4996ea78b7..b5d92e3a1a 100644 --- a/api/service/stagedstreamsync/stage_bodies.go +++ b/api/service/stagedstreamsync/stage_bodies.go @@ -167,13 +167,22 @@ func (b *StageBodies) runBlockWorkerLoop(ctx context.Context, gbm *blockDownload Msg(WrapStagedSyncMsg("downloadRawBlocks failed")) err = errors.Wrap(err, "request error") gbm.HandleRequestError(batch, err, stid) - } else if blockBytes == nil || len(blockBytes) == 0 { + } else if blockBytes == nil { utils.Logger().Warn(). Str("stream", string(stid)). Interface("block numbers", batch). - Msg(WrapStagedSyncMsg("downloadRawBlocks failed, received empty blockBytes")) + Msg(WrapStagedSyncMsg("downloadRawBlocks failed, received invalid (nil) blockBytes")) + err := errors.New("downloadRawBlocks received invalid (nil) blockBytes") + gbm.HandleRequestError(batch, err, stid) + b.configs.protocol.StreamFailed(stid, "downloadRawBlocks failed") + } else if len(blockBytes) == 0 { + utils.Logger().Warn(). + Str("stream", string(stid)). + Interface("block numbers", batch). + Msg(WrapStagedSyncMsg("downloadRawBlocks failed, received empty blockBytes, remote peer is not fully synced")) err := errors.New("downloadRawBlocks received empty blockBytes") gbm.HandleRequestError(batch, err, stid) + b.configs.protocol.RemoveStream(stid) } else { if err = b.saveBlocks(ctx, gbm.tx, batch, blockBytes, sigBytes, loopID, stid); err != nil { panic(ErrSaveBlocksToDbFailed) diff --git a/api/service/stagedstreamsync/stage_epoch.go b/api/service/stagedstreamsync/stage_epoch.go index 2c51aa1f94..e84b74f340 100644 --- a/api/service/stagedstreamsync/stage_epoch.go +++ b/api/service/stagedstreamsync/stage_epoch.go @@ -92,7 +92,12 @@ func (sr *StageEpoch) doShortRangeSyncForEpochSync(ctx context.Context, s *Stage } if err := sh.checkPrerequisites(); err != nil { - return 0, errors.Wrap(err, "prerequisite") + // if error is ErrNotEnoughStreams but still some streams available, + // it can continue syncing, otherwise return error + // here we are not doing concurrent processes, so even 1 stream should be enough + if err != ErrNotEnoughStreams || s.state.protocol.NumStreams() == 0 { + return 0, errors.Wrap(err, "prerequisite") + } } curBN := s.state.bc.CurrentBlock().NumberU64() bns := make([]uint64, 0, BlocksPerRequest) diff --git a/api/service/stagedstreamsync/stage_short_range.go b/api/service/stagedstreamsync/stage_short_range.go index 54534bfbb1..ce6cdf36bc 100644 --- a/api/service/stagedstreamsync/stage_short_range.go +++ b/api/service/stagedstreamsync/stage_short_range.go @@ -97,7 +97,12 @@ func (sr *StageShortRange) doShortRangeSync(ctx context.Context, s *StageState) } if err := sh.checkPrerequisites(); err != nil { - return 0, errors.Wrap(err, "prerequisite") + // if error is ErrNotEnoughStreams but still two streams available, + // it can continue syncing, otherwise return error + // at least 2 streams are needed to do concurrent processes + if err != ErrNotEnoughStreams || s.state.protocol.NumStreams() < 2 { + return 0, errors.Wrap(err, "prerequisite") + } } curBN := sr.configs.bc.CurrentBlock().NumberU64() blkNums := sh.prepareBlockHashNumbers(curBN) diff --git a/api/service/stagedstreamsync/staged_stream_sync.go b/api/service/stagedstreamsync/staged_stream_sync.go index 3cd8756604..1592186b52 100644 --- a/api/service/stagedstreamsync/staged_stream_sync.go +++ b/api/service/stagedstreamsync/staged_stream_sync.go @@ -337,8 +337,9 @@ func (s *StagedStreamSync) promLabels() prometheus.Labels { func (s *StagedStreamSync) checkHaveEnoughStreams() error { numStreams := s.protocol.NumStreams() if numStreams < s.config.MinStreams { - return fmt.Errorf("number of streams smaller than minimum: %v < %v", + s.logger.Debug().Msgf("number of streams smaller than minimum: %v < %v", numStreams, s.config.MinStreams) + return ErrNotEnoughStreams } return nil } diff --git a/cmd/harmony/default.go b/cmd/harmony/default.go index 4cc20cfdf4..50c3815f4b 100644 --- a/cmd/harmony/default.go +++ b/cmd/harmony/default.go @@ -232,7 +232,7 @@ var ( Downloader: true, StagedSync: false, StagedSyncCfg: defaultStagedSyncConfig, - Concurrency: 4, + Concurrency: 2, MinPeers: 2, InitStreams: 2, MaxAdvertiseWaitTime: 2, //minutes diff --git a/p2p/stream/common/requestmanager/interface_test.go b/p2p/stream/common/requestmanager/interface_test.go index c51303ccba..dd9c772fb8 100644 --- a/p2p/stream/common/requestmanager/interface_test.go +++ b/p2p/stream/common/requestmanager/interface_test.go @@ -5,6 +5,7 @@ import ( "fmt" "strconv" "sync" + "time" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/rlp" @@ -118,7 +119,7 @@ func (st *testStream) FailedTimes() int { return 0 } -func (st *testStream) AddFailedTimes() { +func (st *testStream) AddFailedTimes(faultRecoveryThreshold time.Duration) { return } diff --git a/p2p/stream/common/streammanager/interface_test.go b/p2p/stream/common/streammanager/interface_test.go index 5a9bb44366..6933615f9a 100644 --- a/p2p/stream/common/streammanager/interface_test.go +++ b/p2p/stream/common/streammanager/interface_test.go @@ -6,6 +6,7 @@ import ( "strconv" "sync" "sync/atomic" + "time" sttypes "github.com/harmony-one/harmony/p2p/stream/types" "github.com/libp2p/go-libp2p/core/network" @@ -74,7 +75,7 @@ func (st *testStream) FailedTimes() int { return 0 } -func (st *testStream) AddFailedTimes() { +func (st *testStream) AddFailedTimes(faultRecoveryThreshold time.Duration) { return } diff --git a/p2p/stream/protocols/sync/chain_test.go b/p2p/stream/protocols/sync/chain_test.go index 3f3f68b889..ac3d292add 100644 --- a/p2p/stream/protocols/sync/chain_test.go +++ b/p2p/stream/protocols/sync/chain_test.go @@ -215,3 +215,18 @@ func checkBlocksByHashesResult(b []byte, hs []common.Hash) error { } return nil } + +func checkGetReceiptsResult(b []byte, hs []common.Hash) error { + var msg = &syncpb.Message{} + if err := protobuf.Unmarshal(b, msg); err != nil { + return err + } + bhResp, err := msg.GetReceiptsResponse() + if err != nil { + return err + } + if len(hs) != len(bhResp.Receipts) { + return errors.New("unexpected size") + } + return nil +} diff --git a/p2p/stream/protocols/sync/const.go b/p2p/stream/protocols/sync/const.go index 745ed35ba9..afcd21fea7 100644 --- a/p2p/stream/protocols/sync/const.go +++ b/p2p/stream/protocols/sync/const.go @@ -26,7 +26,11 @@ const ( GetReceiptsCap = 10 // MaxStreamFailures is the maximum allowed failures before stream gets removed - MaxStreamFailures = 3 + MaxStreamFailures = 5 + + // FaultRecoveryThreshold is the minimum duration before it resets the previous failures + // So, if stream hasn't had any issue for a certain amount of time since last failure, we can still trust it + FaultRecoveryThreshold = 30 * time.Minute // minAdvertiseInterval is the minimum advertise interval minAdvertiseInterval = 1 * time.Minute diff --git a/p2p/stream/protocols/sync/protocol.go b/p2p/stream/protocols/sync/protocol.go index ca4590c972..538b2b9b8a 100644 --- a/p2p/stream/protocols/sync/protocol.go +++ b/p2p/stream/protocols/sync/protocol.go @@ -272,13 +272,16 @@ func (p *Protocol) RemoveStream(stID sttypes.StreamID) { st.Close() // stream manager removes this stream from the list and triggers discovery if number of streams are not enough p.sm.RemoveStream(stID) //TODO: double check to see if this part is needed + p.logger.Info(). + Str("stream ID", string(stID)). + Msg("stream removed") } } func (p *Protocol) StreamFailed(stID sttypes.StreamID, reason string) { st, exist := p.sm.GetStreamByID(stID) if exist && st != nil { - st.AddFailedTimes() + st.AddFailedTimes(FaultRecoveryThreshold) p.logger.Info(). Str("stream ID", string(st.ID())). Int("num failures", st.FailedTimes()). diff --git a/p2p/stream/types/stream.go b/p2p/stream/types/stream.go index 18b47f6158..12581b7de0 100644 --- a/p2p/stream/types/stream.go +++ b/p2p/stream/types/stream.go @@ -5,6 +5,7 @@ import ( "encoding/binary" "io" "sync" + "time" libp2p_network "github.com/libp2p/go-libp2p/core/network" "github.com/pkg/errors" @@ -22,7 +23,7 @@ type Stream interface { Close() error CloseOnExit() error FailedTimes() int - AddFailedTimes() + AddFailedTimes(faultRecoveryThreshold time.Duration) ResetFailedTimes() } @@ -38,7 +39,8 @@ type BaseStream struct { specErr error specOnce sync.Once - failedTimes int + failedTimes int + lastFailureTime time.Time } // NewBaseStream creates BaseStream as the wrapper of libp2p Stream @@ -82,8 +84,15 @@ func (st *BaseStream) FailedTimes() int { return st.failedTimes } -func (st *BaseStream) AddFailedTimes() { +func (st *BaseStream) AddFailedTimes(faultRecoveryThreshold time.Duration) { + if st.failedTimes > 0 { + durationSinceLastFailure := time.Now().Sub(st.lastFailureTime) + if durationSinceLastFailure >= faultRecoveryThreshold { + st.ResetFailedTimes() + } + } st.failedTimes++ + st.lastFailureTime = time.Now() } func (st *BaseStream) ResetFailedTimes() {