Skip to content

Commit

Permalink
shovel: bugfix multiple filters per block/event (#263)
Browse files Browse the repository at this point in the history
Prior to this commit, Shovel would return early when it encountered a
failed filter check. However, this is not how Shovel is supposed to work.
It is supposed to check all filters and pass the tx/event if at least
one of the filters evaluated to true.
  • Loading branch information
ryandotsmith committed Jul 2, 2024
1 parent 2096580 commit 3410a68
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 60 deletions.
120 changes: 62 additions & 58 deletions dig/dig.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,10 +386,11 @@ type Filter struct {
Ref Ref `json:"filter_ref"`
}

func (f Filter) Accept(ctx context.Context, pgmut *sync.Mutex, pg wpg.Conn, d any) (bool, error) {
func (f Filter) Accept(ctx context.Context, pgmut *sync.Mutex, pg wpg.Conn, d any, frs *filterResults) error {
if len(f.Arg) == 0 && len(f.Ref.Integration) == 0 {
return true, nil
return nil
}

switch v := d.(type) {
case eth.Bytes:
d = []byte(v)
Expand All @@ -398,9 +399,9 @@ func (f Filter) Accept(ctx context.Context, pgmut *sync.Mutex, pg wpg.Conn, d an
}
switch v := d.(type) {
case []byte:
var res bool
switch {
case strings.HasSuffix(f.Op, "contains"):
var res bool
switch {
case len(f.Ref.Table) > 0:
q := fmt.Sprintf(
Expand All @@ -416,7 +417,7 @@ func (f Filter) Accept(ctx context.Context, pgmut *sync.Mutex, pg wpg.Conn, d an
res = false
case err != nil:
const tag = "filter using reference (%s %s): %w"
return false, fmt.Errorf(tag, f.Ref.Table, f.Ref.Column, err)
return fmt.Errorf(tag, f.Ref.Table, f.Ref.Column, err)
}
default:
for i := range f.Arg {
Expand All @@ -427,67 +428,66 @@ func (f Filter) Accept(ctx context.Context, pgmut *sync.Mutex, pg wpg.Conn, d an
}
}
if strings.HasPrefix(f.Op, "!") {
return !res, nil
res = !res
}
return res, nil
frs.add(res)
case f.Op == "eq" || f.Op == "ne":
var res bool
for i := range f.Arg {
if bytes.Contains(v, eth.DecodeHex(f.Arg[i])) {
res = true
break
}
}
if f.Op == "ne" {
return !res, nil
res = !res
}
return res, nil
default:
return true, nil
res = true
}
frs.add(res)
case string:
switch f.Op {
case "contains":
return slices.Contains(f.Arg, v), nil
frs.add(slices.Contains(f.Arg, v))
case "!contains":
return !slices.Contains(f.Arg, v), nil
frs.add(!slices.Contains(f.Arg, v))
case "eq":
return v == f.Arg[0], nil
frs.add(v == f.Arg[0])
case "ne":
return v != f.Arg[0], nil
frs.add(v != f.Arg[0])
}
case uint64:
i, err := strconv.ParseUint(f.Arg[0], 10, 64)
if err != nil {
return false, fmt.Errorf("unable to convert filter arg to int: %q", f.Arg[0])
return fmt.Errorf("unable to convert filter arg to int: %q", f.Arg[0])
}
switch f.Op {
case "eq":
return v == i, nil
frs.add(v == i)
case "ne":
return v != i, nil
frs.add(v != i)
case "gt":
return v > i, nil
frs.add(v > i)
case "lt":
return v < i, nil
frs.add(v < i)
}
case *uint256.Int:
i := &uint256.Int{}
if err := i.SetFromDecimal(f.Arg[0]); err != nil {
return false, fmt.Errorf("unable to convert filter arg dec to uint256: %q", f.Arg[0])
return fmt.Errorf("unable to convert filter arg dec to uint256: %q", f.Arg[0])
}
switch f.Op {
case "eq":
return v.Cmp(i) == 0, nil
frs.add(v.Cmp(i) == 0)
case "ne":
return v.Cmp(i) != 0, nil
frs.add(v.Cmp(i) != 0)
case "gt":
return v.Cmp(i) == 1, nil
frs.add(v.Cmp(i) == 1)
case "lt":
return v.Cmp(i) == -1, nil
frs.add(v.Cmp(i) == -1)
}
}
return true, nil
return nil
}

func parseArray(elm atype, s string) atype {
Expand Down Expand Up @@ -949,29 +949,47 @@ func (lwc *logWithCtx) get(name string) any {
}
}

type filterResults struct {
set bool
val bool
}

func (fr *filterResults) add(b bool) {
fr.set = true
if !fr.val {
fr.val = b
}
}

func (fr *filterResults) accept() bool {
if !fr.set {
return true
}
return fr.val
}

func (ig Integration) processTx(rows [][]any, lwc *logWithCtx, pgmut *sync.Mutex, pg wpg.Conn) ([][]any, bool, error) {
switch {
case ig.numSelected > 0:
return rows, false, nil
case ig.numBDSelected > 0:
frs := filterResults{}
row := make([]any, len(ig.coldefs))
for i, def := range ig.coldefs {
switch {
case !def.BlockData.Empty():
d := lwc.get(def.BlockData.Name)
accept, err := def.BlockData.Accept(lwc.ctx, pgmut, pg, d)
if err != nil {
if err := def.BlockData.Accept(lwc.ctx, pgmut, pg, d, &frs); err != nil {
return nil, false, fmt.Errorf("checking filter: %w", err)
}
if !accept {
return rows, true, nil
}
row[i] = d
default:
return rows, false, fmt.Errorf("expected only blockdata coldef")
}
}
rows = append(rows, row)
if frs.accept() {
rows = append(rows, row)
}
}
return rows, true, nil
}
Expand All @@ -989,18 +1007,15 @@ func (ig Integration) processLog(rows [][]any, lwc *logWithCtx, pgmut *sync.Mute
}
for i := 0; i < ig.resultCache.Len(); i++ {
ictr, actr := 1, 0
frs := filterResults{}
row := make([]any, len(ig.coldefs))
for j, def := range ig.coldefs {
switch {
case def.Input.Indexed:
d := dbtype(def.Input.Type, lwc.l.Topics[ictr])
accept, err := def.Input.Accept(lwc.ctx, pgmut, pg, d)
if err != nil {
if err := def.Input.Accept(lwc.ctx, pgmut, pg, d, &frs); err != nil {
return nil, fmt.Errorf("checking filter: %w", err)
}
if !accept {
return rows, nil
}
row[j] = d
ictr++
case !def.BlockData.Empty():
Expand All @@ -1010,59 +1025,48 @@ func (ig Integration) processLog(rows [][]any, lwc *logWithCtx, pgmut *sync.Mute
d = i
default:
d = lwc.get(def.BlockData.Name)
accept, err := def.BlockData.Accept(lwc.ctx, pgmut, pg, d)
if err != nil {
if err := def.BlockData.Accept(lwc.ctx, pgmut, pg, d, &frs); err != nil {
return nil, fmt.Errorf("checking filter: %w", err)
}
if !accept {
return rows, nil
}
}
row[j] = d
default:
d := dbtype(def.Input.Type, ig.resultCache.At(i)[actr])
accept, err := def.Input.Accept(lwc.ctx, pgmut, pg, d)
if err != nil {
if err := def.Input.Accept(lwc.ctx, pgmut, pg, d, &frs); err != nil {
return nil, fmt.Errorf("checking filter: %w", err)
}
if !accept {
return rows, nil
}
row[j] = d
actr++
}
}
rows = append(rows, row)
if frs.accept() {
rows = append(rows, row)
}
}
default:
frs := filterResults{}
row := make([]any, len(ig.coldefs))
for i, def := range ig.coldefs {
switch {
case def.Input.Indexed:
d := dbtype(def.Input.Type, lwc.l.Topics[1+i])
accept, err := def.Input.Accept(lwc.ctx, pgmut, pg, d)
if err != nil {
if err := def.Input.Accept(lwc.ctx, pgmut, pg, d, &frs); err != nil {
return nil, fmt.Errorf("checking filter: %w", err)
}
if !accept {
return rows, nil
}
row[i] = d
case !def.BlockData.Empty():
d := lwc.get(def.BlockData.Name)
accept, err := def.BlockData.Accept(lwc.ctx, pgmut, pg, d)
if err != nil {
if err := def.BlockData.Accept(lwc.ctx, pgmut, pg, d, &frs); err != nil {
return nil, fmt.Errorf("checking filter: %w", err)
}
if !accept {
return rows, nil
}
row[i] = d
default:
return nil, fmt.Errorf("no rows for un-indexed data")
}
}
rows = append(rows, row)
if frs.accept() {
rows = append(rows, row)
}
}
return rows, nil
}
Expand Down
5 changes: 3 additions & 2 deletions dig/dig_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,8 +457,9 @@ func TestFilter(t *testing.T) {
},
}
for _, c := range cases {
got, err := c.f.Accept(context.Background(), mt, pg, c.d)
frs := filterResults{}
err := c.f.Accept(context.Background(), mt, pg, c.d, &frs)
tc.NoErr(t, err)
tc.WantGot(t, c.want, got)
tc.WantGot(t, c.want, frs.accept())
}
}
1 change: 1 addition & 0 deletions indexsupply.com/shovel/docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ The following resources are automatically deployed on a main commit:

On main but not yet associated with a new version tag.

- fix multiple filters per block/event
- fix filter operations on trace_action_value
- empty decoded bytes are stored as an empty byte array instead of NULL
- accept multiple URLs per source for redundancy
Expand Down

0 comments on commit 3410a68

Please sign in to comment.