Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eth/filters: fix pending for getLogs #24949

Merged
merged 3 commits into from Jun 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
25 changes: 16 additions & 9 deletions accounts/abi/bind/backends/simulated.go
Expand Up @@ -63,9 +63,10 @@ type SimulatedBackend struct {
database ethdb.Database // In memory database to store our testing data
blockchain *core.BlockChain // Ethereum blockchain to handle the consensus

mu sync.Mutex
pendingBlock *types.Block // Currently pending block that will be imported on request
pendingState *state.StateDB // Currently pending state that will be the active on request
mu sync.Mutex
pendingBlock *types.Block // Currently pending block that will be imported on request
pendingState *state.StateDB // Currently pending state that will be the active on request
pendingReceipts types.Receipts // Currently receipts for the pending block

events *filters.EventSystem // Event system for filtering log events live

Expand All @@ -84,8 +85,8 @@ func NewSimulatedBackendWithDatabase(database ethdb.Database, alloc core.Genesis
database: database,
blockchain: blockchain,
config: genesis.Config,
events: filters.NewEventSystem(&filterBackend{database, blockchain}, false),
}
backend.events = filters.NewEventSystem(&filterBackend{database, blockchain, backend}, false)
backend.rollback(blockchain.CurrentBlock())
return backend
}
Expand Down Expand Up @@ -662,7 +663,7 @@ func (b *SimulatedBackend) SendTransaction(ctx context.Context, tx *types.Transa
return fmt.Errorf("invalid transaction nonce: got %d, want %d", tx.Nonce(), nonce)
}
// Include tx in chain
blocks, _ := core.GenerateChain(b.config, block, ethash.NewFaker(), b.database, 1, func(number int, block *core.BlockGen) {
blocks, receipts := core.GenerateChain(b.config, block, ethash.NewFaker(), b.database, 1, func(number int, block *core.BlockGen) {
for _, tx := range b.pendingBlock.Transactions() {
block.AddTxWithChain(b.blockchain, tx)
}
Expand All @@ -672,6 +673,7 @@ func (b *SimulatedBackend) SendTransaction(ctx context.Context, tx *types.Transa

b.pendingBlock = blocks[0]
b.pendingState, _ = state.New(b.pendingBlock.Root(), stateDB.Database(), nil)
b.pendingReceipts = receipts[0]
return nil
}

Expand All @@ -683,7 +685,7 @@ func (b *SimulatedBackend) FilterLogs(ctx context.Context, query ethereum.Filter
var filter *filters.Filter
if query.BlockHash != nil {
// Block filter requested, construct a single-shot filter
filter = filters.NewBlockFilter(&filterBackend{b.database, b.blockchain}, *query.BlockHash, query.Addresses, query.Topics)
filter = filters.NewBlockFilter(&filterBackend{b.database, b.blockchain, b}, *query.BlockHash, query.Addresses, query.Topics)
} else {
// Initialize unset filter boundaries to run from genesis to chain head
from := int64(0)
Expand All @@ -695,7 +697,7 @@ func (b *SimulatedBackend) FilterLogs(ctx context.Context, query ethereum.Filter
to = query.ToBlock.Int64()
}
// Construct the range filter
filter = filters.NewRangeFilter(&filterBackend{b.database, b.blockchain}, from, to, query.Addresses, query.Topics)
filter = filters.NewRangeFilter(&filterBackend{b.database, b.blockchain, b}, from, to, query.Addresses, query.Topics)
}
// Run the filter and return all the logs
logs, err := filter.Logs(ctx)
Expand Down Expand Up @@ -816,8 +818,9 @@ func (m callMsg) AccessList() types.AccessList { return m.CallMsg.AccessList }
// filterBackend implements filters.Backend to support filtering for logs without
// taking bloom-bits acceleration structures into account.
type filterBackend struct {
db ethdb.Database
bc *core.BlockChain
db ethdb.Database
bc *core.BlockChain
backend *SimulatedBackend
}

func (fb *filterBackend) ChainDb() ethdb.Database { return fb.db }
Expand All @@ -834,6 +837,10 @@ func (fb *filterBackend) HeaderByHash(ctx context.Context, hash common.Hash) (*t
return fb.bc.GetHeaderByHash(hash), nil
}

func (fb *filterBackend) PendingBlockAndReceipts() (*types.Block, types.Receipts) {
return fb.backend.pendingBlock, fb.backend.pendingReceipts
}

func (fb *filterBackend) GetReceipts(ctx context.Context, hash common.Hash) (types.Receipts, error) {
number := rawdb.ReadHeaderNumber(fb.db, hash)
if number == nil {
Expand Down
46 changes: 38 additions & 8 deletions eth/filters/filter.go
Expand Up @@ -36,6 +36,7 @@ type Backend interface {
HeaderByHash(ctx context.Context, blockHash common.Hash) (*types.Header, error)
GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error)
GetLogs(ctx context.Context, blockHash common.Hash) ([][]*types.Log, error)
PendingBlockAndReceipts() (*types.Block, types.Receipts)

SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription
SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
Expand Down Expand Up @@ -128,26 +129,35 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
}
return f.blockLogs(ctx, header)
}
// Short-cut if all we care about is pending logs
if f.begin == rpc.PendingBlockNumber.Int64() {
if f.end != rpc.PendingBlockNumber.Int64() {
return nil, errors.New("invalid block range")
}
return f.pendingLogs()
}
// Figure out the limits of the filter range
header, _ := f.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber)
if header == nil {
return nil, nil
}
head := header.Number.Uint64()

if f.begin == -1 {
var (
head = header.Number.Uint64()
end = uint64(f.end)
pending = f.end == rpc.PendingBlockNumber.Int64()
)
if f.begin == rpc.LatestBlockNumber.Int64() {
f.begin = int64(head)
}
end := uint64(f.end)
if f.end == -1 {
if f.end == rpc.LatestBlockNumber.Int64() || f.end == rpc.PendingBlockNumber.Int64() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These need to be fixed, if I request filtering only the pending block, these clauses will force the latest non-pending block to be also included in the dump.

end = head
}
// Gather all indexed logs, and finish with non indexed ones
var (
logs []*types.Log
err error
logs []*types.Log
err error
size, sections = f.backend.BloomStatus()
)
size, sections := f.backend.BloomStatus()
if indexed := sections * size; indexed > uint64(f.begin) {
if indexed > end {
logs, err = f.indexedLogs(ctx, end)
Expand All @@ -160,6 +170,13 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
}
rest, err := f.unindexedLogs(ctx, end)
logs = append(logs, rest...)
if pending {
pendingLogs, err := f.pendingLogs()
if err != nil {
return nil, err
}
logs = append(logs, pendingLogs...)
}
return logs, err
}

Expand Down Expand Up @@ -272,6 +289,19 @@ func (f *Filter) checkMatches(ctx context.Context, header *types.Header) (logs [
return nil, nil
}

// pendingLogs returns the logs matching the filter criteria within the pending block.
func (f *Filter) pendingLogs() ([]*types.Log, error) {
block, receipts := f.backend.PendingBlockAndReceipts()
if bloomFilter(block.Bloom(), f.addresses, f.topics) {
var unfiltered []*types.Log
for _, r := range receipts {
unfiltered = append(unfiltered, r.Logs...)
}
return filterLogs(unfiltered, nil, nil, f.addresses, f.topics), nil
}
return nil, nil
}

func includes(addresses []common.Address, a common.Address) bool {
for _, addr := range addresses {
if addr == a {
Expand Down
4 changes: 4 additions & 0 deletions eth/filters/filter_system_test.go
Expand Up @@ -106,6 +106,10 @@ func (b *testBackend) GetLogs(ctx context.Context, hash common.Hash) ([][]*types
return logs, nil
}

func (b *testBackend) PendingBlockAndReceipts() (*types.Block, types.Receipts) {
return nil, nil
}

func (b *testBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription {
return b.txFeed.Subscribe(ch)
}
Expand Down
1 change: 1 addition & 0 deletions internal/ethapi/backend.go
Expand Up @@ -65,6 +65,7 @@ type Backend interface {
BlockByNumberOrHash(ctx context.Context, blockNrOrHash rpc.BlockNumberOrHash) (*types.Block, error)
StateAndHeaderByNumber(ctx context.Context, number rpc.BlockNumber) (*state.StateDB, *types.Header, error)
StateAndHeaderByNumberOrHash(ctx context.Context, blockNrOrHash rpc.BlockNumberOrHash) (*state.StateDB, *types.Header, error)
PendingBlockAndReceipts() (*types.Block, types.Receipts)
GetReceipts(ctx context.Context, hash common.Hash) (types.Receipts, error)
GetTd(ctx context.Context, hash common.Hash) *big.Int
GetEVM(ctx context.Context, msg core.Message, state *state.StateDB, header *types.Header, vmConfig *vm.Config) (*vm.EVM, func() error, error)
Expand Down