Skip to content

Commit

Permalink
add skip index flag (#399)
Browse files Browse the repository at this point in the history
  • Loading branch information
ceyonur committed Nov 21, 2023
1 parent 25699fc commit e3f354e
Show file tree
Hide file tree
Showing 6 changed files with 231 additions and 23 deletions.
8 changes: 6 additions & 2 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ type CacheConfig struct {
Preimages bool // Whether to store preimage of trie key to the disk
AcceptedCacheSize int // Depth of accepted headers cache and accepted logs cache at the accepted tip
TxLookupLimit uint64 // Number of recent blocks for which to maintain transaction lookup indices
SkipTxIndexing bool // Whether to skip transaction indexing

SnapshotNoBuild bool // Whether the background generation is allowed
SnapshotWait bool // Wait for snapshot construction on startup. TODO(karalabe): This is a dirty hack for testing, nuke it
Expand Down Expand Up @@ -485,7 +486,9 @@ func (bc *BlockChain) dispatchTxUnindexer() {
// - updating the acceptor tip index
func (bc *BlockChain) writeBlockAcceptedIndices(b *types.Block) error {
batch := bc.db.NewBatch()
rawdb.WriteTxLookupEntriesByBlock(batch, b)
if !bc.cacheConfig.SkipTxIndexing {
rawdb.WriteTxLookupEntriesByBlock(batch, b)
}
if err := rawdb.WriteAcceptorTip(batch, b.Hash()); err != nil {
return fmt.Errorf("%w: failed to write acceptor tip key", err)
}
Expand Down Expand Up @@ -864,7 +867,8 @@ func (bc *BlockChain) ValidateCanonicalChain() error {
// Transactions are only indexed beneath the last accepted block, so we only check
// that the transactions have been indexed, if we are checking below the last accepted
// block.
shouldIndexTxs := bc.cacheConfig.TxLookupLimit == 0 || bc.lastAccepted.NumberU64() < current.Number.Uint64()+bc.cacheConfig.TxLookupLimit
shouldIndexTxs := !bc.cacheConfig.SkipTxIndexing &&
(bc.cacheConfig.TxLookupLimit == 0 || bc.lastAccepted.NumberU64() < current.Number.Uint64()+bc.cacheConfig.TxLookupLimit)
if current.Number.Uint64() <= bc.lastAccepted.NumberU64() && shouldIndexTxs {
// Ensure that all of the transactions have been stored correctly in the canonical
// chain
Expand Down
233 changes: 213 additions & 20 deletions core/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,17 +655,25 @@ func TestTransactionIndices(t *testing.T) {
})
require.NoError(err)

blocks2, _, err := GenerateChain(gspec.Config, blocks[len(blocks)-1], dummy.NewDummyEngine(&TestCallbacks), genDb, 10, 10, nil)
blocks2, _, err := GenerateChain(gspec.Config, blocks[len(blocks)-1], dummy.NewDummyEngine(&TestCallbacks), genDb, 10, 10, func(i int, block *BlockGen) {
tx, err := types.SignTx(types.NewTransaction(block.TxNonce(addr1), addr2, big.NewInt(10000), params.TxGas, nil, nil), signer, key1)
require.NoError(err)
block.AddTx(tx)
})
require.NoError(err)

check := func(tail *uint64, chain *BlockChain) {
stored := rawdb.ReadTxIndexTail(chain.db)
require.EqualValues(tail, stored)

var tailValue uint64
if tail == nil {
return
require.Nil(stored)
tailValue = 0
} else {
require.EqualValues(*tail, *stored, "expected tail %d, got %d", *tail, *stored)
tailValue = *tail
}
for i := *tail; i <= chain.CurrentBlock().Number.Uint64(); i++ {

for i := tailValue; i <= chain.CurrentBlock().Number.Uint64(); i++ {
block := rawdb.ReadBlock(chain.db, rawdb.ReadCanonicalHash(chain.db, i), i)
if block.Transactions().Len() == 0 {
continue
Expand All @@ -676,7 +684,7 @@ func TestTransactionIndices(t *testing.T) {
}
}

for i := uint64(0); i < *tail; i++ {
for i := uint64(0); i < tailValue; i++ {
block := rawdb.ReadBlock(chain.db, rawdb.ReadCanonicalHash(chain.db, i), i)
if block.Transactions().Len() == 0 {
continue
Expand Down Expand Up @@ -721,29 +729,167 @@ func TestTransactionIndices(t *testing.T) {

// Reconstruct a block chain which only reserves limited tx indices
// 128 blocks were previously indexed. Now we add a new block at each test step.
limit := []uint64{130 /* 129 + 1 reserve all */, 64 /* drop stale */, 32 /* shorten history */}
tails := []uint64{0 /* reserve all */, 67 /* 130 - 64 + 1 */, 100 /* 131 - 32 + 1 */}
for i, l := range limit {
conf.TxLookupLimit = l
limits := []uint64{
0, /* tip: 129 reserve all (don't run) */
131, /* tip: 130 reserve all */
140, /* tip: 131 reserve all */
64, /* tip: 132, limit:64 */
32, /* tip: 133, limit:32 */
}
for i, l := range limits {
t.Run(fmt.Sprintf("test-%d, limit: %d", i+1, l), func(t *testing.T) {
conf.TxLookupLimit = l

chain, err := createBlockChain(chainDB, conf, gspec, lastAcceptedHash)
require.NoError(err)

newBlks := blocks2[i : i+1]
_, err = chain.InsertChain(newBlks) // Feed chain a higher block to trigger indices updater.
require.NoError(err)

err = chain.Accept(newBlks[0]) // Accept the block to trigger indices updater.
require.NoError(err)

chain.DrainAcceptorQueue()
time.Sleep(50 * time.Millisecond) // Wait for indices initialisation

chain.Stop()
var tail *uint64
if l == 0 {
tail = nil
} else {
var tl uint64
if chain.CurrentBlock().Number.Uint64() > l {
// tail should be the first block number which is indexed
// i.e the first block number that's in the lookup range
tl = chain.CurrentBlock().Number.Uint64() - l + 1
}
tail = &tl
}

chain, err := createBlockChain(chainDB, conf, gspec, lastAcceptedHash)
require.NoError(err)
check(tail, chain)

lastAcceptedHash = chain.CurrentHeader().Hash()
})
}
}

newBlks := blocks2[i : i+1]
_, err = chain.InsertChain(newBlks) // Feed chain a higher block to trigger indices updater.
func TestTransactionSkipIndexing(t *testing.T) {
// Configure and generate a sample block chain
require := require.New(t)
var (
key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
key2, _ = crypto.HexToECDSA("8a1f9a8f95be41cd7ccb6168179afb4504aefe388d1e14474d32c45c72ce7b7a")
addr1 = crypto.PubkeyToAddress(key1.PublicKey)
addr2 = crypto.PubkeyToAddress(key2.PublicKey)
funds = big.NewInt(10000000000000)
gspec = &Genesis{
Config: &params.ChainConfig{HomesteadBlock: new(big.Int)},
Alloc: GenesisAlloc{addr1: {Balance: funds}},
}
signer = types.LatestSigner(gspec.Config)
)
genDb, blocks, _, err := GenerateChainWithGenesis(gspec, dummy.NewDummyEngine(&TestCallbacks), 5, 10, func(i int, block *BlockGen) {
tx, err := types.SignTx(types.NewTransaction(block.TxNonce(addr1), addr2, big.NewInt(10000), params.TxGas, nil, nil), signer, key1)
require.NoError(err)
block.AddTx(tx)
})
require.NoError(err)

err = chain.Accept(newBlks[0]) // Accept the block to trigger indices updater.
blocks2, _, err := GenerateChain(gspec.Config, blocks[len(blocks)-1], dummy.NewDummyEngine(&TestCallbacks), genDb, 5, 10, func(i int, block *BlockGen) {
tx, err := types.SignTx(types.NewTransaction(block.TxNonce(addr1), addr2, big.NewInt(10000), params.TxGas, nil, nil), signer, key1)
require.NoError(err)
block.AddTx(tx)
})
require.NoError(err)

chain.DrainAcceptorQueue()
time.Sleep(50 * time.Millisecond) // Wait for indices initialisation
checkRemoved := func(tail *uint64, to uint64, chain *BlockChain) {
stored := rawdb.ReadTxIndexTail(chain.db)
var tailValue uint64
if tail == nil {
require.Nil(stored)
tailValue = 0
} else {
require.EqualValues(*tail, *stored, "expected tail %d, got %d", *tail, *stored)
tailValue = *tail
}

chain.Stop()
check(&tails[i], chain)
for i := tailValue; i < to; i++ {
block := rawdb.ReadBlock(chain.db, rawdb.ReadCanonicalHash(chain.db, i), i)
if block.Transactions().Len() == 0 {
continue
}
for _, tx := range block.Transactions() {
index := rawdb.ReadTxLookupEntry(chain.db, tx.Hash())
require.NotNilf(index, "Miss transaction indices, number %d hash %s", i, tx.Hash().Hex())
}
}

for i := uint64(0); i < tailValue; i++ {
block := rawdb.ReadBlock(chain.db, rawdb.ReadCanonicalHash(chain.db, i), i)
if block.Transactions().Len() == 0 {
continue
}
for _, tx := range block.Transactions() {
index := rawdb.ReadTxLookupEntry(chain.db, tx.Hash())
require.Nilf(index, "Transaction indices should be deleted, number %d hash %s", i, tx.Hash().Hex())
}
}

for i := to; i <= chain.CurrentBlock().Number.Uint64(); i++ {
block := rawdb.ReadBlock(chain.db, rawdb.ReadCanonicalHash(chain.db, i), i)
if block.Transactions().Len() == 0 {
continue
}
for _, tx := range block.Transactions() {
index := rawdb.ReadTxLookupEntry(chain.db, tx.Hash())
require.Nilf(index, "Transaction indices should be skipped, number %d hash %s", i, tx.Hash().Hex())
}
}
}

lastAcceptedHash = chain.CurrentHeader().Hash()
conf := &CacheConfig{
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieDirtyCommitTarget: 20,
TriePrefetcherParallelism: 4,
Pruning: true,
CommitInterval: 4096,
SnapshotLimit: 256,
SnapshotNoBuild: true, // Ensure the test errors if snapshot initialization fails
AcceptorQueueLimit: 64,
SkipTxIndexing: true,
}

// test1: Init block chain and check all indices has been skipped.
chainDB := rawdb.NewMemoryDatabase()
chain, err := createAndInsertChain(chainDB, conf, gspec, blocks, common.Hash{})
require.NoError(err)
checkRemoved(nil, 0, chain) // check all indices has been skipped

// test2: specify lookuplimit with tx index skipping enabled. Blocks should not be indexed but tail should be updated.
conf.TxLookupLimit = 2
chain, err = createAndInsertChain(chainDB, conf, gspec, blocks2[0:1], chain.CurrentHeader().Hash())
require.NoError(err)
tail := chain.CurrentBlock().Number.Uint64() - conf.TxLookupLimit + 1
checkRemoved(&tail, 0, chain)

// test3: tx index skipping and unindexer disabled. Blocks should be indexed and tail should be updated.
conf.TxLookupLimit = 0
conf.SkipTxIndexing = false
chainDB = rawdb.NewMemoryDatabase()
chain, err = createAndInsertChain(chainDB, conf, gspec, blocks, common.Hash{})
require.NoError(err)
checkRemoved(nil, chain.CurrentBlock().Number.Uint64()+1, chain) // check all indices has been indexed

// now change tx index skipping to true and check that the indices are skipped for the last block
// and old indices are removed up to the tail, but [tail, current) indices are still there.
conf.TxLookupLimit = 2
conf.SkipTxIndexing = true
chain, err = createAndInsertChain(chainDB, conf, gspec, blocks2[0:1], chain.CurrentHeader().Hash())
require.NoError(err)
tail = chain.CurrentBlock().Number.Uint64() - conf.TxLookupLimit + 1
checkRemoved(&tail, chain.CurrentBlock().Number.Uint64(), chain)
}

// TestCanonicalHashMarker tests all the canonical hash markers are updated/deleted
Expand Down Expand Up @@ -881,6 +1027,30 @@ func TestTxLookupBlockChain(t *testing.T) {
}
}

func TestTxLookupSkipIndexingBlockChain(t *testing.T) {
cacheConf := &CacheConfig{
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieDirtyCommitTarget: 20,
TriePrefetcherParallelism: 4,
Pruning: true,
CommitInterval: 4096,
SnapshotLimit: 256,
SnapshotNoBuild: true, // Ensure the test errors if snapshot initialization fails
AcceptorQueueLimit: 64, // ensure channel doesn't block
TxLookupLimit: 5,
SkipTxIndexing: true,
}
createTxLookupBlockChain := func(db ethdb.Database, gspec *Genesis, lastAcceptedHash common.Hash) (*BlockChain, error) {
return createBlockChain(db, cacheConf, gspec, lastAcceptedHash)
}
for _, tt := range tests {
t.Run(tt.Name, func(t *testing.T) {
tt.testFunc(t, createTxLookupBlockChain)
})
}
}

func TestCreateThenDeletePreByzantium(t *testing.T) {
// We want to use pre-byzantium rules where we have intermediate state roots
// between transactions.
Expand Down Expand Up @@ -1176,3 +1346,26 @@ func TestEIP3651(t *testing.T) {
t.Fatalf("sender balance incorrect: expected %d, got %d", expected, actual)
}
}

func createAndInsertChain(db ethdb.Database, cacheConfig *CacheConfig, gspec *Genesis, blocks types.Blocks, lastAcceptedHash common.Hash) (*BlockChain, error) {
chain, err := createBlockChain(db, cacheConfig, gspec, lastAcceptedHash)
if err != nil {
return nil, err
}
_, err = chain.InsertChain(blocks)
if err != nil {
return nil, err
}
for _, block := range blocks {
err := chain.Accept(block)
if err != nil {
return nil, err
}
}

chain.DrainAcceptorQueue()
time.Sleep(1000 * time.Millisecond) // Wait for indices initialisation

chain.Stop()
return chain, nil
}
1 change: 1 addition & 0 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ func New(
Preimages: config.Preimages,
AcceptedCacheSize: config.AcceptedCacheSize,
TxLookupLimit: config.TxLookupLimit,
SkipTxIndexing: config.SkipTxIndexing,
}
)

Expand Down
5 changes: 5 additions & 0 deletions eth/ethconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,4 +158,9 @@ type Config struct {
// * 0: means no limit
// * N: means N block limit [HEAD-N+1, HEAD] and delete extra indexes
TxLookupLimit uint64

// SkipTxIndexing skips indexing transactions.
// This is useful for validators that don't need to index transactions.
// TxLookupLimit can be still used to control unindexing old transactions.
SkipTxIndexing bool
}
6 changes: 5 additions & 1 deletion plugin/evm/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,11 @@ type Config struct {
// * 0: means no limit
// * N: means N block limit [HEAD-N+1, HEAD] and delete extra indexes
TxLookupLimit uint64 `json:"tx-lookup-limit"`

// SkipTxIndexing skips indexing transactions.
// This is useful for validators that don't need to index transactions.
// TxLookupLimit can be still used to control unindexing old transactions.
SkipTxIndexing bool `json:"skip-tx-indexing"`
}

// EthAPIs returns an array of strings representing the Eth APIs that should be enabled
Expand Down Expand Up @@ -290,7 +295,6 @@ func (c *Config) Validate() error {
if c.Pruning && c.CommitInterval == 0 {
return fmt.Errorf("cannot use commit interval of 0 with pruning enabled")
}

return nil
}

Expand Down
1 change: 1 addition & 0 deletions plugin/evm/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,7 @@ func (vm *VM) Initialize(
vm.ethConfig.SkipUpgradeCheck = vm.config.SkipUpgradeCheck
vm.ethConfig.AcceptedCacheSize = vm.config.AcceptedCacheSize
vm.ethConfig.TxLookupLimit = vm.config.TxLookupLimit
vm.ethConfig.SkipTxIndexing = vm.config.SkipTxIndexing

// Create directory for offline pruning
if len(vm.ethConfig.OfflinePruningDataDirectory) != 0 {
Expand Down

0 comments on commit e3f354e

Please sign in to comment.