Skip to content
This repository has been archived by the owner on Nov 2, 2018. It is now read-only.

Commit

Permalink
fix tpool locking bug
Browse files Browse the repository at this point in the history
I was just following orders
  • Loading branch information
lukechampine committed Jan 19, 2017
1 parent 625dfe6 commit 45d49cf
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 32 deletions.
2 changes: 2 additions & 0 deletions modules/consensus.go
Expand Up @@ -123,6 +123,8 @@ type (
// Synced indicates whether or not the ConsensusSet is synced with its
// peers.
Synced bool

TryTransactionSet func([]types.Transaction) (ConsensusChange, error)
}

// A SiacoinOutputDiff indicates the addition or removal of a SiacoinOutput in
Expand Down
3 changes: 1 addition & 2 deletions modules/consensus/accept.go
Expand Up @@ -283,11 +283,10 @@ func (cs *ConsensusSet) managedAcceptBlock(b types.Block) error {
}

// Updates complete, demote the lock.
cs.mu.Demote()
defer cs.mu.DemotedUnlock()
if len(changeEntry.AppliedBlocks) > 0 {
cs.readlockUpdateSubscribers(changeEntry)
}
cs.mu.Unlock()
return nil
}

Expand Down
4 changes: 4 additions & 0 deletions modules/consensus/subscribe.go
Expand Up @@ -86,6 +86,10 @@ func (cs *ConsensusSet) computeConsensusChange(tx *bolt.Tx, ce changeEntry) (mod
if cs.synced && recentBlock == currentBlock {
cc.Synced = true
}

// Add the unexported tryTransactionSet function.
cc.TryTransactionSet = cs.tryTransactionSet

return cc, nil
}

Expand Down
44 changes: 33 additions & 11 deletions modules/consensus/validtransaction.go
Expand Up @@ -313,20 +313,12 @@ func validTransaction(tx *bolt.Tx, t types.Transaction) error {
return nil
}

// TryTransactionSet applies the input transactions to the consensus set to
// tryTransactionSet applies the input transactions to the consensus set to
// determine if they are valid. An error is returned IFF they are not a valid
// set in the current consensus set. The size of the transactions and the set
// is not checked. After the transactions have been validated, a consensus
// change is returned detailing the diffs that the transaciton set would have.
func (cs *ConsensusSet) TryTransactionSet(txns []types.Transaction) (modules.ConsensusChange, error) {
err := cs.tg.Add()
if err != nil {
return modules.ConsensusChange{}, err
}
defer cs.tg.Done()
cs.mu.RLock()
defer cs.mu.RUnlock()

func (cs *ConsensusSet) tryTransactionSet(txns []types.Transaction) (modules.ConsensusChange, error) {
// applyTransaction will apply the diffs from a transaction and store them
// in a block node. diffHolder is the blockNode that tracks the temporary
// changes. At the end of the function, all changes that were made to the
Expand All @@ -339,7 +331,7 @@ func (cs *ConsensusSet) TryTransactionSet(txns []types.Transaction) (modules.Con
// manually manage the tx instead of using 'Update', but that has safety
// concerns and is more difficult to implement correctly.
errSuccess := errors.New("success")
err = cs.db.Update(func(tx *bolt.Tx) error {
err := cs.db.Update(func(tx *bolt.Tx) error {
diffHolder.Height = blockHeight(tx)
for _, txn := range txns {
err := validTransaction(tx, txn)
Expand All @@ -362,3 +354,33 @@ func (cs *ConsensusSet) TryTransactionSet(txns []types.Transaction) (modules.Con
}
return cc, nil
}

// TryTransactionSet applies the input transactions to the consensus set to
// determine if they are valid. An error is returned IFF they are not a valid
// set in the current consensus set. The size of the transactions and the set
// is not checked. After the transactions have been validated, a consensus
// change is returned detailing the diffs that the transaciton set would have.
func (cs *ConsensusSet) TryTransactionSet(txns []types.Transaction) (modules.ConsensusChange, error) {
err := cs.tg.Add()
if err != nil {
return modules.ConsensusChange{}, err
}
defer cs.tg.Done()
cs.mu.RLock()
defer cs.mu.RUnlock()
return cs.tryTransactionSet(txns)
}

// LockedTryTransactionSet calls fn while under read-lock, passing it a
// version of TryTransactionSet that can be called under read-lock. This fixes
// an edge case in the transaction pool.
func (cs *ConsensusSet) LockedTryTransactionSet(fn func(func(txns []types.Transaction) (modules.ConsensusChange, error)) error) error {
err := cs.tg.Add()
if err != nil {
return err
}
defer cs.tg.Done()
cs.mu.RLock()
defer cs.mu.RUnlock()
return fn(cs.tryTransactionSet)
}
40 changes: 24 additions & 16 deletions modules/transactionpool/accept.go
Expand Up @@ -136,7 +136,7 @@ func (tp *TransactionPool) checkTransactionSetComposition(ts []types.Transaction

// handleConflicts detects whether the conflicts in the transaction pool are
// legal children of the new transaction pool set or not.
func (tp *TransactionPool) handleConflicts(ts []types.Transaction, conflicts []TransactionSetID) error {
func (tp *TransactionPool) handleConflicts(ts []types.Transaction, conflicts []TransactionSetID, txnFn func([]types.Transaction) (modules.ConsensusChange, error)) error {
// Create a list of all the transaction ids that compose the set of
// conflicts.
conflictMap := make(map[types.TransactionID]TransactionSetID)
Expand Down Expand Up @@ -176,7 +176,7 @@ func (tp *TransactionPool) handleConflicts(ts []types.Transaction, conflicts []T
conflicts = append(conflicts, conflict)
}
}
return tp.handleConflicts(dedupSet, conflicts)
return tp.handleConflicts(dedupSet, conflicts, txnFn)
}

// Merge all of the conflict sets with the input set (input set goes last
Expand All @@ -203,7 +203,7 @@ func (tp *TransactionPool) handleConflicts(ts []types.Transaction, conflicts []T
}

// Check that the transaction set is valid.
cc, err := tp.consensusSet.TryTransactionSet(superset)
cc, err := txnFn(superset)
if err != nil {
return modules.NewConsensusConflict("provided transaction set has prereqs, but is still invalid: " + err.Error())
}
Expand Down Expand Up @@ -235,7 +235,7 @@ func (tp *TransactionPool) handleConflicts(ts []types.Transaction, conflicts []T

// acceptTransactionSet verifies that a transaction set is allowed to be in the
// transaction pool, and then adds it to the transaction pool.
func (tp *TransactionPool) acceptTransactionSet(ts []types.Transaction) error {
func (tp *TransactionPool) acceptTransactionSet(ts []types.Transaction, txnFn func([]types.Transaction) (modules.ConsensusChange, error)) error {
if len(ts) == 0 {
return errEmptySet
}
Expand Down Expand Up @@ -278,9 +278,9 @@ func (tp *TransactionPool) acceptTransactionSet(ts []types.Transaction) error {
}
}
if len(conflicts) > 0 {
return tp.handleConflicts(ts, conflicts)
return tp.handleConflicts(ts, conflicts, txnFn)
}
cc, err := tp.consensusSet.TryTransactionSet(ts)
cc, err := txnFn(ts)
if err != nil {
return modules.NewConsensusConflict("provided transaction set is standalone and invalid: " + err.Error())
}
Expand All @@ -300,18 +300,26 @@ func (tp *TransactionPool) acceptTransactionSet(ts []types.Transaction) error {
// transactions. If the transaction is accepted, it will be relayed to
// connected peers.
func (tp *TransactionPool) AcceptTransactionSet(ts []types.Transaction) error {
tp.mu.Lock()
defer tp.mu.Unlock()

err := tp.acceptTransactionSet(ts)
if err != nil {
return err
// assert on consensus set to get special method
cs, ok := tp.consensusSet.(interface {
LockedTryTransactionSet(fn func(func(txns []types.Transaction) (modules.ConsensusChange, error)) error) error
})
if !ok {
return errors.New("consensus set does not support LockedTryTransactionSet method")
}

// Notify subscribers and broadcast the transaction set.
go tp.gateway.Broadcast("RelayTransactionSet", ts, tp.gateway.Peers())
tp.updateSubscribersTransactions()
return nil
return cs.LockedTryTransactionSet(func(txnFn func(txns []types.Transaction) (modules.ConsensusChange, error)) error {
tp.mu.Lock()
defer tp.mu.Unlock()
err := tp.acceptTransactionSet(ts, txnFn)
if err != nil {
return err
}
// Notify subscribers and broadcast the transaction set.
go tp.gateway.Broadcast("RelayTransactionSet", ts, tp.gateway.Peers())
tp.updateSubscribersTransactions()
return nil
})
}

// relayTransactionSet is an RPC that accepts a transaction set from a peer. If
Expand Down
2 changes: 1 addition & 1 deletion modules/transactionpool/update.go
Expand Up @@ -108,7 +108,7 @@ func (tp *TransactionPool) ProcessConsensusChange(cc modules.ConsensusChange) {
// processing consensus changes. Overall, the locking is pretty fragile and
// more rules need to be put in place.
for _, set := range unconfirmedSets {
tp.acceptTransactionSet(set) // Error is not checked.
tp.acceptTransactionSet(set, cc.TryTransactionSet) // Error is not checked.
}

// Inform subscribers that an update has executed.
Expand Down
2 changes: 0 additions & 2 deletions modules/wallet/defrag_test.go
Expand Up @@ -112,8 +112,6 @@ func TestDefragOutputExhaustion(t *testing.T) {
t.SkipNow()
}

t.Skip("skipping until consensus consistency bug is fixed")

wt, err := createWalletTester("TestDefragOutputExhaustion")
if err != nil {
t.Fatal(err)
Expand Down

0 comments on commit 45d49cf

Please sign in to comment.