Skip to content

Commit

Permalink
eth/filters: support reverse ordered logs
Browse files Browse the repository at this point in the history
  • Loading branch information
hackmod committed Feb 10, 2019
1 parent 27e3f96 commit b068f9d
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 24 deletions.
30 changes: 24 additions & 6 deletions core/bloombits/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,12 @@ func (m *Matcher) Start(ctx context.Context, begin, end uint64, results chan uin
}
sink := m.run(begin, end, cap(results), session)

start := begin
stop := end
if start > stop {
start, stop = stop, start
}

// Read the output from the result sink and deliver to the user
session.pend.Add(1)
go func() {
Expand All @@ -183,13 +189,14 @@ func (m *Matcher) Start(ctx context.Context, begin, end uint64, results chan uin
sectionStart := res.section * m.sectionSize

first := sectionStart
if begin > first {
first = begin
if start > first {
first = start
}
last := sectionStart + m.sectionSize - 1
if end < last {
last = end
if stop < last {
last = stop
}

// Iterate over all the blocks in the section and return the matching ones
for i := first; i <= last; i++ {
// Skip the entire byte if no matches are found inside (and we're processing an entire byte!)
Expand Down Expand Up @@ -231,13 +238,24 @@ func (m *Matcher) run(begin, end uint64, buffer int, session *MatcherSession) ch
defer session.pend.Done()
defer close(source)

for i := begin / m.sectionSize; i <= end/m.sectionSize; i++ {
inc := int64(1)
if begin > end {
inc = -1
}

i := int64(begin / m.sectionSize)
for ; i != int64(end/m.sectionSize); i += inc {
select {
case <-session.quit:
return
case source <- &partialMatches{i, bytes.Repeat([]byte{0xff}, int(m.sectionSize/8))}:
case source <- &partialMatches{uint64(i), bytes.Repeat([]byte{0xff}, int(m.sectionSize/8))}:
}
}
select {
case <-session.quit:
return
case source <- &partialMatches{uint64(i), bytes.Repeat([]byte{0xff}, int(m.sectionSize/8))}:
}
}()
// Assemble the daisy-chained filtering pipeline
next := source
Expand Down
64 changes: 49 additions & 15 deletions eth/filters/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,9 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
}
head := header.Number.Uint64()

begin := uint64(f.begin)
if f.begin == -1 {
f.begin = int64(head)
begin = head
}
end := uint64(f.end)
if f.end == -1 {
Expand All @@ -145,31 +146,57 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
// Gather all indexed logs, and finish with non indexed ones
var (
logs []*types.Log
rest []*types.Log
err error
)
size, sections := f.backend.BloomStatus()
if indexed := sections * size; indexed > uint64(f.begin) {
if indexed > end {
logs, err = f.indexedLogs(ctx, end)
} else {
logs, err = f.indexedLogs(ctx, indexed-1)

indexed := sections * size
if begin > end {
rest, err = f.unindexedLogs(ctx, begin, indexed)
if err != nil {
return rest, err
}
}
if indexed > begin || indexed > end {
if indexed > begin && indexed > end {
logs, err = f.indexedLogs(ctx, begin, end)
} else if begin < end && end >= indexed {
logs, err = f.indexedLogs(ctx, begin, indexed-1)
} else if begin > end && begin >= indexed {
logs, err = f.indexedLogs(ctx, indexed-1, end)
}
if err != nil {
if len(rest) > 0 {
logs = append(rest, logs...)
}
return logs, err
}
}
rest, err := f.unindexedLogs(ctx, end)
logs = append(logs, rest...)

if end >= begin {
rest, err = f.unindexedLogs(ctx, uint64(f.begin), end)
logs = append(logs, rest...)
} else {
logs = append(rest, logs...)
}
return logs, err
}

// indexedLogs returns the logs matching the filter criteria based on the bloom
// bits indexed available locally or via the network.
func (f *Filter) indexedLogs(ctx context.Context, end uint64) ([]*types.Log, error) {
func (f *Filter) indexedLogs(ctx context.Context, begin, end uint64) ([]*types.Log, error) {
// Create a matcher session and request servicing from the backend
matches := make(chan uint64, 64)

session, err := f.matcher.Start(ctx, uint64(f.begin), end, matches)
start := int64(begin)
stop := int64(end)
inc := int64(1)
if start > stop {
inc = -1
}

session, err := f.matcher.Start(ctx, uint64(start), uint64(stop), matches)
if err != nil {
return nil, err
}
Expand All @@ -187,11 +214,11 @@ func (f *Filter) indexedLogs(ctx context.Context, end uint64) ([]*types.Log, err
if !ok {
err := session.Error()
if err == nil {
f.begin = int64(end) + 1
f.begin = stop + inc
}
return logs, err
}
f.begin = int64(number) + 1
f.begin = int64(number) + inc

// Retrieve the suggested block and pull any truly matching logs
header, err := f.backend.HeaderByNumber(ctx, rpc.BlockNumber(number))
Expand All @@ -212,11 +239,18 @@ func (f *Filter) indexedLogs(ctx context.Context, end uint64) ([]*types.Log, err

// indexedLogs returns the logs matching the filter criteria based on raw block
// iteration and bloom matching.
func (f *Filter) unindexedLogs(ctx context.Context, end uint64) ([]*types.Log, error) {
func (f *Filter) unindexedLogs(ctx context.Context, begin, end uint64) ([]*types.Log, error) {
var logs []*types.Log

for ; f.begin <= int64(end); f.begin++ {
header, err := f.backend.HeaderByNumber(ctx, rpc.BlockNumber(f.begin))
start := int64(begin)
stop := int64(end)
inc := int64(1)
if start > stop {
inc = -1
}

for ; start != stop; start += inc {
header, err := f.backend.HeaderByNumber(ctx, rpc.BlockNumber(start))
if header == nil || err != nil {
return logs, err
}
Expand Down
9 changes: 6 additions & 3 deletions eth/filters/filter_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func (es *EventSystem) subscribe(sub *subscription) *Subscription {

// SubscribeLogs creates a subscription that will write all logs matching the
// given criteria to the given logs channel. Default value for the from and to
// block is "latest". If the fromBlock > toBlock an error is returned.
// block is "latest".
func (es *EventSystem) SubscribeLogs(crit ethereum.FilterQuery, logs chan []*types.Log) (*Subscription, error) {
var from, to rpc.BlockNumber
if crit.FromBlock == nil {
Expand All @@ -217,7 +217,7 @@ func (es *EventSystem) SubscribeLogs(crit ethereum.FilterQuery, logs chan []*typ
return es.subscribeLogs(crit, logs), nil
}
// only interested in mined logs within a specific block range
if from >= 0 && to >= 0 && to >= from {
if from >= 0 && to >= 0 {
return es.subscribeLogs(crit, logs), nil
}
// interested in mined logs from a specific block number, new logs and pending logs
Expand All @@ -228,7 +228,10 @@ func (es *EventSystem) SubscribeLogs(crit ethereum.FilterQuery, logs chan []*typ
if from >= 0 && to == rpc.LatestBlockNumber {
return es.subscribeLogs(crit, logs), nil
}
return nil, fmt.Errorf("invalid from and to block combination: from > to")
if to >= 0 && from == rpc.LatestBlockNumber {
return es.subscribeLogs(crit, logs), nil
}
return nil, fmt.Errorf("invalid from and to block combination")
}

// subscribeMinedPendingLogs creates a subscription that returned mined and
Expand Down

0 comments on commit b068f9d

Please sign in to comment.