diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index f48338b924..fb21857f1d 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -57,7 +57,11 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 - - uses: cachix/install-nix-action@v18 + - uses: cachix/install-nix-action@v19 + with: + # pin to nix-2.13 to workaround compability issue of 2.14, + # see: https://github.com/cachix/install-nix-action/issues/161 + install_url: https://releases.nixos.org/nix/nix-2.13.3/install - uses: cachix/cachix-action@v12 with: name: ethermint @@ -73,7 +77,11 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v2.3.4 - - uses: cachix/install-nix-action@v18 + - uses: cachix/install-nix-action@v19 + with: + # pin to nix-2.13 to workaround compability issue of 2.14, + # see: https://github.com/cachix/install-nix-action/issues/161 + install_url: https://releases.nixos.org/nix/nix-2.13.3/install - uses: cachix/cachix-action@v12 with: name: ethermint diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 33ca1ad133..9d702f836d 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -87,7 +87,11 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 - - uses: cachix/install-nix-action@v18 + - uses: cachix/install-nix-action@v19 + with: + # pin to nix-2.13 to workaround compability issue of 2.14, + # see: https://github.com/cachix/install-nix-action/issues/161 + install_url: https://releases.nixos.org/nix/nix-2.13.3/install - uses: cachix/cachix-action@v12 with: name: ethermint @@ -122,7 +126,11 @@ jobs: runs-on: ${{ matrix.os }} steps: - uses: actions/checkout@v2 - - uses: cachix/install-nix-action@v18 + - uses: cachix/install-nix-action@v19 + with: + # pin to nix-2.13 to workaround compability issue of 2.14, + # see: https://github.com/cachix/install-nix-action/issues/161 + install_url: https://releases.nixos.org/nix/nix-2.13.3/install - uses: cachix/cachix-action@v12 with: name: ethermint diff --git a/.semgrepignore b/.semgrepignore index cb655af10c..c06cf91c91 100644 --- a/.semgrepignore +++ b/.semgrepignore @@ -17,6 +17,7 @@ vendor/ # Common test paths test/ tests/ +testutil/ *_test.go # Semgrep rules folder diff --git a/go.mod b/go.mod index 7ae23473bf..f978446f5c 100644 --- a/go.mod +++ b/go.mod @@ -196,4 +196,5 @@ replace ( github.com/gin-gonic/gin => github.com/gin-gonic/gin v1.7.0 // use cosmos flavored protobufs github.com/gogo/protobuf => github.com/regen-network/protobuf v1.3.3-alpha.regen.1 + github.com/tendermint/tendermint => github.com/mmsqe/cometbft v0.34.23-0.20230301132024-c4eaf1aaf991 ) diff --git a/go.sum b/go.sum index c88654a6b9..140c3c9323 100644 --- a/go.sum +++ b/go.sum @@ -748,6 +748,8 @@ github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyua github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/mitchellh/pointerstructure v1.2.0 h1:O+i9nHnXS3l/9Wu7r4NrEdwA2VFTicjUEN1uBnDo34A= github.com/mitchellh/pointerstructure v1.2.0/go.mod h1:BRAsLI5zgXmw97Lf6s25bs8ohIXc3tViBH44KcwB2g4= +github.com/mmsqe/cometbft v0.34.23-0.20230301132024-c4eaf1aaf991 h1:LWEfabVIYuZgYWiG4PjuMcWaFaCi36V5mgjLPh+LIUU= +github.com/mmsqe/cometbft v0.34.23-0.20230301132024-c4eaf1aaf991/go.mod h1:rXVrl4OYzmIa1I91av3iLv2HS0fGSiucyW9J4aMTpKI= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -979,8 +981,6 @@ github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 h1:epCh84lMvA70 github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7/go.mod h1:q4W45IWZaF22tdD+VEXcAWRA037jwmWEB5VWYORlTpc= github.com/tendermint/go-amino v0.16.0 h1:GyhmgQKvqF82e2oZeuMSp9JTN0N09emoSZlb2lyGa2E= github.com/tendermint/go-amino v0.16.0/go.mod h1:TQU0M1i/ImAo+tYpZi73AU3V/dKeCoMC9Sphe2ZwGME= -github.com/tendermint/tendermint v0.34.24 h1:879MKKJWYYPJEMMKME+DWUTY4V9f/FBpnZDI82ky+4k= -github.com/tendermint/tendermint v0.34.24/go.mod h1:rXVrl4OYzmIa1I91av3iLv2HS0fGSiucyW9J4aMTpKI= github.com/tendermint/tm-db v0.6.7 h1:fE00Cbl0jayAoqlExN6oyQJ7fR/ZtoVOmvPJ//+shu8= github.com/tendermint/tm-db v0.6.7/go.mod h1:byQDzFkZV1syXr/ReXS808NxA2xvyuuVgXOJ/088L6I= github.com/tidwall/btree v1.5.0 h1:iV0yVY/frd7r6qGBXfEYs7DH0gTDgrKTrDjS7xt/IyQ= diff --git a/gomod2nix.toml b/gomod2nix.toml index 65ab566060..5b36b0d597 100644 --- a/gomod2nix.toml +++ b/gomod2nix.toml @@ -451,8 +451,9 @@ schema = 3 version = "v0.16.0" hash = "sha256-JW4zO/0vMzf1dXLePOqaMtiLUZgNbuIseh9GV+jQlf0=" [mod."github.com/tendermint/tendermint"] - version = "v0.34.24" - hash = "sha256-3HFTv4XgN535RDaJ5OwUS+fnJHgkmLTwU7CNU2ilxEQ=" + version = "v0.34.23-0.20230301132024-c4eaf1aaf991" + hash = "sha256-jT1tAlKaMgvRxfNjc1O1WV/n7BzyqKUb8343i+uT6WM=" + replaced = "github.com/mmsqe/cometbft" [mod."github.com/tendermint/tm-db"] version = "v0.6.7" hash = "sha256-hl/3RrBrpkk2zA6dmrNlIYKs1/GfqegSscDSkA5Pjlo=" @@ -490,8 +491,8 @@ schema = 3 version = "v0.0.0-20220722155223-a9213eeb770e" hash = "sha256-kNgzydWRpjm0sZl4uXEs3LX5L0xjJtJRAFf/CTlYUN4=" [mod."golang.org/x/net"] - version = "v0.5.0" - hash = "sha256-HpbIAiLs7S1+tVsaSSdbCPw1IK43A0bFFuSzPSyjLbo=" + version = "v0.6.0" + hash = "sha256-e8F4kMogxT392mmimvcAmkUtD/5vcQRPWDwH238m8FU=" [mod."golang.org/x/oauth2"] version = "v0.0.0-20221014153046-6fdb5e3db783" hash = "sha256-IoygidVNqyAZmN+3macDeIefK8hhJToygpcqlwehdYQ=" @@ -499,14 +500,14 @@ schema = 3 version = "v0.1.0" hash = "sha256-Hygjq9euZ0qz6TvHYQwOZEjNiTbTh1nSLRAWZ6KFGR8=" [mod."golang.org/x/sys"] - version = "v0.4.0" - hash = "sha256-jchMzHCH5dg+IL/F+LqaX/fyAcB/nvHQpfBjqwaRJH0=" + version = "v0.5.0" + hash = "sha256-0LTr3KeJ1OMQAwYUQo1513dXJtQAJn5Dq8sFkc8ps1U=" [mod."golang.org/x/term"] - version = "v0.4.0" - hash = "sha256-wQKxHV10TU4vCU8Re2/hFmAbur/jRWEOB8QXBzgTFNY=" + version = "v0.5.0" + hash = "sha256-f3DiX7NkDsEZpPS+PbmnOH9F5WHFZ1sQrfFg/T2UPno=" [mod."golang.org/x/text"] - version = "v0.6.0" - hash = "sha256-+bpeRWR3relKACdal6NPj+eP5dnWCplTViArSN7/qA4=" + version = "v0.7.0" + hash = "sha256-ydgUqX+t5Qke16C6d3FP/06U/N1n+rUKpLRFj4KXjwk=" [mod."golang.org/x/xerrors"] version = "v0.0.0-20220907171357-04be3eba64a2" hash = "sha256-6+zueutgefIYmgXinOflz8qGDDDj0Zhv+2OkGhBTKno=" diff --git a/rpc/apis.go b/rpc/apis.go index e297721cf8..5e4511d2e2 100644 --- a/rpc/apis.go +++ b/rpc/apis.go @@ -33,8 +33,6 @@ import ( "github.com/evmos/ethermint/rpc/namespaces/ethereum/txpool" "github.com/evmos/ethermint/rpc/namespaces/ethereum/web3" ethermint "github.com/evmos/ethermint/types" - - rpcclient "github.com/tendermint/tendermint/rpc/jsonrpc/client" ) // RPC namespaces and API version @@ -60,7 +58,6 @@ const ( type APICreator = func( ctx *server.Context, clientCtx client.Context, - tendermintWebsocketClient *rpcclient.WSClient, allowUnprotectedTxs bool, indexer ethermint.EVMTxIndexer, ) []rpc.API @@ -72,7 +69,6 @@ func init() { apiCreators = map[string]APICreator{ EthNamespace: func(ctx *server.Context, clientCtx client.Context, - tmWSClient *rpcclient.WSClient, allowUnprotectedTxs bool, indexer ethermint.EVMTxIndexer, ) []rpc.API { @@ -87,12 +83,12 @@ func init() { { Namespace: EthNamespace, Version: apiVersion, - Service: filters.NewPublicAPI(ctx.Logger, clientCtx, tmWSClient, evmBackend), + Service: filters.NewPublicAPI(ctx.Logger, clientCtx, evmBackend), Public: true, }, } }, - Web3Namespace: func(*server.Context, client.Context, *rpcclient.WSClient, bool, ethermint.EVMTxIndexer) []rpc.API { + Web3Namespace: func(*server.Context, client.Context, bool, ethermint.EVMTxIndexer) []rpc.API { return []rpc.API{ { Namespace: Web3Namespace, @@ -102,7 +98,7 @@ func init() { }, } }, - NetNamespace: func(_ *server.Context, clientCtx client.Context, _ *rpcclient.WSClient, _ bool, _ ethermint.EVMTxIndexer) []rpc.API { + NetNamespace: func(_ *server.Context, clientCtx client.Context, _ bool, _ ethermint.EVMTxIndexer) []rpc.API { return []rpc.API{ { Namespace: NetNamespace, @@ -114,7 +110,6 @@ func init() { }, PersonalNamespace: func(ctx *server.Context, clientCtx client.Context, - _ *rpcclient.WSClient, allowUnprotectedTxs bool, indexer ethermint.EVMTxIndexer, ) []rpc.API { @@ -128,7 +123,7 @@ func init() { }, } }, - TxPoolNamespace: func(ctx *server.Context, _ client.Context, _ *rpcclient.WSClient, _ bool, _ ethermint.EVMTxIndexer) []rpc.API { + TxPoolNamespace: func(ctx *server.Context, _ client.Context, _ bool, _ ethermint.EVMTxIndexer) []rpc.API { return []rpc.API{ { Namespace: TxPoolNamespace, @@ -140,7 +135,6 @@ func init() { }, DebugNamespace: func(ctx *server.Context, clientCtx client.Context, - _ *rpcclient.WSClient, allowUnprotectedTxs bool, indexer ethermint.EVMTxIndexer, ) []rpc.API { @@ -156,7 +150,6 @@ func init() { }, MinerNamespace: func(ctx *server.Context, clientCtx client.Context, - _ *rpcclient.WSClient, allowUnprotectedTxs bool, indexer ethermint.EVMTxIndexer, ) []rpc.API { @@ -176,7 +169,6 @@ func init() { // GetRPCAPIs returns the list of all APIs func GetRPCAPIs(ctx *server.Context, clientCtx client.Context, - tmWSClient *rpcclient.WSClient, allowUnprotectedTxs bool, indexer ethermint.EVMTxIndexer, selectedAPIs []string, @@ -185,7 +177,7 @@ func GetRPCAPIs(ctx *server.Context, for _, ns := range selectedAPIs { if creator, ok := apiCreators[ns]; ok { - apis = append(apis, creator(ctx, clientCtx, tmWSClient, allowUnprotectedTxs, indexer)...) + apis = append(apis, creator(ctx, clientCtx, allowUnprotectedTxs, indexer)...) } else { ctx.Logger.Error("invalid namespace value", "namespace", ns) } diff --git a/rpc/backend/mocks/client.go b/rpc/backend/mocks/client.go index fa60f0e03c..660b2a82a8 100644 --- a/rpc/backend/mocks/client.go +++ b/rpc/backend/mocks/client.go @@ -15,6 +15,7 @@ import ( mock "github.com/stretchr/testify/mock" types "github.com/tendermint/tendermint/types" + errors "github.com/pkg/errors" ) // Client is an autogenerated mock type for the Client type @@ -825,6 +826,10 @@ func (_m *Client) Validators(ctx context.Context, height *int64, page *int, perP return r0, r1 } +func (_m *Client) Events(ctx context.Context, req *coretypes.RequestEvents) (*coretypes.ResultEvents, error) { + return nil, errors.New("Events is not implemented") +} + type mockConstructorTestingTNewClient interface { mock.TestingT Cleanup(func()) diff --git a/rpc/namespaces/ethereum/eth/filters/api.go b/rpc/namespaces/ethereum/eth/filters/api.go index 1083711a30..782bece40d 100644 --- a/rpc/namespaces/ethereum/eth/filters/api.go +++ b/rpc/namespaces/ethereum/eth/filters/api.go @@ -17,6 +17,7 @@ package filters import ( "context" + "encoding/json" "fmt" "sync" "time" @@ -27,7 +28,6 @@ import ( "github.com/tendermint/tendermint/libs/log" coretypes "github.com/tendermint/tendermint/rpc/core/types" - rpcclient "github.com/tendermint/tendermint/rpc/jsonrpc/client" tmtypes "github.com/tendermint/tendermint/types" "github.com/ethereum/go-ethereum/common" @@ -93,14 +93,14 @@ type PublicFilterAPI struct { } // NewPublicAPI returns a new PublicFilterAPI instance. -func NewPublicAPI(logger log.Logger, clientCtx client.Context, tmWSClient *rpcclient.WSClient, backend Backend) *PublicFilterAPI { +func NewPublicAPI(logger log.Logger, clientCtx client.Context, backend Backend) *PublicFilterAPI { logger = logger.With("api", "filter") api := &PublicFilterAPI{ logger: logger, clientCtx: clientCtx, backend: backend, filters: make(map[rpc.ID]*filter), - events: NewEventSystem(logger, tmWSClient), + events: NewEventSystem(logger, clientCtx.Client), } go api.timeoutLoop() @@ -158,7 +158,7 @@ func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID { s: pendingTxSub, } - go func(txsCh <-chan coretypes.ResultEvent, errCh <-chan error) { + go func(txsCh <-chan *coretypes.ResultEvents, errCh <-chan error) { defer cancelSubs() for { @@ -171,28 +171,29 @@ func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID { return } - data, ok := ev.Data.(tmtypes.EventDataTx) - if !ok { - api.logger.Debug("event data type mismatch", "type", fmt.Sprintf("%T", ev.Data)) - continue - } - - tx, err := api.clientCtx.TxConfig.TxDecoder()(data.Tx) - if err != nil { - api.logger.Debug("fail to decode tx", "error", err.Error()) - continue - } + for _, item := range ev.Items { + var data tmtypes.EventDataTx + if err := json.Unmarshal(item.Data, &data); err != nil { + api.logger.Debug("event data type mismatch", "type", fmt.Sprintf("%T", item.Data)) + continue + } + tx, err := api.clientCtx.TxConfig.TxDecoder()(data.Tx) + if err != nil { + api.logger.Debug("fail to decode tx", "error", err.Error()) + continue + } - api.filtersMu.Lock() - if f, found := api.filters[pendingTxSub.ID()]; found { - for _, msg := range tx.GetMsgs() { - ethTx, ok := msg.(*evmtypes.MsgEthereumTx) - if ok { - f.hashes = append(f.hashes, ethTx.AsTransaction().Hash()) + api.filtersMu.Lock() + if f, found := api.filters[pendingTxSub.ID()]; found { + for _, msg := range tx.GetMsgs() { + ethTx, ok := msg.(*evmtypes.MsgEthereumTx) + if ok { + f.hashes = append(f.hashes, ethTx.AsTransaction().Hash()) + } } } + api.filtersMu.Unlock() } - api.filtersMu.Unlock() case <-errCh: api.filtersMu.Lock() delete(api.filters, pendingTxSub.ID()) @@ -224,7 +225,7 @@ func (api *PublicFilterAPI) NewPendingTransactions(ctx context.Context) (*rpc.Su return nil, err } - go func(txsCh <-chan coretypes.ResultEvent) { + go func(txsCh <-chan *coretypes.ResultEvents) { defer cancelSubs() for { @@ -237,22 +238,24 @@ func (api *PublicFilterAPI) NewPendingTransactions(ctx context.Context) (*rpc.Su return } - data, ok := ev.Data.(tmtypes.EventDataTx) - if !ok { - api.logger.Debug("event data type mismatch", "type", fmt.Sprintf("%T", ev.Data)) - continue - } + for _, item := range ev.Items { + var data tmtypes.EventDataTx + if err := json.Unmarshal(item.Data, &data); err != nil { + api.logger.Debug("event data type mismatch", "type", fmt.Sprintf("%T", item.Data)) + continue + } - tx, err := api.clientCtx.TxConfig.TxDecoder()(data.Tx) - if err != nil { - api.logger.Debug("fail to decode tx", "error", err.Error()) - continue - } + tx, err := api.clientCtx.TxConfig.TxDecoder()(data.Tx) + if err != nil { + api.logger.Debug("fail to decode tx", "error", err.Error()) + continue + } - for _, msg := range tx.GetMsgs() { - ethTx, ok := msg.(*evmtypes.MsgEthereumTx) - if ok { - _ = notifier.Notify(rpcSub.ID, ethTx.AsTransaction().Hash()) + for _, msg := range tx.GetMsgs() { + ethTx, ok := msg.(*evmtypes.MsgEthereumTx) + if ok { + _ = notifier.Notify(rpcSub.ID, ethTx.AsTransaction().Hash()) + } } } case <-rpcSub.Err(): @@ -288,7 +291,7 @@ func (api *PublicFilterAPI) NewBlockFilter() rpc.ID { api.filters[headerSub.ID()] = &filter{typ: filters.BlocksSubscription, deadline: time.NewTimer(deadline), hashes: []common.Hash{}, s: headerSub} - go func(headersCh <-chan coretypes.ResultEvent, errCh <-chan error) { + go func(headersCh <-chan *coretypes.ResultEvents, errCh <-chan error) { defer cancelSubs() for { @@ -300,18 +303,18 @@ func (api *PublicFilterAPI) NewBlockFilter() rpc.ID { api.filtersMu.Unlock() return } - - data, ok := ev.Data.(tmtypes.EventDataNewBlockHeader) - if !ok { - api.logger.Debug("event data type mismatch", "type", fmt.Sprintf("%T", ev.Data)) - continue - } - - api.filtersMu.Lock() - if f, found := api.filters[headerSub.ID()]; found { - f.hashes = append(f.hashes, common.BytesToHash(data.Header.Hash())) + for _, item := range ev.Items { + var data tmtypes.EventDataNewBlockHeader + if err := json.Unmarshal(item.Data, &data); err != nil { + api.logger.Debug("event data type mismatch", "type", fmt.Sprintf("%T", item.Data)) + continue + } + api.filtersMu.Lock() + if f, found := api.filters[headerSub.ID()]; found { + f.hashes = append(f.hashes, common.BytesToHash(data.Header.Hash())) + } + api.filtersMu.Unlock() } - api.filtersMu.Unlock() case <-errCh: api.filtersMu.Lock() delete(api.filters, headerSub.ID()) @@ -339,7 +342,7 @@ func (api *PublicFilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, er return &rpc.Subscription{}, err } - go func(headersCh <-chan coretypes.ResultEvent) { + go func(headersCh <-chan *coretypes.ResultEvents) { defer cancelSubs() for { @@ -349,18 +352,18 @@ func (api *PublicFilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, er headersSub.Unsubscribe(api.events) return } + for _, item := range ev.Items { + var data tmtypes.EventDataNewBlockHeader + if err := json.Unmarshal(item.Data, &data); err != nil { + api.logger.Debug("event data type mismatch", "type", fmt.Sprintf("%T", item.Data)) + continue + } + baseFee := types.BaseFeeFromEvents(data.ResultBeginBlock.Events) - data, ok := ev.Data.(tmtypes.EventDataNewBlockHeader) - if !ok { - api.logger.Debug("event data type mismatch", "type", fmt.Sprintf("%T", ev.Data)) - continue + // TODO: fetch bloom from events + header := types.EthHeaderFromTendermint(data.Header, ethtypes.Bloom{}, baseFee) + _ = notifier.Notify(rpcSub.ID, header) } - - baseFee := types.BaseFeeFromEvents(data.ResultBeginBlock.Events) - - // TODO: fetch bloom from events - header := types.EthHeaderFromTendermint(data.Header, ethtypes.Bloom{}, baseFee) - _ = notifier.Notify(rpcSub.ID, header) case <-rpcSub.Err(): headersSub.Unsubscribe(api.events) return @@ -389,7 +392,7 @@ func (api *PublicFilterAPI) Logs(ctx context.Context, crit filters.FilterCriteri return &rpc.Subscription{}, err } - go func(logsCh <-chan coretypes.ResultEvent) { + go func(logsCh <-chan *coretypes.ResultEvents) { defer cancelSubs() for { @@ -399,32 +402,30 @@ func (api *PublicFilterAPI) Logs(ctx context.Context, crit filters.FilterCriteri logsSub.Unsubscribe(api.events) return } + for _, item := range ev.Items { + // filter only events from EVM module txs + if item.Event != evmtypes.TypeMsgEthereumTx { + // ignore transaction as it's not from the evm module + return + } - // filter only events from EVM module txs - _, isMsgEthereumTx := ev.Events[evmtypes.TypeMsgEthereumTx] - - if !isMsgEthereumTx { - // ignore transaction as it's not from the evm module - return - } - - // get transaction result data - dataTx, ok := ev.Data.(tmtypes.EventDataTx) - if !ok { - api.logger.Debug("event data type mismatch", "type", fmt.Sprintf("%T", ev.Data)) - continue - } + var dataTx tmtypes.EventDataTx + if err := json.Unmarshal(item.Data, &dataTx); err != nil { + api.logger.Debug("event data type mismatch", "type", fmt.Sprintf("%T", item.Data)) + continue + } - txResponse, err := evmtypes.DecodeTxResponse(dataTx.TxResult.Result.Data) - if err != nil { - api.logger.Error("fail to decode tx response", "error", err) - return - } + txResponse, err := evmtypes.DecodeTxResponse(dataTx.TxResult.Result.Data) + if err != nil { + api.logger.Error("fail to decode tx response", "error", err) + return + } - logs := FilterLogs(evmtypes.LogsToEthereum(txResponse.Logs), crit.FromBlock, crit.ToBlock, crit.Addresses, crit.Topics) + logs := FilterLogs(evmtypes.LogsToEthereum(txResponse.Logs), crit.FromBlock, crit.ToBlock, crit.Addresses, crit.Topics) - for _, log := range logs { - _ = notifier.Notify(rpcSub.ID, log) + for _, log := range logs { + _ = notifier.Notify(rpcSub.ID, log) + } } case <-rpcSub.Err(): // client send an unsubscribe request logsSub.Unsubscribe(api.events) @@ -480,7 +481,7 @@ func (api *PublicFilterAPI) NewFilter(criteria filters.FilterCriteria) (rpc.ID, s: logsSub, } - go func(eventCh <-chan coretypes.ResultEvent) { + go func(eventCh <-chan *coretypes.ResultEvents) { defer cancelSubs() for { @@ -492,25 +493,26 @@ func (api *PublicFilterAPI) NewFilter(criteria filters.FilterCriteria) (rpc.ID, api.filtersMu.Unlock() return } - dataTx, ok := ev.Data.(tmtypes.EventDataTx) - if !ok { - api.logger.Debug("event data type mismatch", "type", fmt.Sprintf("%T", ev.Data)) - continue - } - - txResponse, err := evmtypes.DecodeTxResponse(dataTx.TxResult.Result.Data) - if err != nil { - api.logger.Error("fail to decode tx response", "error", err) - return - } + for _, item := range ev.Items { + var dataTx tmtypes.EventDataTx + if err := json.Unmarshal(item.Data, &dataTx); err != nil { + api.logger.Debug("event data type mismatch", "type", fmt.Sprintf("%T", item.Data)) + continue + } + txResponse, err := evmtypes.DecodeTxResponse(dataTx.TxResult.Result.Data) + if err != nil { + api.logger.Error("fail to decode tx response", "error", err) + return + } - logs := FilterLogs(evmtypes.LogsToEthereum(txResponse.Logs), criteria.FromBlock, criteria.ToBlock, criteria.Addresses, criteria.Topics) + logs := FilterLogs(evmtypes.LogsToEthereum(txResponse.Logs), criteria.FromBlock, criteria.ToBlock, criteria.Addresses, criteria.Topics) - api.filtersMu.Lock() - if f, found := api.filters[filterID]; found { - f.logs = append(f.logs, logs...) + api.filtersMu.Lock() + if f, found := api.filters[filterID]; found { + f.logs = append(f.logs, logs...) + } + api.filtersMu.Unlock() } - api.filtersMu.Unlock() case <-logsSub.Err(): api.filtersMu.Lock() delete(api.filters, filterID) diff --git a/rpc/namespaces/ethereum/eth/filters/filter_system.go b/rpc/namespaces/ethereum/eth/filters/filter_system.go index 5b0b1c2dc5..e86ea74334 100644 --- a/rpc/namespaces/ethereum/eth/filters/filter_system.go +++ b/rpc/namespaces/ethereum/eth/filters/filter_system.go @@ -18,20 +18,14 @@ package filters import ( "context" "fmt" - "sync" "time" - "github.com/pkg/errors" - - tmjson "github.com/tendermint/tendermint/libs/json" "github.com/tendermint/tendermint/libs/log" tmquery "github.com/tendermint/tendermint/libs/pubsub/query" + rpcclient "github.com/tendermint/tendermint/rpc/client" coretypes "github.com/tendermint/tendermint/rpc/core/types" - rpcclient "github.com/tendermint/tendermint/rpc/jsonrpc/client" tmtypes "github.com/tendermint/tendermint/types" - "github.com/ethereum/go-ethereum/common" - ethtypes "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/eth/filters" "github.com/ethereum/go-ethereum/rpc" @@ -43,7 +37,7 @@ import ( var ( txEvents = tmtypes.QueryForEvent(tmtypes.EventTx).String() - evmEvents = tmquery.MustParse(fmt.Sprintf("%s='%s' AND %s.%s='%s'", + evmEvents = tmquery.MustCompile(fmt.Sprintf("%s='%s' AND %s.%s='%s'", tmtypes.EventTypeKey, tmtypes.EventTx, sdk.EventTypeMessage, @@ -54,21 +48,9 @@ var ( // EventSystem creates subscriptions, processes events and broadcasts them to the // subscription which match the subscription criteria using the Tendermint's RPC client. type EventSystem struct { - logger log.Logger - ctx context.Context - tmWSClient *rpcclient.WSClient - - // light client mode - lightMode bool - - index filterIndex - topicChans map[string]chan<- coretypes.ResultEvent - indexMux *sync.RWMutex - - // Channels - install chan *Subscription // install filter for event notification - uninstall chan *Subscription // remove filter for event notification - eventBus pubsub.EventBus + logger log.Logger + ctx context.Context + client rpcclient.Client } // NewEventSystem creates a new manager that listens for event on the given mux, @@ -77,27 +59,12 @@ type EventSystem struct { // // The returned manager has a loop that needs to be stopped with the Stop function // or by stopping the given mux. -func NewEventSystem(logger log.Logger, tmWSClient *rpcclient.WSClient) *EventSystem { - index := make(filterIndex) - for i := filters.UnknownSubscription; i < filters.LastIndexSubscription; i++ { - index[i] = make(map[rpc.ID]*Subscription) - } - +func NewEventSystem(logger log.Logger, client rpcclient.Client) *EventSystem { es := &EventSystem{ - logger: logger, - ctx: context.Background(), - tmWSClient: tmWSClient, - lightMode: false, - index: index, - topicChans: make(map[string]chan<- coretypes.ResultEvent, len(index)), - indexMux: new(sync.RWMutex), - install: make(chan *Subscription), - uninstall: make(chan *Subscription), - eventBus: pubsub.NewEventBus(), + logger: logger, + ctx: context.Background(), + client: client, } - - go es.eventLoop() - go es.consumeEvents() return es } @@ -116,49 +83,83 @@ func (es *EventSystem) subscribe(sub *Subscription) (*Subscription, pubsub.Unsub ) ctx, cancelFn := context.WithCancel(context.Background()) - defer cancelFn() - - existingSubs := es.eventBus.Topics() - for _, topic := range existingSubs { - if topic == sub.event { - eventCh, unsubFn, err := es.eventBus.Subscribe(sub.event) - if err != nil { - err := errors.Wrapf(err, "failed to subscribe to topic: %s", sub.event) - return nil, nil, err - } - - sub.eventCh = eventCh - return sub, unsubFn, nil - } - } - + var query string switch sub.typ { - case filters.LogsSubscription: - err = es.tmWSClient.Subscribe(ctx, sub.event) - case filters.BlocksSubscription: - err = es.tmWSClient.Subscribe(ctx, sub.event) - case filters.PendingTransactionsSubscription: - err = es.tmWSClient.Subscribe(ctx, sub.event) + case filters.LogsSubscription, filters.BlocksSubscription, filters.PendingTransactionsSubscription: + query = sub.event default: err = fmt.Errorf("invalid filter subscription type %d", sub.typ) } if err != nil { sub.err <- err - return nil, nil, err + return nil, func() { cancelFn() }, err } - - // wrap events in a go routine to prevent blocking - es.install <- sub - <-sub.installed - - eventCh, unsubFn, err := es.eventBus.Subscribe(sub.event) - if err != nil { - return nil, nil, errors.Wrapf(err, "failed to subscribe to topic after installed: %s", sub.event) - } - + eventCh := make(chan *coretypes.ResultEvents) + filter := coretypes.EventFilter{Query: query} + go func() { + maxRetry := 3 + retry := maxRetry + defer func() { + close(eventCh) + }() + for { + select { + case <-ctx.Done(): + return + + default: + res, err := es.client.Events(ctx, &coretypes.RequestEvents{ + Filter: &filter, + MaxItems: 1, + After: sub.after, + WaitTime: 30 * time.Second, + IsLatest: true, + }) + if err != nil { + if retry--; retry >= 0 { + continue + } + sub.err <- err + return + } + retry = maxRetry + + items := res.Items + newest := res.Newest + if len(res.Items) > 0 { + var before string + for res.More { + if res != nil { + before = res.Items[len(res.Items)-1].Cursor + } + res, err = es.client.Events(ctx, &coretypes.RequestEvents{ + Filter: &filter, + MaxItems: 100, + After: sub.after, + Before: before, + }) + if err != nil { + if retry--; retry >= 0 { + continue + } + sub.err <- err + return + } + retry = maxRetry + items = append(items, res.Items...) + } + } + res.Items = items + sub.after = newest + if ctx.Err() == nil { + eventCh <- res + } + } + } + }() sub.eventCh = eventCh - return sub, unsubFn, nil + return sub, func() { cancelFn() }, nil } // SubscribeLogs creates a subscription that will write all logs matching the @@ -194,14 +195,12 @@ func (es *EventSystem) SubscribeLogs(crit filters.FilterCriteria) (*Subscription // given criteria to the given logs channel. func (es *EventSystem) subscribeLogs(crit filters.FilterCriteria) (*Subscription, pubsub.UnsubscribeFunc, error) { sub := &Subscription{ - id: rpc.NewID(), - typ: filters.LogsSubscription, - event: evmEvents, - logsCrit: crit, - created: time.Now().UTC(), - logs: make(chan []*ethtypes.Log), - installed: make(chan struct{}, 1), - err: make(chan error, 1), + id: rpc.NewID(), + typ: filters.LogsSubscription, + event: evmEvents, + logsCrit: crit, + created: time.Now().UTC(), + err: make(chan error, 1), } return es.subscribe(sub) } @@ -209,13 +208,11 @@ func (es *EventSystem) subscribeLogs(crit filters.FilterCriteria) (*Subscription // SubscribeNewHeads subscribes to new block headers events. func (es EventSystem) SubscribeNewHeads() (*Subscription, pubsub.UnsubscribeFunc, error) { sub := &Subscription{ - id: rpc.NewID(), - typ: filters.BlocksSubscription, - event: headerEvents, - created: time.Now().UTC(), - headers: make(chan *ethtypes.Header), - installed: make(chan struct{}, 1), - err: make(chan error, 1), + id: rpc.NewID(), + typ: filters.BlocksSubscription, + event: headerEvents, + created: time.Now().UTC(), + err: make(chan error, 1), } return es.subscribe(sub) } @@ -223,101 +220,11 @@ func (es EventSystem) SubscribeNewHeads() (*Subscription, pubsub.UnsubscribeFunc // SubscribePendingTxs subscribes to new pending transactions events from the mempool. func (es EventSystem) SubscribePendingTxs() (*Subscription, pubsub.UnsubscribeFunc, error) { sub := &Subscription{ - id: rpc.NewID(), - typ: filters.PendingTransactionsSubscription, - event: txEvents, - created: time.Now().UTC(), - hashes: make(chan []common.Hash), - installed: make(chan struct{}, 1), - err: make(chan error, 1), + id: rpc.NewID(), + typ: filters.PendingTransactionsSubscription, + event: txEvents, + created: time.Now().UTC(), + err: make(chan error, 1), } return es.subscribe(sub) } - -type filterIndex map[filters.Type]map[rpc.ID]*Subscription - -// eventLoop (un)installs filters and processes mux events. -func (es *EventSystem) eventLoop() { - for { - select { - case f := <-es.install: - es.indexMux.Lock() - es.index[f.typ][f.id] = f - ch := make(chan coretypes.ResultEvent) - es.topicChans[f.event] = ch - if err := es.eventBus.AddTopic(f.event, ch); err != nil { - es.logger.Error("failed to add event topic to event bus", "topic", f.event, "error", err.Error()) - } - es.indexMux.Unlock() - close(f.installed) - case f := <-es.uninstall: - es.indexMux.Lock() - delete(es.index[f.typ], f.id) - - var channelInUse bool - for _, sub := range es.index[f.typ] { - if sub.event == f.event { - channelInUse = true - break - } - } - - // remove topic only when channel is not used by other subscriptions - if !channelInUse { - if err := es.tmWSClient.Unsubscribe(es.ctx, f.event); err != nil { - es.logger.Error("failed to unsubscribe from query", "query", f.event, "error", err.Error()) - } - - ch, ok := es.topicChans[f.event] - if ok { - es.eventBus.RemoveTopic(f.event) - close(ch) - delete(es.topicChans, f.event) - } - } - - es.indexMux.Unlock() - close(f.err) - } - } -} - -func (es *EventSystem) consumeEvents() { - for { - for rpcResp := range es.tmWSClient.ResponsesCh { - var ev coretypes.ResultEvent - - if rpcResp.Error != nil { - time.Sleep(5 * time.Second) - continue - } else if err := tmjson.Unmarshal(rpcResp.Result, &ev); err != nil { - es.logger.Error("failed to JSON unmarshal ResponsesCh result event", "error", err.Error()) - continue - } - - if len(ev.Query) == 0 { - // skip empty responses - continue - } - - es.indexMux.RLock() - ch, ok := es.topicChans[ev.Query] - es.indexMux.RUnlock() - if !ok { - es.logger.Debug("channel for subscription not found", "topic", ev.Query) - es.logger.Debug("list of available channels", "channels", es.eventBus.Topics()) - continue - } - - // gracefully handle lagging subscribers - t := time.NewTimer(time.Second) - select { - case <-t.C: - es.logger.Debug("dropped event during lagging subscription", "topic", ev.Query) - case ch <- ev: - } - } - - time.Sleep(time.Second) - } -} diff --git a/rpc/namespaces/ethereum/eth/filters/subscription.go b/rpc/namespaces/ethereum/eth/filters/subscription.go index cfdf2e67df..88feb2c3d4 100644 --- a/rpc/namespaces/ethereum/eth/filters/subscription.go +++ b/rpc/namespaces/ethereum/eth/filters/subscription.go @@ -18,8 +18,6 @@ package filters import ( "time" - "github.com/ethereum/go-ethereum/common" - ethtypes "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/eth/filters" "github.com/ethereum/go-ethereum/rpc" coretypes "github.com/tendermint/tendermint/rpc/core/types" @@ -27,17 +25,14 @@ import ( // Subscription defines a wrapper for the private subscription type Subscription struct { - id rpc.ID - typ filters.Type - event string - created time.Time - logsCrit filters.FilterCriteria - logs chan []*ethtypes.Log - hashes chan []common.Hash - headers chan *ethtypes.Header - installed chan struct{} // closed when the filter is installed - eventCh <-chan coretypes.ResultEvent - err chan error + id rpc.ID + typ filters.Type + event string + created time.Time + logsCrit filters.FilterCriteria + eventCh <-chan *coretypes.ResultEvents + err chan error + after string } // ID returns the underlying subscription RPC identifier. @@ -45,25 +40,8 @@ func (s Subscription) ID() rpc.ID { return s.id } -// Unsubscribe from the current subscription to Tendermint Websocket. It sends an error to the -// subscription error channel if unsubscribe fails. +// Keep life circle for posterity func (s *Subscription) Unsubscribe(es *EventSystem) { - go func() { - uninstallLoop: - for { - // write uninstall request and consume logs/hashes. This prevents - // the eventLoop broadcast method to deadlock when writing to the - // filter event channel while the subscription loop is waiting for - // this method to return (and thus not reading these events). - select { - case es.uninstall <- s: - break uninstallLoop - case <-s.logs: - case <-s.hashes: - case <-s.headers: - } - } - }() } // Err returns the error channel @@ -72,6 +50,6 @@ func (s *Subscription) Err() <-chan error { } // Event returns the tendermint result event channel -func (s *Subscription) Event() <-chan coretypes.ResultEvent { +func (s *Subscription) Event() <-chan *coretypes.ResultEvents { return s.eventCh } diff --git a/rpc/websockets.go b/rpc/websockets.go index 5f184f4f9c..db4e0f13a0 100644 --- a/rpc/websockets.go +++ b/rpc/websockets.go @@ -38,7 +38,6 @@ import ( "github.com/ethereum/go-ethereum/rpc" "github.com/tendermint/tendermint/libs/log" - rpcclient "github.com/tendermint/tendermint/rpc/jsonrpc/client" tmtypes "github.com/tendermint/tendermint/types" "github.com/evmos/ethermint/rpc/ethereum/pubsub" @@ -89,7 +88,7 @@ type websocketsServer struct { logger log.Logger } -func NewWebsocketsServer(clientCtx client.Context, logger log.Logger, tmWSClient *rpcclient.WSClient, cfg *config.Config) WebsocketsServer { +func NewWebsocketsServer(clientCtx client.Context, logger log.Logger, cfg *config.Config) WebsocketsServer { logger = logger.With("api", "websocket-server") _, port, _ := net.SplitHostPort(cfg.JSONRPC.Address) @@ -98,7 +97,7 @@ func NewWebsocketsServer(clientCtx client.Context, logger log.Logger, tmWSClient wsAddr: cfg.JSONRPC.WsAddress, certFile: cfg.TLS.CertificatePath, keyFile: cfg.TLS.KeyPath, - api: newPubSubAPI(clientCtx, logger, tmWSClient), + api: newPubSubAPI(clientCtx, logger), logger: logger, } } @@ -350,10 +349,10 @@ type pubSubAPI struct { } // newPubSubAPI creates an instance of the ethereum PubSub API. -func newPubSubAPI(clientCtx client.Context, logger log.Logger, tmWSClient *rpcclient.WSClient) *pubSubAPI { +func newPubSubAPI(clientCtx client.Context, logger log.Logger) *pubSubAPI { logger = logger.With("module", "websocket-client") return &pubSubAPI{ - events: rpcfilters.NewEventSystem(logger, tmWSClient), + events: rpcfilters.NewEventSystem(logger, clientCtx.Client), logger: logger, clientCtx: clientCtx, } @@ -401,34 +400,33 @@ func (api *pubSubAPI) subscribeNewHeads(wsConn *wsConn, subID rpc.ID) (pubsub.Un if !ok { return } + for _, item := range event.Items { + var data tmtypes.EventDataNewBlockHeader + if err := json.Unmarshal(item.Data, &data); err != nil { + api.logger.Debug("event data type mismatch", "type", fmt.Sprintf("%T", item.Data)) + continue + } + header := types.EthHeaderFromTendermint(data.Header, ethtypes.Bloom{}, baseFee) + // write to ws conn + res := &SubscriptionNotification{ + Jsonrpc: "2.0", + Method: "eth_subscription", + Params: &SubscriptionResult{ + Subscription: subID, + Result: header, + }, + } - data, ok := event.Data.(tmtypes.EventDataNewBlockHeader) - if !ok { - api.logger.Debug("event data type mismatch", "type", fmt.Sprintf("%T", event.Data)) - continue - } - - header := types.EthHeaderFromTendermint(data.Header, ethtypes.Bloom{}, baseFee) - - // write to ws conn - res := &SubscriptionNotification{ - Jsonrpc: "2.0", - Method: "eth_subscription", - Params: &SubscriptionResult{ - Subscription: subID, - Result: header, - }, - } - - err = wsConn.WriteJSON(res) - if err != nil { - api.logger.Error("error writing header, will drop peer", "error", err.Error()) + err = wsConn.WriteJSON(res) + if err != nil { + api.logger.Error("error writing header, will drop peer", "error", err.Error()) - try(func() { - if err != websocket.ErrCloseSent { - _ = wsConn.Close() - } - }, api.logger, "closing websocket peer sub") + try(func() { + if err != websocket.ErrCloseSent { + _ = wsConn.Close() + } + }, api.logger, "closing websocket peer sub") + } } case err, ok := <-errCh: if !ok { @@ -574,41 +572,42 @@ func (api *pubSubAPI) subscribeLogs(wsConn *wsConn, subID rpc.ID, extra interfac if !ok { return } + for _, item := range event.Items { + var dataTx tmtypes.EventDataTx + if err := json.Unmarshal(item.Data, &dataTx); err != nil { + api.logger.Debug("event data type mismatch", "type", fmt.Sprintf("%T", item.Data)) + continue + } - dataTx, ok := event.Data.(tmtypes.EventDataTx) - if !ok { - api.logger.Debug("event data type mismatch", "type", fmt.Sprintf("%T", event.Data)) - continue - } - - txResponse, err := evmtypes.DecodeTxResponse(dataTx.TxResult.Result.Data) - if err != nil { - api.logger.Error("failed to decode tx response", "error", err.Error()) - return - } - - logs := rpcfilters.FilterLogs(evmtypes.LogsToEthereum(txResponse.Logs), crit.FromBlock, crit.ToBlock, crit.Addresses, crit.Topics) - if len(logs) == 0 { - continue - } + txResponse, err := evmtypes.DecodeTxResponse(dataTx.TxResult.Result.Data) + if err != nil { + api.logger.Error("failed to decode tx response", "error", err.Error()) + return + } - for _, ethLog := range logs { - res := &SubscriptionNotification{ - Jsonrpc: "2.0", - Method: "eth_subscription", - Params: &SubscriptionResult{ - Subscription: subID, - Result: ethLog, - }, + logs := rpcfilters.FilterLogs(evmtypes.LogsToEthereum(txResponse.Logs), crit.FromBlock, crit.ToBlock, crit.Addresses, crit.Topics) + if len(logs) == 0 { + continue } - err = wsConn.WriteJSON(res) - if err != nil { - try(func() { - if err != websocket.ErrCloseSent { - _ = wsConn.Close() - } - }, api.logger, "closing websocket peer sub") + for _, ethLog := range logs { + res := &SubscriptionNotification{ + Jsonrpc: "2.0", + Method: "eth_subscription", + Params: &SubscriptionResult{ + Subscription: subID, + Result: ethLog, + }, + } + + err = wsConn.WriteJSON(res) + if err != nil { + try(func() { + if err != websocket.ErrCloseSent { + _ = wsConn.Close() + } + }, api.logger, "closing websocket peer sub") + } } } case err, ok := <-errCh: @@ -635,38 +634,39 @@ func (api *pubSubAPI) subscribePendingTransactions(wsConn *wsConn, subID rpc.ID) for { select { case ev := <-txsCh: - data, ok := ev.Data.(tmtypes.EventDataTx) - if !ok { - api.logger.Debug("event data type mismatch", "type", fmt.Sprintf("%T", ev.Data)) - continue - } - - ethTxs, err := types.RawTxToEthTx(api.clientCtx, data.Tx) - if err != nil { - // not ethereum tx - continue - } - - for _, ethTx := range ethTxs { - // write to ws conn - res := &SubscriptionNotification{ - Jsonrpc: "2.0", - Method: "eth_subscription", - Params: &SubscriptionResult{ - Subscription: subID, - Result: ethTx.Hash, - }, + for _, item := range ev.Items { + var data tmtypes.EventDataTx + if err := json.Unmarshal(item.Data, &data); err != nil { + api.logger.Debug("event data type mismatch", "type", fmt.Sprintf("%T", item.Data)) + continue } - - err = wsConn.WriteJSON(res) + ethTxs, err := types.RawTxToEthTx(api.clientCtx, data.Tx) if err != nil { - api.logger.Debug("error writing header, will drop peer", "error", err.Error()) + // not ethereum tx + continue + } - try(func() { - if err != websocket.ErrCloseSent { - _ = wsConn.Close() - } - }, api.logger, "closing websocket peer sub") + for _, ethTx := range ethTxs { + // write to ws conn + res := &SubscriptionNotification{ + Jsonrpc: "2.0", + Method: "eth_subscription", + Params: &SubscriptionResult{ + Subscription: subID, + Result: ethTx.Hash, + }, + } + + err = wsConn.WriteJSON(res) + if err != nil { + api.logger.Debug("error writing header, will drop peer", "error", err.Error()) + + try(func() { + if err != websocket.ErrCloseSent { + _ = wsConn.Close() + } + }, api.logger, "closing websocket peer sub") + } } } case err, ok := <-errCh: diff --git a/server/json_rpc.go b/server/json_rpc.go index 7934a30c77..be2da444a5 100644 --- a/server/json_rpc.go +++ b/server/json_rpc.go @@ -36,13 +36,9 @@ import ( // StartJSONRPC starts the JSON-RPC server func StartJSONRPC(ctx *server.Context, clientCtx client.Context, - tmRPCAddr, - tmEndpoint string, config *config.Config, indexer ethermint.EVMTxIndexer, ) (*http.Server, chan struct{}, error) { - tmWsClient := ConnectTmWS(tmRPCAddr, tmEndpoint, ctx.Logger) - logger := ctx.Logger.With("module", "geth") ethlog.Root().SetHandler(ethlog.FuncHandler(func(r *ethlog.Record) error { switch r.Lvl { @@ -61,7 +57,7 @@ func StartJSONRPC(ctx *server.Context, allowUnprotectedTxs := config.JSONRPC.AllowUnprotectedTxs rpcAPIArr := config.JSONRPC.API - apis := rpc.GetRPCAPIs(ctx, clientCtx, tmWsClient, allowUnprotectedTxs, indexer, rpcAPIArr) + apis := rpc.GetRPCAPIs(ctx, clientCtx, allowUnprotectedTxs, indexer, rpcAPIArr) for _, api := range apis { if err := rpcServer.RegisterName(api.Namespace, api.Service); err != nil { @@ -121,8 +117,7 @@ func StartJSONRPC(ctx *server.Context, ctx.Logger.Info("Starting JSON WebSocket server", "address", config.JSONRPC.WsAddress) // allocate separate WS connection to Tendermint - tmWsClient = ConnectTmWS(tmRPCAddr, tmEndpoint, ctx.Logger) - wsSrv := rpc.NewWebsocketsServer(clientCtx, ctx.Logger, tmWsClient, config) + wsSrv := rpc.NewWebsocketsServer(clientCtx, ctx.Logger, config) wsSrv.Start() return httpSrv, httpSrvDone, nil } diff --git a/server/start.go b/server/start.go index 6236dd9c7a..c6b093f00b 100644 --- a/server/start.go +++ b/server/start.go @@ -562,10 +562,7 @@ func startInProcess(ctx *server.Context, clientCtx client.Context, opts StartOpt } clientCtx := clientCtx.WithChainID(genDoc.ChainID) - - tmEndpoint := "/websocket" - tmRPCAddr := cfg.RPC.ListenAddress - httpSrv, httpSrvDone, err = StartJSONRPC(ctx, clientCtx, tmRPCAddr, tmEndpoint, &config, idxer) + httpSrv, httpSrvDone, err = StartJSONRPC(ctx, clientCtx, &config, idxer) if err != nil { return err } diff --git a/server/util.go b/server/util.go index 6657f9900a..35d4a6a1e7 100644 --- a/server/util.go +++ b/server/util.go @@ -18,7 +18,6 @@ package server import ( "net" "net/http" - "time" "github.com/evmos/ethermint/server/config" "github.com/gorilla/mux" @@ -32,7 +31,6 @@ import ( tmcmd "github.com/tendermint/tendermint/cmd/tendermint/commands" tmlog "github.com/tendermint/tendermint/libs/log" - rpcclient "github.com/tendermint/tendermint/rpc/jsonrpc/client" ) // AddCommands adds server commands @@ -71,34 +69,6 @@ func AddCommands( ) } -func ConnectTmWS(tmRPCAddr, tmEndpoint string, logger tmlog.Logger) *rpcclient.WSClient { - tmWsClient, err := rpcclient.NewWS(tmRPCAddr, tmEndpoint, - rpcclient.MaxReconnectAttempts(256), - rpcclient.ReadWait(120*time.Second), - rpcclient.WriteWait(120*time.Second), - rpcclient.PingPeriod(50*time.Second), - rpcclient.OnReconnect(func() { - logger.Debug("EVM RPC reconnects to Tendermint WS", "address", tmRPCAddr+tmEndpoint) - }), - ) - - if err != nil { - logger.Error( - "Tendermint WS client could not be created", - "address", tmRPCAddr+tmEndpoint, - "error", err, - ) - } else if err := tmWsClient.OnStart(); err != nil { - logger.Error( - "Tendermint WS client could not start", - "address", tmRPCAddr+tmEndpoint, - "error", err, - ) - } - - return tmWsClient -} - func MountGRPCWebServices( router *mux.Router, grpcWeb *grpcweb.WrappedGrpcServer, diff --git a/tests/integration_tests/configs/default.jsonnet b/tests/integration_tests/configs/default.jsonnet index a8425405ca..36708ebce0 100644 --- a/tests/integration_tests/configs/default.jsonnet +++ b/tests/integration_tests/configs/default.jsonnet @@ -2,7 +2,7 @@ dotenv: '../../../scripts/.env', 'ethermint_9000-1': { cmd: 'ethermintd', - 'start-flags': '--trace', + 'start-flags': '--trace --log_level debug', config: { mempool: { // use v1 mempool to enable tx prioritization diff --git a/tests/integration_tests/hardhat/contracts/TestMessageCall.sol b/tests/integration_tests/hardhat/contracts/TestMessageCall.sol new file mode 100644 index 0000000000..b21adda267 --- /dev/null +++ b/tests/integration_tests/hardhat/contracts/TestMessageCall.sol @@ -0,0 +1,30 @@ +// SPDX-License-Identifier: MIT +pragma solidity 0.8.10; + +contract Inner { + event TestEvent(uint256); + function test() public returns (uint256) { + emit TestEvent(42); + return 42; + } +} + +// An contract that do lots of message calls +contract TestMessageCall { + Inner _inner; + constructor() public { + _inner = new Inner(); + } + + function test(uint iterations) public returns (uint256) { + uint256 n = 0; + for (uint i = 0; i < iterations; i++) { + n += _inner.test(); + } + return n; + } + + function inner() public view returns (address) { + return address(_inner); + } +} diff --git a/tests/integration_tests/test_filters.py b/tests/integration_tests/test_filters.py index 353e82ed5f..5ef371e1b4 100644 --- a/tests/integration_tests/test_filters.py +++ b/tests/integration_tests/test_filters.py @@ -96,6 +96,16 @@ def test_get_logs_by_topic(cluster): assert len(logs) == 0 +def wait_filter(flt): + res = [] + for i in range(10): + print(i) + res = flt.get_new_entries() + if len(res) > 0: + break + return res + + def test_pending_transaction_filter(cluster): w3: Web3 = cluster.w3 flt = w3.eth.filter("pending") @@ -106,7 +116,8 @@ def test_pending_transaction_filter(cluster): w3_wait_for_new_blocks(w3, 1, sleep=0.1) # with tx txhash = send_successful_transaction(w3) - assert txhash in flt.get_new_entries() + txhashes = wait_filter(flt) + assert txhash in txhashes # check if tx_hash is valid tx = w3.eth.get_transaction(txhash) @@ -125,7 +136,7 @@ def test_block_filter(cluster): # with tx send_successful_transaction(w3) - block_hashes = flt.get_new_entries() + block_hashes = wait_filter(flt) assert len(block_hashes) >= 1 # check if the returned block hash is correct diff --git a/tests/integration_tests/test_websockets.py b/tests/integration_tests/test_websockets.py index aa045a087d..b3ec868707 100644 --- a/tests/integration_tests/test_websockets.py +++ b/tests/integration_tests/test_websockets.py @@ -1,11 +1,20 @@ -def test_single_request_netversion(ethermint): - ethermint.use_websocket() - eth_ws = ethermint.w3.provider +import asyncio +import json +import time +from collections import defaultdict - response = eth_ws.make_request("net_version", []) +import websockets +from web3 import Web3 - # net_version should be 9000 - assert response["result"] == "9000", "got " + response["result"] + ", expected 9000" +from .network import Ethermint +from .utils import ( + CONTRACTS, + KEYS, + deploy_contract, + send_raw_transactions, + sign_transaction, + wait_for_new_blocks, +) # note: # batch requests still not implemented in web3.py @@ -19,9 +28,132 @@ def test_batch_request_netversion(ethermint): return -def test_ws_subscribe_log(ethermint): - return +class Client: + def __init__(self, ws): + self._ws = ws + self._gen_id = 0 + self._subs = defaultdict(asyncio.Queue) + self._rsps = defaultdict(asyncio.Queue) + def gen_id(self): + self._gen_id += 1 + return self._gen_id -def test_ws_subscribe_newheads(ethermint): - return + async def receive_loop(self): + while True: + msg = json.loads(await self._ws.recv()) + if "id" in msg: + # responses + await self._rsps[msg["id"]].put(msg) + else: + # subscriptions + assert msg["method"] == "eth_subscription" + sub_id = msg["params"]["subscription"] + await self._subs[sub_id].put(msg["params"]["result"]) + + async def recv_response(self, rpcid): + rsp = await self._rsps[rpcid].get() + del self._rsps[rpcid] + return rsp + + async def recv_subscription(self, sub_id): + return await self._subs[sub_id].get() + + async def subscribe(self, *args): + rpcid = self.gen_id() + await self._ws.send( + json.dumps({"id": rpcid, "method": "eth_subscribe", "params": args}) + ) + rsp = await self.recv_response(rpcid) + assert "error" not in rsp + return rsp["result"] + + async def send(self, method, *args): + id = self.gen_id() + await self._ws.send(json.dumps({"id": id, "method": method, "params": args})) + rsp = await self.recv_response(id) + assert "error" not in rsp + return rsp["result"] + + def sub_qsize(self, sub_id): + return self._subs[sub_id].qsize() + + async def unsubscribe(self, sub_id): + rpcid = self.gen_id() + await self._ws.send( + json.dumps({"id": rpcid, "method": "eth_unsubscribe", "params": [sub_id]}) + ) + rsp = await self.recv_response(rpcid) + assert "error" not in rsp + return rsp["result"] + + +# TestEvent topic from TestMessageCall contract calculated from event signature +TEST_EVENT_TOPIC = Web3.keccak(text="TestEvent(uint256)") + + +def test_subscribe_basic(ethermint: Ethermint): + """ + test basic subscribe and unsubscribe + """ + cli = ethermint.cosmos_cli() + loop = asyncio.get_event_loop() + + async def assert_unsubscribe(c: Client, sub_id): + assert await c.unsubscribe(sub_id) + # check no more messages + await loop.run_in_executor(None, wait_for_new_blocks, cli, 2) + assert c.sub_qsize(sub_id) == 0 + # unsubscribe again return False + assert not await c.unsubscribe(sub_id) + + async def subscriber_test(c: Client): + sub_id = await c.subscribe("newHeads") + # wait for three new blocks + msgs = [await c.recv_subscription(sub_id) for i in range(3)] + # check blocks are consecutive + assert int(msgs[1]["number"], 0) == int(msgs[0]["number"], 0) + 1 + assert int(msgs[2]["number"], 0) == int(msgs[1]["number"], 0) + 1 + await assert_unsubscribe(c, sub_id) + + async def logs_test(c: Client, w3, contract, address): + sub_id = await c.subscribe("logs", {"address": address}) + iterations = 10000 + tx = contract.functions.test(iterations).build_transaction() + raw_transactions = [] + for key_from in KEYS.values(): + signed = sign_transaction(w3, tx, key_from) + raw_transactions.append(signed.rawTransaction) + send_raw_transactions(w3, raw_transactions) + total = len(KEYS) * iterations + msgs = [await c.recv_subscription(sub_id) for i in range(total)] + assert len(msgs) == total + assert all(msg["topics"] == [TEST_EVENT_TOPIC.hex()] for msg in msgs) + await assert_unsubscribe(c, sub_id) + + async def net_version_test(c: Client): + version = await c.send("net_version") + # net_version should be 9000 + assert version == "9000", "got " + version + ", expected 9000" + + async def async_test(): + async with websockets.connect(ethermint.w3_ws_endpoint) as ws: + c = Client(ws) + t = asyncio.create_task(c.receive_loop()) + # run three subscribers concurrently + await asyncio.gather(*[subscriber_test(c) for i in range(3)]) + await asyncio.gather(*[net_version_test(c)]) + contract, _ = deploy_contract(ethermint.w3, CONTRACTS["TestMessageCall"]) + inner = contract.caller.inner() + begin = time.time() + await asyncio.gather(*[logs_test(c, ethermint.w3, contract, inner)]) + print("msg call time", time.time() - begin) + t.cancel() + try: + await t + except asyncio.CancelledError: + # allow retry + pass + + timeout = 100 + loop.run_until_complete(asyncio.wait_for(async_test(), timeout)) diff --git a/tests/integration_tests/utils.py b/tests/integration_tests/utils.py index b277beeaa3..30b0c862f2 100644 --- a/tests/integration_tests/utils.py +++ b/tests/integration_tests/utils.py @@ -4,6 +4,7 @@ import subprocess import sys import time +from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path import bech32 @@ -32,6 +33,7 @@ "TestChainID": "ChainID.sol", "Mars": "Mars.sol", "StateContract": "StateContract.sol", + "TestMessageCall": "TestMessageCall.sol", } @@ -197,3 +199,12 @@ def parse_events(logs): ev["type"]: {attr["key"]: attr["value"] for attr in ev["attributes"]} for ev in logs[0]["events"] } + + +def send_raw_transactions(w3, raw_transactions): + with ThreadPoolExecutor(len(raw_transactions)) as exec: + tasks = [ + exec.submit(w3.eth.send_raw_transaction, raw) for raw in raw_transactions + ] + sended_hash_set = {future.result() for future in as_completed(tasks)} + return sended_hash_set diff --git a/testutil/network/util.go b/testutil/network/util.go index 03123cfbeb..42456e24a8 100644 --- a/testutil/network/util.go +++ b/testutil/network/util.go @@ -143,10 +143,7 @@ func startInProcess(cfg Config, val *Validator) error { return fmt.Errorf("validator %s context is nil", val.Moniker) } - tmEndpoint := "/websocket" - tmRPCAddr := val.RPCAddress - - val.jsonrpc, val.jsonrpcDone, err = server.StartJSONRPC(val.Ctx, val.ClientCtx, tmRPCAddr, tmEndpoint, val.AppConfig, nil) + val.jsonrpc, val.jsonrpcDone, err = server.StartJSONRPC(val.Ctx, val.ClientCtx, val.AppConfig, nil) if err != nil { return err }