Skip to content

Commit

Permalink
Bucky/mempool txsmap (tendermint#3512)
Browse files Browse the repository at this point in the history
* mempool: resCb -> globalCb

* reqResCb takes an externalCb

* failing test for tendermint#3509

* txsMap is sync.Map

* update changelog
  • Loading branch information
ebuchman authored and brapse committed Jun 5, 2019
1 parent 2ac1c92 commit 6a2595c
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 60 deletions.
9 changes: 6 additions & 3 deletions CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
## v0.32.0
## v0.31.2

**
*March 30th, 2019*

This release fixes a regression from v0.31.1 where Tendermint panics under
mempool load for external ABCI apps.

### BREAKING CHANGES:

Expand All @@ -19,7 +22,7 @@

### IMPROVEMENTS:

- [CircleCI] \#3497 Move release management to CircleCI
- [circle] \#3497 Move release management to CircleCI

### BUG FIXES:

21 changes: 13 additions & 8 deletions abci/client/socket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,17 @@ var _ Client = (*socketClient)(nil)
type socketClient struct {
cmn.BaseService

reqQueue chan *ReqRes
flushTimer *cmn.ThrottleTimer
addr string
mustConnect bool
conn net.Conn

reqQueue chan *ReqRes
flushTimer *cmn.ThrottleTimer

mtx sync.Mutex
addr string
conn net.Conn
err error
reqSent *list.List
resCb func(*types.Request, *types.Response) // listens to all callbacks
reqSent *list.List // list of requests sent, waiting for response
resCb func(*types.Request, *types.Response) // called on all requests, if set.

}

Expand Down Expand Up @@ -86,6 +87,7 @@ func (cli *socketClient) OnStop() {
cli.mtx.Lock()
defer cli.mtx.Unlock()
if cli.conn != nil {
// does this really need a mutex?
cli.conn.Close()
}

Expand Down Expand Up @@ -207,12 +209,15 @@ func (cli *socketClient) didRecvResponse(res *types.Response) error {
reqres.Done() // Release waiters
cli.reqSent.Remove(next) // Pop first item from linked list

// Notify reqRes listener if set
// Notify reqRes listener if set (request specific callback).
// NOTE: it is possible this callback isn't set on the reqres object.
// at this point, in which case it will be called after, when it is set.
// TODO: should we move this after the resCb call so the order is always consistent?
if cb := reqres.GetCallback(); cb != nil {
cb(res)
}

// Notify client listener if set
// Notify client listener if set (global callback).
if cli.resCb != nil {
cli.resCb(reqres.Request, res)
}
Expand Down
2 changes: 1 addition & 1 deletion mempool/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestCacheRemove(t *testing.T) {
for i := 0; i < numTxs; i++ {
// probability of collision is 2**-256
txBytes := make([]byte, 32)
rand.Read(txBytes)
rand.Read(txBytes) // nolint: gosec
txs[i] = txBytes
cache.Push(txBytes)
// make sure its added to both the linked list and the map
Expand Down
117 changes: 69 additions & 48 deletions mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,11 @@ func TxID(tx []byte) string {
return fmt.Sprintf("%X", types.Tx(tx).Hash())
}

// txKey is the fixed length array sha256 hash used as the key in maps.
func txKey(tx types.Tx) [sha256.Size]byte {
return sha256.Sum256(tx)
}

// Mempool is an ordered in-memory pool for transactions before they are proposed in a consensus
// round. Transaction validity is checked using the CheckTx abci message before the transaction is
// added to the pool. The Mempool uses a concurrent list structure for storing transactions that
Expand All @@ -159,23 +164,27 @@ type Mempool struct {
proxyMtx sync.Mutex
proxyAppConn proxy.AppConnMempool
txs *clist.CList // concurrent linked-list of good txs
// map for quick access to txs
// Used in CheckTx to record the tx sender.
txsMap map[[sha256.Size]byte]*clist.CElement
height int64 // the last block Update()'d to
rechecking int32 // for re-checking filtered txs on Update()
recheckCursor *clist.CElement // next expected response
recheckEnd *clist.CElement // re-checking stops here
preCheck PreCheckFunc
postCheck PostCheckFunc

// Track whether we're rechecking txs.
// These are not protected by a mutex and are expected to be mutated
// in serial (ie. by abci responses which are called in serial).
recheckCursor *clist.CElement // next expected response
recheckEnd *clist.CElement // re-checking stops here

// notify listeners (ie. consensus) when txs are available
notifiedTxsAvailable bool
txsAvailable chan struct{} // fires once for each height, when the mempool is not empty
preCheck PreCheckFunc
postCheck PostCheckFunc

// Atomic integers
// Map for quick access to txs to record sender in CheckTx.
// txsMap: txKey -> CElement
txsMap sync.Map

// Used to check if the mempool size is bigger than the allowed limit.
// See TxsBytes
txsBytes int64
// Atomic integers
height int64 // the last block Update()'d to
rechecking int32 // for re-checking filtered txs on Update()
txsBytes int64 // total size of mempool, in bytes

// Keep a cache of already-seen txs.
// This reduces the pressure on the proxyApp.
Expand Down Expand Up @@ -203,7 +212,6 @@ func NewMempool(
config: config,
proxyAppConn: proxyAppConn,
txs: clist.New(),
txsMap: make(map[[sha256.Size]byte]*clist.CElement),
height: height,
rechecking: 0,
recheckCursor: nil,
Expand All @@ -216,7 +224,7 @@ func NewMempool(
} else {
mempool.cache = nopTxCache{}
}
proxyAppConn.SetResponseCallback(mempool.resCb)
proxyAppConn.SetResponseCallback(mempool.globalCb)
for _, option := range options {
option(mempool)
}
Expand Down Expand Up @@ -319,7 +327,7 @@ func (mem *Mempool) Flush() {
e.DetachPrev()
}

mem.txsMap = make(map[[sha256.Size]byte]*clist.CElement)
mem.txsMap = sync.Map{}
_ = atomic.SwapInt64(&mem.txsBytes, 0)
}

Expand Down Expand Up @@ -380,13 +388,12 @@ func (mem *Mempool) CheckTxWithInfo(tx types.Tx, cb func(*abci.Response), txInfo

// CACHE
if !mem.cache.Push(tx) {
// record the sender
e, ok := mem.txsMap[sha256.Sum256(tx)]
// The check is needed because tx may be in cache, but not in the mempool.
// E.g. after we've committed a block, txs are removed from the mempool,
// but not from the cache.
if ok {
memTx := e.Value.(*mempoolTx)
// Record a new sender for a tx we've already seen.
// Note it's possible a tx is still in the cache but no longer in the mempool
// (eg. after committing a block, txs are removed from mempool but not cache),
// so we only record the sender for txs still in the mempool.
if e, ok := mem.txsMap.Load(txKey(tx)); ok {
memTx := e.(*clist.CElement).Value.(*mempoolTx)
if _, loaded := memTx.senders.LoadOrStore(txInfo.PeerID, true); loaded {
// TODO: consider punishing peer for dups,
// its non-trivial since invalid txs can become valid,
Expand Down Expand Up @@ -416,25 +423,21 @@ func (mem *Mempool) CheckTxWithInfo(tx types.Tx, cb func(*abci.Response), txInfo
if err = mem.proxyAppConn.Error(); err != nil {
return err
}

reqRes := mem.proxyAppConn.CheckTxAsync(tx)
if cb != nil {
composedCallback := func(res *abci.Response) {
mem.reqResCb(tx, txInfo.PeerID)(res)
cb(res)
}
reqRes.SetCallback(composedCallback)
} else {
reqRes.SetCallback(mem.reqResCb(tx, txInfo.PeerID))
}
reqRes.SetCallback(mem.reqResCb(tx, txInfo.PeerID, cb))

return nil
}

// Global callback, which is called in the absence of the specific callback.
//
// In recheckTxs because no reqResCb (specific) callback is set, this callback
// will be called.
func (mem *Mempool) resCb(req *abci.Request, res *abci.Response) {
// Global callback that will be called after every ABCI response.
// Having a single global callback avoids needing to set a callback for each request.
// However, processing the checkTx response requires the peerID (so we can track which txs we heard from who),
// and peerID is not included in the ABCI request, so we have to set request-specific callbacks that
// include this information. If we're not in the midst of a recheck, this function will just return,
// so the request specific callback can do the work.
// When rechecking, we don't need the peerID, so the recheck callback happens here.
func (mem *Mempool) globalCb(req *abci.Request, res *abci.Response) {
if mem.recheckCursor == nil {
return
}
Expand All @@ -446,35 +449,50 @@ func (mem *Mempool) resCb(req *abci.Request, res *abci.Response) {
mem.metrics.Size.Set(float64(mem.Size()))
}

// Specific callback, which allows us to incorporate local information, like
// the peer that sent us this tx, so we can avoid sending it back to the same
// peer.
// Request specific callback that should be set on individual reqRes objects
// to incorporate local information when processing the response.
// This allows us to track the peer that sent us this tx, so we can avoid sending it back to them.
// NOTE: alternatively, we could include this information in the ABCI request itself.
//
// External callers of CheckTx, like the RPC, can also pass an externalCb through here that is called
// when all other response processing is complete.
//
// Used in CheckTxWithInfo to record PeerID who sent us the tx.
func (mem *Mempool) reqResCb(tx []byte, peerID uint16) func(res *abci.Response) {
func (mem *Mempool) reqResCb(tx []byte, peerID uint16, externalCb func(*abci.Response)) func(res *abci.Response) {
return func(res *abci.Response) {
if mem.recheckCursor != nil {
return
// this should never happen
panic("recheck cursor is not nil in reqResCb")
}

mem.resCbFirstTime(tx, peerID, res)

// update metrics
mem.metrics.Size.Set(float64(mem.Size()))

// passed in by the caller of CheckTx, eg. the RPC
if externalCb != nil {
externalCb(res)
}
}
}

// Called from:
// - resCbFirstTime (lock not held) if tx is valid
func (mem *Mempool) addTx(memTx *mempoolTx) {
e := mem.txs.PushBack(memTx)
mem.txsMap[sha256.Sum256(memTx.tx)] = e
mem.txsMap.Store(txKey(memTx.tx), e)
atomic.AddInt64(&mem.txsBytes, int64(len(memTx.tx)))
mem.metrics.TxSizeBytes.Observe(float64(len(memTx.tx)))
}

// Called from:
// - Update (lock held) if tx was committed
// - resCbRecheck (lock not held) if tx was invalidated
func (mem *Mempool) removeTx(tx types.Tx, elem *clist.CElement, removeFromCache bool) {
mem.txs.Remove(elem)
elem.DetachPrev()
delete(mem.txsMap, sha256.Sum256(tx))
mem.txsMap.Delete(txKey(tx))
atomic.AddInt64(&mem.txsBytes, int64(-len(tx)))

if removeFromCache {
Expand Down Expand Up @@ -733,7 +751,7 @@ func (mem *Mempool) recheckTxs(txs []types.Tx) {
mem.recheckEnd = mem.txs.Back()

// Push txs to proxyAppConn
// NOTE: reqResCb may be called concurrently.
// NOTE: globalCb may be called concurrently.
for _, tx := range txs {
mem.proxyAppConn.CheckTxAsync(tx)
}
Expand All @@ -746,8 +764,11 @@ func (mem *Mempool) recheckTxs(txs []types.Tx) {
type mempoolTx struct {
height int64 // height that this tx had been validated in
gasWanted int64 // amount of gas this tx states it will require
senders sync.Map // ids of peers who've sent us this tx (as a map for quick lookups)
tx types.Tx //

// ids of peers who've sent us this tx (as a map for quick lookups).
// senders: PeerID -> bool
senders sync.Map
}

// Height returns the height for this transaction
Expand Down Expand Up @@ -798,7 +819,7 @@ func (cache *mapTxCache) Push(tx types.Tx) bool {
defer cache.mtx.Unlock()

// Use the tx hash in the cache
txHash := sha256.Sum256(tx)
txHash := txKey(tx)
if moved, exists := cache.map_[txHash]; exists {
cache.list.MoveToBack(moved)
return false
Expand All @@ -820,7 +841,7 @@ func (cache *mapTxCache) Push(tx types.Tx) bool {
// Remove removes the given tx from the cache.
func (cache *mapTxCache) Remove(tx types.Tx) {
cache.mtx.Lock()
txHash := sha256.Sum256(tx)
txHash := txKey(tx)
popped := cache.map_[txHash]
delete(cache.map_, txHash)
if popped != nil {
Expand Down
50 changes: 50 additions & 0 deletions mempool/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/binary"
"fmt"
"io/ioutil"
mrand "math/rand"
"os"
"path/filepath"
"testing"
Expand All @@ -18,6 +19,7 @@ import (

"github.com/tendermint/tendermint/abci/example/counter"
"github.com/tendermint/tendermint/abci/example/kvstore"
abciserver "github.com/tendermint/tendermint/abci/server"
abci "github.com/tendermint/tendermint/abci/types"
cfg "github.com/tendermint/tendermint/config"
cmn "github.com/tendermint/tendermint/libs/common"
Expand Down Expand Up @@ -510,6 +512,54 @@ func TestMempoolTxsBytes(t *testing.T) {
assert.EqualValues(t, 0, mempool.TxsBytes())
}

// This will non-deterministically catch some concurrency failures like
// https://github.com/tendermint/tendermint/issues/3509
// TODO: all of the tests should probably also run using the remote proxy app
// since otherwise we're not actually testing the concurrency of the mempool here!
func TestMempoolRemoteAppConcurrency(t *testing.T) {
sockPath := fmt.Sprintf("unix:///tmp/echo_%v.sock", cmn.RandStr(6))
app := kvstore.NewKVStoreApplication()
cc, server := newRemoteApp(t, sockPath, app)
defer server.Stop()
config := cfg.ResetTestRoot("mempool_test")
mempool, cleanup := newMempoolWithAppAndConfig(cc, config)
defer cleanup()

// generate small number of txs
nTxs := 10
txLen := 200
txs := make([]types.Tx, nTxs)
for i := 0; i < nTxs; i++ {
txs[i] = cmn.RandBytes(txLen)
}

// simulate a group of peers sending them over and over
N := config.Mempool.Size
maxPeers := 5
for i := 0; i < N; i++ {
peerID := mrand.Intn(maxPeers)
txNum := mrand.Intn(nTxs)
tx := txs[int(txNum)]

// this will err with ErrTxInCache many times ...
mempool.CheckTxWithInfo(tx, nil, TxInfo{PeerID: uint16(peerID)})
}
err := mempool.FlushAppConn()
require.NoError(t, err)
}

// caller must close server
func newRemoteApp(t *testing.T, addr string, app abci.Application) (clientCreator proxy.ClientCreator, server cmn.Service) {
clientCreator = proxy.NewRemoteClientCreator(addr, "socket", true)

// Start server
server = abciserver.NewSocketServer(addr, app)
server.SetLogger(log.TestingLogger().With("module", "abci-server"))
if err := server.Start(); err != nil {
t.Fatalf("Error starting socket server: %v", err.Error())
}
return clientCreator, server
}
func checksumIt(data []byte) string {
h := sha256.New()
h.Write(data)
Expand Down

0 comments on commit 6a2595c

Please sign in to comment.