Skip to content
This repository has been archived by the owner on Nov 2, 2018. It is now read-only.

Commit

Permalink
Make wallet rebroadcast transactions until they are confirmed
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisSchinnerl committed Jan 5, 2018
1 parent 62ef2aa commit 97dc84c
Show file tree
Hide file tree
Showing 4 changed files with 231 additions and 14 deletions.
9 changes: 9 additions & 0 deletions modules/wallet/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,15 @@ const (
// defragThreshold is the number of outputs a wallet is allowed before it is
// defragmented.
defragThreshold = 50

// rebroadcastInterval is the number of blocks the wallet will wait until
// it rebroadcasts an unconfirmed transaction by adding it to the
// transaction pool again.
rebroadcastInterval = 10

// rebroadcastMaxTries is the maximum number of times a transaction set
// will be rebroadcasted before the wallet stops tracking it
rebroadcastMaxTries = 10
)

var (
Expand Down
119 changes: 106 additions & 13 deletions modules/wallet/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,9 @@ func (w *Wallet) ProcessConsensusChange(cc modules.ConsensusChange) {
if err := w.applyHistory(w.dbTx, cc); err != nil {
w.log.Println("ERROR: failed to apply consensus change:", err)
}
if err := w.rebroadcastOldTransactions(w.dbTx, cc); err != nil {
w.log.Println("ERROR: failed to rebroadcast transactions:", err)
}
if err := dbPutConsensusChangeID(w.dbTx, cc.ID); err != nil {
w.log.Println("ERROR: failed to update consensus change ID:", err)
}
Expand All @@ -453,6 +456,89 @@ func (w *Wallet) ProcessConsensusChange(cc modules.ConsensusChange) {
}
}

// isRelevantTxn checks if a txn is relevant to the Wallet
func (w *Wallet) isRelevantTxn(txn types.Transaction) (relevant bool) {
// determine whether transaction is relevant to the wallet
for _, sci := range txn.SiacoinInputs {
relevant = relevant || w.isWalletAddress(sci.UnlockConditions.UnlockHash())
}
for _, sco := range txn.SiacoinOutputs {
relevant = relevant || w.isWalletAddress(sco.UnlockHash)
}
return
}

// rebroadcastOldTransaction rebroadcasts transactions that haven't been
// confirmed within rebroadcastInterval blocks
func (w *Wallet) rebroadcastOldTransactions(tx *bolt.Tx, cc modules.ConsensusChange) error {
// Get the current consensus height
consensusHeight, err := dbGetConsensusHeight(tx)
if err != nil {
return err
}

// Mark reverted transactions as not confirmed
for _, block := range cc.RevertedBlocks {
for _, bts := range w.broadcastedTSets {
for _, txn := range block.Transactions {
if _, exists := bts.confirmedTxn[txn.ID()]; exists {
bts.confirmedTxn[txn.ID()] = false
}
}
}
}

// Mark applied transactions as confirmed
for _, block := range cc.AppliedBlocks {
for _, bts := range w.broadcastedTSets {
for _, txn := range block.Transactions {
if _, exists := bts.confirmedTxn[txn.ID()]; exists {
bts.confirmedTxn[txn.ID()] = true
}
}
}
}

// Check if all transactions of the set are confirmed
for tSetID, bts := range w.broadcastedTSets {
confirmed := true
for _, c := range bts.confirmedTxn {
if !c {
confirmed = false
break
}
}
// If the transaction set has been confirmed for one broadcast cycle it
// should be safe to remove it
if confirmed && consensusHeight > bts.height+rebroadcastInterval {
delete(w.broadcastedTSets, tSetID)
continue
}
// If the transaction set has been confirmed recently we wait a little
// bit longer before we remove it
if confirmed {
continue
}
// If the transaction set is not confirmed and hasn't been broadcasted
// for rebroadcastInterval blocks we try to broadcast it again
if consensusHeight > bts.height+rebroadcastInterval {
bts.height = consensusHeight
bts.tries++
go func() {
w.mu.Lock()
defer w.mu.Unlock()
if err := w.tpool.AcceptTransactionSet(bts.transactions); err != nil {
w.log.Println("WARNING: Rebroadcast failed: ", err)
}
}()
if bts.tries >= rebroadcastMaxTries {
delete(w.broadcastedTSets, tSetID)
}
}
}
return nil
}

// ReceiveUpdatedUnconfirmedTransactions updates the wallet's unconfirmed
// transaction set.
func (w *Wallet) ReceiveUpdatedUnconfirmedTransactions(diff *modules.TransactionPoolDiff) {
Expand Down Expand Up @@ -501,9 +587,25 @@ func (w *Wallet) ReceiveUpdatedUnconfirmedTransactions(diff *modules.Transaction
for _, unconfirmedTxnSet := range diff.AppliedTransactions {
// Mark all of the transactions that appeared in this set.
//
// TODO: Technically only necessary to mark the ones that are relevant
// to the wallet, but overhead should be low.
w.unconfirmedSets[unconfirmedTxnSet.ID] = unconfirmedTxnSet.IDs
relevant := false
for i := 0; !relevant && i < len(unconfirmedTxnSet.Transactions); i++ {
relevant = relevant || w.isRelevantTxn(unconfirmedTxnSet.Transactions[i])
}
// Only add relevant unconfirmed transactions to this set. If at least
// a single txn is relevant, the set is also relevant.
if relevant {
w.unconfirmedSets[unconfirmedTxnSet.ID] = unconfirmedTxnSet.IDs

// If the unconfirmed set doesn't exist yet, add it to broadcastedTSets
var err error
var bts *broadcastedTSet
if _, exists := w.broadcastedTSets[unconfirmedTxnSet.ID]; !exists {
bts, err = w.newBroadcastedTSet(unconfirmedTxnSet.Transactions)
}
if err == nil {
w.broadcastedTSets[unconfirmedTxnSet.ID] = bts
}
}

// Get the values for the spent outputs.
spentSiacoinOutputs := make(map[types.SiacoinOutputID]types.SiacoinOutput)
Expand All @@ -517,17 +619,8 @@ func (w *Wallet) ReceiveUpdatedUnconfirmedTransactions(diff *modules.Transaction

// Add each transaction to our set of unconfirmed transactions.
for i, txn := range unconfirmedTxnSet.Transactions {
// determine whether transaction is relevant to the wallet
relevant := false
for _, sci := range txn.SiacoinInputs {
relevant = relevant || w.isWalletAddress(sci.UnlockConditions.UnlockHash())
}
for _, sco := range txn.SiacoinOutputs {
relevant = relevant || w.isWalletAddress(sco.UnlockHash)
}

// only create a ProcessedTransaction if txn is relevant
if !relevant {
if !w.isRelevantTxn(txn) {
continue
}

Expand Down
34 changes: 33 additions & 1 deletion modules/wallet/wallet.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,32 @@ type spendableKey struct {
SecretKeys []crypto.SecretKey
}

// broadcastedTSet is a helper struct to keep track of transaction sets and to
// help rebroadcast them.
type broadcastedTSet struct {
height types.BlockHeight
tries int
confirmedTxn map[types.TransactionID]bool
transactions []types.Transaction
}

// newBroadcastedTSet
func (w *Wallet) newBroadcastedTSet(tSet []types.Transaction) (bts *broadcastedTSet, err error) {
bts = &broadcastedTSet{}
// Set the height
bts.height, err = dbGetConsensusHeight(w.dbTx)
if err != nil {
return
}
// Initialize confirmedTxn and transactions
bts.confirmedTxn = make(map[types.TransactionID]bool)
for _, txn := range tSet {
bts.confirmedTxn[txn.ID()] = false
bts.transactions = append(bts.transactions, txn)
}
return
}

// Wallet is an object that tracks balances, creates keys and addresses,
// manages building and sending transactions.
type Wallet struct {
Expand Down Expand Up @@ -82,6 +108,11 @@ type Wallet struct {
unconfirmedSets map[modules.TransactionSetID][]types.TransactionID
unconfirmedProcessedTransactions []modules.ProcessedTransaction

// broadcastedTSets tracks transactions that have been sent to the
// transaction pool for broadcasting. This allows the wallet to rebroadcast
// them if necessary
broadcastedTSets map[modules.TransactionSetID]*broadcastedTSet

// The wallet's database tracks its seeds, keys, outputs, and
// transactions. A global db transaction is maintained in memory to avoid
// excessive disk writes. Any operations involving dbTx must hold an
Expand Down Expand Up @@ -127,7 +158,8 @@ func newWallet(cs modules.ConsensusSet, tpool modules.TransactionPool, persistDi
keys: make(map[types.UnlockHash]spendableKey),
lookahead: make(map[types.UnlockHash]uint64),

unconfirmedSets: make(map[modules.TransactionSetID][]types.TransactionID),
unconfirmedSets: make(map[modules.TransactionSetID][]types.TransactionID),
broadcastedTSets: make(map[modules.TransactionSetID]*broadcastedTSet),

persistDir: persistDir,

Expand Down
83 changes: 83 additions & 0 deletions modules/wallet/wallet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,3 +591,86 @@ func TestDistantWallets(t *testing.T) {
t.Fatal("wallet should not recognize coins sent to very high seed index")
}
}

// TestRebroadcastTransactions checks if transactions are correctly
// rebroadcasted after some time if they haven't been confirmed
func TestRebroadcastTransactions(t *testing.T) {
if testing.Short() {
t.SkipNow()
}
wt, err := createWalletTester(t.Name(), &ProductionDependencies{})
if err != nil {
t.Fatal(err)
}
defer wt.closeWt()

// Get an address to send money to
uc, err := wt.wallet.NextAddress()
if err != nil {
t.Fatal(err)
}
// Send money to the address
_, err = wt.wallet.SendSiacoins(types.SiacoinPrecision, uc.UnlockHash())
if err != nil {
t.Fatal(err)
}
// The wallet should track the new tSet
if len(wt.wallet.broadcastedTSets) != 1 {
t.Fatalf("len(broadcastedTSets) should be %v but was %v",
1, len(wt.wallet.broadcastedTSets))
}
// Mine enough blocks for the wallet to stop tracking the tSet
for i := 0; i < rebroadcastInterval+1; i++ {
if _, err := wt.miner.AddBlock(); err != nil {
t.Fatal(err)
}
}
if len(wt.wallet.broadcastedTSets) > 0 {
t.Fatalf("len(broadcastedTSets) should be 0 but was %v",
len(wt.wallet.broadcastedTSets))
}

// Send some more money to the address
tSet, err := wt.wallet.SendSiacoins(types.SiacoinPrecision, uc.UnlockHash())
if err != nil {
t.Fatal(err)
}
// The wallet should track the new tSet
if len(wt.wallet.broadcastedTSets) != 1 {
t.Fatalf("len(broadcastedTSets) should be %v but was %v",
1, len(wt.wallet.broadcastedTSets))
}
// Mine a block to get the tSet confirmed
if _, err := wt.miner.AddBlock(); err != nil {
t.Fatal(err)
}
// Corrupt the new tSet to make sure the wallet believes it is not confirmed
tSetID := modules.TransactionSetID(crypto.HashAll(tSet))
bts := wt.wallet.broadcastedTSets[tSetID]
for tid := range bts.confirmedTxn {
bts.confirmedTxn[tid] = false
}
// Mine the same number of blocks. This time the wallet should still track
// the tSet afterwards.
for i := 0; i < rebroadcastInterval+1; i++ {
if _, err := wt.miner.AddBlock(); err != nil {
t.Fatal(err)
}
}
if len(wt.wallet.broadcastedTSets) != 1 {
t.Fatalf("The wallet should still track the tSet")
}
if bts.tries != 1 {
t.Fatalf("The transaction set's tries counter should be 1")
}
// Continue mining to make sure that the wallet stops tracking the tSet
// once the max number of retries is reached
for i := 0; i < rebroadcastInterval*rebroadcastMaxTries; i++ {
if _, err := wt.miner.AddBlock(); err != nil {
t.Fatal(err)
}
}
if _, exists := wt.wallet.broadcastedTSets[tSetID]; exists {
t.Fatalf("Wallet should drop txnSet after %v tries", rebroadcastMaxTries)
}
}

0 comments on commit 97dc84c

Please sign in to comment.