diff --git a/cmd/access/main.go b/cmd/access/main.go index b794e4922c5..942e7796ca7 100644 --- a/cmd/access/main.go +++ b/cmd/access/main.go @@ -13,6 +13,7 @@ import ( "github.com/onflow/flow-go/cmd" "github.com/onflow/flow-go/consensus" "github.com/onflow/flow-go/consensus/hotstuff/committees" + "github.com/onflow/flow-go/consensus/hotstuff/notifications/pubsub" "github.com/onflow/flow-go/consensus/hotstuff/verification" recovery "github.com/onflow/flow-go/consensus/recovery/protocol" "github.com/onflow/flow-go/engine" @@ -59,6 +60,7 @@ func main() { syncCore *synchronization.Core rpcConf rpc.Config rpcEng *rpc.Engine + finalizationDistributor *pubsub.FinalizationDistributor collectionRPC access.AccessAPIClient executionNodeAddress string // deprecated historicalAccessRPCs []access.AccessAPIClient @@ -275,9 +277,12 @@ func main() { return nil, fmt.Errorf("could not find latest finalized block and pending blocks to recover consensus follower: %w", err) } + finalizationDistributor = pubsub.NewFinalizationDistributor() + finalizationDistributor.AddConsumer(ingestEng) + // creates a consensus follower with ingestEngine as the notifier // so that it gets notified upon each new finalized block - followerCore, err := consensus.NewFollower(node.Logger, committee, node.Storage.Headers, final, verifier, ingestEng, node.RootBlock.Header, node.RootQC, finalized, pending) + followerCore, err := consensus.NewFollower(node.Logger, committee, node.Storage.Headers, final, verifier, finalizationDistributor, node.RootBlock.Header, node.RootQC, finalized, pending) if err != nil { return nil, fmt.Errorf("could not initialize follower core: %w", err) } @@ -316,6 +321,9 @@ func main() { if err != nil { return nil, fmt.Errorf("could not create synchronization engine: %w", err) } + + finalizationDistributor.AddOnBlockFinalizedConsumer(sync.OnFinalizedBlock) + return sync, nil }). Component("ping engine", func(node *cmd.FlowNodeBuilder) (module.ReadyDoneAware, error) { diff --git a/cmd/collection/main.go b/cmd/collection/main.go index c3b36cc2567..96505656a31 100644 --- a/cmd/collection/main.go +++ b/cmd/collection/main.go @@ -9,7 +9,7 @@ import ( "github.com/onflow/flow-go/cmd" "github.com/onflow/flow-go/consensus" "github.com/onflow/flow-go/consensus/hotstuff/committees" - "github.com/onflow/flow-go/consensus/hotstuff/notifications" + "github.com/onflow/flow-go/consensus/hotstuff/notifications/pubsub" "github.com/onflow/flow-go/consensus/hotstuff/pacemaker/timeout" "github.com/onflow/flow-go/consensus/hotstuff/verification" recovery "github.com/onflow/flow-go/consensus/recovery/protocol" @@ -64,8 +64,9 @@ func main() { ingestConf ingest.Config ingressConf ingress.Config - pools *epochpool.TransactionPools // epoch-scoped transaction pools - followerBuffer *buffer.PendingBlocks // pending block cache for follower + pools *epochpool.TransactionPools // epoch-scoped transaction pools + followerBuffer *buffer.PendingBlocks // pending block cache for follower + finalizationDistributor *pubsub.FinalizationDistributor push *pusher.Engine ing *ingest.Engine @@ -184,8 +185,7 @@ func main() { // initialize the verifier for the protocol consensus verifier := verification.NewCombinedVerifier(mainConsensusCommittee, staking, beacon, merger) - // use proper engine for notifier to follower - notifier := notifications.NewNoopConsumer() + finalizationDistributor = pubsub.NewFinalizationDistributor() finalized, pending, err := recovery.FindLatest(node.State, node.Storage.Headers) if err != nil { @@ -199,7 +199,7 @@ func main() { node.Storage.Headers, finalizer, verifier, - notifier, + finalizationDistributor, node.RootBlock.Header, node.RootQC, finalized, @@ -246,6 +246,8 @@ func main() { return nil, fmt.Errorf("could not create synchronization engine: %w", err) } + finalizationDistributor.AddOnBlockFinalizedConsumer(sync.OnFinalizedBlock) + return sync, nil }). Component("ingestion engine", func(node *cmd.FlowNodeBuilder) (module.ReadyDoneAware, error) { diff --git a/cmd/consensus/main.go b/cmd/consensus/main.go index 7f8ff6461be..84dda91cd40 100644 --- a/cmd/consensus/main.go +++ b/cmd/consensus/main.go @@ -494,6 +494,8 @@ func main() { return nil, fmt.Errorf("could not initialize synchronization engine: %w", err) } + finalizationDistributor.AddOnBlockFinalizedConsumer(sync.OnFinalizedBlock) + return sync, nil }). Component("receipt requester engine", func(node *cmd.FlowNodeBuilder) (module.ReadyDoneAware, error) { diff --git a/cmd/execution/main.go b/cmd/execution/main.go index 5dbd8d2758f..fc75c73ac49 100644 --- a/cmd/execution/main.go +++ b/cmd/execution/main.go @@ -13,6 +13,7 @@ import ( "github.com/onflow/flow-go/cmd" "github.com/onflow/flow-go/consensus" "github.com/onflow/flow-go/consensus/hotstuff/committees" + "github.com/onflow/flow-go/consensus/hotstuff/notifications/pubsub" "github.com/onflow/flow-go/consensus/hotstuff/verification" recovery "github.com/onflow/flow-go/consensus/recovery/protocol" "github.com/onflow/flow-go/engine" @@ -70,6 +71,7 @@ func main() { computationManager *computation.Manager collectionRequester *requester.Engine ingestionEng *ingestion.Engine + finalizationDistributor *pubsub.FinalizationDistributor rpcConf rpc.Config err error executionState state.ExecutionState @@ -380,9 +382,12 @@ func main() { return nil, fmt.Errorf("could not find latest finalized block and pending blocks to recover consensus follower: %w", err) } + finalizationDistributor = pubsub.NewFinalizationDistributor() + finalizationDistributor.AddConsumer(checkerEng) + // creates a consensus follower with ingestEngine as the notifier // so that it gets notified upon each new finalized block - followerCore, err := consensus.NewFollower(node.Logger, committee, node.Storage.Headers, final, verifier, checkerEng, node.RootBlock.Header, node.RootQC, finalized, pending) + followerCore, err := consensus.NewFollower(node.Logger, committee, node.Storage.Headers, final, verifier, finalizationDistributor, node.RootBlock.Header, node.RootQC, finalized, pending) if err != nil { return nil, fmt.Errorf("could not create follower core logic: %w", err) } @@ -427,7 +432,7 @@ func main() { ) return eng, err }). - Component("sychronization engine", func(node *cmd.FlowNodeBuilder) (module.ReadyDoneAware, error) { + Component("synchronization engine", func(node *cmd.FlowNodeBuilder) (module.ReadyDoneAware, error) { // initialize the synchronization engine syncEngine, err = synchronization.New( node.Logger, @@ -443,6 +448,8 @@ func main() { return nil, fmt.Errorf("could not initialize synchronization engine: %w", err) } + finalizationDistributor.AddOnBlockFinalizedConsumer(syncEngine.OnFinalizedBlock) + return syncEngine, nil }). Component("grpc server", func(node *cmd.FlowNodeBuilder) (module.ReadyDoneAware, error) { diff --git a/cmd/verification/main.go b/cmd/verification/main.go index a61cac5337d..5d9380efba3 100644 --- a/cmd/verification/main.go +++ b/cmd/verification/main.go @@ -9,6 +9,7 @@ import ( "github.com/onflow/flow-go/cmd" "github.com/onflow/flow-go/consensus" "github.com/onflow/flow-go/consensus/hotstuff/committees" + "github.com/onflow/flow-go/consensus/hotstuff/notifications/pubsub" "github.com/onflow/flow-go/consensus/hotstuff/verification" recovery "github.com/onflow/flow-go/consensus/recovery/protocol" followereng "github.com/onflow/flow-go/engine/common/follower" @@ -60,14 +61,15 @@ func main() { processedBlockHeight *storage.ConsumerProgress // used in block consumer chunkQueue *storage.ChunksQueue // used in chunk consumer - syncCore *synchronization.Core // used in follower engine - pendingBlocks *buffer.PendingBlocks // used in follower engine - assignerEngine *assigner.Engine // the assigner engine - fetcherEngine *fetcher.Engine // the fetcher engine - requesterEngine *vereq.Engine // the requester engine - verifierEng *verifier.Engine // the verifier engine - chunkConsumer *chunkconsumer.ChunkConsumer - blockConsumer *blockconsumer.BlockConsumer + syncCore *synchronization.Core // used in follower engine + pendingBlocks *buffer.PendingBlocks // used in follower engine + assignerEngine *assigner.Engine // the assigner engine + fetcherEngine *fetcher.Engine // the fetcher engine + requesterEngine *vereq.Engine // the requester engine + verifierEng *verifier.Engine // the verifier engine + chunkConsumer *chunkconsumer.ChunkConsumer + blockConsumer *blockconsumer.BlockConsumer + finalizationDistributor *pubsub.FinalizationDistributor followerEng *followereng.Engine // the follower engine collector module.VerificationMetrics // used to collect metrics of all engines @@ -297,9 +299,12 @@ func main() { return nil, fmt.Errorf("could not find latest finalized block and pending blocks to recover consensus follower: %w", err) } + finalizationDistributor = pubsub.NewFinalizationDistributor() + finalizationDistributor.AddConsumer(blockConsumer) + // creates a consensus follower with ingestEngine as the notifier // so that it gets notified upon each new finalized block - followerCore, err := consensus.NewFollower(node.Logger, committee, node.Storage.Headers, final, verifier, blockConsumer, node.RootBlock.Header, + followerCore, err := consensus.NewFollower(node.Logger, committee, node.Storage.Headers, final, verifier, finalizationDistributor, node.RootBlock.Header, node.RootQC, finalized, pending) if err != nil { return nil, fmt.Errorf("could not create follower core logic: %w", err) @@ -339,6 +344,9 @@ func main() { if err != nil { return nil, fmt.Errorf("could not create synchronization engine: %w", err) } + + finalizationDistributor.AddOnBlockFinalizedConsumer(sync.OnFinalizedBlock) + return sync, nil }). Run() diff --git a/consensus/hotstuff/notifications/pubsub/finalization_distributor.go b/consensus/hotstuff/notifications/pubsub/finalization_distributor.go index 113b959d5b1..2169aff6f20 100644 --- a/consensus/hotstuff/notifications/pubsub/finalization_distributor.go +++ b/consensus/hotstuff/notifications/pubsub/finalization_distributor.go @@ -1,6 +1,7 @@ package pubsub import ( + "github.com/onflow/flow-go/consensus/hotstuff" "sync" "github.com/onflow/flow-go/consensus/hotstuff/model" @@ -12,9 +13,10 @@ type OnBlockIncorporatedConsumer = func(incorporatedBlockID flow.Identifier) // FinalizationDistributor subscribes for finalization events from hotstuff and distributes it to subscribers type FinalizationDistributor struct { - blockFinalizedConsumers []OnBlockFinalizedConsumer - blockIncorporatedConsumers []OnBlockIncorporatedConsumer - lock sync.RWMutex + blockFinalizedConsumers []OnBlockFinalizedConsumer + blockIncorporatedConsumers []OnBlockIncorporatedConsumer + hotStuffFinalizationConsumers []hotstuff.FinalizationConsumer + lock sync.RWMutex } func NewFinalizationDistributor() *FinalizationDistributor { @@ -30,12 +32,19 @@ func (p *FinalizationDistributor) AddOnBlockFinalizedConsumer(consumer OnBlockFi defer p.lock.Unlock() p.blockFinalizedConsumers = append(p.blockFinalizedConsumers, consumer) } + func (p *FinalizationDistributor) AddOnBlockIncorporatedConsumer(consumer OnBlockIncorporatedConsumer) { p.lock.Lock() defer p.lock.Unlock() p.blockIncorporatedConsumers = append(p.blockIncorporatedConsumers, consumer) } +func (p *FinalizationDistributor) AddConsumer(consumer hotstuff.FinalizationConsumer) { + p.lock.Lock() + defer p.lock.Unlock() + p.hotStuffFinalizationConsumers = append(p.hotStuffFinalizationConsumers, consumer) +} + func (p *FinalizationDistributor) OnEventProcessed() {} func (p *FinalizationDistributor) OnBlockIncorporated(block *model.Block) { @@ -44,6 +53,9 @@ func (p *FinalizationDistributor) OnBlockIncorporated(block *model.Block) { for _, consumer := range p.blockIncorporatedConsumers { consumer(block.BlockID) } + for _, consumer := range p.hotStuffFinalizationConsumers { + consumer.OnBlockIncorporated(block) + } } func (p *FinalizationDistributor) OnFinalizedBlock(block *model.Block) { @@ -52,9 +64,18 @@ func (p *FinalizationDistributor) OnFinalizedBlock(block *model.Block) { for _, consumer := range p.blockFinalizedConsumers { consumer(block.BlockID) } + for _, consumer := range p.hotStuffFinalizationConsumers { + consumer.OnFinalizedBlock(block) + } } -func (p *FinalizationDistributor) OnDoubleProposeDetected(*model.Block, *model.Block) {} +func (p *FinalizationDistributor) OnDoubleProposeDetected(block1, block2 *model.Block) { + p.lock.RLock() + defer p.lock.RUnlock() + for _, consumer := range p.hotStuffFinalizationConsumers { + consumer.OnDoubleProposeDetected(block1, block2) + } +} func (p *FinalizationDistributor) OnReceiveVote(uint64, *model.Vote) {} diff --git a/engine/common/synchronization/engine.go b/engine/common/synchronization/engine.go index 1ad2a9def4c..792eae1012a 100644 --- a/engine/common/synchronization/engine.go +++ b/engine/common/synchronization/engine.go @@ -12,6 +12,7 @@ import ( "github.com/rs/zerolog" "github.com/onflow/flow-go/engine" + "github.com/onflow/flow-go/engine/common/fifoqueue" "github.com/onflow/flow-go/model/events" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/model/flow/filter" @@ -24,6 +25,31 @@ import ( "github.com/onflow/flow-go/storage" ) +// defaultSyncRequestQueueCapacity maximum capacity of sync requests queue +const defaultSyncRequestQueueCapacity = 500 + +// defaultSyncRequestQueueCapacity maximum capacity of range requests queue +const defaultRangeRequestQueueCapacity = 500 + +// defaultSyncRequestQueueCapacity maximum capacity of batch requests queue +const defaultBatchRequestQueueCapacity = 500 + +// defaultSyncResponseQueueCapacity maximum capacity of sync responses queue +const defaultSyncResponseQueueCapacity = 500 + +// defaultBlockResponseQueueCapacity maximum capacity of block responses queue +const defaultBlockResponseQueueCapacity = 500 + +// defaultEngineRequestsWorkers number of workers to dispatch events for requests +const defaultEngineRequestsWorkers = 8 + +// finalSnapshot is a helper structure which contains latest finalized header and participants list +// for consensus nodes, it is used in Engine to access latest valid data +type finalizedSnapshot struct { + head *flow.Header + participants flow.IdentityList +} + // Engine is the synchronization engine, responsible for synchronizing chain state. type Engine struct { unit *engine.Unit @@ -35,9 +61,21 @@ type Engine struct { blocks storage.Blocks comp network.Engine // compliance layer engine - pollInterval time.Duration - scanInterval time.Duration - core module.SyncCore + pollInterval time.Duration + scanInterval time.Duration + core module.SyncCore + lastFinalizedSnapshot *finalizedSnapshot // last finalized snapshot of header and consensus participants + + pendingSyncRequests engine.MessageStore // message store for *message.SyncRequest + pendingBatchRequests engine.MessageStore // message store for *message.BatchRequest + pendingRangeRequests engine.MessageStore // message store for *message.RangeRequest + requestMessageHandler *engine.MessageHandler // message handler responsible for request processing + + finalizationEventNotifier engine.Notifier // notifier for finalization events + + pendingSyncResponses engine.MessageStore // message store for *message.SyncResponse + pendingBlockResponses engine.MessageStore // message store for *message.BlockResponse + responseMessageHandler *engine.MessageHandler // message handler responsible for response processing } // New creates a new main chain synchronization engine. @@ -58,20 +96,32 @@ func New( f(opt) } + if comp == nil { + panic("must initialize synchronization engine with comp engine") + } + // initialize the propagation engine with its dependencies e := &Engine{ - unit: engine.NewUnit(), - log: log.With().Str("engine", "synchronization").Logger(), - metrics: metrics, - me: me, - state: state, - blocks: blocks, - comp: comp, - core: core, - pollInterval: opt.pollInterval, - scanInterval: opt.scanInterval, + unit: engine.NewUnit(), + log: log.With().Str("engine", "synchronization").Logger(), + metrics: metrics, + me: me, + state: state, + blocks: blocks, + comp: comp, + core: core, + pollInterval: opt.pollInterval, + scanInterval: opt.scanInterval, + finalizationEventNotifier: engine.NewNotifier(), + } + + err := e.setupResponseMessageHandler() + if err != nil { + return nil, fmt.Errorf("could not setup message handler") } + e.setupRequestMessageHandler() + // register the engine with the network layer and store the conduit con, err := net.Register(engine.SyncCommittee, e) if err != nil { @@ -79,17 +129,156 @@ func New( } e.con = con + err = e.onFinalizedBlock() + if err != nil { + return nil, fmt.Errorf("could not apply last finalized state") + } + return e, nil } +// finalSnapshot returns last locally stored snapshot which contains final header +// and list of consensus participants +func (e *Engine) finalSnapshot() *finalizedSnapshot { + e.unit.Lock() + defer e.unit.Unlock() + return e.lastFinalizedSnapshot +} + +// onFinalizedBlock updates latest locally cached finalized snapshot +func (e *Engine) onFinalizedBlock() error { + finalSnapshot := e.state.Final() + head, err := finalSnapshot.Head() + if err != nil { + return fmt.Errorf("could not get last finalized header: %w", err) + } + + // get all of the consensus nodes from the state + participants, err := finalSnapshot.Identities(filter.And( + filter.HasRole(flow.RoleConsensus), + filter.Not(filter.HasNodeID(e.me.NodeID())), + )) + if err != nil { + return fmt.Errorf("could get consensus participants at latest finalized block: %w", err) + } + + e.unit.Lock() + defer e.unit.Unlock() + if e.lastFinalizedSnapshot != nil && e.lastFinalizedSnapshot.head.Height >= head.Height { + return nil + } + e.lastFinalizedSnapshot = &finalizedSnapshot{ + head: head, + participants: participants, + } + return nil +} + +// setupRequestMessageHandler initializes the inbound queues and the MessageHandler for UNTRUSTED requests. +func (e *Engine) setupRequestMessageHandler() { + // RequestHeap deduplicates requests by keeping only one sync request for each requester. + e.pendingSyncRequests = NewRequestHeap(defaultSyncRequestQueueCapacity) + e.pendingRangeRequests = NewRequestHeap(defaultRangeRequestQueueCapacity) + e.pendingBatchRequests = NewRequestHeap(defaultBatchRequestQueueCapacity) + + // define message queueing behaviour + e.requestMessageHandler = engine.NewMessageHandler( + e.log, + engine.NewNotifier(), + engine.Pattern{ + Match: func(msg *engine.Message) bool { + _, ok := msg.Payload.(*messages.SyncRequest) + if ok { + e.metrics.MessageReceived(metrics.EngineSynchronization, metrics.MessageSyncRequest) + } + return ok + }, + Store: e.pendingSyncRequests, + }, + engine.Pattern{ + Match: func(msg *engine.Message) bool { + _, ok := msg.Payload.(*messages.RangeRequest) + if ok { + e.metrics.MessageReceived(metrics.EngineSynchronization, metrics.MessageRangeRequest) + } + return ok + }, + Store: e.pendingRangeRequests, + }, + engine.Pattern{ + Match: func(msg *engine.Message) bool { + _, ok := msg.Payload.(*messages.BatchRequest) + if ok { + e.metrics.MessageReceived(metrics.EngineSynchronization, metrics.MessageBatchRequest) + } + return ok + }, + Store: e.pendingBatchRequests, + }, + ) +} + +// setupResponseMessageHandler initializes the inbound queues and the MessageHandler for UNTRUSTED responses. +func (e *Engine) setupResponseMessageHandler() error { + syncResponseQueue, err := fifoqueue.NewFifoQueue( + fifoqueue.WithCapacity(defaultSyncResponseQueueCapacity)) + if err != nil { + return fmt.Errorf("failed to create queue for sync responses: %w", err) + } + + e.pendingSyncResponses = &engine.FifoMessageStore{ + FifoQueue: syncResponseQueue, + } + + blockResponseQueue, err := fifoqueue.NewFifoQueue( + fifoqueue.WithCapacity(defaultBlockResponseQueueCapacity)) + if err != nil { + return fmt.Errorf("failed to create queue for block responses: %w", err) + } + + e.pendingBlockResponses = &engine.FifoMessageStore{ + FifoQueue: blockResponseQueue, + } + + // define message queueing behaviour + e.responseMessageHandler = engine.NewMessageHandler( + e.log, + engine.NewNotifier(), + engine.Pattern{ + Match: func(msg *engine.Message) bool { + _, ok := msg.Payload.(*messages.SyncResponse) + if ok { + e.metrics.MessageReceived(metrics.EngineSynchronization, metrics.MessageSyncResponse) + } + return ok + }, + Store: e.pendingSyncResponses, + }, + engine.Pattern{ + Match: func(msg *engine.Message) bool { + _, ok := msg.Payload.(*messages.BlockResponse) + if ok { + e.metrics.MessageReceived(metrics.EngineSynchronization, metrics.MessageBlockResponse) + } + return ok + }, + Store: e.pendingBlockResponses, + }, + ) + + return nil +} + // Ready returns a ready channel that is closed once the engine has fully // started. For consensus engine, this is true once the underlying consensus // algorithm has started. func (e *Engine) Ready() <-chan struct{} { - if e.comp == nil { - panic("must initialize synchronization engine with comp engine") - } e.unit.Launch(e.checkLoop) + for i := 0; i < defaultEngineRequestsWorkers; i++ { + e.unit.Launch(e.requestProcessingLoop) + } + e.unit.Launch(e.responseProcessingLoop) + e.unit.Launch(e.finalizationProcessingLoop) return e.unit.Ready() } @@ -108,12 +297,10 @@ func (e *Engine) SubmitLocal(event interface{}) { // for processing in a non-blocking manner. It returns instantly and logs // a potential processing error internally when done. func (e *Engine) Submit(originID flow.Identifier, event interface{}) { - e.unit.Launch(func() { - err := e.Process(originID, event) - if err != nil { - engine.LogError(e.log, err) - } - }) + err := e.Process(originID, event) + if err != nil { + engine.LogError(e.log, err) + } } // ProcessLocal processes an event originating on the local node. @@ -124,59 +311,156 @@ func (e *Engine) ProcessLocal(event interface{}) error { // Process processes the given event from the node with the given origin ID in // a blocking manner. It returns the potential processing error when done. func (e *Engine) Process(originID flow.Identifier, event interface{}) error { - return e.unit.Do(func() error { - return e.process(originID, event) - }) -} - -// process processes events for the propagation engine on the consensus node. -func (e *Engine) process(originID flow.Identifier, event interface{}) error { - - switch ev := event.(type) { - case *messages.SyncRequest: - e.before(metrics.MessageSyncRequest) - defer e.after(metrics.MessageSyncRequest) - return e.onSyncRequest(originID, ev) - case *messages.SyncResponse: - e.before(metrics.MessageSyncResponse) - defer e.after(metrics.MessageSyncResponse) - return e.onSyncResponse(originID, ev) - case *messages.RangeRequest: - e.before(metrics.MessageRangeRequest) - defer e.after(metrics.MessageRangeRequest) - return e.onRangeRequest(originID, ev) - case *messages.BatchRequest: - e.before(metrics.MessageBatchRequest) - defer e.after(metrics.MessageBatchRequest) - return e.onBatchRequest(originID, ev) - case *messages.BlockResponse: - e.before(metrics.MessageBlockResponse) - defer e.after(metrics.MessageBlockResponse) - return e.onBlockResponse(originID, ev) + switch event.(type) { + case *messages.RangeRequest, *messages.BatchRequest, *messages.SyncRequest: + return e.requestMessageHandler.Process(originID, event) + case *messages.SyncResponse, *messages.BlockResponse: + return e.responseMessageHandler.Process(originID, event) default: return fmt.Errorf("invalid event type (%T)", event) } } -func (e *Engine) before(msg string) { - e.metrics.MessageReceived(metrics.EngineSynchronization, msg) - e.unit.Lock() +// OnFinalizedBlock implements the `OnFinalizedBlock` callback from the `hotstuff.FinalizationConsumer` +// (1) Updates local state of last finalized snapshot. +// CAUTION: the input to this callback is treated as trusted; precautions should be taken that messages +// from external nodes cannot be considered as inputs to this function +func (e *Engine) OnFinalizedBlock(flow.Identifier) { + // notify that there is new finalized block + e.finalizationEventNotifier.Notify() +} + +// requestProcessingLoop is a separate goroutine that performs processing of queued requests +func (e *Engine) requestProcessingLoop() { + notifier := e.requestMessageHandler.GetNotifier() + for { + select { + case <-e.unit.Quit(): + return + case <-notifier: + err := e.processAvailableRequests() + if err != nil { + e.log.Fatal().Err(err).Msg("internal error processing queued message") + } + } + } +} + +// finalizationProcessingLoop is a separate goroutine that performs processing of finalization events +func (e *Engine) finalizationProcessingLoop() { + notifier := e.finalizationEventNotifier.Channel() + for { + select { + case <-e.unit.Quit(): + return + case <-notifier: + err := e.onFinalizedBlock() + if err != nil { + e.log.Fatal().Err(err).Msg("could not process latest finalized block") + } + } + } +} + +// responseProcessingLoop is a separate goroutine that performs processing of queued responses +func (e *Engine) responseProcessingLoop() { + notifier := e.responseMessageHandler.GetNotifier() + for { + select { + case <-e.unit.Quit(): + return + case <-notifier: + err := e.processAvailableResponses() + if err != nil { + e.log.Fatal().Err(err).Msg("internal error processing queued message") + } + } + } +} + +// processAvailableResponses is processor of pending events which drives events from networking layer to business logic. +func (e *Engine) processAvailableResponses() error { + for { + select { + case <-e.unit.Quit(): + return nil + default: + } + + msg, ok := e.pendingSyncResponses.Get() + if ok { + err := e.onSyncResponse(msg.OriginID, msg.Payload.(*messages.SyncResponse)) + e.metrics.MessageHandled(metrics.EngineSynchronization, metrics.MessageSyncResponse) + if err != nil { + return fmt.Errorf("could not process sync response") + } + continue + } + + msg, ok = e.pendingBlockResponses.Get() + if ok { + err := e.onBlockResponse(msg.OriginID, msg.Payload.(*messages.BlockResponse)) + e.metrics.MessageHandled(metrics.EngineSynchronization, metrics.MessageBlockResponse) + if err != nil { + return fmt.Errorf("could not process block response") + } + continue + } + + // when there is no more messages in the queue, back to the loop to wait + // for the next incoming message to arrive. + return nil + } } -func (e *Engine) after(msg string) { - e.unit.Unlock() - e.metrics.MessageHandled(metrics.EngineSynchronization, msg) +// processAvailableRequests is processor of pending events which drives events from networking layer to business logic. +func (e *Engine) processAvailableRequests() error { + + for { + select { + case <-e.unit.Quit(): + return nil + default: + } + + msg, ok := e.pendingSyncRequests.Get() + if ok { + err := e.onSyncRequest(msg.OriginID, msg.Payload.(*messages.SyncRequest)) + if err != nil { + return fmt.Errorf("could not process sync request") + } + continue + } + + msg, ok = e.pendingRangeRequests.Get() + if ok { + err := e.onRangeRequest(msg.OriginID, msg.Payload.(*messages.RangeRequest)) + if err != nil { + return fmt.Errorf("could not process range request") + } + continue + } + + msg, ok = e.pendingBatchRequests.Get() + if ok { + err := e.onBatchRequest(msg.OriginID, msg.Payload.(*messages.BatchRequest)) + if err != nil { + return fmt.Errorf("could not process batch request") + } + continue + } + + // when there is no more messages in the queue, back to the loop to wait + // for the next incoming message to arrive. + return nil + } } // onSyncRequest processes an outgoing handshake; if we have a higher height, we // inform the other node of it, so they can organize their block downloads. If // we have a lower height, we add the difference to our own download queue. func (e *Engine) onSyncRequest(originID flow.Identifier, req *messages.SyncRequest) error { - - final, err := e.state.Final().Head() - if err != nil { - return fmt.Errorf("could not get finalized height: %w", err) - } + final := e.finalSnapshot().head // queue any missing heights as needed e.core.HandleHeight(final, req.Height) @@ -192,7 +476,7 @@ func (e *Engine) onSyncRequest(originID flow.Identifier, req *messages.SyncReque Height: final.Height, Nonce: req.Nonce, } - err = e.con.Unicast(res, originID) + err := e.con.Unicast(res, originID) if err != nil { return fmt.Errorf("could not send sync response: %w", err) } @@ -205,11 +489,7 @@ func (e *Engine) onSyncRequest(originID flow.Identifier, req *messages.SyncReque // onSyncResponse processes a synchronization response. func (e *Engine) onSyncResponse(originID flow.Identifier, res *messages.SyncResponse) error { - final, err := e.state.Final().Head() - if err != nil { - return fmt.Errorf("could not get finalized height: %w", err) - } - + final := e.finalSnapshot().head e.core.HandleHeight(final, res.Height) return nil } @@ -218,10 +498,7 @@ func (e *Engine) onSyncResponse(originID flow.Identifier, res *messages.SyncResp func (e *Engine) onRangeRequest(originID flow.Identifier, req *messages.RangeRequest) error { // get the latest final state to know if we can fulfill the request - head, err := e.state.Final().Head() - if err != nil { - return fmt.Errorf("could not get finalized head: %w", err) - } + head := e.finalSnapshot().head // if we don't have anything to send, we can bail right away if head.Height < req.FromHeight || req.FromHeight > req.ToHeight { @@ -253,7 +530,7 @@ func (e *Engine) onRangeRequest(originID flow.Identifier, req *messages.RangeReq Nonce: req.Nonce, Blocks: blocks, } - err = e.con.Unicast(res, originID) + err := e.con.Unicast(res, originID) if err != nil { return fmt.Errorf("could not send range response: %w", err) } @@ -314,32 +591,20 @@ func (e *Engine) onBatchRequest(originID flow.Identifier, req *messages.BatchReq // onBlockResponse processes a response containing a specifically requested block. func (e *Engine) onBlockResponse(originID flow.Identifier, res *messages.BlockResponse) error { - // process the blocks one by one for _, block := range res.Blocks { - e.processIncomingBlock(originID, block) + if !e.core.HandleBlock(block.Header) { + continue + } + synced := &events.SyncedBlock{ + OriginID: originID, + Block: block, + } + e.comp.SubmitLocal(synced) } - return nil } -// processIncoming processes an incoming block, so we can take into account the -// overlap between block IDs and heights. -func (e *Engine) processIncomingBlock(originID flow.Identifier, block *flow.Block) { - - shouldProcess := e.core.HandleBlock(block.Header) - if !shouldProcess { - return - } - - synced := &events.SyncedBlock{ - OriginID: originID, - Block: block, - } - - e.comp.SubmitLocal(synced) -} - // checkLoop will regularly scan for items that need requesting. func (e *Engine) checkLoop() { pollChan := make(<-chan time.Time) @@ -369,19 +634,12 @@ CheckLoop: } case <-scan.C: - final, err := e.state.Final().Head() - if err != nil { - e.log.Error().Err(err).Msg("could not get final height") - continue - } - - e.unit.Lock() - ranges, batches := e.core.ScanPending(final) - err = e.sendRequests(ranges, batches) + snapshot := e.finalSnapshot() + ranges, batches := e.core.ScanPending(snapshot.head) + err := e.sendRequests(snapshot.participants, ranges, batches) if err != nil { e.log.Error().Err(err).Msg("could not send requests") } - e.unit.Unlock() } } @@ -392,28 +650,14 @@ CheckLoop: // pollHeight will send a synchronization request to three random nodes. func (e *Engine) pollHeight() error { - // get the last finalized header - final, err := e.state.Final().Head() - if err != nil { - return fmt.Errorf("could not get last finalized header: %w", err) - } - - // get all of the consensus nodes from the state - participants, err := e.state.Final().Identities(filter.And( - filter.HasRole(flow.RoleConsensus), - filter.Not(filter.HasNodeID(e.me.NodeID())), - )) - - if err != nil { - return fmt.Errorf("could not send get consensus identities: %w", err) - } + snapshot := e.finalSnapshot() // send the request for synchronization req := &messages.SyncRequest{ Nonce: rand.Uint64(), - Height: final.Height, + Height: snapshot.head.Height, } - err = e.con.Multicast(req, synccore.DefaultPollNodes, participants.NodeIDs()...) + err := e.con.Multicast(req, synccore.DefaultPollNodes, snapshot.participants.NodeIDs()...) if err != nil { return fmt.Errorf("could not send sync request: %w", err) } @@ -423,16 +667,8 @@ func (e *Engine) pollHeight() error { return err } -// sendRequests sends a request for each range and batch. -func (e *Engine) sendRequests(ranges []flow.Range, batches []flow.Batch) error { - - participants, err := e.state.Final().Identities(filter.And( - filter.HasRole(flow.RoleConsensus), - filter.Not(filter.HasNodeID(e.me.NodeID())), - )) - if err != nil { - return fmt.Errorf("could not get participants: %w", err) - } +// sendRequests sends a request for each range and batch using consensus participants from last finalized snapshot. +func (e *Engine) sendRequests(participants flow.IdentityList, ranges []flow.Range, batches []flow.Batch) error { var errs error for _, ran := range ranges { diff --git a/engine/common/synchronization/engine_test.go b/engine/common/synchronization/engine_test.go index afa77161d5c..cce472e67c0 100644 --- a/engine/common/synchronization/engine_test.go +++ b/engine/common/synchronization/engine_test.go @@ -93,6 +93,15 @@ func (ss *SyncSuite) SetupTest() { return ss.snapshot }, ) + ss.state.On("AtBlockID", mock.Anything).Return( + func(blockID flow.Identifier) protocolint.Snapshot { + if ss.head.ID() == blockID { + return ss.snapshot + } else { + return unittest.StateSnapshotForUnknownBlock() + } + }, + ).Maybe() // set up the snapshot mock ss.snapshot = &protocol.Snapshot{} @@ -402,7 +411,8 @@ func (ss *SyncSuite) TestSendRequests() { ) ss.core.On("BatchRequested", batches[0]) - err := ss.e.sendRequests(ranges, batches) + // exclude my node ID + err := ss.e.sendRequests(ss.participants[1:], ranges, batches) ss.Assert().Nil(err) ss.con.AssertExpectations(ss.T()) } @@ -414,3 +424,52 @@ func (ss *SyncSuite) TestStartStop() { <-ss.e.Done() }, time.Second) } + +// TestProcessingMultipleItems tests that items are processed in async way +func (ss *SyncSuite) TestProcessingMultipleItems() { + <-ss.e.Ready() + + originID := unittest.IdentifierFixture() + for i := 0; i < 5; i++ { + msg := &messages.SyncResponse{ + Nonce: uint64(i), + Height: uint64(1000 + i), + } + ss.core.On("HandleHeight", mock.Anything, msg.Height).Once() + require.NoError(ss.T(), ss.e.Process(originID, msg)) + } + + finalHeight := ss.head.Height + for i := 0; i < 5; i++ { + msg := &messages.SyncRequest{ + Nonce: uint64(i), + Height: finalHeight - 100, + } + + originID := unittest.IdentifierFixture() + ss.core.On("WithinTolerance", mock.Anything, mock.Anything).Return(false) + ss.core.On("HandleHeight", mock.Anything, msg.Height).Once() + ss.con.On("Unicast", mock.Anything, mock.Anything).Return(nil) + + require.NoError(ss.T(), ss.e.Process(originID, msg)) + } + + // give at least some time to process items + time.Sleep(time.Millisecond * 100) + + ss.core.AssertExpectations(ss.T()) +} + +// TestOnFinalizedBlock tests that when new finalized block is discovered engine updates cached variables +// to latest state +func (ss *SyncSuite) TestOnFinalizedBlock() { + finalizedBlock := unittest.BlockHeaderWithParentFixture(ss.head) + // change head + ss.head = &finalizedBlock + + err := ss.e.onFinalizedBlock() + require.NoError(ss.T(), err) + actualSnapshot := ss.e.finalSnapshot() + require.ElementsMatch(ss.T(), actualSnapshot.participants, ss.participants[1:]) + require.Equal(ss.T(), actualSnapshot.head, &finalizedBlock) +} diff --git a/engine/common/synchronization/request_heap.go b/engine/common/synchronization/request_heap.go new file mode 100644 index 00000000000..b7e012d1a9b --- /dev/null +++ b/engine/common/synchronization/request_heap.go @@ -0,0 +1,73 @@ +package synchronization + +import ( + "sync" + + "github.com/onflow/flow-go/engine" + "github.com/onflow/flow-go/model/flow" +) + +// RequestHeap is a special structure that implements engine.MessageStore interface and +// indexes requests by originator. If request will be sent by same originator then it will replace the old one. +// Comparing to default FIFO queue this one can contain MAX one request for origin ID. +// Getting value from queue as well as ejecting is pseudo-random. +type RequestHeap struct { + lock sync.Mutex + limit uint + requests map[flow.Identifier]*engine.Message +} + +func NewRequestHeap(limit uint) *RequestHeap { + return &RequestHeap{ + limit: limit, + requests: make(map[flow.Identifier]*engine.Message), + } +} + +// Put stores message into requests map using OriginID as key. +// Returns always true +func (q *RequestHeap) Put(message *engine.Message) bool { + q.lock.Lock() + defer q.lock.Unlock() + // first try to eject if we are at max capacity, we need to do this way + // to prevent a situation where just inserted item gets ejected + if _, found := q.requests[message.OriginID]; !found { + // if no message from the origin is stored, make sure we have room to store the new message: + q.reduce() + } + // at this point we can be sure that there is at least one slot + q.requests[message.OriginID] = message + return true +} + +// Get returns pseudo-random element from request storage using go map properties. +func (q *RequestHeap) Get() (*engine.Message, bool) { + q.lock.Lock() + defer q.lock.Unlock() + + var originID flow.Identifier + var msg *engine.Message + + if len(q.requests) == 0 { + return nil, false + } + + // pick first element using go map randomness property + for originID, msg = range q.requests { + break + } + + delete(q.requests, originID) + + return msg, true +} + +// reduce will reduce the size of the kept entities until we are within the +// configured memory pool size limit. If called on max capacity will eject at least one element. +func (q *RequestHeap) reduce() { + for overCapacity := len(q.requests) - int(q.limit); overCapacity >= 0; overCapacity-- { + for originID := range q.requests { + delete(q.requests, originID) + } + } +} diff --git a/engine/common/synchronization/request_heap_test.go b/engine/common/synchronization/request_heap_test.go new file mode 100644 index 00000000000..41e5ab2ba12 --- /dev/null +++ b/engine/common/synchronization/request_heap_test.go @@ -0,0 +1,93 @@ +package synchronization + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/onflow/flow-go/engine" + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/utils/unittest" +) + +// TestRequestQueue_Get tests that after pushing multiple items we can drain the queue using Get method +func TestRequestQueue_Get(t *testing.T) { + q := NewRequestHeap(100) + items := 20 + messages := make(map[flow.Identifier]*engine.Message) + for i := 0; i < items; i++ { + msg := &engine.Message{ + OriginID: unittest.IdentifierFixture(), + Payload: unittest.IdentifierFixture(), + } + messages[msg.OriginID] = msg + require.True(t, q.Put(msg)) + } + + for i := 0; i < items; i++ { + msg, ok := q.Get() + require.True(t, ok) + expected, ok := messages[msg.OriginID] + require.True(t, ok) + require.Equal(t, expected, msg) + } + + // at this point queue should be empty + _, ok := q.Get() + require.False(t, ok) +} + +// TestRequestQueue_Put tests that putting an item into queue overwrites previous one +func TestRequestQueue_Put(t *testing.T) { + q := NewRequestHeap(100) + msg := &engine.Message{ + OriginID: unittest.IdentifierFixture(), + Payload: unittest.IdentifierFixture(), + } + require.True(t, q.Put(msg)) + require.Equal(t, msg, q.requests[msg.OriginID]) + + newMsg := &engine.Message{ + OriginID: msg.OriginID, + Payload: unittest.BlockFixture(), + } + + // this should overwrite message + require.True(t, q.Put(newMsg)) + require.Equal(t, newMsg, q.requests[msg.OriginID]) +} + +// TestRequestQueue_PutAtMaxCapacity tests that putting an item over max capacity results in successful eject and put +func TestRequestQueue_PutAtMaxCapacity(t *testing.T) { + limit := uint(10) + q := NewRequestHeap(limit) + messages := make(map[flow.Identifier]*engine.Message) + for i := uint(0); i < limit; i++ { + msg := &engine.Message{ + OriginID: unittest.IdentifierFixture(), + Payload: unittest.IdentifierFixture(), + } + require.True(t, q.Put(msg)) + messages[msg.OriginID] = msg + } + newMsg := &engine.Message{ + OriginID: unittest.IdentifierFixture(), + Payload: unittest.BlockFixture(), + } + require.True(t, q.Put(newMsg)) + + ejectedCount := 0 + for { + m, ok := q.Get() + if !ok { + break + } + expectedMessage, found := messages[m.OriginID] + if !found { + ejectedCount++ + } else { + require.Equal(t, m, expectedMessage) + } + } + require.Equal(t, 1, ejectedCount) +} diff --git a/engine/testutil/nodes.go b/engine/testutil/nodes.go index 177ed9b98ee..290f2e41af9 100644 --- a/engine/testutil/nodes.go +++ b/engine/testutil/nodes.go @@ -14,6 +14,7 @@ import ( "github.com/onflow/flow-go/consensus/hotstuff" mockhotstuff "github.com/onflow/flow-go/consensus/hotstuff/mocks" "github.com/onflow/flow-go/consensus/hotstuff/notifications" + "github.com/onflow/flow-go/consensus/hotstuff/notifications/pubsub" "github.com/onflow/flow-go/crypto" "github.com/onflow/flow-go/engine" collectioningest "github.com/onflow/flow-go/engine/collection/ingest" @@ -443,7 +444,7 @@ func ExecutionNode(t *testing.T, hub *stub.Hub, identity *flow.Identity, identit deltas, err := ingestion.NewDeltas(1000) require.NoError(t, err) - checkerEngine := &CheckerMock{} + finalizationDistributor := pubsub.NewFinalizationDistributor() rootHead, rootQC := getRoot(t, &node) ingestionEngine, err := ingestion.New( @@ -475,7 +476,7 @@ func ExecutionNode(t *testing.T, hub *stub.Hub, identity *flow.Identity, identit node.ProtocolEvents.AddConsumer(ingestionEngine) - followerCore, finalizer := createFollowerCore(t, &node, followerState, checkerEngine, rootHead, rootQC) + followerCore, finalizer := createFollowerCore(t, &node, followerState, finalizationDistributor, rootHead, rootQC) // initialize cleaner for DB cleaner := storage.NewCleaner(node.Log, node.DB, node.Metrics, flow.DefaultValueLogGCFrequency) @@ -497,6 +498,8 @@ func ExecutionNode(t *testing.T, hub *stub.Hub, identity *flow.Identity, identit ) require.NoError(t, err) + finalizationDistributor.AddOnBlockFinalizedConsumer(syncEngine.OnFinalizedBlock) + return testmock.ExecutionNode{ GenericNode: node, MutableState: followerState, diff --git a/module/synchronization/core.go b/module/synchronization/core.go index b2c2c58c92c..f372404fe40 100644 --- a/module/synchronization/core.go +++ b/module/synchronization/core.go @@ -45,8 +45,7 @@ func DefaultConfig() Config { // Core should be wrapped by a type-aware engine that manages the specifics of // each chain. Example: https://github.com/onflow/flow-go/blob/master/engine/common/synchronization/engine.go // -// Core is NOT safe for concurrent use by multiple goroutines. Wrapping engines -// are responsible for avoid concurrent access. +// Core is safe for concurrent use by multiple goroutines. type Core struct { log zerolog.Logger Config Config @@ -97,9 +96,6 @@ func (c *Core) HandleBlock(header *flow.Header) bool { // If the height difference between local and the reported height, we do nothing. // Otherwise, we queue each missing height. func (c *Core) HandleHeight(final *flow.Header, height uint64) { - c.mu.Lock() - defer c.mu.Unlock() - // don't bother queueing anything if we're within tolerance if c.WithinTolerance(final, height) { return @@ -107,6 +103,8 @@ func (c *Core) HandleHeight(final *flow.Header, height uint64) { // if we are sufficiently behind, we want to sync the missing blocks if height > final.Height { + c.mu.Lock() + defer c.mu.Unlock() for h := final.Height + 1; h <= height; h++ { c.queueByHeight(h) } diff --git a/storage/badger/blocks.go b/storage/badger/blocks.go index 4449c27cf7c..8e4cfb6d4ce 100644 --- a/storage/badger/blocks.go +++ b/storage/badger/blocks.go @@ -77,12 +77,14 @@ func (b *Blocks) ByID(blockID flow.Identifier) (*flow.Block, error) { // ByHeight ... func (b *Blocks) ByHeight(height uint64) (*flow.Block, error) { - var blockID flow.Identifier - err := b.db.View(operation.LookupBlockHeight(height, &blockID)) + tx := b.db.NewTransaction(false) + defer tx.Discard() + + blockID, err := b.headers.retrieveIdByHeightTx(height)(tx) if err != nil { - return nil, fmt.Errorf("could not look up block: %w", err) + return nil, err } - return b.ByID(blockID) + return b.retrieveTx(blockID)(tx) } // ByCollectionID ... diff --git a/storage/badger/headers.go b/storage/badger/headers.go index 6f69c978656..5af7ee78694 100644 --- a/storage/badger/headers.go +++ b/storage/badger/headers.go @@ -107,6 +107,16 @@ func (h *Headers) retrieveTx(blockID flow.Identifier) func(*badger.Txn) (*flow.H } } +func (h *Headers) retrieveIdByHeightTx(height uint64) func(*badger.Txn) (flow.Identifier, error) { + return func(tx *badger.Txn) (flow.Identifier, error) { + blockID, err := h.heightCache.Get(height)(tx) + if err != nil { + return flow.ZeroID, fmt.Errorf("failed to retrieve block ID for height %d: %w", height, err) + } + return blockID.(flow.Identifier), nil + } +} + func (h *Headers) Store(header *flow.Header) error { return operation.RetryOnConflictTx(h.db, transaction.Update, h.storeTx(header)) } @@ -121,11 +131,11 @@ func (h *Headers) ByHeight(height uint64) (*flow.Header, error) { tx := h.db.NewTransaction(false) defer tx.Discard() - blockID, err := h.heightCache.Get(height)(tx) + blockID, err := h.retrieveIdByHeightTx(height)(tx) if err != nil { - return nil, fmt.Errorf("could not look up height: %w", err) + return nil, err } - return h.ByBlockID(blockID.(flow.Identifier)) + return h.retrieveTx(blockID)(tx) } func (h *Headers) ByParentID(parentID flow.Identifier) ([]*flow.Header, error) {