Skip to content
This repository has been archived by the owner on Nov 2, 2018. It is now read-only.

fix tpool locking bug #1555

Merged
merged 1 commit into from Jan 20, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions modules/consensus.go
Expand Up @@ -123,6 +123,12 @@ type (
// Synced indicates whether or not the ConsensusSet is synced with its
// peers.
Synced bool

// TryTransactionSet is an unlocked version of
// ConsensusSet.TryTransactionSet. This allows the TryTransactionSet
// function to be called by a subscriber during
// ProcessConsensusChange.
TryTransactionSet func([]types.Transaction) (ConsensusChange, error)
}

// A SiacoinOutputDiff indicates the addition or removal of a SiacoinOutput in
Expand Down
3 changes: 1 addition & 2 deletions modules/consensus/accept.go
Expand Up @@ -283,11 +283,10 @@ func (cs *ConsensusSet) managedAcceptBlock(b types.Block) error {
}

// Updates complete, demote the lock.
cs.mu.Demote()
defer cs.mu.DemotedUnlock()
if len(changeEntry.AppliedBlocks) > 0 {
cs.readlockUpdateSubscribers(changeEntry)
}
cs.mu.Unlock()
return nil
}

Expand Down
4 changes: 4 additions & 0 deletions modules/consensus/subscribe.go
Expand Up @@ -86,6 +86,10 @@ func (cs *ConsensusSet) computeConsensusChange(tx *bolt.Tx, ce changeEntry) (mod
if cs.synced && recentBlock == currentBlock {
cc.Synced = true
}

// Add the unexported tryTransactionSet function.
cc.TryTransactionSet = cs.tryTransactionSet

return cc, nil
}

Expand Down
44 changes: 33 additions & 11 deletions modules/consensus/validtransaction.go
Expand Up @@ -313,20 +313,12 @@ func validTransaction(tx *bolt.Tx, t types.Transaction) error {
return nil
}

// TryTransactionSet applies the input transactions to the consensus set to
// tryTransactionSet applies the input transactions to the consensus set to
// determine if they are valid. An error is returned IFF they are not a valid
// set in the current consensus set. The size of the transactions and the set
// is not checked. After the transactions have been validated, a consensus
// change is returned detailing the diffs that the transaciton set would have.
func (cs *ConsensusSet) TryTransactionSet(txns []types.Transaction) (modules.ConsensusChange, error) {
err := cs.tg.Add()
if err != nil {
return modules.ConsensusChange{}, err
}
defer cs.tg.Done()
cs.mu.RLock()
defer cs.mu.RUnlock()

func (cs *ConsensusSet) tryTransactionSet(txns []types.Transaction) (modules.ConsensusChange, error) {
// applyTransaction will apply the diffs from a transaction and store them
// in a block node. diffHolder is the blockNode that tracks the temporary
// changes. At the end of the function, all changes that were made to the
Expand All @@ -339,7 +331,7 @@ func (cs *ConsensusSet) TryTransactionSet(txns []types.Transaction) (modules.Con
// manually manage the tx instead of using 'Update', but that has safety
// concerns and is more difficult to implement correctly.
errSuccess := errors.New("success")
err = cs.db.Update(func(tx *bolt.Tx) error {
err := cs.db.Update(func(tx *bolt.Tx) error {
diffHolder.Height = blockHeight(tx)
for _, txn := range txns {
err := validTransaction(tx, txn)
Expand All @@ -362,3 +354,33 @@ func (cs *ConsensusSet) TryTransactionSet(txns []types.Transaction) (modules.Con
}
return cc, nil
}

// TryTransactionSet applies the input transactions to the consensus set to
// determine if they are valid. An error is returned IFF they are not a valid
// set in the current consensus set. The size of the transactions and the set
// is not checked. After the transactions have been validated, a consensus
// change is returned detailing the diffs that the transaciton set would have.
func (cs *ConsensusSet) TryTransactionSet(txns []types.Transaction) (modules.ConsensusChange, error) {
err := cs.tg.Add()
if err != nil {
return modules.ConsensusChange{}, err
}
defer cs.tg.Done()
cs.mu.RLock()
defer cs.mu.RUnlock()
return cs.tryTransactionSet(txns)
}

// LockedTryTransactionSet calls fn while under read-lock, passing it a
// version of TryTransactionSet that can be called under read-lock. This fixes
// an edge case in the transaction pool.
func (cs *ConsensusSet) LockedTryTransactionSet(fn func(func(txns []types.Transaction) (modules.ConsensusChange, error)) error) error {
err := cs.tg.Add()
if err != nil {
return err
}
defer cs.tg.Done()
cs.mu.RLock()
defer cs.mu.RUnlock()
return fn(cs.tryTransactionSet)
}
40 changes: 24 additions & 16 deletions modules/transactionpool/accept.go
Expand Up @@ -136,7 +136,7 @@ func (tp *TransactionPool) checkTransactionSetComposition(ts []types.Transaction

// handleConflicts detects whether the conflicts in the transaction pool are
// legal children of the new transaction pool set or not.
func (tp *TransactionPool) handleConflicts(ts []types.Transaction, conflicts []TransactionSetID) error {
func (tp *TransactionPool) handleConflicts(ts []types.Transaction, conflicts []TransactionSetID, txnFn func([]types.Transaction) (modules.ConsensusChange, error)) error {
// Create a list of all the transaction ids that compose the set of
// conflicts.
conflictMap := make(map[types.TransactionID]TransactionSetID)
Expand Down Expand Up @@ -176,7 +176,7 @@ func (tp *TransactionPool) handleConflicts(ts []types.Transaction, conflicts []T
conflicts = append(conflicts, conflict)
}
}
return tp.handleConflicts(dedupSet, conflicts)
return tp.handleConflicts(dedupSet, conflicts, txnFn)
}

// Merge all of the conflict sets with the input set (input set goes last
Expand All @@ -203,7 +203,7 @@ func (tp *TransactionPool) handleConflicts(ts []types.Transaction, conflicts []T
}

// Check that the transaction set is valid.
cc, err := tp.consensusSet.TryTransactionSet(superset)
cc, err := txnFn(superset)
if err != nil {
return modules.NewConsensusConflict("provided transaction set has prereqs, but is still invalid: " + err.Error())
}
Expand Down Expand Up @@ -235,7 +235,7 @@ func (tp *TransactionPool) handleConflicts(ts []types.Transaction, conflicts []T

// acceptTransactionSet verifies that a transaction set is allowed to be in the
// transaction pool, and then adds it to the transaction pool.
func (tp *TransactionPool) acceptTransactionSet(ts []types.Transaction) error {
func (tp *TransactionPool) acceptTransactionSet(ts []types.Transaction, txnFn func([]types.Transaction) (modules.ConsensusChange, error)) error {
if len(ts) == 0 {
return errEmptySet
}
Expand Down Expand Up @@ -278,9 +278,9 @@ func (tp *TransactionPool) acceptTransactionSet(ts []types.Transaction) error {
}
}
if len(conflicts) > 0 {
return tp.handleConflicts(ts, conflicts)
return tp.handleConflicts(ts, conflicts, txnFn)
}
cc, err := tp.consensusSet.TryTransactionSet(ts)
cc, err := txnFn(ts)
if err != nil {
return modules.NewConsensusConflict("provided transaction set is standalone and invalid: " + err.Error())
}
Expand All @@ -300,18 +300,26 @@ func (tp *TransactionPool) acceptTransactionSet(ts []types.Transaction) error {
// transactions. If the transaction is accepted, it will be relayed to
// connected peers.
func (tp *TransactionPool) AcceptTransactionSet(ts []types.Transaction) error {
tp.mu.Lock()
defer tp.mu.Unlock()

err := tp.acceptTransactionSet(ts)
if err != nil {
return err
// assert on consensus set to get special method
cs, ok := tp.consensusSet.(interface {
LockedTryTransactionSet(fn func(func(txns []types.Transaction) (modules.ConsensusChange, error)) error) error
})
if !ok {
return errors.New("consensus set does not support LockedTryTransactionSet method")
}

// Notify subscribers and broadcast the transaction set.
go tp.gateway.Broadcast("RelayTransactionSet", ts, tp.gateway.Peers())
tp.updateSubscribersTransactions()
return nil
return cs.LockedTryTransactionSet(func(txnFn func(txns []types.Transaction) (modules.ConsensusChange, error)) error {
tp.mu.Lock()
defer tp.mu.Unlock()
err := tp.acceptTransactionSet(ts, txnFn)
if err != nil {
return err
}
// Notify subscribers and broadcast the transaction set.
go tp.gateway.Broadcast("RelayTransactionSet", ts, tp.gateway.Peers())
tp.updateSubscribersTransactions()
return nil
})
}

// relayTransactionSet is an RPC that accepts a transaction set from a peer. If
Expand Down
2 changes: 1 addition & 1 deletion modules/transactionpool/update.go
Expand Up @@ -108,7 +108,7 @@ func (tp *TransactionPool) ProcessConsensusChange(cc modules.ConsensusChange) {
// processing consensus changes. Overall, the locking is pretty fragile and
// more rules need to be put in place.
for _, set := range unconfirmedSets {
tp.acceptTransactionSet(set) // Error is not checked.
tp.acceptTransactionSet(set, cc.TryTransactionSet) // Error is not checked.
}

// Inform subscribers that an update has executed.
Expand Down
2 changes: 0 additions & 2 deletions modules/wallet/defrag_test.go
Expand Up @@ -112,8 +112,6 @@ func TestDefragOutputExhaustion(t *testing.T) {
t.SkipNow()
}

t.Skip("skipping until consensus consistency bug is fixed")

wt, err := createWalletTester("TestDefragOutputExhaustion")
if err != nil {
t.Fatal(err)
Expand Down