From fc9ac09704cf66de5a82cbccf7c10d3707f66ee5 Mon Sep 17 00:00:00 2001 From: Esad Akar Date: Thu, 10 Jun 2021 15:29:49 +0300 Subject: [PATCH] feat: warmup time for push/pull protocols --- cmd/bee/cmd/cmd.go | 3 +++ cmd/bee/cmd/start.go | 1 + pkg/node/node.go | 13 ++++++++++--- pkg/puller/puller.go | 16 +++++++++++++--- pkg/puller/puller_test.go | 2 +- pkg/pusher/pusher.go | 16 +++++++++++++--- pkg/pusher/pusher_test.go | 2 +- pkg/pushsync/pushsync.go | 11 ++++++++++- pkg/pushsync/pushsync_test.go | 2 +- 9 files changed, 53 insertions(+), 13 deletions(-) diff --git a/cmd/bee/cmd/cmd.go b/cmd/bee/cmd/cmd.go index 7c626604f25..e1bd5a826a5 100644 --- a/cmd/bee/cmd/cmd.go +++ b/cmd/bee/cmd/cmd.go @@ -11,6 +11,7 @@ import ( "os" "path/filepath" "strings" + "time" "github.com/ethersphere/bee/pkg/logging" "github.com/ethersphere/bee/pkg/swarm" @@ -64,6 +65,7 @@ const ( optionNameFullNode = "full-node" optionNamePostageContractAddress = "postage-stamp-address" optionNameBlockTime = "block-time" + optionWarmUpTime = "warmup-time" ) func init() { @@ -236,6 +238,7 @@ func (c *command) setAllFlags(cmd *cobra.Command) { cmd.Flags().String(optionNameTransactionHash, "", "proof-of-identity transaction hash") cmd.Flags().Uint64(optionNameBlockTime, 15, "chain block time") cmd.Flags().String(optionNameSwapDeploymentGasPrice, "", "gas price in wei to use for deployment and funding") + cmd.Flags().Duration(optionWarmUpTime, time.Minute*10, "time to warmup the node before pull/push protocols can be kicked off.") } func newLogger(cmd *cobra.Command, verbosity string) (logging.Logger, error) { diff --git a/cmd/bee/cmd/start.go b/cmd/bee/cmd/start.go index a80793793fe..f92001dfab6 100644 --- a/cmd/bee/cmd/start.go +++ b/cmd/bee/cmd/start.go @@ -154,6 +154,7 @@ Welcome to the Swarm.... Bzzz Bzzzz Bzzzz PostageContractAddress: c.config.GetString(optionNamePostageContractAddress), BlockTime: c.config.GetUint64(optionNameBlockTime), DeployGasPrice: c.config.GetString(optionNameSwapDeploymentGasPrice), + WarmupTime: c.config.GetDuration(optionWarmUpTime), }) if err != nil { return err diff --git a/pkg/node/node.go b/pkg/node/node.go index 3e227b80d25..0030b7e3115 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -146,6 +146,7 @@ type Options struct { PriceOracleAddress string BlockTime uint64 DeployGasPrice string + WarmupTime time.Duration } const ( @@ -173,6 +174,12 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey, } }() + // light nodes have zero warmup time for pull/pushsync protocols + warmupTime := o.WarmupTime + if !o.FullNodeMode { + warmupTime = 0 + } + b = &Bee{ p2pCancel: p2pCancel, errorLogWriter: logger.WriterLevel(logrus.ErrorLevel), @@ -559,7 +566,7 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey, pinningService := pinning.NewService(storer, stateStore, traversalService) - pushSyncProtocol := pushsync.New(swarmAddress, p2ps, storer, kad, tagService, o.FullNodeMode, pssService.TryUnwrap, validStamp, logger, acc, pricer, signer, tracer) + pushSyncProtocol := pushsync.New(swarmAddress, p2ps, storer, kad, tagService, o.FullNodeMode, pssService.TryUnwrap, validStamp, logger, acc, pricer, signer, tracer, warmupTime) // set the pushSyncer in the PSS pssService.SetPushSyncer(pushSyncProtocol) @@ -570,7 +577,7 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey, b.recoveryHandleCleanup = pssService.Register(recovery.Topic, chunkRepairHandler) } - pusherService := pusher.New(networkID, storer, kad, pushSyncProtocol, tagService, logger, tracer) + pusherService := pusher.New(networkID, storer, kad, pushSyncProtocol, tagService, logger, tracer, warmupTime) b.pusherCloser = pusherService pullStorage := pullstorage.New(storer) @@ -580,7 +587,7 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey, var pullerService *puller.Puller if o.FullNodeMode { - pullerService := puller.New(stateStore, kad, pullSyncProtocol, logger, puller.Options{}) + pullerService := puller.New(stateStore, kad, pullSyncProtocol, logger, puller.Options{}, warmupTime) b.pullerCloser = pullerService } diff --git a/pkg/puller/puller.go b/pkg/puller/puller.go index 7e2b72e4ba8..222d7866ddb 100644 --- a/pkg/puller/puller.go +++ b/pkg/puller/puller.go @@ -50,7 +50,7 @@ type Puller struct { bins uint8 // how many bins do we support } -func New(stateStore storage.StateStorer, topology topology.Driver, pullSync pullsync.Interface, logger logging.Logger, o Options) *Puller { +func New(stateStore storage.StateStorer, topology topology.Driver, pullSync pullsync.Interface, logger logging.Logger, o Options, warmupTime time.Duration) *Puller { var ( bins uint8 = swarm.MaxBins ) @@ -77,7 +77,7 @@ func New(stateStore storage.StateStorer, topology topology.Driver, pullSync pull p.syncPeers[i] = make(map[string]*syncPeer) } p.wg.Add(1) - go p.manage() + go p.manage(warmupTime) return p } @@ -86,7 +86,7 @@ type peer struct { po uint8 } -func (p *Puller) manage() { +func (p *Puller) manage(warmupTime time.Duration) { defer p.wg.Done() c, unsubscribe := p.topology.SubscribePeersChange() defer unsubscribe() @@ -96,6 +96,16 @@ func (p *Puller) manage() { <-p.quit cancel() }() + + // wait for warmup duration to complete + select { + case <-time.After(warmupTime): + case <-p.quit: + return + } + + p.logger.Info("puller: warmup period complete, worker starting.") + for { select { case <-c: diff --git a/pkg/puller/puller_test.go b/pkg/puller/puller_test.go index e77c4cfd62d..3079850d514 100644 --- a/pkg/puller/puller_test.go +++ b/pkg/puller/puller_test.go @@ -597,7 +597,7 @@ func newPuller(ops opts) (*puller.Puller, storage.StateStorer, *mockk.Mock, *moc o := puller.Options{ Bins: ops.bins, } - return puller.New(s, kad, ps, logger, o), s, kad, ps + return puller.New(s, kad, ps, logger, o, 0), s, kad, ps } type c struct { diff --git a/pkg/pusher/pusher.go b/pkg/pusher/pusher.go index d857e7eccbe..05a3b1e9905 100644 --- a/pkg/pusher/pusher.go +++ b/pkg/pusher/pusher.go @@ -49,7 +49,7 @@ var ( var ErrInvalidAddress = errors.New("invalid address") -func New(networkID uint64, storer storage.Storer, peerSuggester topology.ClosestPeerer, pushSyncer pushsync.PushSyncer, tagger *tags.Tags, logger logging.Logger, tracer *tracing.Tracer) *Service { +func New(networkID uint64, storer storage.Storer, peerSuggester topology.ClosestPeerer, pushSyncer pushsync.PushSyncer, tagger *tags.Tags, logger logging.Logger, tracer *tracing.Tracer, warmupTime time.Duration) *Service { service := &Service{ networkID: networkID, storer: storer, @@ -61,13 +61,13 @@ func New(networkID uint64, storer storage.Storer, peerSuggester topology.Closest quit: make(chan struct{}), chunksWorkerQuitC: make(chan struct{}), } - go service.chunksWorker() + go service.chunksWorker(warmupTime) return service } // chunksWorker is a loop that keeps looking for chunks that are locally uploaded ( by monitoring pushIndex ) // and pushes them to the closest peer and get a receipt. -func (s *Service) chunksWorker() { +func (s *Service) chunksWorker(warmupTime time.Duration) { var ( chunks <-chan swarm.Chunk unsubscribe func() @@ -81,6 +81,7 @@ func (s *Service) chunksWorker() { span opentracing.Span logger *logrus.Entry ) + defer timer.Stop() defer close(s.chunksWorkerQuitC) go func() { @@ -88,6 +89,15 @@ func (s *Service) chunksWorker() { cancel() }() + // wait for warmup duration to complete + select { + case <-time.After(warmupTime): + case <-s.quit: + return + } + + s.logger.Info("pusher: warmup period complete, worker starting.") + LOOP: for { select { diff --git a/pkg/pusher/pusher_test.go b/pkg/pusher/pusher_test.go index 9891ab3a3d5..82ea6f25fe5 100644 --- a/pkg/pusher/pusher_test.go +++ b/pkg/pusher/pusher_test.go @@ -379,7 +379,7 @@ func createPusher(t *testing.T, addr swarm.Address, pushSyncService pushsync.Pus } peerSuggester := mock.NewTopologyDriver(mockOpts...) - pusherService := pusher.New(1, pusherStorer, peerSuggester, pushSyncService, mtags, logger, nil) + pusherService := pusher.New(1, pusherStorer, peerSuggester, pushSyncService, mtags, logger, nil, 0) return mtags, pusherService, pusherStorer } diff --git a/pkg/pushsync/pushsync.go b/pkg/pushsync/pushsync.go index 18332aeba57..3d330b5d5ba 100644 --- a/pkg/pushsync/pushsync.go +++ b/pkg/pushsync/pushsync.go @@ -45,6 +45,7 @@ const ( var ( ErrOutOfDepthReplication = errors.New("replication outside of the neighborhood") ErrNoPush = errors.New("could not push chunk") + ErrWarmup = errors.New("node warmup time not complete") ) type PushSyncer interface { @@ -72,13 +73,14 @@ type PushSync struct { signer crypto.Signer isFullNode bool failedRequests *failedRequestCache + warmupPeriod time.Time } var defaultTTL = 20 * time.Second // request time to live var timeToWaitForPushsyncToNeighbor = 3 * time.Second // time to wait to get a receipt for a chunk var nPeersToPushsync = 3 // number of peers to replicate to as receipt is sent upstream -func New(address swarm.Address, streamer p2p.StreamerDisconnecter, storer storage.Putter, topology topology.Driver, tagger *tags.Tags, isFullNode bool, unwrap func(swarm.Chunk), validStamp func(swarm.Chunk, []byte) (swarm.Chunk, error), logger logging.Logger, accounting accounting.Interface, pricer pricer.Interface, signer crypto.Signer, tracer *tracing.Tracer) *PushSync { +func New(address swarm.Address, streamer p2p.StreamerDisconnecter, storer storage.Putter, topology topology.Driver, tagger *tags.Tags, isFullNode bool, unwrap func(swarm.Chunk), validStamp func(swarm.Chunk, []byte) (swarm.Chunk, error), logger logging.Logger, accounting accounting.Interface, pricer pricer.Interface, signer crypto.Signer, tracer *tracing.Tracer, warmupTime time.Duration) *PushSync { ps := &PushSync{ address: address, streamer: streamer, @@ -95,6 +97,7 @@ func New(address swarm.Address, streamer p2p.StreamerDisconnecter, storer storag validStamp: validStamp, signer: signer, failedRequests: newFailedRequestCache(), + warmupPeriod: time.Now().Add(warmupTime), } return ps } @@ -197,6 +200,12 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) receipt, err := ps.pushToClosest(ctx, chunk, false) if err != nil { if errors.Is(err, topology.ErrWantSelf) { + + if time.Now().Before(ps.warmupPeriod) { + err = ErrWarmup + return + } + if !storedChunk { _, err = ps.storer.Put(ctx, storage.ModePutSync, chunk) if err != nil { diff --git a/pkg/pushsync/pushsync_test.go b/pkg/pushsync/pushsync_test.go index 0e7d696ba8f..2e8561faca5 100644 --- a/pkg/pushsync/pushsync_test.go +++ b/pkg/pushsync/pushsync_test.go @@ -804,7 +804,7 @@ func createPushSyncNodeWithAccounting(t *testing.T, addr swarm.Address, prices p return ch.WithStamp(postage.NewStamp(nil, nil)), nil } - return pushsync.New(addr, recorderDisconnecter, storer, mockTopology, mtag, true, unwrap, validStamp, logger, acct, mockPricer, signer, nil), storer, mtag + return pushsync.New(addr, recorderDisconnecter, storer, mockTopology, mtag, true, unwrap, validStamp, logger, acct, mockPricer, signer, nil, 0), storer, mtag } func waitOnRecordAndTest(t *testing.T, peer swarm.Address, recorder *streamtest.Recorder, add swarm.Address, data []byte) {