Skip to content

Commit

Permalink
FAB-14068 filter creation and retrieval methods
Browse files Browse the repository at this point in the history
Implement the EthService methods:
 - NewBlockFilter
 - GetFilterLogs
 - GetFilterChanges

FilterID object for consistent parsing of input arguments.

filterEntry Object for consistent code flow when running filters.

filters time out after 5 minutes. need to move to a settable
constant so we can test.

Last block seen updated after executing filter.

LogsFilters are the same as a deferred call to GetLogs.

Change-Id: Ie64e4cf40a63959b9275fbdec255293a6b26f5a6
Signed-off-by: Morgan Bauer <mbauer@us.ibm.com>
  • Loading branch information
MHBauer committed Jan 22, 2020
1 parent 2b037c9 commit b0ff610
Show file tree
Hide file tree
Showing 12 changed files with 860 additions and 55 deletions.
126 changes: 126 additions & 0 deletions Fab3_Instructions.md
Expand Up @@ -26,6 +26,11 @@ Fab3 currently supports:
- [eth_getTransactionReceipt](#eth_getTransactionReceipt)
- [eth_getLogs](#eth_getLogs)
- [eth_getTransactionCount](#eth_getTransactionCount)
- [eth_newFilter](#eth_newFilter)
- [eth_newBlockFilter](#eth_newBlockFilter)
- [eth_uninstallFilter](#eth_uninstallFilter)
- [eth_getFilterChanges](#eth_getFilterChanges)
- [eth_getFilterLogs](#eth_getFilterLogs)

### net_version
`net_version` always returns the string `66616265766d`, which is the hex encoding
Expand Down Expand Up @@ -369,3 +374,124 @@ curl http://127.0.0.1:5000 -X POST -H "Content-Type:application/json" -d '{
{"jsonrpc":"2.0","result":"0x0","id":1}
```

### eth_newFilter
`eth_newFilter` takes the same arguments as [`eth_getLogs`](#eth_getLogs). It returns an
identifier to collect the log entries. The log filter is not run until
[`eth_getFilterChanges`](#eth_getFilterChanges) is called.

**Example**
```
curl http://127.0.0.1:5000 -X POST -H "Content-Type:application/json" -d '{
"jsonrpc": "2.0",
"id": 5,
"method": "eth_newFilter",
"params": [{
"toBlock": "latest",
"address": [
"0x6c27ec2ab7a4e81228080434d553fa198ddccfbc"
],
"topics": [
[],
[
"0000000000000000000000000000000000000000000000000000000000000000"
]
]
}]
}'
{"jsonrpc":"2.0","result":"0x1","id":5}
```

### eth_newBlockFilter
`eth_newBlockFilter` creates a filter of the blocks that arrive after creation
of the filter. An identifier is returned to refer to the filter in the future.

**Example**
```
curl http://127.0.0.1:5000 -X POST -H "Content-Type:application/json" -d '{
"jsonrpc":"2.0",
"method": "eth_newBlockFilter",
"id":1,
"params":[]
}'
{"jsonrpc":"2.0","result":"0x2","id":1}
```

### eth_uninstallFilter
`eth_uninstallFilter` takes a filter identifier and forgets the associated
filter.

**Example**
```
curl http://127.0.0.1:5000 -X POST -H "Content-Type:application/json" -d '{
"jsonrpc":"2.0",
"method": "eth_uninstallFilter",
"id":1,
"params":["0x2"]
}'
{"jsonrpc":"2.0","result":true,"id":1}
```

### eth_getFilterChanges
`eth_getFilterChanges` takes a filter identifier and returns the output
associated with the filter. For new block filters, that is an array of block
hashes. For log filters, it is the log entries as if from [`eth_getLogs`](#eth_getLogs).

**Example**
```
curl http://127.0.0.1:5000 -X POST -H "Content-Type:application/json" -d '{
"jsonrpc":"2.0",
"method": "eth_getFilterChanges",
"id": 6129484611666146000,
"params":["0x2"]
}'
{
"jsonrpc": "2.0",
"result": [
"0xcbe7100b09f4c5aaf2649936bf8ba65b90636ca375ec23e4a81801bffe996724",
"0xae3e4d4972e986d44d6bd830a2d40afa404a4e6b42b5429d2bba700b1e956e61",
"0x316f3cae866ae1f0c53ecce4d63378cc1ad2e3a3d7eea11315002c3e2f18d9ca",
"0x60d15a4cc589ac95723768a243edfa1fd432c4ea3ea83fe21938313780e8076d"
],
"id": 6129484611666146000
}
```

### eth_getFilterLogs
`eth_getFilterLogs` is an exact duplicate of `eth_getFilterChanges`. It takes
all of the same input and returns all of the exact same output, advancing the
state of referred log object in the same way as `eth_getFilterChanges`.

**Example**
```
curl http://127.0.0.1:5000 -X POST -H "Content-Type:application/json" -d '{
"jsonrpc":"2.0",
"method": "eth_getFilterLogs",
"id": 8674665223082154000,
"params":["0x1"]
}'
{
"jsonrpc": "2.0",
"result": [
{
"address": "0xb125f5af2083c8d86e36beeabf8be0ed78028fad",
"topics": [
"0xd81ec364c58bcc9b49b6c953fc8e1f1c158ee89255bae73029133234a2936aad",
"0x0000000000000000000000000000000000000000000000000000000000000000",
"0x3737373737373737373737373737373737373737373737373737373737373737"
],
"blockNumber": "0x4",
"transactionHash": "0xaa8e9ffa6a49f8f99e8bb82f3711681e6638f35c408eba6772e385cb6ebee4a0",
"transactionIndex": "0x0",
"blockHash": "0xbf6deecd5d248c6d0cdab0bff79aa94bd1dc9e72a3b5b3b9e98f5d0d14145a1a",
"logIndex": "0x0"
}
],
"id": 8674665223082154000
}
```
2 changes: 2 additions & 0 deletions evmcc/go.sum
Expand Up @@ -164,6 +164,7 @@ github.com/pierrec/lz4 v2.2.6+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDft0ttaMvbicHlPoso=
Expand Down Expand Up @@ -200,6 +201,7 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/sykesm/zap-logfmt v0.0.2/go.mod h1:TerDJT124HaO8UTpZ2wJCipJRAKQ9XONM1mzUabIh6M=
github.com/syndtr/goleveldb v1.0.1-0.20190318030020-c3a204f8e965/go.mod h1:9OrXJhf154huy1nPWmuSrkgjPUtUNhA+Zmy+6AESzuA=
Expand Down
112 changes: 101 additions & 11 deletions fab3/ethservice.go
Expand Up @@ -18,6 +18,7 @@ import (
"strconv"
"strings"
"sync"
"time"

"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
Expand Down Expand Up @@ -81,7 +82,10 @@ type EthService interface {
GetTransactionCount(r *http.Request, _ *interface{}, reply *string) error
GetLogs(*http.Request, *types.GetLogsArgs, *[]types.Log) error
NewFilter(*http.Request, *types.GetLogsArgs, *string) error
UninstallFilter(*http.Request, *string, *bool) error
NewBlockFilter(*http.Request, *interface{}, *string) error
UninstallFilter(*http.Request, *types.FilterID, *bool) error
GetFilterChanges(*http.Request, *types.FilterID, *[]interface{}) error
GetFilterLogs(*http.Request, *types.FilterID, *[]types.Log) error
}

type ethService struct {
Expand All @@ -91,7 +95,7 @@ type ethService struct {
ccid string
logger *zap.SugaredLogger
filterMapLock sync.Mutex
filterMap map[uint64]interface{}
filterMap map[uint64]filterEntry
filterSeq uint64
}

Expand All @@ -102,7 +106,7 @@ func NewEthService(channelClient ChannelClient, ledgerClient LedgerClient, chann
channelID: channelID,
ccid: ccid,
logger: logger.Named("ethservice"),
filterMap: make(map[uint64]interface{}),
filterMap: make(map[uint64]filterEntry),
}
}

Expand Down Expand Up @@ -218,7 +222,7 @@ func (s *ethService) GetTransactionReceipt(r *http.Request, txID *string, reply
func (s *ethService) Accounts(r *http.Request, arg *string, reply *[]string) error {
response, err := s.query(s.ccid, "account", [][]byte{})
if err != nil {
return fmt.Errorf("Failed to query the ledger: %s", err)
return errors.Wrap(err, "Failed to query the ledger")
}

*reply = []string{"0x" + strings.ToLower(string(response.Payload))}
Expand Down Expand Up @@ -528,21 +532,26 @@ func (s *ethService) GetLogs(r *http.Request, args *types.GetLogsArgs, logs *[]t
}

func (s *ethService) NewFilter(_ *http.Request, filter *types.GetLogsArgs, result *string) error {
logger := s.logger.With("method", "NewFilter")
latestExistingBlock, err := s.parseBlockNum("latest")
if err != nil {
return errors.Wrap(err, "latest block number not available at this time")
}

s.filterMapLock.Lock()
s.filterSeq++
index := s.filterSeq
s.filterMap[index] = filter
s.filterMap[index] = &logsFilter{logArgs: filter, latestBlockSeen: latestExistingBlock, lastAccessTime: time.Now()}
s.filterMapLock.Unlock()

logger.Debugf("filter created with ID %d, log args %v", index, filter)

*result = "0x" + strconv.FormatUint(index, 16)
return nil
}

func (s *ethService) UninstallFilter(_ *http.Request, filterID *string, removed *bool) error {
id, err := strconv.ParseUint(strip0x(*filterID), 16, 64)
if err != nil {
return errors.Wrap(err, "failed to parse filter id")
}

func (s *ethService) UninstallFilter(_ *http.Request, filterID *types.FilterID, removed *bool) error {
id := filterID.ID
s.filterMapLock.Lock()
defer s.filterMapLock.Unlock()

Expand All @@ -554,6 +563,87 @@ func (s *ethService) UninstallFilter(_ *http.Request, filterID *string, removed
return nil
}

func (s *ethService) NewBlockFilter(_ *http.Request, _ *interface{}, result *string) error {
logger := s.logger.With("method", "NewBlockFilter")
latestExistingBlock, err := s.parseBlockNum("latest")
if err != nil {
return errors.Wrap(err, "latest block number not available at this time")
}

s.filterMapLock.Lock()
s.filterSeq++
index := s.filterSeq
s.filterMap[index] = &newBlockFilter{latestBlockSeen: latestExistingBlock, lastAccessTime: time.Now()}
s.filterMapLock.Unlock()

logger.Debugf("filter created with ID %d, current latest block %d", index, latestExistingBlock)

*result = "0x" + strconv.FormatUint(index, 16)
return nil
}

func (s *ethService) GetFilterChanges(_ *http.Request, filterID *types.FilterID, logsOrBlocks *[]interface{}) error {
id := filterID.ID
var f filterEntry
// get the filter
s.filterMapLock.Lock()
if e, ok := s.filterMap[id]; ok {
f = e
} else {
s.filterMapLock.Unlock()
return fmt.Errorf("No filter found for id %d", id)
}
s.filterMapLock.Unlock()
// check the filter for validity by expiration time
if time.Now().After(f.LastAccessTime().Add(5 * time.Minute)) {
// remove the filter if not valid and return error
// delegate to existing function
var removed bool
s.UninstallFilter(nil, filterID, &removed)
return fmt.Errorf("No filter found for id %d", id)
}
// use the filter
ret, err := f.Filter(s)
if err != nil {
return errors.Wrapf(err, "failed to run filter id: %d", id)
}
*logsOrBlocks = ret
return nil
}

func (s *ethService) GetFilterLogs(_ *http.Request, filterID *types.FilterID, logs *[]types.Log) error {
logger := s.logger.With("method", "GetFilterLogs")
id := filterID.ID
var f *logsFilter
// get the filter
s.filterMapLock.Lock()
if e, ok := s.filterMap[id]; ok {
if f, ok = e.(*logsFilter); !ok {
s.filterMapLock.Unlock()
return fmt.Errorf("Filter id %d is not for retrieving logs, it is %T", id, f)
}
} else {
s.filterMapLock.Unlock()
return fmt.Errorf("No filter found for id %d", id)
}
s.filterMapLock.Unlock()
// check the filter for validity by expiration time
if time.Now().After(f.LastAccessTime().Add(5 * time.Minute)) {
// remove the filter if not valid and return error
// delegate to existing function
var removed bool
logger.Debugf("expired filter ID:%d removed", id)
s.UninstallFilter(nil, filterID, &removed)
return fmt.Errorf("No filter found for id %d", id)
}
// use the filter
err := s.GetLogs(nil, f.logArgs, logs)
if err != nil {
return errors.Wrapf(err, "failed to run filter id: %d", id)
}
return nil
}

func (s *ethService) query(ccid, function string, queryArgs [][]byte) (channel.Response, error) {
return s.channelClient.Query(channel.Request{
ChaincodeID: ccid,
Expand Down

0 comments on commit b0ff610

Please sign in to comment.