Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/OpenBazaar/spvwallet
Browse files Browse the repository at this point in the history
  • Loading branch information
cpacia committed Aug 30, 2017
2 parents 9dff3e7 + 468f7d6 commit 773769f
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 42 deletions.
2 changes: 1 addition & 1 deletion eight333.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func (w *SPVWallet) Rebroadcast() {
if len(invMsg.InvList) == 0 { // nothing to broadcast, so don't
return
}
for _, peer := range w.peerManager.ConnectedPeers() {
for _, peer := range w.peerManager.ReadyPeers() {
peer.QueueMessage(invMsg, nil)
}
}
74 changes: 36 additions & 38 deletions peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@ package spvwallet

import (
"errors"
"net"
"strconv"
"sync"
"time"

"github.com/btcsuite/btcd/addrmgr"
"github.com/btcsuite/btcd/chaincfg"
"github.com/btcsuite/btcd/chaincfg/chainhash"
Expand All @@ -10,10 +15,6 @@ import (
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil/bloom"
"golang.org/x/net/proxy"
"net"
"strconv"
"sync"
"time"
)

var (
Expand Down Expand Up @@ -77,9 +78,10 @@ type PeerManager struct {

sourceAddr *wire.NetAddress

peerConfig *peer.Config
connectedPeers map[uint64]*peer.Peer
peerMutex *sync.RWMutex
peerConfig *peer.Config
openPeers map[uint64]*peer.Peer
readyPeers map[*peer.Peer]struct{}
peerMutex *sync.RWMutex

trustedPeer net.Addr
downloadPeer *peer.Peer
Expand All @@ -103,7 +105,8 @@ func NewPeerManager(config *PeerManagerConfig) (*PeerManager, error) {
pm := &PeerManager{
addrManager: addrmgr.New(config.AddressCacheDir, nil),
peerMutex: new(sync.RWMutex),
connectedPeers: make(map[uint64]*peer.Peer),
openPeers: make(map[uint64]*peer.Peer),
readyPeers: make(map[*peer.Peer]struct{}),
downloadQueues: make(map[int32]map[chainhash.Hash]int32),
sourceAddr: wire.NewNetAddressIPPort(net.ParseIP("0.0.0.0"), defaultPort, 0),
trustedPeer: config.TrustedPeer,
Expand Down Expand Up @@ -170,11 +173,11 @@ func NewPeerManager(config *PeerManagerConfig) (*PeerManager, error) {
return pm, nil
}

func (pm *PeerManager) ConnectedPeers() []*peer.Peer {
func (pm *PeerManager) ReadyPeers() []*peer.Peer {
var peers []*peer.Peer
pm.peerMutex.RLock()
defer pm.peerMutex.RUnlock()
for _, peer := range pm.connectedPeers {
for peer := range pm.readyPeers {
peers = append(peers, peer)
}
return peers
Expand All @@ -189,7 +192,7 @@ func (pm *PeerManager) onConnection(req *connmgr.ConnReq, conn net.Conn) {
pm.peerMutex.Lock()
defer pm.peerMutex.Unlock()
if pm.proxy == nil {
for _, peer := range pm.connectedPeers {
for _, peer := range pm.openPeers {
if conn.RemoteAddr().String() == peer.Addr() {
pm.connManager.Disconnect(req.ID())
return
Expand All @@ -204,8 +207,8 @@ func (pm *PeerManager) onConnection(req *connmgr.ConnReq, conn net.Conn) {
return
}

// Add to connected peers
pm.connectedPeers[req.ID()] = p
// Add to open peers
pm.openPeers[req.ID()] = p

// Associate the connection with the peer
p.AssociateConnection(conn)
Expand All @@ -225,15 +228,8 @@ func (pm *PeerManager) onVerack(p *peer.Peer, msg *wire.MsgVerAck) {
p.NA().Services = p.Services()
if !(p.NA().HasService(wire.SFNodeBloom) && p.NA().HasService(wire.SFNodeNetwork)) ||
p.NA().HasService(SFNodeBitcoinCash) { // Don't connect to bitcoin cash nodes
pm.peerMutex.Lock()
for id, peer := range pm.connectedPeers {
if peer.ID() == p.ID() {
delete(pm.connectedPeers, id)
pm.connManager.Disconnect(id)
break
}
}
pm.peerMutex.Unlock()
// onDisconnection will be called
// which will remove the peer from openPeers
p.Disconnect()
return
}
Expand All @@ -249,6 +245,7 @@ func (pm *PeerManager) onVerack(p *peer.Peer, msg *wire.MsgVerAck) {
p.QueueMessage(filter.MsgFilterLoad(), nil)

pm.peerMutex.Lock()
pm.readyPeers[p] = struct{}{}
if pm.downloadPeer == nil {
pm.setDownloadPeer(p)
}
Expand All @@ -259,27 +256,26 @@ func (pm *PeerManager) onDisconnection(req *connmgr.ConnReq) {
// Remove from connected peers
pm.peerMutex.Lock()
defer pm.peerMutex.Unlock()
peer, ok := pm.connectedPeers[req.ID()]
if ok {
log.Debugf("Peer%d disconnected", peer.ID())
delete(pm.connectedPeers, req.ID())
_, ok := pm.downloadQueues[peer.ID()]
if ok {
delete(pm.downloadQueues, peer.ID())
}
peer, ok := pm.openPeers[req.ID()]
if !ok {
return
}
pm.connManager.Disconnect(req.ID())

log.Debugf("Peer%d disconnected", peer.ID())
delete(pm.openPeers, req.ID())
delete(pm.downloadQueues, peer.ID())
delete(pm.readyPeers, peer)

// If this was our download peer we lost, replace him
if pm.downloadPeer != nil && peer != nil {
if pm.downloadPeer.ID() == peer.ID() {
go pm.selectNewDownlaodPeer()
go pm.selectNewDownloadPeer()
}
}
}

func (pm *PeerManager) selectNewDownlaodPeer() {
for _, peer := range pm.connectedPeers {
func (pm *PeerManager) selectNewDownloadPeer() {
for peer := range pm.readyPeers {
pm.setDownloadPeer(peer)
break
}
Expand Down Expand Up @@ -327,7 +323,7 @@ func (pm *PeerManager) CheckForMoreBlocks(height uint32) bool {
defer pm.peerMutex.RUnlock()

moar := false
for _, peer := range pm.connectedPeers {
for peer := range pm.readyPeers {
if uint32(peer.LastBlock()) > height {
pm.downloadPeer = peer
go pm.startChainDownload(peer)
Expand Down Expand Up @@ -397,10 +393,10 @@ func (pm *PeerManager) queryDNSSeeds() {
// If we have connected peers let's use them to get more addresses. If not, use the DNS seeds
func (pm *PeerManager) getMoreAddresses() {
if pm.addrManager.NeedMoreAddresses() {
if len(pm.connectedPeers) > 0 {
if len(pm.readyPeers) > 0 {
pm.peerMutex.RLock()
log.Debug("Querying peers for more addresses")
for _, peer := range pm.connectedPeers {
for peer := range pm.readyPeers {
peer.QueueMessage(wire.NewMsgGetAddr(), nil)
}
pm.peerMutex.RUnlock()
Expand Down Expand Up @@ -438,13 +434,15 @@ func (pm *PeerManager) Stop() {
pm.peerMutex.Lock()
defer pm.peerMutex.Unlock()
wg := new(sync.WaitGroup)
for _, peer := range pm.connectedPeers {
for _, peer := range pm.openPeers {
wg.Add(1)
go func() {
// onDisconnection will be called.
peer.Disconnect()
peer.WaitForDisconnect()
wg.Done()
}()
}
pm.openPeers = make(map[uint64]*peer.Peer)
wg.Wait()
}
2 changes: 1 addition & 1 deletion sortsignsend.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (s *SPVWallet) Broadcast(tx *wire.MsgTx) error {
}

log.Debugf("Broadcasting tx %s to peers", tx.TxHash().String())
for _, peer := range s.peerManager.ConnectedPeers() {
for _, peer := range s.peerManager.ReadyPeers() {
peer.QueueMessage(invMsg, nil)
s.updateFilterAndSend(peer)
}
Expand Down
4 changes: 2 additions & 2 deletions wallet.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func (w *SPVWallet) Mnemonic() string {
}

func (w *SPVWallet) ConnectedPeers() []*peer.Peer {
return w.peerManager.ConnectedPeers()
return w.peerManager.ReadyPeers()
}

func (w *SPVWallet) CurrentAddress(purpose KeyPurpose) btc.Address {
Expand Down Expand Up @@ -356,7 +356,7 @@ func (w *SPVWallet) AddWatchedScript(script []byte) error {
err := w.txstore.WatchedScripts().Put(script)
w.txstore.PopulateAdrs()

for _, peer := range w.peerManager.ConnectedPeers() {
for _, peer := range w.peerManager.ReadyPeers() {
w.updateFilterAndSend(peer)
}
return err
Expand Down

0 comments on commit 773769f

Please sign in to comment.