Skip to content

Commit

Permalink
[P2P] Bug fix and small change of whitelist
Browse files Browse the repository at this point in the history
- Fix add or remove whitelist conf was not applied immediately
- Change behavior of whitelist not to affect outbounding peers
  • Loading branch information
hayarobi committed Sep 3, 2019
1 parent d1f6e4b commit 3de0486
Show file tree
Hide file tree
Showing 8 changed files with 61 additions and 39 deletions.
37 changes: 22 additions & 15 deletions p2p/p2p.go
Expand Up @@ -86,7 +86,7 @@ func (p2ps *P2P) AfterStart() {
}
p2ps.lm.Start()
p2ps.mutex.Lock()
p2ps.setSelfRole()
p2ps.checkConsensus()
p2ps.Logger.Info().Array("supportedVersions", p2putil.NewLogStringersMarshaller(versions, 10)).Str("role", p2ps.selfRole.String()).Msg("Starting p2p component")
nt := p2ps.nt
nt.Start()
Expand All @@ -99,19 +99,14 @@ func (p2ps *P2P) AfterStart() {
p2ps.mm.Start()
}

func (p2ps *P2P) setSelfRole() {
func (p2ps *P2P) checkConsensus() {
// set role of self peer
ccinfo := p2ps.consacc.ConsensusInfo()
if ccinfo.Type == "raft" {
if !p2ps.useRaft {
panic("configuration failure. ")
panic("configuration failure. consensus type of genesis block and consensus accessor are differ")
}
}
if p2ps.cfg.Consensus.EnableBp {
p2ps.selfRole = p2pcommon.BlockProducer
} else {
p2ps.selfRole = p2pcommon.Watcher
}
}

// BeforeStop is called before actor hub stops. it finishes underlying peer manager
Expand Down Expand Up @@ -178,6 +173,12 @@ func (p2ps *P2P) initP2P(cfg *config.Config, chainSvc *chain.ChainService) {

useRaft := genesis.ConsensusType() == consensus.ConsensusName[consensus.ConsensusRAFT]
p2ps.useRaft = useRaft
if p2ps.cfg.Consensus.EnableBp {
p2ps.selfRole = p2pcommon.BlockProducer
} else {
p2ps.selfRole = p2pcommon.Watcher
}

netTransport := transport.NewNetworkTransport(cfg.P2P, p2ps.Logger)
signer := newDefaultMsgSigner(p2pkey.NodePrivKey(), p2pkey.NodePubKey(), p2pkey.NodeID())

Expand Down Expand Up @@ -284,30 +285,36 @@ func (p2ps *P2P) Receive(context actor.Context) {
case message.GetRaftTransport:
context.Respond(raftsupport.NewAergoRaftTransport(p2ps.Logger, p2ps.nt, p2ps.pm, p2ps.mf, p2ps.consacc, msg.Cluster))
case message.P2PWhiteListConfEnableEvent:
p2ps.Logger.Debug().Bool("enabled", msg.On).Msg("p2p whitelist conf changed")
p2ps.Logger.Debug().Bool("enabled", msg.On).Msg("p2p whitelist on/off changed")
// TODO do more fine grained work
p2ps.lm.RefineList()
// disconnect newly blacklisted peer.
p2ps.banIfFound()
p2ps.checkAndBanInboundPeers()
case message.P2PWhiteListConfSetEvent:
p2ps.Logger.Debug().Array("enabled", p2putil.NewLogStringsMarshaller(msg.Values, 10)).Msg("p2p whitelist conf changed")
p2ps.Logger.Debug().Array("entries", p2putil.NewLogStringsMarshaller(msg.Values, 10)).Msg("p2p whitelist entries changed")
// TODO do more fine grained work
p2ps.lm.RefineList()
// disconnect newly blacklisted peer.
p2ps.banIfFound()
p2ps.checkAndBanInboundPeers()
}
}

func (p2ps *P2P) banIfFound() {
func (p2ps *P2P) checkAndBanInboundPeers() {
for _, peer := range p2ps.pm.GetPeers() {
// FIXME ip check should be currently connected ip address
ip, err := network.GetSingleIPAddress(peer.Meta().IPAddress)
if err != nil {
p2ps.Error().Str(p2putil.LogPeerName, peer.Name()).Err(err).Msg("Failed to get ip address of peer")
p2ps.Warn().Str(p2putil.LogPeerName, peer.Name()).Err(err).Msg("Failed to get ip address of peer")
continue
}
// TODO temporal treatment. need more works.
// just inbound peers will be disconnected
if peer.Meta().Outbound {
p2ps.Debug().Str(p2putil.LogPeerName, peer.Name()).Err(err).Msg("outbound peer is not banned")
continue
}
if banned, _ := p2ps.lm.IsBanned(ip.String(), peer.ID()); banned {
p2ps.Error().Str(p2putil.LogPeerName, peer.Name()).Msg("peer is banned by list manager")
p2ps.Info().Str(p2putil.LogPeerName, peer.Name()).Msg("peer is banned by list manager")
peer.Stop()
}
}
Expand Down
4 changes: 2 additions & 2 deletions p2p/p2p_test.go
Expand Up @@ -121,7 +121,7 @@ func TestP2P_banIfFound(t *testing.T) {
for i:=0; i<sampleCnt; i++ {
mPeer := p2pmock.NewMockRemotePeer(ctrl)
mPeer.EXPECT().ID().Return(pids[i])
mPeer.EXPECT().Meta().Return(p2pcommon.PeerMeta{IPAddress:addr})
mPeer.EXPECT().Meta().Return(p2pcommon.PeerMeta{IPAddress:addr, Outbound:false}).MinTimes(1)
mPeer.EXPECT().Name().Return("peer "+pids[i].ShortString()).AnyTimes()
if tt.inWhite[i] == 0 {
mPeer.EXPECT().Stop()
Expand All @@ -137,7 +137,7 @@ func TestP2P_banIfFound(t *testing.T) {
}
p2ps.BaseComponent = component.NewBaseComponent(message.P2PSvc, p2ps, log.NewLogger("p2p"))

p2ps.banIfFound()
p2ps.checkAndBanInboundPeers()
})
}
}
7 changes: 5 additions & 2 deletions p2p/p2pcommon/peerrole.go
Expand Up @@ -14,14 +14,17 @@ const (
BlockProducer
Watcher
_
RaftProducer // node that is ready to produce a block (can be a leader or follower)
RaftWatcher // node that is not produce block.
)
//go:generate stringer -type=PeerRole

type PeerRoleManager interface {
UpdateBP(toAdd []types.PeerID, toRemove []types.PeerID)

// SelfRole returns role of this peer itself
SelfRole() PeerRole
// GetRole returns role of remote peer
GetRole(pid types.PeerID) PeerRole
// NotifyNewBlockMsg selects target peers with the appropriate role and sends them a NewBlockNotice
NotifyNewBlockMsg(mo MsgOrder, peers []RemotePeer) (skipped, sent int)
}
//go:generate mockgen -source=peerrole.go -package=p2pmock -destination=../p2pmock/mock_peerrole.go
Expand Down
19 changes: 4 additions & 15 deletions p2p/p2pcommon/peerrole_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions p2p/p2pmock/mock_peerrole.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions p2p/rolemanager.go
Expand Up @@ -38,6 +38,10 @@ func (rm *RaftRoleManager) UpdateBP(toAdd []types.PeerID, toRemove []types.PeerI
rm.p2ps.pm.UpdatePeerRole(changes)
}

func (rm *RaftRoleManager) SelfRole() p2pcommon.PeerRole {
return rm.p2ps.selfRole
}

func (rm *RaftRoleManager) GetRole(pid types.PeerID) p2pcommon.PeerRole {
rm.raftMutex.Lock()
defer rm.raftMutex.Unlock()
Expand Down Expand Up @@ -78,6 +82,10 @@ func (rm *DefaultRoleManager) UpdateBP(toAdd []types.PeerID, toRemove []types.Pe
rm.p2ps.pm.UpdatePeerRole(changes)
}

func (rm *DefaultRoleManager) SelfRole() p2pcommon.PeerRole {
return rm.p2ps.selfRole
}

func (rm *DefaultRoleManager) GetRole(pid types.PeerID) p2pcommon.PeerRole {
prettyID := pid.Pretty()
bps := rm.p2ps.consacc.ConsensusInfo().Bps
Expand Down
9 changes: 5 additions & 4 deletions p2p/waitpeermanager.go
Expand Up @@ -155,10 +155,11 @@ func (dpm *basePeerManager) connectWaitingPeers(maxJob int) {
if _, exist := dpm.workingJobs[wp.Meta.ID]; exist {
continue
}
if banned, _ := dpm.lm.IsBanned(wp.Meta.IPAddress, wp.Meta.ID); banned {
dpm.logger.Info().Str(p2putil.LogPeerName, p2putil.ShortMetaForm(wp.Meta)).Msg("Skipping banned peer")
continue
}
// 2019.09.02 connecting to outbound peer is not affected by whitelist. inbound peer will block
//if banned, _ := dpm.lm.IsBanned(wp.Meta.IPAddress, wp.Meta.ID); banned {
// dpm.logger.Info().Str(p2putil.LogPeerName, p2putil.ShortMetaForm(wp.Meta)).Msg("Skipping banned peer")
// continue
//}
dpm.logger.Info().Int("trial", wp.TrialCnt).Str(p2putil.LogPeerID, p2putil.ShortForm(wp.Meta.ID)).Msg("Starting scheduled try to connect peer")

dpm.workingJobs[wp.Meta.ID] = ConnWork{Meta: wp.Meta, PeerID: wp.Meta.ID, StartTime: time.Now()}
Expand Down
2 changes: 1 addition & 1 deletion rpc/rpc.go
Expand Up @@ -233,7 +233,7 @@ func (ns *RPC) Receive(context actor.Context) {
if err := json.Unmarshal([]byte(e.JsonArgs), &values); err != nil {
return
}
msg := &message.P2PWhiteListConfSetEvent{
msg := message.P2PWhiteListConfSetEvent{
Values: values,
}
ns.TellTo(message.P2PSvc, msg)
Expand Down

0 comments on commit 3de0486

Please sign in to comment.