Skip to content

Commit

Permalink
specification and implementation for content addressable transaction …
Browse files Browse the repository at this point in the history
…pool (#1)
  • Loading branch information
cmwaters committed Jan 2, 2023
1 parent e2b06dd commit 18db218
Show file tree
Hide file tree
Showing 19 changed files with 2,997 additions and 21 deletions.
7 changes: 3 additions & 4 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const (
// Default is v0.
MempoolV0 = "v0"
MempoolV1 = "v1"
MempoolV2 = "v2"
)

// NOTE: Most of the structs & relevant comments + the
Expand Down Expand Up @@ -687,9 +688,7 @@ type MempoolConfig struct {
// Mempool version to use:
// 1) "v0" - (default) FIFO mempool.
// 2) "v1" - prioritized mempool.
// WARNING: There's a known memory leak with the prioritized mempool
// that the team are working on. Read more here:
// https://github.com/tendermint/tendermint/issues/8775
// 3) "v2" - content addressable transaction pool
Version string `mapstructure:"version"`
RootDir string `mapstructure:"home"`
Recheck bool `mapstructure:"recheck"`
Expand Down Expand Up @@ -735,7 +734,7 @@ type MempoolConfig struct {
// DefaultMempoolConfig returns a default configuration for the Tendermint mempool
func DefaultMempoolConfig() *MempoolConfig {
return &MempoolConfig{
Version: MempoolV0,
Version: MempoolV2,
Recheck: true,
Broadcast: true,
WalPath: "",
Expand Down
1 change: 1 addition & 0 deletions config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ dial_timeout = "{{ .P2P.DialTimeout }}"
# Mempool version to use:
# 1) "v0" - (default) FIFO mempool.
# 2) "v1" - prioritized mempool.
# 3) "v2" - content addressable transaction pool
version = "{{ .Mempool.Version }}"
recheck = {{ .Mempool.Recheck }}
Expand Down
237 changes: 237 additions & 0 deletions mempool/cat/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
package cat

import (
"container/list"
"time"

tmsync "github.com/tendermint/tendermint/libs/sync"
"github.com/tendermint/tendermint/types"
)

// LRUTxCache maintains a thread-safe LRU cache of raw transactions. The cache
// only stores the hash of the raw transaction.
type LRUTxCache struct {
mtx tmsync.Mutex
size int
cacheMap map[types.TxKey]*list.Element
list *list.List
}

func NewLRUTxCache(cacheSize int) *LRUTxCache {
return &LRUTxCache{
size: cacheSize,
cacheMap: make(map[types.TxKey]*list.Element, cacheSize),
list: list.New(),
}
}

// GetList returns the underlying linked-list that backs the LRU cache. Note,
// this should be used for testing purposes only!
func (c *LRUTxCache) GetList() *list.List {
return c.list
}

func (c *LRUTxCache) Reset() {
c.mtx.Lock()
defer c.mtx.Unlock()

c.cacheMap = make(map[types.TxKey]*list.Element, c.size)
c.list.Init()
}

func (c *LRUTxCache) Push(txKey types.TxKey) bool {
if c.size == 0 {
return true
}

c.mtx.Lock()
defer c.mtx.Unlock()

moved, ok := c.cacheMap[txKey]
if ok {
c.list.MoveToBack(moved)
return false
}

if c.list.Len() >= c.size {
front := c.list.Front()
if front != nil {
frontKey := front.Value.(types.TxKey)
delete(c.cacheMap, frontKey)
c.list.Remove(front)
}
}

e := c.list.PushBack(txKey)
c.cacheMap[txKey] = e

return true
}

func (c *LRUTxCache) Remove(txKey types.TxKey) {
c.mtx.Lock()
defer c.mtx.Unlock()

e := c.cacheMap[txKey]
delete(c.cacheMap, txKey)

if e != nil {
c.list.Remove(e)
}
}

func (c *LRUTxCache) Has(txKey types.TxKey) bool {
if c.size == 0 {
return false
}

c.mtx.Lock()
defer c.mtx.Unlock()

_, ok := c.cacheMap[txKey]
return ok
}

type EvictedTxInfo struct {
timeEvicted time.Time
priority int64
gasWanted int64
sender string
peers map[uint16]bool
}

type EvictedTxCache struct {
mtx tmsync.Mutex
size int
cache map[types.TxKey]*EvictedTxInfo
}

func NewEvictedTxCache(size int) *EvictedTxCache {
return &EvictedTxCache{
size: size,
cache: make(map[types.TxKey]*EvictedTxInfo),
}
}

func (c *EvictedTxCache) Has(txKey types.TxKey) bool {
c.mtx.Lock()
defer c.mtx.Unlock()
_, exists := c.cache[txKey]
return exists
}

func (c *EvictedTxCache) Push(wtx *WrappedTx) {
c.mtx.Lock()
defer c.mtx.Unlock()
c.cache[wtx.key] = &EvictedTxInfo{
timeEvicted: time.Now().UTC(),
priority: wtx.priority,
gasWanted: wtx.gasWanted,
sender: wtx.sender,
peers: wtx.peers,
}
// if cache too large, remove the oldest entry
if len(c.cache) > c.size {
oldestTxKey := wtx.key
oldestTxTime := time.Now().UTC()
for key, info := range c.cache {
if info.timeEvicted.Before(oldestTxTime) {
oldestTxTime = info.timeEvicted
oldestTxKey = key
}
}
delete(c.cache, oldestTxKey)
}
}

func (c *EvictedTxCache) Pop(txKey types.TxKey) *EvictedTxInfo {
c.mtx.Lock()
defer c.mtx.Unlock()
info, exists := c.cache[txKey]
if !exists {
return nil
} else {
delete(c.cache, txKey)
return info
}
}

func (c *EvictedTxCache) Prune(limit time.Time) {
c.mtx.Lock()
defer c.mtx.Unlock()
for key, info := range c.cache {
if info.timeEvicted.Before(limit) {
delete(c.cache, key)
}
}
}

// seenTxSet records transactions that have been
// seen by other peers but not yet by us
type SeenTxSet struct {
mtx tmsync.Mutex
size int
set map[types.TxKey]timestampedPeerSet
}

type timestampedPeerSet struct {
peers map[uint16]bool
time time.Time
}

func NewSeenTxSet(size int) *SeenTxSet {
return &SeenTxSet{
size: size,
set: make(map[types.TxKey]timestampedPeerSet),
}
}

func (s *SeenTxSet) Add(txKey types.TxKey, peer uint16) {
s.mtx.Lock()
defer s.mtx.Unlock()
seenSet, exists := s.set[txKey]
if !exists {
s.set[txKey] = timestampedPeerSet{
peers: map[uint16]bool{peer: true},
time: time.Now().UTC(),
}
s.constrainSize()
} else {
seenSet.peers[peer] = true
}
}

func (s *SeenTxSet) constrainSize() {
if len(s.set) > s.size {
var (
oldestTxKey types.TxKey
oldestTime time.Time
)
for key, set := range s.set {
if oldestTime.IsZero() || set.time.Before(oldestTime) {
oldestTxKey = key
oldestTime = set.time
}
}
delete(s.set, oldestTxKey)
}
}

func (s *SeenTxSet) Pop(txKey types.TxKey) map[uint16]bool {
s.mtx.Lock()
defer s.mtx.Unlock()
seenSet, exists := s.set[txKey]
if !exists {
return nil
} else {
delete(s.set, txKey)
return seenSet.peers
}
}

// Len returns the amount of cached items. Mostly used for testing.
func (s *SeenTxSet) Len() int {
s.mtx.Lock()
defer s.mtx.Unlock()
return len(s.set)
}
94 changes: 94 additions & 0 deletions mempool/cat/cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package cat_test

import (
"math/rand"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/tendermint/tendermint/mempool/cat"
"github.com/tendermint/tendermint/types"
)

func TestSeenTxSet(t *testing.T) {
var (
tx1Key = types.Tx("tx1").Key()
tx2Key = types.Tx("tx2").Key()
tx3Key = types.Tx("tx3").Key()
peer1 uint16 = 1
peer2 uint16 = 2
)

seenSet := cat.NewSeenTxSet(2)
require.Nil(t, seenSet.Pop(tx1Key))

seenSet.Add(tx1Key, peer1)
seenSet.Add(tx1Key, peer2)
seenSet.Add(tx1Key, peer1)
peers := seenSet.Pop(tx1Key)
require.NotNil(t, peers)
require.Equal(t, map[uint16]bool{peer1: true, peer2: true}, peers)
seenSet.Add(tx2Key, peer1)
seenSet.Add(tx3Key, peer1)
seenSet.Add(tx1Key, peer1)
require.Equal(t, 2, seenSet.Len())
}

func TestCacheRemove(t *testing.T) {
cache := cat.NewLRUTxCache(100)
numTxs := 10

txs := make([][32]byte, numTxs)
for i := 0; i < numTxs; i++ {
// probability of collision is 2**-256
txBytes := make([]byte, 32)
_, err := rand.Read(txBytes)
require.NoError(t, err)

copy(txs[i][:], txBytes)
cache.Push(txs[i])

// make sure its added to both the linked list and the map
require.Equal(t, i+1, cache.GetList().Len())
}

for i := 0; i < numTxs; i++ {
cache.Remove(txs[i])
// make sure its removed from both the map and the linked list
require.Equal(t, numTxs-(i+1), cache.GetList().Len())
}
}

func TestEvictedTxCache(t *testing.T) {
var (
tx1 = types.Tx("tx1")
tx2 = types.Tx("tx2")
tx3 = types.Tx("tx3")
wtx1 = cat.NewWrappedTx(
tx1, tx1.Key(), 10, 1, 5, "",
)
wtx2 = cat.NewWrappedTx(
tx2, tx2.Key(), 10, 1, 5, "",
)
wtx3 = cat.NewWrappedTx(
tx3, tx3.Key(), 10, 1, 5, "",
)
)

cache := cat.NewEvictedTxCache(2)
require.False(t, cache.Has(tx1.Key()))
require.Nil(t, cache.Pop(tx1.Key()))
cache.Push(wtx1)
require.True(t, cache.Has(tx1.Key()))
require.NotNil(t, cache.Pop(tx1.Key()))
cache.Push(wtx1)
time.Sleep(1 * time.Millisecond)
cache.Push(wtx2)
time.Sleep(1 * time.Millisecond)
cache.Push(wtx3)
require.False(t, cache.Has(tx1.Key()))
cache.Prune(time.Now().UTC().Add(1 * time.Second))
require.False(t, cache.Has(tx2.Key()))
require.False(t, cache.Has(tx3.Key()))
}
Loading

0 comments on commit 18db218

Please sign in to comment.