From b068f9d49154ca30749dd6db266c640194c81bee Mon Sep 17 00:00:00 2001 From: hackyminer Date: Sun, 10 Feb 2019 05:01:55 +0900 Subject: [PATCH] eth/filters: support reverse ordered logs --- core/bloombits/matcher.go | 30 +++++++++++++---- eth/filters/filter.go | 64 +++++++++++++++++++++++++++--------- eth/filters/filter_system.go | 9 +++-- 3 files changed, 79 insertions(+), 24 deletions(-) diff --git a/core/bloombits/matcher.go b/core/bloombits/matcher.go index 3ec0d5ae94e54..3a93bb6b89a0b 100644 --- a/core/bloombits/matcher.go +++ b/core/bloombits/matcher.go @@ -163,6 +163,12 @@ func (m *Matcher) Start(ctx context.Context, begin, end uint64, results chan uin } sink := m.run(begin, end, cap(results), session) + start := begin + stop := end + if start > stop { + start, stop = stop, start + } + // Read the output from the result sink and deliver to the user session.pend.Add(1) go func() { @@ -183,13 +189,14 @@ func (m *Matcher) Start(ctx context.Context, begin, end uint64, results chan uin sectionStart := res.section * m.sectionSize first := sectionStart - if begin > first { - first = begin + if start > first { + first = start } last := sectionStart + m.sectionSize - 1 - if end < last { - last = end + if stop < last { + last = stop } + // Iterate over all the blocks in the section and return the matching ones for i := first; i <= last; i++ { // Skip the entire byte if no matches are found inside (and we're processing an entire byte!) @@ -231,13 +238,24 @@ func (m *Matcher) run(begin, end uint64, buffer int, session *MatcherSession) ch defer session.pend.Done() defer close(source) - for i := begin / m.sectionSize; i <= end/m.sectionSize; i++ { + inc := int64(1) + if begin > end { + inc = -1 + } + + i := int64(begin / m.sectionSize) + for ; i != int64(end/m.sectionSize); i += inc { select { case <-session.quit: return - case source <- &partialMatches{i, bytes.Repeat([]byte{0xff}, int(m.sectionSize/8))}: + case source <- &partialMatches{uint64(i), bytes.Repeat([]byte{0xff}, int(m.sectionSize/8))}: } } + select { + case <-session.quit: + return + case source <- &partialMatches{uint64(i), bytes.Repeat([]byte{0xff}, int(m.sectionSize/8))}: + } }() // Assemble the daisy-chained filtering pipeline next := source diff --git a/eth/filters/filter.go b/eth/filters/filter.go index 071613ad7a158..dfcfd6cd8496a 100644 --- a/eth/filters/filter.go +++ b/eth/filters/filter.go @@ -135,8 +135,9 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) { } head := header.Number.Uint64() + begin := uint64(f.begin) if f.begin == -1 { - f.begin = int64(head) + begin = head } end := uint64(f.end) if f.end == -1 { @@ -145,31 +146,57 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) { // Gather all indexed logs, and finish with non indexed ones var ( logs []*types.Log + rest []*types.Log err error ) size, sections := f.backend.BloomStatus() - if indexed := sections * size; indexed > uint64(f.begin) { - if indexed > end { - logs, err = f.indexedLogs(ctx, end) - } else { - logs, err = f.indexedLogs(ctx, indexed-1) + + indexed := sections * size + if begin > end { + rest, err = f.unindexedLogs(ctx, begin, indexed) + if err != nil { + return rest, err + } + } + if indexed > begin || indexed > end { + if indexed > begin && indexed > end { + logs, err = f.indexedLogs(ctx, begin, end) + } else if begin < end && end >= indexed { + logs, err = f.indexedLogs(ctx, begin, indexed-1) + } else if begin > end && begin >= indexed { + logs, err = f.indexedLogs(ctx, indexed-1, end) } if err != nil { + if len(rest) > 0 { + logs = append(rest, logs...) + } return logs, err } } - rest, err := f.unindexedLogs(ctx, end) - logs = append(logs, rest...) + + if end >= begin { + rest, err = f.unindexedLogs(ctx, uint64(f.begin), end) + logs = append(logs, rest...) + } else { + logs = append(rest, logs...) + } return logs, err } // indexedLogs returns the logs matching the filter criteria based on the bloom // bits indexed available locally or via the network. -func (f *Filter) indexedLogs(ctx context.Context, end uint64) ([]*types.Log, error) { +func (f *Filter) indexedLogs(ctx context.Context, begin, end uint64) ([]*types.Log, error) { // Create a matcher session and request servicing from the backend matches := make(chan uint64, 64) - session, err := f.matcher.Start(ctx, uint64(f.begin), end, matches) + start := int64(begin) + stop := int64(end) + inc := int64(1) + if start > stop { + inc = -1 + } + + session, err := f.matcher.Start(ctx, uint64(start), uint64(stop), matches) if err != nil { return nil, err } @@ -187,11 +214,11 @@ func (f *Filter) indexedLogs(ctx context.Context, end uint64) ([]*types.Log, err if !ok { err := session.Error() if err == nil { - f.begin = int64(end) + 1 + f.begin = stop + inc } return logs, err } - f.begin = int64(number) + 1 + f.begin = int64(number) + inc // Retrieve the suggested block and pull any truly matching logs header, err := f.backend.HeaderByNumber(ctx, rpc.BlockNumber(number)) @@ -212,11 +239,18 @@ func (f *Filter) indexedLogs(ctx context.Context, end uint64) ([]*types.Log, err // indexedLogs returns the logs matching the filter criteria based on raw block // iteration and bloom matching. -func (f *Filter) unindexedLogs(ctx context.Context, end uint64) ([]*types.Log, error) { +func (f *Filter) unindexedLogs(ctx context.Context, begin, end uint64) ([]*types.Log, error) { var logs []*types.Log - for ; f.begin <= int64(end); f.begin++ { - header, err := f.backend.HeaderByNumber(ctx, rpc.BlockNumber(f.begin)) + start := int64(begin) + stop := int64(end) + inc := int64(1) + if start > stop { + inc = -1 + } + + for ; start != stop; start += inc { + header, err := f.backend.HeaderByNumber(ctx, rpc.BlockNumber(start)) if header == nil || err != nil { return logs, err } diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index 70139c1a96ece..5f57777542b67 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -194,7 +194,7 @@ func (es *EventSystem) subscribe(sub *subscription) *Subscription { // SubscribeLogs creates a subscription that will write all logs matching the // given criteria to the given logs channel. Default value for the from and to -// block is "latest". If the fromBlock > toBlock an error is returned. +// block is "latest". func (es *EventSystem) SubscribeLogs(crit ethereum.FilterQuery, logs chan []*types.Log) (*Subscription, error) { var from, to rpc.BlockNumber if crit.FromBlock == nil { @@ -217,7 +217,7 @@ func (es *EventSystem) SubscribeLogs(crit ethereum.FilterQuery, logs chan []*typ return es.subscribeLogs(crit, logs), nil } // only interested in mined logs within a specific block range - if from >= 0 && to >= 0 && to >= from { + if from >= 0 && to >= 0 { return es.subscribeLogs(crit, logs), nil } // interested in mined logs from a specific block number, new logs and pending logs @@ -228,7 +228,10 @@ func (es *EventSystem) SubscribeLogs(crit ethereum.FilterQuery, logs chan []*typ if from >= 0 && to == rpc.LatestBlockNumber { return es.subscribeLogs(crit, logs), nil } - return nil, fmt.Errorf("invalid from and to block combination: from > to") + if to >= 0 && from == rpc.LatestBlockNumber { + return es.subscribeLogs(crit, logs), nil + } + return nil, fmt.Errorf("invalid from and to block combination") } // subscribeMinedPendingLogs creates a subscription that returned mined and