Skip to content

Commit

Permalink
tendermint#3186 eventbus and indexservice start before handshaker
Browse files Browse the repository at this point in the history
  • Loading branch information
ackratos committed Jan 23, 2019
1 parent 2449bf7 commit 0424582
Show file tree
Hide file tree
Showing 12 changed files with 68 additions and 66 deletions.
2 changes: 1 addition & 1 deletion blockchain/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func newBlockchainReactor(logger log.Logger, genDoc *types.GenesisDoc, privVals
// NOTE we have to create and commit the blocks first because
// pool.height is determined from the store.
fastSync := true
blockExec := sm.NewBlockExecutor(dbm.NewMemDB(), log.TestingLogger(), proxyApp.Consensus(),
blockExec := sm.NewBlockExecutor(dbm.NewMemDB(), log.TestingLogger(), proxyApp.Consensus(), types.NopEventBus{},
sm.MockMempool{}, sm.MockEvidencePool{})

// let's add some blocks in
Expand Down
9 changes: 4 additions & 5 deletions consensus/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,15 +270,14 @@ func newConsensusStateWithConfigAndBlockStore(thisConfig *cfg.Config, state sm.S
evpool := sm.MockEvidencePool{}

// Make ConsensusState
eventBus := types.NewEventBus()
eventBus.SetLogger(log.TestingLogger().With("module", "events"))
eventBus.Start()
stateDB := dbm.NewMemDB()
blockExec := sm.NewBlockExecutor(stateDB, log.TestingLogger(), proxyAppConnCon, mempool, evpool)
blockExec := sm.NewBlockExecutor(stateDB, log.TestingLogger(), proxyAppConnCon, eventBus, mempool, evpool)
cs := NewConsensusState(thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool)
cs.SetLogger(log.TestingLogger().With("module", "consensus"))
cs.SetPrivValidator(pv)

eventBus := types.NewEventBus()
eventBus.SetLogger(log.TestingLogger().With("module", "events"))
eventBus.Start()
cs.SetEventBus(eventBus)
return cs
}
Expand Down
8 changes: 4 additions & 4 deletions consensus/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,14 +147,14 @@ func TestReactorWithEvidence(t *testing.T) {
evpool := newMockEvidencePool(addr)

// Make ConsensusState
blockExec := sm.NewBlockExecutor(stateDB, log.TestingLogger(), proxyAppConnCon, mempool, evpool)
eventBus := types.NewEventBus()
eventBus.SetLogger(log.TestingLogger().With("module", "events"))
eventBus.Start()
blockExec := sm.NewBlockExecutor(stateDB, log.TestingLogger(), proxyAppConnCon, eventBus, mempool, evpool)
cs := NewConsensusState(thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool)
cs.SetLogger(log.TestingLogger().With("module", "consensus"))
cs.SetPrivValidator(pv)

eventBus := types.NewEventBus()
eventBus.SetLogger(log.TestingLogger().With("module", "events"))
eventBus.Start()
cs.SetEventBus(eventBus)

cs.SetTimeoutTicker(tickerFunc())
Expand Down
6 changes: 4 additions & 2 deletions consensus/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,19 +196,21 @@ type Handshaker struct {
stateDB dbm.DB
initialState sm.State
store sm.BlockStore
eventBus types.BlockEventPublisher
genDoc *types.GenesisDoc
logger log.Logger

nBlocks int // number of blocks applied to the state
}

func NewHandshaker(stateDB dbm.DB, state sm.State,
store sm.BlockStore, genDoc *types.GenesisDoc) *Handshaker {
store sm.BlockStore, eventBus types.BlockEventPublisher, genDoc *types.GenesisDoc) *Handshaker {

return &Handshaker{
stateDB: stateDB,
initialState: state,
store: store,
eventBus: eventBus,
genDoc: genDoc,
logger: log.NewNopLogger(),
nBlocks: 0,
Expand Down Expand Up @@ -431,7 +433,7 @@ func (h *Handshaker) replayBlock(state sm.State, height int64, proxyApp proxy.Ap
block := h.store.LoadBlock(height)
meta := h.store.LoadBlockMeta(height)

blockExec := sm.NewBlockExecutor(h.stateDB, h.logger, proxyApp, sm.MockMempool{}, sm.MockEvidencePool{})
blockExec := sm.NewBlockExecutor(h.stateDB, h.logger, proxyApp, h.eventBus, sm.MockMempool{}, sm.MockEvidencePool{})

var err error
state, err = blockExec.ApplyBlock(state, meta.BlockID, block)
Expand Down
14 changes: 7 additions & 7 deletions consensus/replay_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,19 +326,19 @@ func newConsensusStateForReplay(config cfg.BaseConfig, csConfig *cfg.ConsensusCo
cmn.Exit(fmt.Sprintf("Error starting proxy app conns: %v", err))
}

handshaker := NewHandshaker(stateDB, state, blockStore, gdoc)
err = handshaker.Handshake(proxyApp)
if err != nil {
cmn.Exit(fmt.Sprintf("Error on handshake: %v", err))
}

eventBus := types.NewEventBus()
if err := eventBus.Start(); err != nil {
cmn.Exit(fmt.Sprintf("Failed to start event bus: %v", err))
}

handshaker := NewHandshaker(stateDB, state, blockStore, eventBus, gdoc)
err = handshaker.Handshake(proxyApp)
if err != nil {
cmn.Exit(fmt.Sprintf("Error on handshake: %v", err))
}

mempool, evpool := sm.MockMempool{}, sm.MockEvidencePool{}
blockExec := sm.NewBlockExecutor(stateDB, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool)
blockExec := sm.NewBlockExecutor(stateDB, log.TestingLogger(), proxyApp.Consensus(), eventBus, mempool, evpool)

consensusState := NewConsensusState(csConfig, state.Copy(), blockExec,
blockStore, mempool, evpool)
Expand Down
6 changes: 3 additions & 3 deletions consensus/replay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ func testHandshakeReplay(t *testing.T, nBlocks int, mode uint) {

// now start the app using the handshake - it should sync
genDoc, _ := sm.MakeGenesisDocFromFile(config.GenesisFile())
handshaker := NewHandshaker(stateDB, state, store, genDoc)
handshaker := NewHandshaker(stateDB, state, store, types.NopEventBus{}, genDoc)
proxyApp := proxy.NewAppConns(clientCreator2)
if err := proxyApp.Start(); err != nil {
t.Fatalf("Error starting proxy app connections: %v", err)
Expand Down Expand Up @@ -387,7 +387,7 @@ func testHandshakeReplay(t *testing.T, nBlocks int, mode uint) {

func applyBlock(stateDB dbm.DB, st sm.State, blk *types.Block, proxyApp proxy.AppConns) sm.State {
testPartSize := types.BlockPartSizeBytes
blockExec := sm.NewBlockExecutor(stateDB, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool)
blockExec := sm.NewBlockExecutor(stateDB, log.TestingLogger(), proxyApp.Consensus(), types.NopEventBus{}, mempool, evpool)

blkID := types.BlockID{blk.Hash(), blk.MakePartSet(testPartSize).Header()}
newState, err := blockExec.ApplyBlock(st, blkID, blk)
Expand Down Expand Up @@ -640,7 +640,7 @@ func TestInitChainUpdateValidators(t *testing.T) {

// now start the app using the handshake - it should sync
genDoc, _ := sm.MakeGenesisDocFromFile(config.GenesisFile())
handshaker := NewHandshaker(stateDB, state, store, genDoc)
handshaker := NewHandshaker(stateDB, state, store, types.NopEventBus{}, genDoc)
proxyApp := proxy.NewAppConns(clientCreator)
if err := proxyApp.Start(); err != nil {
t.Fatalf("Error starting proxy app connections: %v", err)
Expand Down
1 change: 0 additions & 1 deletion consensus/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,6 @@ func (cs *ConsensusState) SetLogger(l log.Logger) {
// SetEventBus sets event bus.
func (cs *ConsensusState) SetEventBus(b *types.EventBus) {
cs.eventBus = b
cs.blockExec.SetEventBus(b)
}

// StateMetrics sets the metrics.
Expand Down
2 changes: 1 addition & 1 deletion consensus/wal_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func WALGenerateNBlocks(wr io.Writer, numBlocks int) (err error) {
defer eventBus.Stop()
mempool := sm.MockMempool{}
evpool := sm.MockEvidencePool{}
blockExec := sm.NewBlockExecutor(stateDB, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool)
blockExec := sm.NewBlockExecutor(stateDB, log.TestingLogger(), proxyApp.Consensus(), eventBus, mempool, evpool)
consensusState := NewConsensusState(config.Consensus, state.Copy(), blockExec, blockStore, mempool, evpool)
consensusState.SetLogger(logger)
consensusState.SetEventBus(eventBus)
Expand Down
66 changes: 38 additions & 28 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,45 @@ func NewNode(config *cfg.Config,
return nil, fmt.Errorf("Error starting proxy app connections: %v", err)
}

eventBus := types.NewEventBus()
eventBus.SetLogger(logger.With("module", "events"))

err = eventBus.Start()
if err != nil {
return nil, err
}

// Transaction indexing
var txIndexer txindex.TxIndexer
switch config.TxIndex.Indexer {
case "kv":
store, err := dbProvider(&DBContext{"tx_index", config})
if err != nil {
return nil, err
}
if config.TxIndex.IndexTags != "" {
txIndexer = kv.NewTxIndex(store, kv.IndexTags(splitAndTrimEmpty(config.TxIndex.IndexTags, ",", " ")))
} else if config.TxIndex.IndexAllTags {
txIndexer = kv.NewTxIndex(store, kv.IndexAllTags())
} else {
txIndexer = kv.NewTxIndex(store)
}
default:
txIndexer = &null.TxIndex{}
}

indexerService := txindex.NewIndexerService(txIndexer, eventBus)
indexerService.SetLogger(logger.With("module", "txindex"))

err = indexerService.Start()
if err != nil {
return nil, err
}

// Create the handshaker, which calls RequestInfo, sets the AppVersion on the state,
// and replays any blocks as necessary to sync tendermint with the app.
consensusLogger := logger.With("module", "consensus")
handshaker := cs.NewHandshaker(stateDB, state, blockStore, genDoc)
handshaker := cs.NewHandshaker(stateDB, state, blockStore, eventBus, genDoc)
handshaker.SetLogger(consensusLogger)
if err := handshaker.Handshake(proxyApp); err != nil {
return nil, fmt.Errorf("Error during handshake: %v", err)
Expand Down Expand Up @@ -317,6 +352,7 @@ func NewNode(config *cfg.Config,
stateDB,
blockExecLogger,
proxyApp.Consensus(),
eventBus,
mempool,
evidencePool,
sm.BlockExecutorWithMetrics(smMetrics),
Expand All @@ -343,35 +379,10 @@ func NewNode(config *cfg.Config,
consensusReactor := cs.NewConsensusReactor(consensusState, fastSync, cs.ReactorMetrics(csMetrics))
consensusReactor.SetLogger(consensusLogger)

eventBus := types.NewEventBus()
eventBus.SetLogger(logger.With("module", "events"))

// services which will be publishing and/or subscribing for messages (events)
// consensusReactor will set it on consensusState and blockExecutor
consensusReactor.SetEventBus(eventBus)

// Transaction indexing
var txIndexer txindex.TxIndexer
switch config.TxIndex.Indexer {
case "kv":
store, err := dbProvider(&DBContext{"tx_index", config})
if err != nil {
return nil, err
}
if config.TxIndex.IndexTags != "" {
txIndexer = kv.NewTxIndex(store, kv.IndexTags(splitAndTrimEmpty(config.TxIndex.IndexTags, ",", " ")))
} else if config.TxIndex.IndexAllTags {
txIndexer = kv.NewTxIndex(store, kv.IndexAllTags())
} else {
txIndexer = kv.NewTxIndex(store)
}
default:
txIndexer = &null.TxIndex{}
}

indexerService := txindex.NewIndexerService(txIndexer, eventBus)
indexerService.SetLogger(logger.With("module", "txindex"))

p2pLogger := logger.With("module", "p2p")
nodeInfo, err := makeNodeInfo(
config,
Expand Down Expand Up @@ -582,8 +593,7 @@ func (n *Node) OnStart() error {
}
}

// start tx indexer
return n.indexerService.Start()
return nil
}

// OnStop stops the Node. It implements cmn.Service.
Expand Down
11 changes: 2 additions & 9 deletions state/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,11 @@ func BlockExecutorWithMetrics(metrics *Metrics) BlockExecutorOption {

// NewBlockExecutor returns a new BlockExecutor with a NopEventBus.
// Call SetEventBus to provide one.
func NewBlockExecutor(db dbm.DB, logger log.Logger, proxyApp proxy.AppConnConsensus,
mempool Mempool, evpool EvidencePool, options ...BlockExecutorOption) *BlockExecutor {
func NewBlockExecutor(db dbm.DB, logger log.Logger, proxyApp proxy.AppConnConsensus, eventBus types.BlockEventPublisher, mempool Mempool, evpool EvidencePool, options ...BlockExecutorOption) *BlockExecutor {
res := &BlockExecutor{
db: db,
proxyApp: proxyApp,
eventBus: types.NopEventBus{},
eventBus: eventBus,
mempool: mempool,
evpool: evpool,
logger: logger,
Expand All @@ -68,12 +67,6 @@ func NewBlockExecutor(db dbm.DB, logger log.Logger, proxyApp proxy.AppConnConsen
return res
}

// SetEventBus - sets the event bus for publishing block related events.
// If not called, it defaults to types.NopEventBus.
func (blockExec *BlockExecutor) SetEventBus(eventBus types.BlockEventPublisher) {
blockExec.eventBus = eventBus
}

// CreateProposalBlock calls state.MakeBlock with evidence from the evpool
// and txs from the mempool. The max bytes must be big enough to fit the commit.
// Up to 1/10th of the block space is allcoated for maximum sized evidence.
Expand Down
7 changes: 3 additions & 4 deletions state/execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestApplyBlock(t *testing.T) {

state, stateDB := state(1, 1)

blockExec := NewBlockExecutor(stateDB, log.TestingLogger(), proxyApp.Consensus(),
blockExec := NewBlockExecutor(stateDB, log.TestingLogger(), proxyApp.Consensus(), types.NopEventBus{},
MockMempool{}, MockEvidencePool{})

block := makeBlock(state, 1)
Expand Down Expand Up @@ -309,13 +309,12 @@ func TestEndBlockValidatorUpdates(t *testing.T) {

state, stateDB := state(1, 1)

blockExec := NewBlockExecutor(stateDB, log.TestingLogger(), proxyApp.Consensus(),
MockMempool{}, MockEvidencePool{})
eventBus := types.NewEventBus()
err = eventBus.Start()
require.NoError(t, err)
defer eventBus.Stop()
blockExec.SetEventBus(eventBus)

blockExec := NewBlockExecutor(stateDB, log.TestingLogger(), proxyApp.Consensus(), eventBus, MockMempool{}, MockEvidencePool{})

updatesCh := make(chan interface{}, 1)
err = eventBus.Subscribe(context.Background(), "TestEndBlockValidatorUpdates", types.EventQueryValidatorSetUpdates, updatesCh)
Expand Down
2 changes: 1 addition & 1 deletion state/validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestValidateBlockHeader(t *testing.T) {
var height int64 = 1 // TODO(#2589): generalize
state, stateDB := state(1, int(height))

blockExec := NewBlockExecutor(stateDB, log.TestingLogger(), nil, nil, nil)
blockExec := NewBlockExecutor(stateDB, log.TestingLogger(), nil, types.NopEventBus{}, nil, nil)

// A good block passes.
block := makeBlock(state, height)
Expand Down

0 comments on commit 0424582

Please sign in to comment.