Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add skip index flag #399

Merged
merged 11 commits into from
Nov 21, 2023
8 changes: 6 additions & 2 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ type CacheConfig struct {
Preimages bool // Whether to store preimage of trie key to the disk
AcceptedCacheSize int // Depth of accepted headers cache and accepted logs cache at the accepted tip
TxLookupLimit uint64 // Number of recent blocks for which to maintain transaction lookup indices
SkipTxIndexing bool // Whether to skip transaction indexing

SnapshotNoBuild bool // Whether the background generation is allowed
SnapshotWait bool // Wait for snapshot construction on startup. TODO(karalabe): This is a dirty hack for testing, nuke it
Expand Down Expand Up @@ -485,7 +486,9 @@ func (bc *BlockChain) dispatchTxUnindexer() {
// - updating the acceptor tip index
func (bc *BlockChain) writeBlockAcceptedIndices(b *types.Block) error {
batch := bc.db.NewBatch()
rawdb.WriteTxLookupEntriesByBlock(batch, b)
if !bc.cacheConfig.SkipTxIndexing {
rawdb.WriteTxLookupEntriesByBlock(batch, b)
}
if err := rawdb.WriteAcceptorTip(batch, b.Hash()); err != nil {
return fmt.Errorf("%w: failed to write acceptor tip key", err)
}
Expand Down Expand Up @@ -864,7 +867,8 @@ func (bc *BlockChain) ValidateCanonicalChain() error {
// Transactions are only indexed beneath the last accepted block, so we only check
// that the transactions have been indexed, if we are checking below the last accepted
// block.
shouldIndexTxs := bc.cacheConfig.TxLookupLimit == 0 || bc.lastAccepted.NumberU64() < current.Number.Uint64()+bc.cacheConfig.TxLookupLimit
shouldIndexTxs := !bc.cacheConfig.SkipTxIndexing &&
(bc.cacheConfig.TxLookupLimit == 0 || bc.lastAccepted.NumberU64() < current.Number.Uint64()+bc.cacheConfig.TxLookupLimit)
if current.Number.Uint64() <= bc.lastAccepted.NumberU64() && shouldIndexTxs {
// Ensure that all of the transactions have been stored correctly in the canonical
// chain
Expand Down
233 changes: 213 additions & 20 deletions core/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,17 +655,25 @@ func TestTransactionIndices(t *testing.T) {
})
require.NoError(err)

blocks2, _, err := GenerateChain(gspec.Config, blocks[len(blocks)-1], dummy.NewDummyEngine(&TestCallbacks), genDb, 10, 10, nil)
blocks2, _, err := GenerateChain(gspec.Config, blocks[len(blocks)-1], dummy.NewDummyEngine(&TestCallbacks), genDb, 10, 10, func(i int, block *BlockGen) {
tx, err := types.SignTx(types.NewTransaction(block.TxNonce(addr1), addr2, big.NewInt(10000), params.TxGas, nil, nil), signer, key1)
require.NoError(err)
block.AddTx(tx)
})
require.NoError(err)

check := func(tail *uint64, chain *BlockChain) {
stored := rawdb.ReadTxIndexTail(chain.db)
require.EqualValues(tail, stored)

var tailValue uint64
if tail == nil {
return
require.Nil(stored)
tailValue = 0
} else {
require.EqualValues(*tail, *stored, "expected tail %d, got %d", *tail, *stored)
tailValue = *tail
}
for i := *tail; i <= chain.CurrentBlock().Number.Uint64(); i++ {

for i := tailValue; i <= chain.CurrentBlock().Number.Uint64(); i++ {
block := rawdb.ReadBlock(chain.db, rawdb.ReadCanonicalHash(chain.db, i), i)
if block.Transactions().Len() == 0 {
continue
Expand All @@ -676,7 +684,7 @@ func TestTransactionIndices(t *testing.T) {
}
}

for i := uint64(0); i < *tail; i++ {
for i := uint64(0); i < tailValue; i++ {
block := rawdb.ReadBlock(chain.db, rawdb.ReadCanonicalHash(chain.db, i), i)
if block.Transactions().Len() == 0 {
continue
Expand Down Expand Up @@ -721,29 +729,167 @@ func TestTransactionIndices(t *testing.T) {

// Reconstruct a block chain which only reserves limited tx indices
// 128 blocks were previously indexed. Now we add a new block at each test step.
limit := []uint64{130 /* 129 + 1 reserve all */, 64 /* drop stale */, 32 /* shorten history */}
tails := []uint64{0 /* reserve all */, 67 /* 130 - 64 + 1 */, 100 /* 131 - 32 + 1 */}
for i, l := range limit {
conf.TxLookupLimit = l
limits := []uint64{
0, /* tip: 129 reserve all (don't run) */
131, /* tip: 130 reserve all */
140, /* tip: 131 reserve all */
64, /* tip: 132, limit:64 */
32, /* tip: 133, limit:32 */
}
for i, l := range limits {
t.Run(fmt.Sprintf("test-%d, limit: %d", i+1, l), func(t *testing.T) {
conf.TxLookupLimit = l

chain, err := createBlockChain(chainDB, conf, gspec, lastAcceptedHash)
require.NoError(err)

newBlks := blocks2[i : i+1]
_, err = chain.InsertChain(newBlks) // Feed chain a higher block to trigger indices updater.
require.NoError(err)

err = chain.Accept(newBlks[0]) // Accept the block to trigger indices updater.
require.NoError(err)

chain.DrainAcceptorQueue()
time.Sleep(50 * time.Millisecond) // Wait for indices initialisation

chain.Stop()
var tail *uint64
if l == 0 {
tail = nil
} else {
var tl uint64
if chain.CurrentBlock().Number.Uint64() > l {
// tail should be the first block number which is indexed
// i.e the first block number that's in the lookup range
tl = chain.CurrentBlock().Number.Uint64() - l + 1
}
tail = &tl
}

chain, err := createBlockChain(chainDB, conf, gspec, lastAcceptedHash)
require.NoError(err)
check(tail, chain)

lastAcceptedHash = chain.CurrentHeader().Hash()
})
}
}

newBlks := blocks2[i : i+1]
_, err = chain.InsertChain(newBlks) // Feed chain a higher block to trigger indices updater.
func TestTransactionSkipIndexing(t *testing.T) {
// Configure and generate a sample block chain
require := require.New(t)
var (
key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
key2, _ = crypto.HexToECDSA("8a1f9a8f95be41cd7ccb6168179afb4504aefe388d1e14474d32c45c72ce7b7a")
addr1 = crypto.PubkeyToAddress(key1.PublicKey)
addr2 = crypto.PubkeyToAddress(key2.PublicKey)
funds = big.NewInt(10000000000000)
gspec = &Genesis{
Config: &params.ChainConfig{HomesteadBlock: new(big.Int)},
Alloc: GenesisAlloc{addr1: {Balance: funds}},
}
signer = types.LatestSigner(gspec.Config)
)
genDb, blocks, _, err := GenerateChainWithGenesis(gspec, dummy.NewDummyEngine(&TestCallbacks), 5, 10, func(i int, block *BlockGen) {
tx, err := types.SignTx(types.NewTransaction(block.TxNonce(addr1), addr2, big.NewInt(10000), params.TxGas, nil, nil), signer, key1)
require.NoError(err)
block.AddTx(tx)
})
require.NoError(err)

err = chain.Accept(newBlks[0]) // Accept the block to trigger indices updater.
blocks2, _, err := GenerateChain(gspec.Config, blocks[len(blocks)-1], dummy.NewDummyEngine(&TestCallbacks), genDb, 5, 10, func(i int, block *BlockGen) {
tx, err := types.SignTx(types.NewTransaction(block.TxNonce(addr1), addr2, big.NewInt(10000), params.TxGas, nil, nil), signer, key1)
require.NoError(err)
block.AddTx(tx)
})
require.NoError(err)

chain.DrainAcceptorQueue()
time.Sleep(50 * time.Millisecond) // Wait for indices initialisation
checkRemoved := func(tail *uint64, to uint64, chain *BlockChain) {
stored := rawdb.ReadTxIndexTail(chain.db)
var tailValue uint64
if tail == nil {
require.Nil(stored)
tailValue = 0
} else {
require.EqualValues(*tail, *stored, "expected tail %d, got %d", *tail, *stored)
tailValue = *tail
}

chain.Stop()
check(&tails[i], chain)
for i := tailValue; i < to; i++ {
block := rawdb.ReadBlock(chain.db, rawdb.ReadCanonicalHash(chain.db, i), i)
if block.Transactions().Len() == 0 {
continue
}
for _, tx := range block.Transactions() {
index := rawdb.ReadTxLookupEntry(chain.db, tx.Hash())
require.NotNilf(index, "Miss transaction indices, number %d hash %s", i, tx.Hash().Hex())
}
}

for i := uint64(0); i < tailValue; i++ {
block := rawdb.ReadBlock(chain.db, rawdb.ReadCanonicalHash(chain.db, i), i)
if block.Transactions().Len() == 0 {
continue
}
for _, tx := range block.Transactions() {
index := rawdb.ReadTxLookupEntry(chain.db, tx.Hash())
require.Nilf(index, "Transaction indices should be deleted, number %d hash %s", i, tx.Hash().Hex())
}
}

for i := to; i <= chain.CurrentBlock().Number.Uint64(); i++ {
block := rawdb.ReadBlock(chain.db, rawdb.ReadCanonicalHash(chain.db, i), i)
if block.Transactions().Len() == 0 {
continue
}
for _, tx := range block.Transactions() {
index := rawdb.ReadTxLookupEntry(chain.db, tx.Hash())
require.Nilf(index, "Transaction indices should be skipped, number %d hash %s", i, tx.Hash().Hex())
}
}
}

lastAcceptedHash = chain.CurrentHeader().Hash()
conf := &CacheConfig{
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieDirtyCommitTarget: 20,
TriePrefetcherParallelism: 4,
Pruning: true,
CommitInterval: 4096,
SnapshotLimit: 256,
SnapshotNoBuild: true, // Ensure the test errors if snapshot initialization fails
AcceptorQueueLimit: 64,
SkipTxIndexing: true,
}

// test1: Init block chain and check all indices has been skipped.
chainDB := rawdb.NewMemoryDatabase()
chain, err := createAndInsertChain(chainDB, conf, gspec, blocks, common.Hash{})
require.NoError(err)
checkRemoved(nil, 0, chain) // check all indices has been skipped

// test2: specify lookuplimit with tx index skipping enabled. Blocks should not be indexed but tail should be updated.
conf.TxLookupLimit = 2
chain, err = createAndInsertChain(chainDB, conf, gspec, blocks2[0:1], chain.CurrentHeader().Hash())
require.NoError(err)
tail := chain.CurrentBlock().Number.Uint64() - conf.TxLookupLimit + 1
checkRemoved(&tail, 0, chain)

// test3: tx index skipping and unindexer disabled. Blocks should be indexed and tail should be updated.
conf.TxLookupLimit = 0
conf.SkipTxIndexing = false
chainDB = rawdb.NewMemoryDatabase()
chain, err = createAndInsertChain(chainDB, conf, gspec, blocks, common.Hash{})
require.NoError(err)
checkRemoved(nil, chain.CurrentBlock().Number.Uint64()+1, chain) // check all indices has been indexed

// now change tx index skipping to true and check that the indices are skipped for the last block
// and old indices are removed up to the tail, but [tail, current) indices are still there.
conf.TxLookupLimit = 2
conf.SkipTxIndexing = true
chain, err = createAndInsertChain(chainDB, conf, gspec, blocks2[0:1], chain.CurrentHeader().Hash())
require.NoError(err)
tail = chain.CurrentBlock().Number.Uint64() - conf.TxLookupLimit + 1
checkRemoved(&tail, chain.CurrentBlock().Number.Uint64(), chain)
}

// TestCanonicalHashMarker tests all the canonical hash markers are updated/deleted
Expand Down Expand Up @@ -881,6 +1027,30 @@ func TestTxLookupBlockChain(t *testing.T) {
}
}

func TestTxLookupSkipIndexingBlockChain(t *testing.T) {
cacheConf := &CacheConfig{
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieDirtyCommitTarget: 20,
TriePrefetcherParallelism: 4,
Pruning: true,
CommitInterval: 4096,
SnapshotLimit: 256,
SnapshotNoBuild: true, // Ensure the test errors if snapshot initialization fails
AcceptorQueueLimit: 64, // ensure channel doesn't block
TxLookupLimit: 5,
SkipTxIndexing: true,
}
createTxLookupBlockChain := func(db ethdb.Database, gspec *Genesis, lastAcceptedHash common.Hash) (*BlockChain, error) {
return createBlockChain(db, cacheConf, gspec, lastAcceptedHash)
}
for _, tt := range tests {
t.Run(tt.Name, func(t *testing.T) {
tt.testFunc(t, createTxLookupBlockChain)
})
}
}

func TestCreateThenDeletePreByzantium(t *testing.T) {
// We want to use pre-byzantium rules where we have intermediate state roots
// between transactions.
Expand Down Expand Up @@ -1176,3 +1346,26 @@ func TestEIP3651(t *testing.T) {
t.Fatalf("sender balance incorrect: expected %d, got %d", expected, actual)
}
}

func createAndInsertChain(db ethdb.Database, cacheConfig *CacheConfig, gspec *Genesis, blocks types.Blocks, lastAcceptedHash common.Hash) (*BlockChain, error) {
chain, err := createBlockChain(db, cacheConfig, gspec, lastAcceptedHash)
if err != nil {
return nil, err
}
_, err = chain.InsertChain(blocks)
if err != nil {
return nil, err
}
for _, block := range blocks {
err := chain.Accept(block)
if err != nil {
return nil, err
}
}

chain.DrainAcceptorQueue()
time.Sleep(1000 * time.Millisecond) // Wait for indices initialisation

chain.Stop()
return chain, nil
}
1 change: 1 addition & 0 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ func New(
Preimages: config.Preimages,
AcceptedCacheSize: config.AcceptedCacheSize,
TxLookupLimit: config.TxLookupLimit,
SkipTxIndexing: config.SkipTxIndexing,
}
)

Expand Down
5 changes: 5 additions & 0 deletions eth/ethconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,4 +158,9 @@ type Config struct {
// * 0: means no limit
// * N: means N block limit [HEAD-N+1, HEAD] and delete extra indexes
TxLookupLimit uint64

// SkipTxIndexing skips indexing transactions.
// This is useful for validators that don't need to index transactions.
// TxLookupLimit can be still used to control unindexing old transactions.
SkipTxIndexing bool
}
6 changes: 5 additions & 1 deletion plugin/evm/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,11 @@ type Config struct {
// * 0: means no limit
// * N: means N block limit [HEAD-N+1, HEAD] and delete extra indexes
TxLookupLimit uint64 `json:"tx-lookup-limit"`

// SkipTxIndexing skips indexing transactions.
// This is useful for validators that don't need to index transactions.
// TxLookupLimit can be still used to control unindexing old transactions.
SkipTxIndexing bool `json:"skip-tx-indexing"`
}

// EthAPIs returns an array of strings representing the Eth APIs that should be enabled
Expand Down Expand Up @@ -290,7 +295,6 @@ func (c *Config) Validate() error {
if c.Pruning && c.CommitInterval == 0 {
return fmt.Errorf("cannot use commit interval of 0 with pruning enabled")
}

return nil
}

Expand Down
1 change: 1 addition & 0 deletions plugin/evm/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,7 @@ func (vm *VM) Initialize(
vm.ethConfig.SkipUpgradeCheck = vm.config.SkipUpgradeCheck
vm.ethConfig.AcceptedCacheSize = vm.config.AcceptedCacheSize
vm.ethConfig.TxLookupLimit = vm.config.TxLookupLimit
vm.ethConfig.SkipTxIndexing = vm.config.SkipTxIndexing

// Create directory for offline pruning
if len(vm.ethConfig.OfflinePruningDataDirectory) != 0 {
Expand Down