Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compatibility fixes/improvements for JSON/RPC filter polling #641

Merged
merged 6 commits into from
Jul 29, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ require (
github.com/klauspost/compress v1.15.5 // indirect
github.com/mattn/go-colorable v0.1.12 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/umbracle/ethgo v0.1.3
github.com/umbracle/ethgo v0.1.4-0.20220722090909-c8ac32939570
github.com/valyala/fastjson v1.6.3 // indirect
go.uber.org/zap v1.21.0 // indirect
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1174,6 +1174,8 @@ github.com/tyler-smith/go-bip39 v1.1.0/go.mod h1:gUYDtqQw1JS3ZJ8UWVcGTGqqr6YIN3C
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
github.com/umbracle/ethgo v0.1.3 h1:s8D7Rmphnt71zuqrgsGTMS5gTNbueGO1zKLh7qsFzTM=
github.com/umbracle/ethgo v0.1.3/go.mod h1:g9zclCLixH8liBI27Py82klDkW7Oo33AxUOr+M9lzrU=
github.com/umbracle/ethgo v0.1.4-0.20220722090909-c8ac32939570 h1:/KyTftQQhxq0iRIVRocn0F2D4zoHmstIfB4FTDjsZbw=
github.com/umbracle/ethgo v0.1.4-0.20220722090909-c8ac32939570/go.mod h1:g9zclCLixH8liBI27Py82klDkW7Oo33AxUOr+M9lzrU=
github.com/umbracle/fastrlp v0.0.0-20220527094140-59d5dd30e722 h1:10Nbw6cACsnQm7r34zlpJky+IzxVLRk6MKTS2d3Vp0E=
github.com/umbracle/fastrlp v0.0.0-20220527094140-59d5dd30e722/go.mod h1:c8J0h9aULj2i3umrfyestM6jCq0LK0U6ly6bWy96nd4=
github.com/umbracle/go-eth-bn256 v0.0.0-20190607160430-b36caf4e0f6b h1:t3nz9xXkLZJz+ZlTGFT3ixsCGO5AHx1Yift2EAfjnnc=
Expand Down
63 changes: 41 additions & 22 deletions jsonrpc/filter_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"encoding/json"
"errors"
"fmt"
"strings"
"sync"
"time"

Expand All @@ -17,7 +16,7 @@ import (
)

var (
ErrFilterDoesNotExists = errors.New("filter does not exists")
peterbroadhurst marked this conversation as resolved.
Show resolved Hide resolved
ErrFilterDoesNotExists = errors.New("filter not found")
peterbroadhurst marked this conversation as resolved.
Show resolved Hide resolved
ErrWSFilterDoesNotSupportGetChanges = errors.New("web socket Filter doesn't support to return a batch of the changes")
ErrCastingFilterToLogFilter = errors.New("casting filter object to logFilter error")
ErrBlockNotFound = errors.New("block not found")
Expand All @@ -41,8 +40,8 @@ type filter interface {
// getFilterBase returns filterBase that has common fields
getFilterBase() *filterBase

// getUpdates returns stored data in string
getUpdates() (string, error)
// getUpdates returns stored data in a JSON serializable form
getUpdates() (interface{}, error)

// sendUpdates write stored data to web socket stream
sendUpdates() error
Expand Down Expand Up @@ -120,15 +119,15 @@ func (f *blockFilter) takeBlockUpdates() []*types.Header {
}

// getUpdates returns updates of blocks in string
func (f *blockFilter) getUpdates() (string, error) {
func (f *blockFilter) getUpdates() (interface{}, error) {
headers := f.takeBlockUpdates()

updates := []string{}
for _, header := range headers {
updates = append(updates, header.Hash.String())
}

return fmt.Sprintf("[\"%s\"]", strings.Join(updates, "\",\"")), nil
return updates, nil
}

// sendUpdates writes the updates of blocks to web socket stream
Expand Down Expand Up @@ -177,15 +176,10 @@ func (f *logFilter) takeLogUpdates() []*Log {
}

// getUpdates returns stored logs in string
func (f *logFilter) getUpdates() (string, error) {
func (f *logFilter) getUpdates() (interface{}, error) {
logs := f.takeLogUpdates()

res, err := json.Marshal(logs)
if err != nil {
return "", err
}

return string(res), nil
return logs, nil
}

// sendUpdates writes stored logs to web socket stream
Expand Down Expand Up @@ -478,28 +472,42 @@ func (f *FilterManager) GetLogFilterFromID(filterID string) (*logFilter, error)
return logFilter, nil
}

// GetFilterChanges returns the updates of the filter with given ID in string
func (f *FilterManager) GetFilterChanges(id string) (string, error) {
// GetFilterChanges returns the updates of the filter with given ID in string, and refreshes the timeout on the filter
func (f *FilterManager) GetFilterChanges(id string) (interface{}, error) {
filter, res, err := f.getFilterAndChanges(id)

if err == nil && !filter.isWS() {
// Refresh the timeout on this filter
f.lock.Lock()
f.refreshFilterTimeout(filter.getFilterBase())
f.lock.Unlock()
}

return res, err
}

// getFilterAndChanges returns the updates of the filter with given ID in string (read lock only)
func (f *FilterManager) getFilterAndChanges(id string) (filter, interface{}, error) {
f.lock.RLock()
defer f.lock.RUnlock()

filter, ok := f.filters[id]

if !ok {
return "", ErrFilterDoesNotExists
return nil, nil, ErrFilterDoesNotExists
}

// we cannot get updates from a ws filter with getFilterChanges
if filter.isWS() {
return "", ErrWSFilterDoesNotSupportGetChanges
return nil, nil, ErrWSFilterDoesNotSupportGetChanges
}

res, err := filter.getUpdates()
if err != nil {
return "", err
return nil, nil, err
}

return res, nil
return filter, res, nil
}

// Uninstall removes the filter with given ID from list
Expand All @@ -526,6 +534,19 @@ func (f *FilterManager) removeFilterByID(id string) bool {
return true
}

// refreshFilterTimeout updates the timeout for a filter to the current time
func (f *FilterManager) refreshFilterTimeout(filter *filterBase) {
f.timeouts.removeFilter(filter)
f.addFilterTimeout(filter)
}

// addFilterTimeout set timeout and add to heap
func (f *FilterManager) addFilterTimeout(filter *filterBase) {
filter.expiredAt = time.Now().Add(f.timeout)
f.timeouts.addFilter(filter)
f.emitSignalToUpdateCh()
}

// addFilter is an internal method to add given filter to list and heap
func (f *FilterManager) addFilter(filter filter) string {
f.lock.Lock()
Expand All @@ -537,9 +558,7 @@ func (f *FilterManager) addFilter(filter filter) string {

// Set timeout and add to heap if filter doesn't have web socket connection
if !filter.isWS() {
base.expiredAt = time.Now().Add(f.timeout)
f.timeouts.addFilter(base)
f.emitSignalToUpdateCh()
f.addFilterTimeout(base)
}

return base.id
Expand Down
22 changes: 6 additions & 16 deletions vendor/github.com/umbracle/ethgo/jsonrpc/eth.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion vendor/modules.txt
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,7 @@ github.com/syndtr/goleveldb/leveldb/opt
github.com/syndtr/goleveldb/leveldb/storage
github.com/syndtr/goleveldb/leveldb/table
github.com/syndtr/goleveldb/leveldb/util
# github.com/umbracle/ethgo v0.1.3
# github.com/umbracle/ethgo v0.1.4-0.20220722090909-c8ac32939570
## explicit; go 1.18
github.com/umbracle/ethgo
github.com/umbracle/ethgo/abi
Expand Down