diff --git a/CHANGELOG.md b/CHANGELOG.md index 44e328d971..45829ccf31 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -44,6 +44,7 @@ Ref: https://keepachangelog.com/en/1.0.0/ * (ante) [tharsis#991](https://github.com/tharsis/ethermint/pull/991) Set an upper bound to gasWanted to prevent DoS attack. * (rpc) [tharsis#1006](https://github.com/tharsis/ethermint/pull/1006) Use `string` as the parameters type to correct ambiguous results. * (ante) [tharsis#1004](https://github.com/tharsis/ethermint/pull/1004) make MaxTxGasWanted configurable. +* (rpc) [tharsis#1012](https://github.com/tharsis/ethermint/pull/1012) fix the tx hash in filter entries created by `eth_newPendingTransactionFilter`. ## [v0.11.0] - 2022-03-06 diff --git a/rpc/apis.go b/rpc/apis.go index 7519e5c834..29c431a9fe 100644 --- a/rpc/apis.go +++ b/rpc/apis.go @@ -57,7 +57,7 @@ func GetRPCAPIs(ctx *server.Context, clientCtx client.Context, tmWSClient *rpccl rpc.API{ Namespace: EthNamespace, Version: apiVersion, - Service: filters.NewPublicAPI(ctx.Logger, tmWSClient, evmBackend), + Service: filters.NewPublicAPI(ctx.Logger, clientCtx, tmWSClient, evmBackend), Public: true, }, ) diff --git a/rpc/ethereum/namespaces/eth/filters/api.go b/rpc/ethereum/namespaces/eth/filters/api.go index 5d6a788a1f..4c286312d0 100644 --- a/rpc/ethereum/namespaces/eth/filters/api.go +++ b/rpc/ethereum/namespaces/eth/filters/api.go @@ -6,6 +6,7 @@ import ( "sync" "time" + "github.com/cosmos/cosmos-sdk/client" "github.com/tharsis/ethermint/rpc/ethereum/types" "github.com/tendermint/tendermint/libs/log" @@ -56,6 +57,7 @@ type filter struct { // information related to the Ethereum protocol such as blocks, transactions and logs. type PublicFilterAPI struct { logger log.Logger + clientCtx client.Context backend Backend events *EventSystem filtersMu sync.Mutex @@ -63,13 +65,14 @@ type PublicFilterAPI struct { } // NewPublicAPI returns a new PublicFilterAPI instance. -func NewPublicAPI(logger log.Logger, tmWSClient *rpcclient.WSClient, backend Backend) *PublicFilterAPI { +func NewPublicAPI(logger log.Logger, clientCtx client.Context, tmWSClient *rpcclient.WSClient, backend Backend) *PublicFilterAPI { logger = logger.With("api", "filter") api := &PublicFilterAPI{ - logger: logger, - backend: backend, - filters: make(map[rpc.ID]*filter), - events: NewEventSystem(logger, tmWSClient), + logger: logger, + clientCtx: clientCtx, + backend: backend, + filters: make(map[rpc.ID]*filter), + events: NewEventSystem(logger, tmWSClient), } go api.timeoutLoop() @@ -141,11 +144,20 @@ func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID { continue } - txHash := common.BytesToHash(tmtypes.Tx(data.Tx).Hash()) + tx, err := api.clientCtx.TxConfig.TxDecoder()(data.Tx) + if err != nil { + api.logger.Debug("fail to decode tx", "error", err.Error()) + continue + } api.filtersMu.Lock() if f, found := api.filters[pendingTxSub.ID()]; found { - f.hashes = append(f.hashes, txHash) + for _, msg := range tx.GetMsgs() { + ethTx, ok := msg.(*evmtypes.MsgEthereumTx) + if ok { + f.hashes = append(f.hashes, common.HexToHash(ethTx.Hash)) + } + } } api.filtersMu.Unlock() case <-errCh: @@ -198,13 +210,17 @@ func (api *PublicFilterAPI) NewPendingTransactions(ctx context.Context) (*rpc.Su continue } - txHash := common.BytesToHash(tmtypes.Tx(data.Tx).Hash()) - - // To keep the original behavior, send a single tx hash in one notification. - // TODO(rjl493456442) Send a batch of tx hashes in one notification - err = notifier.Notify(rpcSub.ID, txHash) + tx, err := api.clientCtx.TxConfig.TxDecoder()(data.Tx) if err != nil { - return + api.logger.Debug("fail to decode tx", "error", err.Error()) + continue + } + + for _, msg := range tx.GetMsgs() { + ethTx, ok := msg.(*evmtypes.MsgEthereumTx) + if ok { + _ = notifier.Notify(rpcSub.ID, common.HexToHash(ethTx.Hash)) + } } case <-rpcSub.Err(): pendingTxSub.Unsubscribe(api.events) @@ -314,11 +330,7 @@ func (api *PublicFilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, er // TODO: fetch bloom from events header := types.EthHeaderFromTendermint(data.Header, ethtypes.Bloom{}, baseFee) - err = notifier.Notify(rpcSub.ID, header) - if err != nil { - headersSub.err <- err - return - } + _ = notifier.Notify(rpcSub.ID, header) case <-rpcSub.Err(): headersSub.Unsubscribe(api.events) return @@ -381,10 +393,7 @@ func (api *PublicFilterAPI) Logs(ctx context.Context, crit filters.FilterCriteri logs := FilterLogs(evmtypes.LogsToEthereum(txResponse.Logs), crit.FromBlock, crit.ToBlock, crit.Addresses, crit.Topics) for _, log := range logs { - err = notifier.Notify(rpcSub.ID, log) - if err != nil { - return - } + _ = notifier.Notify(rpcSub.ID, log) } case <-rpcSub.Err(): // client send an unsubscribe request logsSub.Unsubscribe(api.events) diff --git a/tests/e2e/integration_test.go b/tests/e2e/integration_test.go index ace2ad248c..0052ee2ca0 100644 --- a/tests/e2e/integration_test.go +++ b/tests/e2e/integration_test.go @@ -721,3 +721,29 @@ func (s *IntegrationTestSuite) TestWeb3Sha3() { }) } } + +func (s *IntegrationTestSuite) TestPendingTransactionFilter() { + var ( + filterID string + filterResult []common.Hash + ) + // create filter + err := s.rpcClient.Call(&filterID, "eth_newPendingTransactionFilter") + s.Require().NoError(err) + // check filter result is empty + err = s.rpcClient.Call(&filterResult, "eth_getFilterChanges", filterID) + s.Require().NoError(err) + s.Require().Empty(filterResult) + // send transaction + signedTx := s.signValidTx(common.HexToAddress("0x378c50D9264C63F3F92B806d4ee56E9D86FfB3Ec"), big.NewInt(10)).AsTransaction() + err = s.network.Validators[0].JSONRPCClient.SendTransaction(s.ctx, signedTx) + s.Require().NoError(err) + + s.waitForTransaction() + s.expectSuccessReceipt(signedTx.Hash()) + + // check filter changes match the tx hash + err = s.rpcClient.Call(&filterResult, "eth_getFilterChanges", filterID) + s.Require().NoError(err) + s.Require().Equal([]common.Hash{signedTx.Hash()}, filterResult) +} diff --git a/testutil/network/util.go b/testutil/network/util.go index 1c4c8f50f5..cf41f2bebe 100644 --- a/testutil/network/util.go +++ b/testutil/network/util.go @@ -128,7 +128,7 @@ func startInProcess(cfg Config, val *Validator) error { } tmEndpoint := "/websocket" - tmRPCAddr := fmt.Sprintf("tcp://%s", val.AppConfig.GRPC.Address) + tmRPCAddr := val.RPCAddress val.jsonrpc, val.jsonrpcDone, err = server.StartJSONRPC(val.Ctx, val.ClientCtx, tmRPCAddr, tmEndpoint, *val.AppConfig) if err != nil {