Skip to content

Commit

Permalink
Merge pull request onflow#910 from onflow/yurii/5615-synchronization-…
Browse files Browse the repository at this point in the history
…engine-queue

[Consensus] Queue for `synchronization.Engine`
  • Loading branch information
durkmurder committed Jul 5, 2021
2 parents d648e6a + a00f07e commit 2641774
Show file tree
Hide file tree
Showing 14 changed files with 689 additions and 167 deletions.
10 changes: 9 additions & 1 deletion cmd/access/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/onflow/flow-go/cmd"
"github.com/onflow/flow-go/consensus"
"github.com/onflow/flow-go/consensus/hotstuff/committees"
"github.com/onflow/flow-go/consensus/hotstuff/notifications/pubsub"
"github.com/onflow/flow-go/consensus/hotstuff/verification"
recovery "github.com/onflow/flow-go/consensus/recovery/protocol"
"github.com/onflow/flow-go/engine"
Expand Down Expand Up @@ -59,6 +60,7 @@ func main() {
syncCore *synchronization.Core
rpcConf rpc.Config
rpcEng *rpc.Engine
finalizationDistributor *pubsub.FinalizationDistributor
collectionRPC access.AccessAPIClient
executionNodeAddress string // deprecated
historicalAccessRPCs []access.AccessAPIClient
Expand Down Expand Up @@ -275,9 +277,12 @@ func main() {
return nil, fmt.Errorf("could not find latest finalized block and pending blocks to recover consensus follower: %w", err)
}

finalizationDistributor = pubsub.NewFinalizationDistributor()
finalizationDistributor.AddConsumer(ingestEng)

// creates a consensus follower with ingestEngine as the notifier
// so that it gets notified upon each new finalized block
followerCore, err := consensus.NewFollower(node.Logger, committee, node.Storage.Headers, final, verifier, ingestEng, node.RootBlock.Header, node.RootQC, finalized, pending)
followerCore, err := consensus.NewFollower(node.Logger, committee, node.Storage.Headers, final, verifier, finalizationDistributor, node.RootBlock.Header, node.RootQC, finalized, pending)
if err != nil {
return nil, fmt.Errorf("could not initialize follower core: %w", err)
}
Expand Down Expand Up @@ -316,6 +321,9 @@ func main() {
if err != nil {
return nil, fmt.Errorf("could not create synchronization engine: %w", err)
}

finalizationDistributor.AddOnBlockFinalizedConsumer(sync.OnFinalizedBlock)

return sync, nil
}).
Component("ping engine", func(node *cmd.FlowNodeBuilder) (module.ReadyDoneAware, error) {
Expand Down
14 changes: 8 additions & 6 deletions cmd/collection/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/onflow/flow-go/cmd"
"github.com/onflow/flow-go/consensus"
"github.com/onflow/flow-go/consensus/hotstuff/committees"
"github.com/onflow/flow-go/consensus/hotstuff/notifications"
"github.com/onflow/flow-go/consensus/hotstuff/notifications/pubsub"
"github.com/onflow/flow-go/consensus/hotstuff/pacemaker/timeout"
"github.com/onflow/flow-go/consensus/hotstuff/verification"
recovery "github.com/onflow/flow-go/consensus/recovery/protocol"
Expand Down Expand Up @@ -64,8 +64,9 @@ func main() {
ingestConf ingest.Config
ingressConf ingress.Config

pools *epochpool.TransactionPools // epoch-scoped transaction pools
followerBuffer *buffer.PendingBlocks // pending block cache for follower
pools *epochpool.TransactionPools // epoch-scoped transaction pools
followerBuffer *buffer.PendingBlocks // pending block cache for follower
finalizationDistributor *pubsub.FinalizationDistributor

push *pusher.Engine
ing *ingest.Engine
Expand Down Expand Up @@ -184,8 +185,7 @@ func main() {
// initialize the verifier for the protocol consensus
verifier := verification.NewCombinedVerifier(mainConsensusCommittee, staking, beacon, merger)

// use proper engine for notifier to follower
notifier := notifications.NewNoopConsumer()
finalizationDistributor = pubsub.NewFinalizationDistributor()

finalized, pending, err := recovery.FindLatest(node.State, node.Storage.Headers)
if err != nil {
Expand All @@ -199,7 +199,7 @@ func main() {
node.Storage.Headers,
finalizer,
verifier,
notifier,
finalizationDistributor,
node.RootBlock.Header,
node.RootQC,
finalized,
Expand Down Expand Up @@ -246,6 +246,8 @@ func main() {
return nil, fmt.Errorf("could not create synchronization engine: %w", err)
}

finalizationDistributor.AddOnBlockFinalizedConsumer(sync.OnFinalizedBlock)

return sync, nil
}).
Component("ingestion engine", func(node *cmd.FlowNodeBuilder) (module.ReadyDoneAware, error) {
Expand Down
2 changes: 2 additions & 0 deletions cmd/consensus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,8 @@ func main() {
return nil, fmt.Errorf("could not initialize synchronization engine: %w", err)
}

finalizationDistributor.AddOnBlockFinalizedConsumer(sync.OnFinalizedBlock)

return sync, nil
}).
Component("receipt requester engine", func(node *cmd.FlowNodeBuilder) (module.ReadyDoneAware, error) {
Expand Down
11 changes: 9 additions & 2 deletions cmd/execution/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/onflow/flow-go/cmd"
"github.com/onflow/flow-go/consensus"
"github.com/onflow/flow-go/consensus/hotstuff/committees"
"github.com/onflow/flow-go/consensus/hotstuff/notifications/pubsub"
"github.com/onflow/flow-go/consensus/hotstuff/verification"
recovery "github.com/onflow/flow-go/consensus/recovery/protocol"
"github.com/onflow/flow-go/engine"
Expand Down Expand Up @@ -70,6 +71,7 @@ func main() {
computationManager *computation.Manager
collectionRequester *requester.Engine
ingestionEng *ingestion.Engine
finalizationDistributor *pubsub.FinalizationDistributor
rpcConf rpc.Config
err error
executionState state.ExecutionState
Expand Down Expand Up @@ -380,9 +382,12 @@ func main() {
return nil, fmt.Errorf("could not find latest finalized block and pending blocks to recover consensus follower: %w", err)
}

finalizationDistributor = pubsub.NewFinalizationDistributor()
finalizationDistributor.AddConsumer(checkerEng)

// creates a consensus follower with ingestEngine as the notifier
// so that it gets notified upon each new finalized block
followerCore, err := consensus.NewFollower(node.Logger, committee, node.Storage.Headers, final, verifier, checkerEng, node.RootBlock.Header, node.RootQC, finalized, pending)
followerCore, err := consensus.NewFollower(node.Logger, committee, node.Storage.Headers, final, verifier, finalizationDistributor, node.RootBlock.Header, node.RootQC, finalized, pending)
if err != nil {
return nil, fmt.Errorf("could not create follower core logic: %w", err)
}
Expand Down Expand Up @@ -427,7 +432,7 @@ func main() {
)
return eng, err
}).
Component("sychronization engine", func(node *cmd.FlowNodeBuilder) (module.ReadyDoneAware, error) {
Component("synchronization engine", func(node *cmd.FlowNodeBuilder) (module.ReadyDoneAware, error) {
// initialize the synchronization engine
syncEngine, err = synchronization.New(
node.Logger,
Expand All @@ -443,6 +448,8 @@ func main() {
return nil, fmt.Errorf("could not initialize synchronization engine: %w", err)
}

finalizationDistributor.AddOnBlockFinalizedConsumer(syncEngine.OnFinalizedBlock)

return syncEngine, nil
}).
Component("grpc server", func(node *cmd.FlowNodeBuilder) (module.ReadyDoneAware, error) {
Expand Down
26 changes: 17 additions & 9 deletions cmd/verification/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/onflow/flow-go/cmd"
"github.com/onflow/flow-go/consensus"
"github.com/onflow/flow-go/consensus/hotstuff/committees"
"github.com/onflow/flow-go/consensus/hotstuff/notifications/pubsub"
"github.com/onflow/flow-go/consensus/hotstuff/verification"
recovery "github.com/onflow/flow-go/consensus/recovery/protocol"
followereng "github.com/onflow/flow-go/engine/common/follower"
Expand Down Expand Up @@ -60,14 +61,15 @@ func main() {
processedBlockHeight *storage.ConsumerProgress // used in block consumer
chunkQueue *storage.ChunksQueue // used in chunk consumer

syncCore *synchronization.Core // used in follower engine
pendingBlocks *buffer.PendingBlocks // used in follower engine
assignerEngine *assigner.Engine // the assigner engine
fetcherEngine *fetcher.Engine // the fetcher engine
requesterEngine *vereq.Engine // the requester engine
verifierEng *verifier.Engine // the verifier engine
chunkConsumer *chunkconsumer.ChunkConsumer
blockConsumer *blockconsumer.BlockConsumer
syncCore *synchronization.Core // used in follower engine
pendingBlocks *buffer.PendingBlocks // used in follower engine
assignerEngine *assigner.Engine // the assigner engine
fetcherEngine *fetcher.Engine // the fetcher engine
requesterEngine *vereq.Engine // the requester engine
verifierEng *verifier.Engine // the verifier engine
chunkConsumer *chunkconsumer.ChunkConsumer
blockConsumer *blockconsumer.BlockConsumer
finalizationDistributor *pubsub.FinalizationDistributor

followerEng *followereng.Engine // the follower engine
collector module.VerificationMetrics // used to collect metrics of all engines
Expand Down Expand Up @@ -297,9 +299,12 @@ func main() {
return nil, fmt.Errorf("could not find latest finalized block and pending blocks to recover consensus follower: %w", err)
}

finalizationDistributor = pubsub.NewFinalizationDistributor()
finalizationDistributor.AddConsumer(blockConsumer)

// creates a consensus follower with ingestEngine as the notifier
// so that it gets notified upon each new finalized block
followerCore, err := consensus.NewFollower(node.Logger, committee, node.Storage.Headers, final, verifier, blockConsumer, node.RootBlock.Header,
followerCore, err := consensus.NewFollower(node.Logger, committee, node.Storage.Headers, final, verifier, finalizationDistributor, node.RootBlock.Header,
node.RootQC, finalized, pending)
if err != nil {
return nil, fmt.Errorf("could not create follower core logic: %w", err)
Expand Down Expand Up @@ -339,6 +344,9 @@ func main() {
if err != nil {
return nil, fmt.Errorf("could not create synchronization engine: %w", err)
}

finalizationDistributor.AddOnBlockFinalizedConsumer(sync.OnFinalizedBlock)

return sync, nil
}).
Run()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pubsub

import (
"github.com/onflow/flow-go/consensus/hotstuff"
"sync"

"github.com/onflow/flow-go/consensus/hotstuff/model"
Expand All @@ -12,9 +13,10 @@ type OnBlockIncorporatedConsumer = func(incorporatedBlockID flow.Identifier)

// FinalizationDistributor subscribes for finalization events from hotstuff and distributes it to subscribers
type FinalizationDistributor struct {
blockFinalizedConsumers []OnBlockFinalizedConsumer
blockIncorporatedConsumers []OnBlockIncorporatedConsumer
lock sync.RWMutex
blockFinalizedConsumers []OnBlockFinalizedConsumer
blockIncorporatedConsumers []OnBlockIncorporatedConsumer
hotStuffFinalizationConsumers []hotstuff.FinalizationConsumer
lock sync.RWMutex
}

func NewFinalizationDistributor() *FinalizationDistributor {
Expand All @@ -30,12 +32,19 @@ func (p *FinalizationDistributor) AddOnBlockFinalizedConsumer(consumer OnBlockFi
defer p.lock.Unlock()
p.blockFinalizedConsumers = append(p.blockFinalizedConsumers, consumer)
}

func (p *FinalizationDistributor) AddOnBlockIncorporatedConsumer(consumer OnBlockIncorporatedConsumer) {
p.lock.Lock()
defer p.lock.Unlock()
p.blockIncorporatedConsumers = append(p.blockIncorporatedConsumers, consumer)
}

func (p *FinalizationDistributor) AddConsumer(consumer hotstuff.FinalizationConsumer) {
p.lock.Lock()
defer p.lock.Unlock()
p.hotStuffFinalizationConsumers = append(p.hotStuffFinalizationConsumers, consumer)
}

func (p *FinalizationDistributor) OnEventProcessed() {}

func (p *FinalizationDistributor) OnBlockIncorporated(block *model.Block) {
Expand All @@ -44,6 +53,9 @@ func (p *FinalizationDistributor) OnBlockIncorporated(block *model.Block) {
for _, consumer := range p.blockIncorporatedConsumers {
consumer(block.BlockID)
}
for _, consumer := range p.hotStuffFinalizationConsumers {
consumer.OnBlockIncorporated(block)
}
}

func (p *FinalizationDistributor) OnFinalizedBlock(block *model.Block) {
Expand All @@ -52,9 +64,18 @@ func (p *FinalizationDistributor) OnFinalizedBlock(block *model.Block) {
for _, consumer := range p.blockFinalizedConsumers {
consumer(block.BlockID)
}
for _, consumer := range p.hotStuffFinalizationConsumers {
consumer.OnFinalizedBlock(block)
}
}

func (p *FinalizationDistributor) OnDoubleProposeDetected(*model.Block, *model.Block) {}
func (p *FinalizationDistributor) OnDoubleProposeDetected(block1, block2 *model.Block) {
p.lock.RLock()
defer p.lock.RUnlock()
for _, consumer := range p.hotStuffFinalizationConsumers {
consumer.OnDoubleProposeDetected(block1, block2)
}
}

func (p *FinalizationDistributor) OnReceiveVote(uint64, *model.Vote) {}

Expand Down
Loading

0 comments on commit 2641774

Please sign in to comment.