Skip to content

Commit

Permalink
Merge pull request #1256 from davecgh/server_onversion_use_local_for_…
Browse files Browse the repository at this point in the history
…net_addr

server: Use local addr var in version handler.
  • Loading branch information
Roasbeef committed Oct 13, 2018
2 parents 2a560b2 + 8c981e4 commit d2046ee
Show file tree
Hide file tree
Showing 5 changed files with 245 additions and 140 deletions.
20 changes: 20 additions & 0 deletions addrmgr/addrmanager.go
@@ -1,4 +1,5 @@
// Copyright (c) 2013-2016 The btcsuite developers
// Copyright (c) 2015-2018 The Decred developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.

Expand Down Expand Up @@ -937,6 +938,25 @@ func (a *AddrManager) Good(addr *wire.NetAddress) {
a.addrNew[newBucket][rmkey] = rmka
}

// SetServices sets the services for the giiven address to the provided value.
func (a *AddrManager) SetServices(addr *wire.NetAddress, services wire.ServiceFlag) {
a.mtx.Lock()
defer a.mtx.Unlock()

ka := a.find(addr)
if ka == nil {
return
}

// Update the services if needed.
if ka.na.Services != services {
// ka.na is immutable, so replace it.
naCopy := *ka.na
naCopy.Services = services
ka.na = &naCopy
}
}

// AddLocalAddress adds na to the list of known local addresses to advertise
// with the given priority.
func (a *AddrManager) AddLocalAddress(na *wire.NetAddress, priority AddressPriority) error {
Expand Down
6 changes: 4 additions & 2 deletions peer/example_test.go
@@ -1,4 +1,5 @@
// Copyright (c) 2015-2016 The btcsuite developers
// Copyright (c) 2015-2018 The btcsuite developers
// Copyright (c) 2016-2018 The Decred developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.

Expand Down Expand Up @@ -72,8 +73,9 @@ func Example_newOutboundPeer() {
Services: 0,
TrickleInterval: time.Second * 10,
Listeners: peer.MessageListeners{
OnVersion: func(p *peer.Peer, msg *wire.MsgVersion) {
OnVersion: func(p *peer.Peer, msg *wire.MsgVersion) *wire.MsgReject {
fmt.Println("outbound: received version")
return nil
},
OnVerAck: func(p *peer.Peer, msg *wire.MsgVerAck) {
verack <- struct{}{}
Expand Down
151 changes: 69 additions & 82 deletions peer/peer.go
Expand Up @@ -34,9 +34,9 @@ const (
// inv message to a peer.
DefaultTrickleInterval = 10 * time.Second

// minAcceptableProtocolVersion is the lowest protocol version that a
// MinAcceptableProtocolVersion is the lowest protocol version that a
// connected peer may support.
minAcceptableProtocolVersion = wire.MultipleAddressVersion
MinAcceptableProtocolVersion = wire.MultipleAddressVersion

// outputBufferSize is the number of elements the output channels use.
outputBufferSize = 50
Expand Down Expand Up @@ -187,7 +187,9 @@ type MessageListeners struct {
OnMerkleBlock func(p *Peer, msg *wire.MsgMerkleBlock)

// OnVersion is invoked when a peer receives a version bitcoin message.
OnVersion func(p *Peer, msg *wire.MsgVersion)
// The caller may return a reject message in which case the message will
// be sent to the peer and the peer will be disconnected.
OnVersion func(p *Peer, msg *wire.MsgVersion) *wire.MsgReject

// OnVerAck is invoked when a peer receives a verack bitcoin message.
OnVerAck func(p *Peer, msg *wire.MsgVerAck)
Expand Down Expand Up @@ -1369,7 +1371,7 @@ out:
p.stallControl <- stallControlMsg{sccHandlerStart, rmsg}
switch msg := rmsg.(type) {
case *wire.MsgVersion:

// Limit to one version message per peer.
p.PushRejectMsg(msg.Command(), wire.RejectDuplicate,
"duplicate version message", nil, true)
break out
Expand Down Expand Up @@ -1875,26 +1877,42 @@ func (p *Peer) Disconnect() {
close(p.quit)
}

// handleRemoteVersionMsg is invoked when a version bitcoin message is received
// from the remote peer. It will return an error if the remote peer's version
// is not compatible with ours.
func (p *Peer) handleRemoteVersionMsg(msg *wire.MsgVersion) error {
// readRemoteVersionMsg waits for the next message to arrive from the remote
// peer. If the next message is not a version message or the version is not
// acceptable then return an error.
func (p *Peer) readRemoteVersionMsg() error {
// Read their version message.
remoteMsg, _, err := p.readMessage(wire.LatestEncoding)
if err != nil {
return err
}

// Notify and disconnect clients if the first message is not a version
// message.
msg, ok := remoteMsg.(*wire.MsgVersion)
if !ok {
reason := "a version message must precede all others"
rejectMsg := wire.NewMsgReject(msg.Command(), wire.RejectMalformed,
reason)
_ = p.writeMessage(rejectMsg, wire.LatestEncoding)
return errors.New(reason)
}

// Detect self connections.
if !allowSelfConns && sentNonces.Exists(msg.Nonce) {
return errors.New("disconnecting peer connected to self")
}

// Notify and disconnect clients that have a protocol version that is
// too old.
//
// NOTE: If minAcceptableProtocolVersion is raised to be higher than
// wire.RejectVersion, this should send a reject packet before
// disconnecting.
if uint32(msg.ProtocolVersion) < minAcceptableProtocolVersion {
reason := fmt.Sprintf("protocol version must be %d or greater",
minAcceptableProtocolVersion)
return errors.New(reason)
}
// Negotiate the protocol version and set the services to what the remote
// peer advertised.
p.flagsMtx.Lock()
p.advertisedProtoVer = uint32(msg.ProtocolVersion)
p.protocolVersion = minUint32(p.protocolVersion, p.advertisedProtoVer)
p.versionKnown = true
p.services = msg.Services
p.flagsMtx.Unlock()
log.Debugf("Negotiated protocol version %d for peer %s",
p.protocolVersion, p)

// Updating a bunch of stats including block based stats, and the
// peer's time offset.
Expand All @@ -1904,22 +1922,10 @@ func (p *Peer) handleRemoteVersionMsg(msg *wire.MsgVersion) error {
p.timeOffset = msg.Timestamp.Unix() - time.Now().Unix()
p.statsMtx.Unlock()

// Negotiate the protocol version.
// Set the peer's ID, user agent, and potentially the flag which
// specifies the witness support is enabled.
p.flagsMtx.Lock()
p.advertisedProtoVer = uint32(msg.ProtocolVersion)
p.protocolVersion = minUint32(p.protocolVersion, p.advertisedProtoVer)
p.versionKnown = true
log.Debugf("Negotiated protocol version %d for peer %s",
p.protocolVersion, p)

// Set the peer's ID.
p.id = atomic.AddInt32(&nodeCount, 1)

// Set the supported services for the peer to what the remote peer
// advertised.
p.services = msg.Services

// Set the remote peer's user agent.
p.userAgent = msg.UserAgent

// Determine if the peer would like to receive witness data with
Expand All @@ -1938,36 +1944,33 @@ func (p *Peer) handleRemoteVersionMsg(msg *wire.MsgVersion) error {
p.wireEncoding = wire.WitnessEncoding
}

return nil
}

// readRemoteVersionMsg waits for the next message to arrive from the remote
// peer. If the next message is not a version message or the version is not
// acceptable then return an error.
func (p *Peer) readRemoteVersionMsg() error {
// Read their version message.
msg, _, err := p.readMessage(wire.LatestEncoding)
if err != nil {
return err
}

remoteVerMsg, ok := msg.(*wire.MsgVersion)
if !ok {
errStr := "A version message must precede all others"
log.Errorf(errStr)

rejectMsg := wire.NewMsgReject(msg.Command(), wire.RejectMalformed,
errStr)
return p.writeMessage(rejectMsg, wire.LatestEncoding)
// Invoke the callback if specified.
if p.cfg.Listeners.OnVersion != nil {
rejectMsg := p.cfg.Listeners.OnVersion(p, msg)
if rejectMsg != nil {
_ = p.writeMessage(rejectMsg, wire.LatestEncoding)
return errors.New(rejectMsg.Reason)
}
}

if err := p.handleRemoteVersionMsg(remoteVerMsg); err != nil {
return err
// Notify and disconnect clients that have a protocol version that is
// too old.
//
// NOTE: If minAcceptableProtocolVersion is raised to be higher than
// wire.RejectVersion, this should send a reject packet before
// disconnecting.
if uint32(msg.ProtocolVersion) < MinAcceptableProtocolVersion {
// Send a reject message indicating the protocol version is
// obsolete and wait for the message to be sent before
// disconnecting.
reason := fmt.Sprintf("protocol version must be %d or greater",
MinAcceptableProtocolVersion)
rejectMsg := wire.NewMsgReject(msg.Command(), wire.RejectObsolete,
reason)
_ = p.writeMessage(rejectMsg, wire.LatestEncoding)
return errors.New(reason)
}

if p.cfg.Listeners.OnVersion != nil {
p.cfg.Listeners.OnVersion(p, remoteVerMsg)
}
return nil
}

Expand All @@ -1992,7 +1995,8 @@ func (p *Peer) localVersionMsg() (*wire.MsgVersion, error) {
proxyaddress, _, err := net.SplitHostPort(p.cfg.Proxy)
// invalid proxy means poorly configured, be on the safe side.
if err != nil || p.na.IP.String() == proxyaddress {
theirNA = wire.NewNetAddressIPPort(net.IP([]byte{0, 0, 0, 0}), 0, 0)
theirNA = wire.NewNetAddressIPPort(net.IP([]byte{0, 0, 0, 0}), 0,
theirNA.Services)
}
}

Expand Down Expand Up @@ -2020,25 +2024,7 @@ func (p *Peer) localVersionMsg() (*wire.MsgVersion, error) {
msg.AddUserAgent(p.cfg.UserAgentName, p.cfg.UserAgentVersion,
p.cfg.UserAgentComments...)

// XXX: bitcoind appears to always enable the full node services flag
// of the remote peer netaddress field in the version message regardless
// of whether it knows it supports it or not. Also, bitcoind sets
// the services field of the local peer to 0 regardless of support.
//
// Realistically, this should be set as follows:
// - For outgoing connections:
// - Set the local netaddress services to what the local peer
// actually supports
// - Set the remote netaddress services to 0 to indicate no services
// as they are still unknown
// - For incoming connections:
// - Set the local netaddress services to what the local peer
// actually supports
// - Set the remote netaddress services to the what was advertised by
// by the remote peer in its version message
msg.AddrYou.Services = wire.SFNodeNetwork

// Advertise the services flag
// Advertise local services.
msg.Services = p.cfg.Services

// Advertise our max supported protocol version.
Expand Down Expand Up @@ -2099,9 +2085,11 @@ func (p *Peer) start() error {
select {
case err := <-negotiateErr:
if err != nil {
p.Disconnect()
return err
}
case <-time.After(negotiateTimeout):
p.Disconnect()
return errors.New("protocol negotiation timeout")
}
log.Debugf("Connected to %s", p.Addr())
Expand Down Expand Up @@ -2224,14 +2212,13 @@ func NewOutboundPeer(cfg *Config, addr string) (*Peer, error) {
}

if cfg.HostToNetAddress != nil {
na, err := cfg.HostToNetAddress(host, uint16(port), cfg.Services)
na, err := cfg.HostToNetAddress(host, uint16(port), 0)
if err != nil {
return nil, err
}
p.na = na
} else {
p.na = wire.NewNetAddressIPPort(net.ParseIP(host), uint16(port),
cfg.Services)
p.na = wire.NewNetAddressIPPort(net.ParseIP(host), uint16(port), 0)
}

return p, nil
Expand Down
63 changes: 62 additions & 1 deletion peer/peer_test.go
@@ -1,4 +1,5 @@
// Copyright (c) 2015-2016 The btcsuite developers
// Copyright (c) 2016-2018 The Decred developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.

Expand Down Expand Up @@ -431,8 +432,9 @@ func TestPeerListeners(t *testing.T) {
OnMerkleBlock: func(p *peer.Peer, msg *wire.MsgMerkleBlock) {
ok <- msg
},
OnVersion: func(p *peer.Peer, msg *wire.MsgVersion) {
OnVersion: func(p *peer.Peer, msg *wire.MsgVersion) *wire.MsgReject {
ok <- msg
return nil
},
OnVerAck: func(p *peer.Peer, msg *wire.MsgVerAck) {
verack <- struct{}{}
Expand Down Expand Up @@ -856,6 +858,65 @@ func TestUnsupportedVersionPeer(t *testing.T) {
}
}

// TestDuplicateVersionMsg ensures that receiving a version message after one
// has already been received results in the peer being disconnected.
func TestDuplicateVersionMsg(t *testing.T) {
// Create a pair of peers that are connected to each other using a fake
// connection.
verack := make(chan struct{})
peerCfg := &peer.Config{
Listeners: peer.MessageListeners{
OnVerAck: func(p *peer.Peer, msg *wire.MsgVerAck) {
verack <- struct{}{}
},
},
UserAgentName: "peer",
UserAgentVersion: "1.0",
ChainParams: &chaincfg.MainNetParams,
Services: 0,
}
inConn, outConn := pipe(
&conn{laddr: "10.0.0.1:9108", raddr: "10.0.0.2:9108"},
&conn{laddr: "10.0.0.2:9108", raddr: "10.0.0.1:9108"},
)
outPeer, err := peer.NewOutboundPeer(peerCfg, inConn.laddr)
if err != nil {
t.Fatalf("NewOutboundPeer: unexpected err: %v\n", err)
}
outPeer.AssociateConnection(outConn)
inPeer := peer.NewInboundPeer(peerCfg)
inPeer.AssociateConnection(inConn)
// Wait for the veracks from the initial protocol version negotiation.
for i := 0; i < 2; i++ {
select {
case <-verack:
case <-time.After(time.Second):
t.Fatal("verack timeout")
}
}
// Queue a duplicate version message from the outbound peer and wait until
// it is sent.
done := make(chan struct{})
outPeer.QueueMessage(&wire.MsgVersion{}, done)
select {
case <-done:
case <-time.After(time.Second):
t.Fatal("send duplicate version timeout")
}
// Ensure the peer that is the recipient of the duplicate version closes the
// connection.
disconnected := make(chan struct{}, 1)
go func() {
inPeer.WaitForDisconnect()
disconnected <- struct{}{}
}()
select {
case <-disconnected:
case <-time.After(time.Second):
t.Fatal("peer did not disconnect")
}
}

func init() {
// Allow self connection when running the tests.
peer.TstAllowSelfConns()
Expand Down

0 comments on commit d2046ee

Please sign in to comment.