-
Notifications
You must be signed in to change notification settings - Fork 23
/
eth_api_filter.go
140 lines (116 loc) · 4.6 KB
/
eth_api_filter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package rpc
import (
"context"
"github.com/Conflux-Chain/confura/node"
"github.com/Conflux-Chain/confura/util/metrics"
"github.com/openweb3/go-rpc-provider"
web3Types "github.com/openweb3/web3go/types"
"github.com/pkg/errors"
)
const (
rpcMethodEthNewFilter = "eth_newFilter"
rpcMethodEthGetFilterLogs = "eth_getFilterLogs"
)
func isEthFilterRpcMethod(method string) bool {
switch method {
case "eth_newFilter", "eth_newBlockFilter", "eth_newPendingTransactionFilter":
return true
case "eth_getFilterChanges", "eth_getFilterLogs", "eth_uninstallFilter":
return true
default:
return false
}
}
// uniform virtual filter proxy error
func errVirtualFilterProxyErrorOrNil(err error) error {
return errors.WithMessage(err, "virtual filter proxy error")
}
// NewFilter creates a new filter and returns the filter id. It can be
// used to retrieve logs when the state changes. This method cannot be
// used to fetch logs that are already stored in the state.
//
// Default criteria for the from and to block are "latest".
// Using "latest" as block number will return logs for mined blocks.
// Using "pending" as block number is not supported.
// In case logs are removed (chain reorg) previously returned logs are returned
// again but with the removed property set to true.
//
// In case "fromBlock" > "toBlock" an error is returned.
func (api *ethAPI) NewFilter(ctx context.Context, fq web3Types.FilterQuery) (*rpc.ID, error) {
w3c := GetEthClientFromContext(ctx)
metrics.UpdateEthRpcLogFilter(rpcMethodEthNewFilter, w3c.Eth, &fq)
if api.VirtualFilterClient != nil {
fid, err := api.VirtualFilterClient.NewFilter(w3c.URL, &fq)
return fid, errVirtualFilterProxyErrorOrNil(err)
}
return w3c.Filter.NewLogFilter(&fq)
}
// NewBlockFilter creates a filter that fetches blocks that are imported into the chain.
// It is part of the filter package since polling goes with eth_getFilterChanges.
func (api *ethAPI) NewBlockFilter(ctx context.Context) (*rpc.ID, error) {
w3c := GetEthClientFromContext(ctx)
if api.VirtualFilterClient != nil {
fid, err := api.VirtualFilterClient.NewBlockFilter(w3c.URL)
return fid, errVirtualFilterProxyErrorOrNil(err)
}
return w3c.Filter.NewBlockFilter()
}
// NewPendingTransactionFilter creates a filter that fetches pending transaction hashes
// as transactions enter the pending state.
//
// It is part of the filter package because this filter can be used through the
// `eth_getFilterChanges` polling method that is also used for log filters.
func (api *ethAPI) NewPendingTransactionFilter(ctx context.Context) (*rpc.ID, error) {
w3c := GetEthClientFromContext(ctx)
if api.VirtualFilterClient != nil {
fid, err := api.VirtualFilterClient.NewPendingTransactionFilter(w3c.URL)
return fid, errVirtualFilterProxyErrorOrNil(err)
}
return w3c.Filter.NewPendingTransactionFilter()
}
// UninstallFilter removes the filter with the given filter id.
func (api *ethAPI) UninstallFilter(ctx context.Context, fid rpc.ID) (bool, error) {
if api.VirtualFilterClient != nil {
ok, err := api.VirtualFilterClient.UninstallFilter(fid)
return ok, errVirtualFilterProxyErrorOrNil(err)
}
w3c := GetEthClientFromContext(ctx)
return w3c.Filter.UninstallFilter(fid)
}
// GetFilterChanges returns the logs for the filter with the given id since
// last time it was called. This can be used for polling.
//
// For pending transaction and block filters the result is []common.Hash.
// (pending) Log filters return []Log.
func (api *ethAPI) GetFilterChanges(ctx context.Context, fid rpc.ID) (interface{}, error) {
if api.VirtualFilterClient != nil {
res, err := api.VirtualFilterClient.GetFilterChanges(fid)
return res, errVirtualFilterProxyErrorOrNil(err)
}
w3c := GetEthClientFromContext(ctx)
return w3c.Filter.GetFilterChanges(fid)
}
// GetFilterLogs returns the logs for the filter with the given id.
// If the filter could not be found an empty array of logs is returned.
func (api *ethAPI) GetFilterLogs(ctx context.Context, fid rpc.ID) ([]web3Types.Log, error) {
if api.VirtualFilterClient == nil {
// delegate to full node if no virtual filter client provided
w3c := GetEthClientFromContext(ctx)
return w3c.Filter.GetFilterLogs(fid)
}
fq, err := api.VirtualFilterClient.GetLogFilter(fid)
if err != nil {
return ethEmptyLogs, errVirtualFilterProxyErrorOrNil(err)
}
w3c, err := api.provider.GetClientByIP(ctx, node.GroupEthLogs)
if err != nil {
return ethEmptyLogs, errors.WithMessage(err, "failed to get client by ip")
}
return api.getLogs(ctx, w3c, fq, rpcMethodEthGetFilterLogs)
}
func uniformEthLogs(logs []web3Types.Log) []web3Types.Log {
if logs == nil {
return ethEmptyLogs
}
return logs
}