-
Notifications
You must be signed in to change notification settings - Fork 24
/
cfx_api_filter.go
127 lines (104 loc) · 4.19 KB
/
cfx_api_filter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package rpc
import (
"context"
"github.com/Conflux-Chain/confura/node"
"github.com/Conflux-Chain/confura/util/metrics"
sdk "github.com/Conflux-Chain/go-conflux-sdk"
"github.com/Conflux-Chain/go-conflux-sdk/types"
"github.com/openweb3/go-rpc-provider"
"github.com/pkg/errors"
)
const (
rpcMethodCfxNewFilter = "cfx_newFilter"
rpcMethodCfxGetFilterLogs = "cfx_getFilterLogs"
)
func isCfxFilterRpcMethod(method string) bool {
switch method {
case "cfx_newFilter", "cfx_newBlockFilter", "cfx_newPendingTransactionFilter":
return true
case "cfx_getFilterChanges", "cfx_getFilterLogs", "cfx_uninstallFilter":
return true
default:
return false
}
}
// NewFilter creates a new filter and returns the filter id. It can be
// used to retrieve logs when the state changes. This method cannot be
// used to fetch logs that are already stored in the state.
func (api *cfxAPI) NewFilter(ctx context.Context, filterCrit types.LogFilter) (*rpc.ID, error) {
cfx := GetCfxClientFromContext(ctx)
metrics.UpdateCfxRpcLogFilter(rpcMethodCfxNewFilter, cfx, &filterCrit)
if api.VirtualFilterClient != nil {
fid, err := api.VirtualFilterClient.NewFilter(cfx.GetNodeURL(), &filterCrit)
return fid, errVirtualFilterProxyErrorOrNil(err)
}
return cfx.(*sdk.Client).Filter().NewFilter(filterCrit)
}
// NewBlockFilter creates a filter that fetches blocks that are imported into the chain.
// It is part of the filter package since polling goes with cfx_getFilterChanges.
func (api *cfxAPI) NewBlockFilter(ctx context.Context) (*rpc.ID, error) {
cfx := GetCfxClientFromContext(ctx)
if api.VirtualFilterClient != nil {
fid, err := api.VirtualFilterClient.NewBlockFilter(cfx.GetNodeURL())
return fid, errVirtualFilterProxyErrorOrNil(err)
}
return cfx.(*sdk.Client).Filter().NewBlockFilter()
}
// NewPendingTransactionFilter creates a filter that fetches pending transaction hashes
// as transactions enter the pending state.
//
// It is part of the filter package because this filter can be used through the
// `cfx_getFilterChanges` polling method that is also used for log filters.
func (api *cfxAPI) NewPendingTransactionFilter(ctx context.Context) (*rpc.ID, error) {
cfx := GetCfxClientFromContext(ctx)
if api.VirtualFilterClient != nil {
fid, err := api.VirtualFilterClient.NewPendingTransactionFilter(cfx.GetNodeURL())
return fid, errVirtualFilterProxyErrorOrNil(err)
}
return cfx.(*sdk.Client).Filter().NewPendingTransactionFilter()
}
// UninstallFilter removes the filter with the given filter id.
func (api *cfxAPI) UninstallFilter(ctx context.Context, fid rpc.ID) (bool, error) {
if api.VirtualFilterClient != nil {
ok, err := api.VirtualFilterClient.UninstallFilter(fid)
return ok, errVirtualFilterProxyErrorOrNil(err)
}
cfx := GetCfxClientFromContext(ctx)
return cfx.(*sdk.Client).Filter().UninstallFilter(fid)
}
// GetFilterChanges returns the logs for the filter with the given id since
// last time it was called. This can be used for polling.
//
// For pending transaction and block filters the result is []types.Hash.
// (pending)Log filters return []types.CfxFilterLog.
func (api *cfxAPI) GetFilterChanges(ctx context.Context, fid rpc.ID) (interface{}, error) {
if api.VirtualFilterClient != nil {
res, err := api.VirtualFilterClient.GetFilterChanges(fid)
return res, errVirtualFilterProxyErrorOrNil(err)
}
cfx := GetCfxClientFromContext(ctx)
return cfx.(*sdk.Client).Filter().GetFilterChanges(fid)
}
// GetFilterLogs returns the logs for the filter with the given id.
// If the filter could not be found an empty array of logs is returned.
func (api *cfxAPI) GetFilterLogs(ctx context.Context, fid rpc.ID) ([]types.Log, error) {
if api.VirtualFilterClient == nil {
cfx := GetCfxClientFromContext(ctx)
return cfx.(*sdk.Client).Filter().GetFilterLogs(fid)
}
fq, err := api.VirtualFilterClient.GetLogFilter(fid)
if err != nil {
return emptyLogs, errVirtualFilterProxyErrorOrNil(err)
}
cfx, err := api.provider.GetClientByIP(ctx, node.GroupCfxLogs)
if err != nil {
return emptyLogs, errors.WithMessage(err, "failed to get client by ip")
}
return api.getLogs(ctx, cfx, *fq, rpcMethodCfxGetFilterLogs)
}
func uniformCfxLogs(logs []types.Log) []types.Log {
if logs == nil {
return emptyLogs
}
return logs
}