Skip to content

Commit

Permalink
tendermint#3186 revert remove setter of eventBus on block executor
Browse files Browse the repository at this point in the history
  • Loading branch information
ackratos committed Jan 28, 2019
1 parent c624fac commit 0f58d4f
Show file tree
Hide file tree
Showing 13 changed files with 48 additions and 28 deletions.
2 changes: 1 addition & 1 deletion blockchain/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func newBlockchainReactor(logger log.Logger, genDoc *types.GenesisDoc, privVals
// NOTE we have to create and commit the blocks first because
// pool.height is determined from the store.
fastSync := true
blockExec := sm.NewBlockExecutor(dbm.NewMemDB(), log.TestingLogger(), proxyApp.Consensus(), types.NopEventBus{},
blockExec := sm.NewBlockExecutor(dbm.NewMemDB(), log.TestingLogger(), proxyApp.Consensus(),
sm.MockMempool{}, sm.MockEvidencePool{})

// let's add some blocks in
Expand Down
9 changes: 5 additions & 4 deletions consensus/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,14 +270,15 @@ func newConsensusStateWithConfigAndBlockStore(thisConfig *cfg.Config, state sm.S
evpool := sm.MockEvidencePool{}

// Make ConsensusState
eventBus := types.NewEventBus()
eventBus.SetLogger(log.TestingLogger().With("module", "events"))
eventBus.Start()
stateDB := dbm.NewMemDB()
blockExec := sm.NewBlockExecutor(stateDB, log.TestingLogger(), proxyAppConnCon, eventBus, mempool, evpool)
blockExec := sm.NewBlockExecutor(stateDB, log.TestingLogger(), proxyAppConnCon, mempool, evpool)
cs := NewConsensusState(thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool)
cs.SetLogger(log.TestingLogger().With("module", "consensus"))
cs.SetPrivValidator(pv)

eventBus := types.NewEventBus()
eventBus.SetLogger(log.TestingLogger().With("module", "events"))
eventBus.Start()
cs.SetEventBus(eventBus)
return cs
}
Expand Down
8 changes: 4 additions & 4 deletions consensus/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,14 +147,14 @@ func TestReactorWithEvidence(t *testing.T) {
evpool := newMockEvidencePool(addr)

// Make ConsensusState
eventBus := types.NewEventBus()
eventBus.SetLogger(log.TestingLogger().With("module", "events"))
eventBus.Start()
blockExec := sm.NewBlockExecutor(stateDB, log.TestingLogger(), proxyAppConnCon, eventBus, mempool, evpool)
blockExec := sm.NewBlockExecutor(stateDB, log.TestingLogger(), proxyAppConnCon, mempool, evpool)
cs := NewConsensusState(thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool)
cs.SetLogger(log.TestingLogger().With("module", "consensus"))
cs.SetPrivValidator(pv)

eventBus := types.NewEventBus()
eventBus.SetLogger(log.TestingLogger().With("module", "events"))
eventBus.Start()
cs.SetEventBus(eventBus)

cs.SetTimeoutTicker(tickerFunc())
Expand Down
13 changes: 10 additions & 3 deletions consensus/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,13 +204,13 @@ type Handshaker struct {
}

func NewHandshaker(stateDB dbm.DB, state sm.State,
store sm.BlockStore, eventBus types.BlockEventPublisher, genDoc *types.GenesisDoc) *Handshaker {
store sm.BlockStore, genDoc *types.GenesisDoc) *Handshaker {

return &Handshaker{
stateDB: stateDB,
initialState: state,
store: store,
eventBus: eventBus,
eventBus: types.NopEventBus{},
genDoc: genDoc,
logger: log.NewNopLogger(),
nBlocks: 0,
Expand All @@ -221,6 +221,12 @@ func (h *Handshaker) SetLogger(l log.Logger) {
h.logger = l
}

// SetEventBus - sets the event bus for publishing block related events.
// If not called, it defaults to types.NopEventBus.
func (h *Handshaker) SetEventBus(eventBus types.BlockEventPublisher) {
h.eventBus = eventBus
}

func (h *Handshaker) NBlocks() int {
return h.nBlocks
}
Expand Down Expand Up @@ -433,7 +439,8 @@ func (h *Handshaker) replayBlock(state sm.State, height int64, proxyApp proxy.Ap
block := h.store.LoadBlock(height)
meta := h.store.LoadBlockMeta(height)

blockExec := sm.NewBlockExecutor(h.stateDB, h.logger, proxyApp, h.eventBus, sm.MockMempool{}, sm.MockEvidencePool{})
blockExec := sm.NewBlockExecutor(h.stateDB, h.logger, proxyApp, sm.MockMempool{}, sm.MockEvidencePool{})
blockExec.SetEventBus(h.eventBus)

var err error
state, err = blockExec.ApplyBlock(state, meta.BlockID, block)
Expand Down
5 changes: 3 additions & 2 deletions consensus/replay_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,14 +331,15 @@ func newConsensusStateForReplay(config cfg.BaseConfig, csConfig *cfg.ConsensusCo
cmn.Exit(fmt.Sprintf("Failed to start event bus: %v", err))
}

handshaker := NewHandshaker(stateDB, state, blockStore, eventBus, gdoc)
handshaker := NewHandshaker(stateDB, state, blockStore, gdoc)
handshaker.SetEventBus(eventBus)
err = handshaker.Handshake(proxyApp)
if err != nil {
cmn.Exit(fmt.Sprintf("Error on handshake: %v", err))
}

mempool, evpool := sm.MockMempool{}, sm.MockEvidencePool{}
blockExec := sm.NewBlockExecutor(stateDB, log.TestingLogger(), proxyApp.Consensus(), eventBus, mempool, evpool)
blockExec := sm.NewBlockExecutor(stateDB, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool)

consensusState := NewConsensusState(csConfig, state.Copy(), blockExec,
blockStore, mempool, evpool)
Expand Down
6 changes: 3 additions & 3 deletions consensus/replay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ func testHandshakeReplay(t *testing.T, nBlocks int, mode uint) {

// now start the app using the handshake - it should sync
genDoc, _ := sm.MakeGenesisDocFromFile(config.GenesisFile())
handshaker := NewHandshaker(stateDB, state, store, types.NopEventBus{}, genDoc)
handshaker := NewHandshaker(stateDB, state, store, genDoc)
proxyApp := proxy.NewAppConns(clientCreator2)
if err := proxyApp.Start(); err != nil {
t.Fatalf("Error starting proxy app connections: %v", err)
Expand Down Expand Up @@ -387,7 +387,7 @@ func testHandshakeReplay(t *testing.T, nBlocks int, mode uint) {

func applyBlock(stateDB dbm.DB, st sm.State, blk *types.Block, proxyApp proxy.AppConns) sm.State {
testPartSize := types.BlockPartSizeBytes
blockExec := sm.NewBlockExecutor(stateDB, log.TestingLogger(), proxyApp.Consensus(), types.NopEventBus{}, mempool, evpool)
blockExec := sm.NewBlockExecutor(stateDB, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool)

blkID := types.BlockID{blk.Hash(), blk.MakePartSet(testPartSize).Header()}
newState, err := blockExec.ApplyBlock(st, blkID, blk)
Expand Down Expand Up @@ -640,7 +640,7 @@ func TestInitChainUpdateValidators(t *testing.T) {

// now start the app using the handshake - it should sync
genDoc, _ := sm.MakeGenesisDocFromFile(config.GenesisFile())
handshaker := NewHandshaker(stateDB, state, store, types.NopEventBus{}, genDoc)
handshaker := NewHandshaker(stateDB, state, store, genDoc)
proxyApp := proxy.NewAppConns(clientCreator)
if err := proxyApp.Start(); err != nil {
t.Fatalf("Error starting proxy app connections: %v", err)
Expand Down
1 change: 1 addition & 0 deletions consensus/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ func (cs *ConsensusState) SetLogger(l log.Logger) {
// SetEventBus sets event bus.
func (cs *ConsensusState) SetEventBus(b *types.EventBus) {
cs.eventBus = b
cs.blockExec.SetEventBus(b)
}

// StateMetrics sets the metrics.
Expand Down
2 changes: 1 addition & 1 deletion consensus/wal_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func WALGenerateNBlocks(wr io.Writer, numBlocks int) (err error) {
defer eventBus.Stop()
mempool := sm.MockMempool{}
evpool := sm.MockEvidencePool{}
blockExec := sm.NewBlockExecutor(stateDB, log.TestingLogger(), proxyApp.Consensus(), eventBus, mempool, evpool)
blockExec := sm.NewBlockExecutor(stateDB, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool)
consensusState := NewConsensusState(config.Consensus, state.Copy(), blockExec, blockStore, mempool, evpool)
consensusState.SetLogger(logger)
consensusState.SetEventBus(eventBus)
Expand Down
8 changes: 6 additions & 2 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,10 @@ func NewNode(config *cfg.Config,
return nil, fmt.Errorf("Error starting proxy app connections: %v", err)
}

// EventBus and IndexerService must be started before the handshake because
// we might need to index the txs of the replayed block as this might not have happened
// when the node stopped last time (i.e. the node stopped after it saved the block
// but before it indexed the txs, or, endblocker panicked)
eventBus := types.NewEventBus()
eventBus.SetLogger(logger.With("module", "events"))

Expand Down Expand Up @@ -255,8 +259,9 @@ func NewNode(config *cfg.Config,
// Create the handshaker, which calls RequestInfo, sets the AppVersion on the state,
// and replays any blocks as necessary to sync tendermint with the app.
consensusLogger := logger.With("module", "consensus")
handshaker := cs.NewHandshaker(stateDB, state, blockStore, eventBus, genDoc)
handshaker := cs.NewHandshaker(stateDB, state, blockStore, genDoc)
handshaker.SetLogger(consensusLogger)
handshaker.SetEventBus(eventBus)
if err := handshaker.Handshake(proxyApp); err != nil {
return nil, fmt.Errorf("Error during handshake: %v", err)
}
Expand Down Expand Up @@ -352,7 +357,6 @@ func NewNode(config *cfg.Config,
stateDB,
blockExecLogger,
proxyApp.Consensus(),
eventBus,
mempool,
evidencePool,
sm.BlockExecutorWithMetrics(smMetrics),
Expand Down
1 change: 0 additions & 1 deletion node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,6 @@ func TestCreateProposalBlock(t *testing.T) {
stateDB,
logger,
proxyApp.Consensus(),
types.NopEventBus{},
mempool,
evidencePool,
)
Expand Down
10 changes: 8 additions & 2 deletions state/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@ func BlockExecutorWithMetrics(metrics *Metrics) BlockExecutorOption {

// NewBlockExecutor returns a new BlockExecutor with a NopEventBus.
// Call SetEventBus to provide one.
func NewBlockExecutor(db dbm.DB, logger log.Logger, proxyApp proxy.AppConnConsensus, eventBus types.BlockEventPublisher, mempool Mempool, evpool EvidencePool, options ...BlockExecutorOption) *BlockExecutor {
func NewBlockExecutor(db dbm.DB, logger log.Logger, proxyApp proxy.AppConnConsensus, mempool Mempool, evpool EvidencePool, options ...BlockExecutorOption) *BlockExecutor {
res := &BlockExecutor{
db: db,
proxyApp: proxyApp,
eventBus: eventBus,
eventBus: types.NopEventBus{},
mempool: mempool,
evpool: evpool,
logger: logger,
Expand All @@ -67,6 +67,12 @@ func NewBlockExecutor(db dbm.DB, logger log.Logger, proxyApp proxy.AppConnConsen
return res
}

// SetEventBus - sets the event bus for publishing block related events.
// If not called, it defaults to types.NopEventBus.
func (blockExec *BlockExecutor) SetEventBus(eventBus types.BlockEventPublisher) {
blockExec.eventBus = eventBus
}

// CreateProposalBlock calls state.MakeBlock with evidence from the evpool
// and txs from the mempool. The max bytes must be big enough to fit the commit.
// Up to 1/10th of the block space is allcoated for maximum sized evidence.
Expand Down
7 changes: 4 additions & 3 deletions state/execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestApplyBlock(t *testing.T) {

state, stateDB := state(1, 1)

blockExec := NewBlockExecutor(stateDB, log.TestingLogger(), proxyApp.Consensus(), types.NopEventBus{},
blockExec := NewBlockExecutor(stateDB, log.TestingLogger(), proxyApp.Consensus(),
MockMempool{}, MockEvidencePool{})

block := makeBlock(state, 1)
Expand Down Expand Up @@ -309,12 +309,13 @@ func TestEndBlockValidatorUpdates(t *testing.T) {

state, stateDB := state(1, 1)

blockExec := NewBlockExecutor(stateDB, log.TestingLogger(), proxyApp.Consensus(), MockMempool{}, MockEvidencePool{})

eventBus := types.NewEventBus()
err = eventBus.Start()
require.NoError(t, err)
defer eventBus.Stop()

blockExec := NewBlockExecutor(stateDB, log.TestingLogger(), proxyApp.Consensus(), eventBus, MockMempool{}, MockEvidencePool{})
blockExec.SetEventBus(eventBus)

updatesCh := make(chan interface{}, 1)
err = eventBus.Subscribe(context.Background(), "TestEndBlockValidatorUpdates", types.EventQueryValidatorSetUpdates, updatesCh)
Expand Down
4 changes: 2 additions & 2 deletions state/validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestValidateBlockHeader(t *testing.T) {
var height int64 = 1 // TODO(#2589): generalize
state, stateDB := state(1, int(height))

blockExec := NewBlockExecutor(stateDB, log.TestingLogger(), nil, types.NopEventBus{}, nil, nil)
blockExec := NewBlockExecutor(stateDB, log.TestingLogger(), nil, nil, nil)

// A good block passes.
block := makeBlock(state, height)
Expand Down Expand Up @@ -94,7 +94,7 @@ func TestValidateBlockEvidence(t *testing.T) {
var height int64 = 1 // TODO(#2589): generalize
state, stateDB := state(1, int(height))

blockExec := NewBlockExecutor(stateDB, log.TestingLogger(), nil, types.NopEventBus{}, nil, nil)
blockExec := NewBlockExecutor(stateDB, log.TestingLogger(), nil, nil, nil)

// make some evidence
addr, _ := state.Validators.GetByIndex(0)
Expand Down

0 comments on commit 0f58d4f

Please sign in to comment.