Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compatibility fixes/improvements for JSON/RPC filter polling #641

Merged
merged 6 commits into from
Jul 29, 2022
28 changes: 11 additions & 17 deletions jsonrpc/filter_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"encoding/json"
"errors"
"fmt"
"strings"
"sync"
"time"

Expand All @@ -17,7 +16,7 @@ import (
)

var (
ErrFilterDoesNotExists = errors.New("filter does not exists")
peterbroadhurst marked this conversation as resolved.
Show resolved Hide resolved
ErrFilterDoesNotExists = errors.New("filter not found")
peterbroadhurst marked this conversation as resolved.
Show resolved Hide resolved
ErrWSFilterDoesNotSupportGetChanges = errors.New("web socket Filter doesn't support to return a batch of the changes")
ErrCastingFilterToLogFilter = errors.New("casting filter object to logFilter error")
ErrBlockNotFound = errors.New("block not found")
Expand All @@ -41,8 +40,8 @@ type filter interface {
// getFilterBase returns filterBase that has common fields
getFilterBase() *filterBase

// getUpdates returns stored data in string
getUpdates() (string, error)
// getUpdates returns stored data in a JSON serializable form
getUpdates() (interface{}, error)

// sendUpdates write stored data to web socket stream
sendUpdates() error
Expand Down Expand Up @@ -120,15 +119,15 @@ func (f *blockFilter) takeBlockUpdates() []*types.Header {
}

// getUpdates returns updates of blocks in string
func (f *blockFilter) getUpdates() (string, error) {
func (f *blockFilter) getUpdates() (interface{}, error) {
headers := f.takeBlockUpdates()

updates := []string{}
for _, header := range headers {
updates = append(updates, header.Hash.String())
}

return fmt.Sprintf("[\"%s\"]", strings.Join(updates, "\",\"")), nil
return updates, nil
}

// sendUpdates writes the updates of blocks to web socket stream
Expand Down Expand Up @@ -177,15 +176,10 @@ func (f *logFilter) takeLogUpdates() []*Log {
}

// getUpdates returns stored logs in string
func (f *logFilter) getUpdates() (string, error) {
func (f *logFilter) getUpdates() (interface{}, error) {
logs := f.takeLogUpdates()

res, err := json.Marshal(logs)
if err != nil {
return "", err
}

return string(res), nil
return logs, nil
}

// sendUpdates writes stored logs to web socket stream
Expand Down Expand Up @@ -479,24 +473,24 @@ func (f *FilterManager) GetLogFilterFromID(filterID string) (*logFilter, error)
}

// GetFilterChanges returns the updates of the filter with given ID in string
func (f *FilterManager) GetFilterChanges(id string) (string, error) {
func (f *FilterManager) GetFilterChanges(id string) (interface{}, error) {
f.lock.RLock()
defer f.lock.RUnlock()

filter, ok := f.filters[id]

if !ok {
return "", ErrFilterDoesNotExists
return nil, ErrFilterDoesNotExists
}

// we cannot get updates from a ws filter with getFilterChanges
if filter.isWS() {
return "", ErrWSFilterDoesNotSupportGetChanges
return nil, ErrWSFilterDoesNotSupportGetChanges
}

res, err := filter.getUpdates()
if err != nil {
return "", err
return nil, err
}

return res, nil
Expand Down