Skip to content

Commit

Permalink
feat: async reactor receiving
Browse files Browse the repository at this point in the history
This commit is that port the following PR to ebony.
#135
  • Loading branch information
brew0722 authored and Jiyong Ha committed May 26, 2021
1 parent 2e4c6dd commit bc1b4af
Show file tree
Hide file tree
Showing 33 changed files with 416 additions and 112 deletions.
7 changes: 5 additions & 2 deletions blockchain/v0/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ type BlockchainReactor struct {

// NewBlockchainReactor returns new reactor instance.
func NewBlockchainReactor(state sm.State, blockExec *sm.BlockExecutor, store *store.BlockStore,
fastSync bool) *BlockchainReactor {
fastSync bool, async bool, recvBufSize int) *BlockchainReactor {

if state.LastBlockHeight != store.Height() {
panic(fmt.Sprintf("state (%v) and store (%v) height mismatch", state.LastBlockHeight,
Expand All @@ -90,7 +90,7 @@ func NewBlockchainReactor(state sm.State, blockExec *sm.BlockExecutor, store *st
requestsCh: requestsCh,
errorsCh: errorsCh,
}
bcR.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcR)
bcR.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcR, async, recvBufSize)
return bcR
}

Expand All @@ -103,6 +103,9 @@ func (bcR *BlockchainReactor) SetLogger(l log.Logger) {
// OnStart implements service.Service.
func (bcR *BlockchainReactor) OnStart() error {
if bcR.fastSync {
// call BaseReactor's OnStart()
bcR.BaseReactor.OnStart()

err := bcR.pool.Start()
if err != nil {
return err
Expand Down
39 changes: 25 additions & 14 deletions blockchain/v0/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ func newBlockchainReactor(
logger log.Logger,
genDoc *types.GenesisDoc,
privVals []types.PrivValidator,
maxBlockHeight int64) BlockchainReactorPair {
maxBlockHeight int64,
async bool,
recvBufSize int) BlockchainReactorPair {
if len(privVals) != 1 {
panic("only support one validator")
}
Expand Down Expand Up @@ -125,7 +127,7 @@ func newBlockchainReactor(
blockStore.SaveBlock(thisBlock, thisParts, lastCommit)
}

bcReactor := NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync)
bcReactor := NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync, async, recvBufSize)
bcReactor.SetLogger(logger.With("module", "blockchain"))

return BlockchainReactorPair{bcReactor, proxyApp}
Expand All @@ -140,10 +142,12 @@ func TestNoBlockResponse(t *testing.T) {

reactorPairs := make([]BlockchainReactorPair, 2)

reactorPairs[0] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, maxBlockHeight)
reactorPairs[1] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0)
reactorPairs[0] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, maxBlockHeight,
config.P2P.RecvAsync, config.P2P.BlockchainRecvBufSize)
reactorPairs[1] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0,
config.P2P.RecvAsync, config.P2P.BlockchainRecvBufSize)

p2p.MakeConnectedSwitches(config.P2P, 2, func(i int, s *p2p.Switch) *p2p.Switch {
p2p.MakeConnectedSwitches(config.P2P, 2, func(i int, s *p2p.Switch, config *cfg.P2PConfig) *p2p.Switch {
s.AddReactor("BLOCKCHAIN", reactorPairs[i].reactor)
return s

Expand Down Expand Up @@ -202,7 +206,7 @@ func TestBadBlockStopsPeer(t *testing.T) {

// Other chain needs a different validator set
otherGenDoc, otherPrivVals := randGenesisDoc(1, false, 30)
otherChain := newBlockchainReactor(log.TestingLogger(), otherGenDoc, otherPrivVals, maxBlockHeight)
otherChain := newBlockchainReactor(log.TestingLogger(), otherGenDoc, otherPrivVals, maxBlockHeight, config.P2P.RecvAsync, config.P2P.BlockchainRecvBufSize)

defer func() {
err := otherChain.reactor.Stop()
Expand All @@ -213,12 +217,17 @@ func TestBadBlockStopsPeer(t *testing.T) {

reactorPairs := make([]BlockchainReactorPair, 4)

reactorPairs[0] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, maxBlockHeight)
reactorPairs[1] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0)
reactorPairs[2] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0)
reactorPairs[3] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0)

switches := p2p.MakeConnectedSwitches(config.P2P, 4, func(i int, s *p2p.Switch) *p2p.Switch {
reactorPairs[0] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, maxBlockHeight,
config.P2P.RecvAsync, config.P2P.BlockchainRecvBufSize)
reactorPairs[1] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0,
config.P2P.RecvAsync, config.P2P.BlockchainRecvBufSize)
reactorPairs[2] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0,
config.P2P.RecvAsync, config.P2P.BlockchainRecvBufSize)
reactorPairs[3] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0,
config.P2P.RecvAsync, config.P2P.BlockchainRecvBufSize)

switches := p2p.MakeConnectedSwitches(config.P2P, 4, func(i int, s *p2p.Switch,
config *cfg.P2PConfig) *p2p.Switch {
s.AddReactor("BLOCKCHAIN", reactorPairs[i].reactor)
return s

Expand Down Expand Up @@ -254,10 +263,12 @@ func TestBadBlockStopsPeer(t *testing.T) {
// race, but can't be easily avoided.
reactorPairs[3].reactor.store = otherChain.reactor.store

lastReactorPair := newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0)
lastReactorPair := newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0,
config.P2P.RecvAsync, config.P2P.BlockchainRecvBufSize)
reactorPairs = append(reactorPairs, lastReactorPair)

switches = append(switches, p2p.MakeConnectedSwitches(config.P2P, 1, func(i int, s *p2p.Switch) *p2p.Switch {
switches = append(switches, p2p.MakeConnectedSwitches(config.P2P, 1, func(i int, s *p2p.Switch,
config *cfg.P2PConfig) *p2p.Switch {
s.AddReactor("BLOCKCHAIN", reactorPairs[len(reactorPairs)-1].reactor)
return s

Expand Down
9 changes: 6 additions & 3 deletions blockchain/v1/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ type BlockchainReactor struct {

// NewBlockchainReactor returns new reactor instance.
func NewBlockchainReactor(state sm.State, blockExec *sm.BlockExecutor, store *store.BlockStore,
fastSync bool) *BlockchainReactor {
fastSync bool, async bool, recvBufSize int) *BlockchainReactor {

if state.LastBlockHeight != store.Height() {
panic(fmt.Sprintf("state (%v) and store (%v) height mismatch", state.LastBlockHeight,
Expand Down Expand Up @@ -99,8 +99,8 @@ func NewBlockchainReactor(state sm.State, blockExec *sm.BlockExecutor, store *st
}
fsm := NewFSM(startHeight, bcR)
bcR.fsm = fsm
bcR.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcR)
// bcR.swReporter = behaviour.NewSwitchReporter(bcR.BaseReactor.Switch)
bcR.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcR, async, recvBufSize)
//bcR.swReporter = behaviour.NewSwitchReporter(bcR.BaseReactor.Switch)

return bcR
}
Expand Down Expand Up @@ -140,6 +140,9 @@ func (bcR *BlockchainReactor) SetLogger(l log.Logger) {
func (bcR *BlockchainReactor) OnStart() error {
bcR.swReporter = behaviour.NewSwitchReporter(bcR.BaseReactor.Switch)
if bcR.fastSync {
// call BaseReactor's OnStart()
bcR.BaseReactor.OnStart()

go bcR.poolRoutine()
}
return nil
Expand Down
14 changes: 9 additions & 5 deletions blockchain/v1/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ func newBlockchainReactor(
blockStore.SaveBlock(thisBlock, thisParts, lastCommit)
}

bcReactor := NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync)
bcReactor := NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync,
config.P2P.RecvAsync, config.P2P.BlockchainRecvBufSize)
bcReactor.SetLogger(logger.With("module", "blockchain"))

return bcReactor
Expand All @@ -160,7 +161,8 @@ func newBlockchainReactorPair(
maxBlockHeight int64) BlockchainReactorPair {

consensusReactor := &consensusReactorTest{}
consensusReactor.BaseReactor = *p2p.NewBaseReactor("Consensus reactor", consensusReactor)
consensusReactor.BaseReactor = *p2p.NewBaseReactor("Consensus reactor", consensusReactor,
config.P2P.RecvAsync, config.P2P.ConsensusRecvBufSize)

return BlockchainReactorPair{
newBlockchainReactor(t, logger, genDoc, privVals, maxBlockHeight),
Expand Down Expand Up @@ -193,7 +195,7 @@ func TestFastSyncNoBlockResponse(t *testing.T) {
reactorPairs[0] = newBlockchainReactorPair(t, logger, genDoc, privVals, maxBlockHeight)
reactorPairs[1] = newBlockchainReactorPair(t, logger, genDoc, privVals, 0)

p2p.MakeConnectedSwitches(config.P2P, 2, func(i int, s *p2p.Switch) *p2p.Switch {
p2p.MakeConnectedSwitches(config.P2P, 2, func(i int, s *p2p.Switch, config *cfg.P2PConfig) *p2p.Switch {
s.AddReactor("BLOCKCHAIN", reactorPairs[i].bcR)
s.AddReactor("CONSENSUS", reactorPairs[i].conR)
moduleName := fmt.Sprintf("blockchain-%v", i)
Expand Down Expand Up @@ -273,7 +275,8 @@ func TestFastSyncBadBlockStopsPeer(t *testing.T) {
reactorPairs[i] = newBlockchainReactorPair(t, logger[i], genDoc, privVals, height)
}

switches := p2p.MakeConnectedSwitches(config.P2P, numNodes, func(i int, s *p2p.Switch) *p2p.Switch {
switches := p2p.MakeConnectedSwitches(config.P2P, numNodes, func(i int, s *p2p.Switch,
config *cfg.P2PConfig) *p2p.Switch {
reactorPairs[i].conR.mtx.Lock()
s.AddReactor("BLOCKCHAIN", reactorPairs[i].bcR)
s.AddReactor("CONSENSUS", reactorPairs[i].conR)
Expand Down Expand Up @@ -315,7 +318,8 @@ outerFor:
lastReactorPair := newBlockchainReactorPair(t, lastLogger, genDoc, privVals, 0)
reactorPairs = append(reactorPairs, lastReactorPair)

switches = append(switches, p2p.MakeConnectedSwitches(config.P2P, 1, func(i int, s *p2p.Switch) *p2p.Switch {
switches = append(switches, p2p.MakeConnectedSwitches(config.P2P, 1, func(i int, s *p2p.Switch,
config *cfg.P2PConfig) *p2p.Switch {
s.AddReactor("BLOCKCHAIN", reactorPairs[len(reactorPairs)-1].bcR)
s.AddReactor("CONSENSUS", reactorPairs[len(reactorPairs)-1].conR)
moduleName := fmt.Sprintf("blockchain-%v", len(reactorPairs)-1)
Expand Down
16 changes: 16 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,16 @@ type P2PConfig struct { //nolint: maligned
HandshakeTimeout time.Duration `mapstructure:"handshake_timeout"`
DialTimeout time.Duration `mapstructure:"dial_timeout"`

// Reactor async receive
RecvAsync bool `mapstructure:"recv_async"`

// Size of receive buffer used in async receiving
PexRecvBufSize int `mapstructure:"pex_recv_buf_size"`
EvidenceRecvBufSize int `mapstructure:"evidence_recv_buf_size"`
MempoolRecvBufSize int `mapstructure:"mempool_recv_buf_size"`
ConsensusRecvBufSize int `mapstructure:"consensus_recv_buf_size"`
BlockchainRecvBufSize int `mapstructure:"blockchain_recv_buf_size"`

// Testing params.
// Force dial to fail
TestDialFail bool `mapstructure:"test_dial_fail"`
Expand Down Expand Up @@ -612,6 +622,12 @@ func DefaultP2PConfig() *P2PConfig {
AllowDuplicateIP: false,
HandshakeTimeout: 20 * time.Second,
DialTimeout: 3 * time.Second,
RecvAsync: true,
PexRecvBufSize: 1000,
EvidenceRecvBufSize: 1000,
MempoolRecvBufSize: 1000,
ConsensusRecvBufSize: 1000,
BlockchainRecvBufSize: 1000,
TestDialFail: false,
TestFuzz: false,
TestFuzzConfig: DefaultFuzzConnConfig(),
Expand Down
12 changes: 12 additions & 0 deletions config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,18 @@ allow_duplicate_ip = {{ .P2P.AllowDuplicateIP }}
handshake_timeout = "{{ .P2P.HandshakeTimeout }}"
dial_timeout = "{{ .P2P.DialTimeout }}"
# Sync/async of reactor's receive function
recv_async = {{ .P2P.RecvAsync }}
# Size of channel buffer of reactor
pex_recv_buf_size = {{ .P2P.PexRecvBufSize }}
mempool_recv_buf_size = {{ .P2P.MempoolRecvBufSize }}
evidence_recv_buf_size = {{ .P2P.EvidenceRecvBufSize }}
consensus_recv_buf_size = {{ .P2P.ConsensusRecvBufSize }}
blockchain_recv_buf_size = {{ .P2P.BlockchainRecvBufSize }}
##### mempool configuration options #####
#######################################################
### Mempool Configuration Option ###
#######################################################
Expand Down
18 changes: 13 additions & 5 deletions consensus/byzantine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"testing"
"time"

config2 "github.com/line/ostracon/config"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -101,7 +102,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
blocksSubs := make([]types.Subscription, 0)
eventBuses := make([]*types.EventBus, nValidators)
for i := 0; i < nValidators; i++ {
reactors[i] = NewReactor(css[i], true) // so we dont start the consensus states
reactors[i] = NewReactor(css[i], true, true, 1000) // so we dont start the consensus states
reactors[i].SetLogger(css[i].Logger)

// eventBus is already started with the cs
Expand All @@ -118,7 +119,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
}
}
// make connected switches and start all reactors
p2p.MakeConnectedSwitches(config.P2P, nValidators, func(i int, s *p2p.Switch) *p2p.Switch {
p2p.MakeConnectedSwitches(config.P2P, nValidators, func(i int, s *p2p.Switch, config *config2.P2PConfig) *p2p.Switch {
s.AddReactor("CONSENSUS", reactors[i])
s.SetLogger(reactors[i].conS.Logger.With("module", "p2p"))
return s
Expand Down Expand Up @@ -297,7 +298,7 @@ func TestByzantineConflictingProposalsWithPartition(t *testing.T) {
config.P2P,
i,
"foo", "1.0.0",
func(i int, sw *p2p.Switch) *p2p.Switch {
func(i int, sw *p2p.Switch, config *config2.P2PConfig) *p2p.Switch {
return sw
})
switches[i].SetLogger(p2pLogger.With("validator", i))
Expand Down Expand Up @@ -331,7 +332,7 @@ func TestByzantineConflictingProposalsWithPartition(t *testing.T) {
blocksSubs[i], err = eventBus.Subscribe(context.Background(), testSubscriber, types.EventQueryNewBlock)
require.NoError(t, err)

conR := NewReactor(css[i], true) // so we don't start the consensus states
conR := NewReactor(css[i], true, true, 1000) // so we don't start the consensus states
conR.SetLogger(logger.With("validator", i))
conR.SetEventBus(eventBus)

Expand Down Expand Up @@ -359,7 +360,7 @@ func TestByzantineConflictingProposalsWithPartition(t *testing.T) {
}
}()

p2p.MakeConnectedSwitches(config.P2P, N, func(i int, s *p2p.Switch) *p2p.Switch {
p2p.MakeConnectedSwitches(config.P2P, N, func(i int, s *p2p.Switch, config *config2.P2PConfig) *p2p.Switch {
// ignore new switch s, we already made ours
switches[i].AddReactor("CONSENSUS", reactors[i])
return switches[i]
Expand Down Expand Up @@ -552,3 +553,10 @@ func (br *ByzantineReactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {
br.reactor.Receive(chID, peer, msgBytes)
}
func (br *ByzantineReactor) InitPeer(peer p2p.Peer) p2p.Peer { return peer }
func (br *ByzantineReactor) RecvRoutine() {
br.reactor.RecvRoutine()
}

func (br *ByzantineReactor) GetRecvChan() chan *p2p.BufferedMsg {
return br.reactor.GetRecvChan()
}
9 changes: 6 additions & 3 deletions consensus/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,13 @@ type ReactorOption func(*Reactor)

// NewReactor returns a new Reactor with the given
// consensusState.
func NewReactor(consensusState *State, waitSync bool, options ...ReactorOption) *Reactor {
func NewReactor(consensusState *State, fastSync bool, async bool, recvBufSize int, options ...ReactorOption) *Reactor {
conR := &Reactor{
conS: consensusState,
waitSync: waitSync,
waitSync: fastSync,
Metrics: NopMetrics(),
}
conR.BaseReactor = *p2p.NewBaseReactor("Consensus", conR)
conR.BaseReactor = *p2p.NewBaseReactor("Consensus", conR, async, recvBufSize)

for _, option := range options {
option(conR)
Expand All @@ -74,6 +74,9 @@ func NewReactor(consensusState *State, waitSync bool, options ...ReactorOption)
func (conR *Reactor) OnStart() error {
conR.Logger.Info("Reactor ", "waitSync", conR.WaitSync())

// call BaseReactor's OnStart()
conR.BaseReactor.OnStart()

// start routine that computes peer statistics for evaluating peer quality
go conR.peerStatsRoutine()

Expand Down
4 changes: 2 additions & 2 deletions consensus/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func startConsensusNet(t *testing.T, css []*State, n int) (
for i := 0; i < n; i++ {
/*logger, err := tmflags.ParseLogLevel("consensus:info,*:error", logger, "info")
if err != nil { t.Fatal(err)}*/
reactors[i] = NewReactor(css[i], true) // so we dont start the consensus states
reactors[i] = NewReactor(css[i], true, true, 1000) // so we dont start the consensus states
reactors[i].SetLogger(css[i].Logger)

// eventBus is already started with the cs
Expand All @@ -73,7 +73,7 @@ func startConsensusNet(t *testing.T, css []*State, n int) (
}
}
// make connected switches and start all reactors
p2p.MakeConnectedSwitches(config.P2P, n, func(i int, s *p2p.Switch) *p2p.Switch {
p2p.MakeConnectedSwitches(config.P2P, n, func(i int, s *p2p.Switch, config *cfg.P2PConfig) *p2p.Switch {
s.AddReactor("CONSENSUS", reactors[i])
s.SetLogger(reactors[i].conS.Logger.With("module", "p2p"))
return s
Expand Down

0 comments on commit bc1b4af

Please sign in to comment.