Skip to content

Commit

Permalink
spv: Add non-wallet-backed mempool to syncer
Browse files Browse the repository at this point in the history
This adds a mempool on the SPV Syncer so that it can actually relay
transactions that are not relevant to the wallet.

Transactions that are relevant (i.e. stored in the wallet's DB) don't
need to be stored in the mempool since they are already accessible when
a GetData request comes in from a peer.
  • Loading branch information
matheusd committed Apr 10, 2020
1 parent fee9b28 commit 65b5361
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 2 deletions.
23 changes: 23 additions & 0 deletions spv/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,32 @@ func (s *Syncer) LoadTxFilter(ctx context.Context, reload bool, addrs []dcrutil.
// PublishTransactions implements the PublishTransaction method of the
// wallet.Peer interface.
func (s *Syncer) PublishTransactions(ctx context.Context, txs ...*wire.MsgTx) error {
// Figure out transactions that are not stored by the wallet and create
// an aux map so we can choose which need to be stored in the syncer's
// mempool.
walletBacked := make(map[chainhash.Hash]bool, len(txs))
relevant, _, err := s.wallet.DetermineRelevantTxs(ctx, txs...)
if err != nil {
return err
}
for _, tx := range relevant {
walletBacked[tx.TxHash()] = true
}

msg := wire.NewMsgInvSizeHint(uint(len(txs)))
for _, tx := range txs {
txHash := tx.TxHash()
if !walletBacked[txHash] {
// Load into the mempool and let the mempool handler
// know of it.
if _, loaded := s.mempool.LoadOrStore(txHash, tx); !loaded {
select {
case s.mempoolAdds <- &txHash:
case <-ctx.Done():
return ctx.Err()
}
}
}
err := msg.AddInvVect(wire.NewInvVect(wire.InvTypeTx, &txHash))
if err != nil {
return errors.E(errors.Protocol, err)
Expand Down
40 changes: 38 additions & 2 deletions spv/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ type Syncer struct {

// Holds all potential callbacks used to notify clients
notifications *Notifications

// Mempool for non-wallet-relevant transactions.
mempool sync.Map // k=chainhash.Hash v=*wire.MsgTx
mempoolAdds chan *chainhash.Hash
}

// Notifications struct to contain all of the upcoming callbacks that will
Expand Down Expand Up @@ -118,6 +122,7 @@ func NewSyncer(w *wallet.Wallet, lp *p2p.LocalPeer) *Syncer {
rescanFilter: wallet.NewRescanFilter(nil, nil),
seenTxs: lru.NewCache(2000),
lp: lp,
mempoolAdds: make(chan *chainhash.Hash),
}
}

Expand Down Expand Up @@ -304,6 +309,8 @@ func (s *Syncer) Run(ctx context.Context) error {
g.Go(func() error { return s.connectToCandidates(ctx) })
}

g.Go(func() error { return s.handleMempool(ctx) })

s.wallet.SetNetworkBackend(s)
defer s.wallet.SetNetworkBackend(nil)

Expand Down Expand Up @@ -554,8 +561,16 @@ func (s *Syncer) receiveGetData(ctx context.Context) error {
rp.RemoteAddr(), err)
return
}
if len(missing) != 0 {
notFound = append(notFound, missing...)

// For the missing ones, attempt to search in
// the non-wallet-relevant syncer mempool.
for _, miss := range missing {
if v, ok := s.mempool.Load(miss.Hash); ok {
tx := v.(*wire.MsgTx)
foundTxs = append(foundTxs, tx)
continue
}
notFound = append(notFound, miss)
}
}

Expand Down Expand Up @@ -1279,3 +1294,24 @@ func (s *Syncer) startupSync(ctx context.Context, rp *p2p.RemotePeer) error {
}
return nil
}

// handleMempool handles eviction from the local mempool of non-wallet-backed
// transactions. It MUST be run as a goroutine.
func (s *Syncer) handleMempool(ctx context.Context) error {
const mempoolEvictionTimeout = 60 * time.Minute

for {
select {
case txHash := <-s.mempoolAdds:
go func() {
select {
case <-ctx.Done():
case <-time.After(mempoolEvictionTimeout):
s.mempool.Delete(txHash)
}
}()
case <-ctx.Done():
return ctx.Err()
}
}
}
18 changes: 18 additions & 0 deletions wallet/wallet.go
Original file line number Diff line number Diff line change
Expand Up @@ -4090,6 +4090,24 @@ func (w *Wallet) isRelevantTx(dbtx walletdb.ReadTx, tx *wire.MsgTx) bool {
return false
}

// DetermineRelevantTxs splits the given transactions into slices of relevant
// and non-wallet-relevant transactions (respectively).
func (w *Wallet) DetermineRelevantTxs(ctx context.Context, txs ...*wire.MsgTx) ([]*wire.MsgTx, []*wire.MsgTx, error) {
var relevant, nonRelevant []*wire.MsgTx
err := walletdb.View(ctx, w.db, func(dbtx walletdb.ReadTx) error {
for _, tx := range txs {
switch w.isRelevantTx(dbtx, tx) {
case true:
relevant = append(relevant, tx)
default:
nonRelevant = append(nonRelevant, tx)
}
}
return nil
})
return relevant, nonRelevant, err
}

func (w *Wallet) appendRelevantOutpoints(relevant []wire.OutPoint, dbtx walletdb.ReadTx, tx *wire.MsgTx) []wire.OutPoint {
addrmgrNs := dbtx.ReadBucket(waddrmgrNamespaceKey)

Expand Down

0 comments on commit 65b5361

Please sign in to comment.