From 45d49cff0fb02e0744c4c1bad94a865e363fb55c Mon Sep 17 00:00:00 2001 From: lukechampine Date: Thu, 19 Jan 2017 17:32:58 -0500 Subject: [PATCH] fix tpool locking bug I was just following orders --- modules/consensus.go | 2 ++ modules/consensus/accept.go | 3 +- modules/consensus/subscribe.go | 4 +++ modules/consensus/validtransaction.go | 44 ++++++++++++++++++++------- modules/transactionpool/accept.go | 40 ++++++++++++++---------- modules/transactionpool/update.go | 2 +- modules/wallet/defrag_test.go | 2 -- 7 files changed, 65 insertions(+), 32 deletions(-) diff --git a/modules/consensus.go b/modules/consensus.go index 4c82652764..f8e1037312 100644 --- a/modules/consensus.go +++ b/modules/consensus.go @@ -123,6 +123,8 @@ type ( // Synced indicates whether or not the ConsensusSet is synced with its // peers. Synced bool + + TryTransactionSet func([]types.Transaction) (ConsensusChange, error) } // A SiacoinOutputDiff indicates the addition or removal of a SiacoinOutput in diff --git a/modules/consensus/accept.go b/modules/consensus/accept.go index 8768b008f4..e9ac6eb8f1 100644 --- a/modules/consensus/accept.go +++ b/modules/consensus/accept.go @@ -283,11 +283,10 @@ func (cs *ConsensusSet) managedAcceptBlock(b types.Block) error { } // Updates complete, demote the lock. - cs.mu.Demote() - defer cs.mu.DemotedUnlock() if len(changeEntry.AppliedBlocks) > 0 { cs.readlockUpdateSubscribers(changeEntry) } + cs.mu.Unlock() return nil } diff --git a/modules/consensus/subscribe.go b/modules/consensus/subscribe.go index 5077e8966b..3571da2adc 100644 --- a/modules/consensus/subscribe.go +++ b/modules/consensus/subscribe.go @@ -86,6 +86,10 @@ func (cs *ConsensusSet) computeConsensusChange(tx *bolt.Tx, ce changeEntry) (mod if cs.synced && recentBlock == currentBlock { cc.Synced = true } + + // Add the unexported tryTransactionSet function. + cc.TryTransactionSet = cs.tryTransactionSet + return cc, nil } diff --git a/modules/consensus/validtransaction.go b/modules/consensus/validtransaction.go index 0229281ca5..76fd6a61fc 100644 --- a/modules/consensus/validtransaction.go +++ b/modules/consensus/validtransaction.go @@ -313,20 +313,12 @@ func validTransaction(tx *bolt.Tx, t types.Transaction) error { return nil } -// TryTransactionSet applies the input transactions to the consensus set to +// tryTransactionSet applies the input transactions to the consensus set to // determine if they are valid. An error is returned IFF they are not a valid // set in the current consensus set. The size of the transactions and the set // is not checked. After the transactions have been validated, a consensus // change is returned detailing the diffs that the transaciton set would have. -func (cs *ConsensusSet) TryTransactionSet(txns []types.Transaction) (modules.ConsensusChange, error) { - err := cs.tg.Add() - if err != nil { - return modules.ConsensusChange{}, err - } - defer cs.tg.Done() - cs.mu.RLock() - defer cs.mu.RUnlock() - +func (cs *ConsensusSet) tryTransactionSet(txns []types.Transaction) (modules.ConsensusChange, error) { // applyTransaction will apply the diffs from a transaction and store them // in a block node. diffHolder is the blockNode that tracks the temporary // changes. At the end of the function, all changes that were made to the @@ -339,7 +331,7 @@ func (cs *ConsensusSet) TryTransactionSet(txns []types.Transaction) (modules.Con // manually manage the tx instead of using 'Update', but that has safety // concerns and is more difficult to implement correctly. errSuccess := errors.New("success") - err = cs.db.Update(func(tx *bolt.Tx) error { + err := cs.db.Update(func(tx *bolt.Tx) error { diffHolder.Height = blockHeight(tx) for _, txn := range txns { err := validTransaction(tx, txn) @@ -362,3 +354,33 @@ func (cs *ConsensusSet) TryTransactionSet(txns []types.Transaction) (modules.Con } return cc, nil } + +// TryTransactionSet applies the input transactions to the consensus set to +// determine if they are valid. An error is returned IFF they are not a valid +// set in the current consensus set. The size of the transactions and the set +// is not checked. After the transactions have been validated, a consensus +// change is returned detailing the diffs that the transaciton set would have. +func (cs *ConsensusSet) TryTransactionSet(txns []types.Transaction) (modules.ConsensusChange, error) { + err := cs.tg.Add() + if err != nil { + return modules.ConsensusChange{}, err + } + defer cs.tg.Done() + cs.mu.RLock() + defer cs.mu.RUnlock() + return cs.tryTransactionSet(txns) +} + +// LockedTryTransactionSet calls fn while under read-lock, passing it a +// version of TryTransactionSet that can be called under read-lock. This fixes +// an edge case in the transaction pool. +func (cs *ConsensusSet) LockedTryTransactionSet(fn func(func(txns []types.Transaction) (modules.ConsensusChange, error)) error) error { + err := cs.tg.Add() + if err != nil { + return err + } + defer cs.tg.Done() + cs.mu.RLock() + defer cs.mu.RUnlock() + return fn(cs.tryTransactionSet) +} diff --git a/modules/transactionpool/accept.go b/modules/transactionpool/accept.go index f409ed8ea3..937b4dbead 100644 --- a/modules/transactionpool/accept.go +++ b/modules/transactionpool/accept.go @@ -136,7 +136,7 @@ func (tp *TransactionPool) checkTransactionSetComposition(ts []types.Transaction // handleConflicts detects whether the conflicts in the transaction pool are // legal children of the new transaction pool set or not. -func (tp *TransactionPool) handleConflicts(ts []types.Transaction, conflicts []TransactionSetID) error { +func (tp *TransactionPool) handleConflicts(ts []types.Transaction, conflicts []TransactionSetID, txnFn func([]types.Transaction) (modules.ConsensusChange, error)) error { // Create a list of all the transaction ids that compose the set of // conflicts. conflictMap := make(map[types.TransactionID]TransactionSetID) @@ -176,7 +176,7 @@ func (tp *TransactionPool) handleConflicts(ts []types.Transaction, conflicts []T conflicts = append(conflicts, conflict) } } - return tp.handleConflicts(dedupSet, conflicts) + return tp.handleConflicts(dedupSet, conflicts, txnFn) } // Merge all of the conflict sets with the input set (input set goes last @@ -203,7 +203,7 @@ func (tp *TransactionPool) handleConflicts(ts []types.Transaction, conflicts []T } // Check that the transaction set is valid. - cc, err := tp.consensusSet.TryTransactionSet(superset) + cc, err := txnFn(superset) if err != nil { return modules.NewConsensusConflict("provided transaction set has prereqs, but is still invalid: " + err.Error()) } @@ -235,7 +235,7 @@ func (tp *TransactionPool) handleConflicts(ts []types.Transaction, conflicts []T // acceptTransactionSet verifies that a transaction set is allowed to be in the // transaction pool, and then adds it to the transaction pool. -func (tp *TransactionPool) acceptTransactionSet(ts []types.Transaction) error { +func (tp *TransactionPool) acceptTransactionSet(ts []types.Transaction, txnFn func([]types.Transaction) (modules.ConsensusChange, error)) error { if len(ts) == 0 { return errEmptySet } @@ -278,9 +278,9 @@ func (tp *TransactionPool) acceptTransactionSet(ts []types.Transaction) error { } } if len(conflicts) > 0 { - return tp.handleConflicts(ts, conflicts) + return tp.handleConflicts(ts, conflicts, txnFn) } - cc, err := tp.consensusSet.TryTransactionSet(ts) + cc, err := txnFn(ts) if err != nil { return modules.NewConsensusConflict("provided transaction set is standalone and invalid: " + err.Error()) } @@ -300,18 +300,26 @@ func (tp *TransactionPool) acceptTransactionSet(ts []types.Transaction) error { // transactions. If the transaction is accepted, it will be relayed to // connected peers. func (tp *TransactionPool) AcceptTransactionSet(ts []types.Transaction) error { - tp.mu.Lock() - defer tp.mu.Unlock() - - err := tp.acceptTransactionSet(ts) - if err != nil { - return err + // assert on consensus set to get special method + cs, ok := tp.consensusSet.(interface { + LockedTryTransactionSet(fn func(func(txns []types.Transaction) (modules.ConsensusChange, error)) error) error + }) + if !ok { + return errors.New("consensus set does not support LockedTryTransactionSet method") } - // Notify subscribers and broadcast the transaction set. - go tp.gateway.Broadcast("RelayTransactionSet", ts, tp.gateway.Peers()) - tp.updateSubscribersTransactions() - return nil + return cs.LockedTryTransactionSet(func(txnFn func(txns []types.Transaction) (modules.ConsensusChange, error)) error { + tp.mu.Lock() + defer tp.mu.Unlock() + err := tp.acceptTransactionSet(ts, txnFn) + if err != nil { + return err + } + // Notify subscribers and broadcast the transaction set. + go tp.gateway.Broadcast("RelayTransactionSet", ts, tp.gateway.Peers()) + tp.updateSubscribersTransactions() + return nil + }) } // relayTransactionSet is an RPC that accepts a transaction set from a peer. If diff --git a/modules/transactionpool/update.go b/modules/transactionpool/update.go index aefd5fd486..e64058d8dc 100644 --- a/modules/transactionpool/update.go +++ b/modules/transactionpool/update.go @@ -108,7 +108,7 @@ func (tp *TransactionPool) ProcessConsensusChange(cc modules.ConsensusChange) { // processing consensus changes. Overall, the locking is pretty fragile and // more rules need to be put in place. for _, set := range unconfirmedSets { - tp.acceptTransactionSet(set) // Error is not checked. + tp.acceptTransactionSet(set, cc.TryTransactionSet) // Error is not checked. } // Inform subscribers that an update has executed. diff --git a/modules/wallet/defrag_test.go b/modules/wallet/defrag_test.go index c5d00344ba..7de2950977 100644 --- a/modules/wallet/defrag_test.go +++ b/modules/wallet/defrag_test.go @@ -112,8 +112,6 @@ func TestDefragOutputExhaustion(t *testing.T) { t.SkipNow() } - t.Skip("skipping until consensus consistency bug is fixed") - wt, err := createWalletTester("TestDefragOutputExhaustion") if err != nil { t.Fatal(err)