Skip to content
This repository has been archived by the owner on Apr 4, 2024. It is now read-only.

Problem: eth_newPendingTransactionFilter don't return correct tx hash #1012

Merged
merged 9 commits into from
Mar 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
* (ante) [tharsis#991](https://github.com/tharsis/ethermint/pull/991) Set an upper bound to gasWanted to prevent DoS attack.
* (rpc) [tharsis#1006](https://github.com/tharsis/ethermint/pull/1006) Use `string` as the parameters type to correct ambiguous results.
* (ante) [tharsis#1004](https://github.com/tharsis/ethermint/pull/1004) make MaxTxGasWanted configurable.
* (rpc) [tharsis#1012](https://github.com/tharsis/ethermint/pull/1012) fix the tx hash in filter entries created by `eth_newPendingTransactionFilter`.

## [v0.11.0] - 2022-03-06

Expand Down
2 changes: 1 addition & 1 deletion rpc/apis.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func GetRPCAPIs(ctx *server.Context, clientCtx client.Context, tmWSClient *rpccl
rpc.API{
Namespace: EthNamespace,
Version: apiVersion,
Service: filters.NewPublicAPI(ctx.Logger, tmWSClient, evmBackend),
Service: filters.NewPublicAPI(ctx.Logger, clientCtx, tmWSClient, evmBackend),
Public: true,
},
)
Expand Down
53 changes: 31 additions & 22 deletions rpc/ethereum/namespaces/eth/filters/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync"
"time"

"github.com/cosmos/cosmos-sdk/client"
"github.com/tharsis/ethermint/rpc/ethereum/types"

"github.com/tendermint/tendermint/libs/log"
Expand Down Expand Up @@ -56,20 +57,22 @@ type filter struct {
// information related to the Ethereum protocol such as blocks, transactions and logs.
type PublicFilterAPI struct {
logger log.Logger
clientCtx client.Context
backend Backend
events *EventSystem
filtersMu sync.Mutex
filters map[rpc.ID]*filter
}

// NewPublicAPI returns a new PublicFilterAPI instance.
func NewPublicAPI(logger log.Logger, tmWSClient *rpcclient.WSClient, backend Backend) *PublicFilterAPI {
func NewPublicAPI(logger log.Logger, clientCtx client.Context, tmWSClient *rpcclient.WSClient, backend Backend) *PublicFilterAPI {
logger = logger.With("api", "filter")
api := &PublicFilterAPI{
logger: logger,
backend: backend,
filters: make(map[rpc.ID]*filter),
events: NewEventSystem(logger, tmWSClient),
logger: logger,
clientCtx: clientCtx,
backend: backend,
filters: make(map[rpc.ID]*filter),
events: NewEventSystem(logger, tmWSClient),
}

go api.timeoutLoop()
Expand Down Expand Up @@ -141,11 +144,20 @@ func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID {
continue
}

txHash := common.BytesToHash(tmtypes.Tx(data.Tx).Hash())
tx, err := api.clientCtx.TxConfig.TxDecoder()(data.Tx)
if err != nil {
api.logger.Debug("fail to decode tx", "error", err.Error())
continue
}

api.filtersMu.Lock()
if f, found := api.filters[pendingTxSub.ID()]; found {
f.hashes = append(f.hashes, txHash)
for _, msg := range tx.GetMsgs() {
ethTx, ok := msg.(*evmtypes.MsgEthereumTx)
if ok {
f.hashes = append(f.hashes, common.HexToHash(ethTx.Hash))
}
}
}
api.filtersMu.Unlock()
case <-errCh:
Expand Down Expand Up @@ -198,13 +210,17 @@ func (api *PublicFilterAPI) NewPendingTransactions(ctx context.Context) (*rpc.Su
continue
}

txHash := common.BytesToHash(tmtypes.Tx(data.Tx).Hash())

// To keep the original behavior, send a single tx hash in one notification.
// TODO(rjl493456442) Send a batch of tx hashes in one notification
err = notifier.Notify(rpcSub.ID, txHash)
tx, err := api.clientCtx.TxConfig.TxDecoder()(data.Tx)
if err != nil {
return
api.logger.Debug("fail to decode tx", "error", err.Error())
continue
}

for _, msg := range tx.GetMsgs() {
ethTx, ok := msg.(*evmtypes.MsgEthereumTx)
if ok {
_ = notifier.Notify(rpcSub.ID, common.HexToHash(ethTx.Hash))
}
}
case <-rpcSub.Err():
pendingTxSub.Unsubscribe(api.events)
Expand Down Expand Up @@ -314,11 +330,7 @@ func (api *PublicFilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, er

// TODO: fetch bloom from events
header := types.EthHeaderFromTendermint(data.Header, ethtypes.Bloom{}, baseFee)
err = notifier.Notify(rpcSub.ID, header)
if err != nil {
headersSub.err <- err
return
}
_ = notifier.Notify(rpcSub.ID, header)
case <-rpcSub.Err():
headersSub.Unsubscribe(api.events)
return
Expand Down Expand Up @@ -381,10 +393,7 @@ func (api *PublicFilterAPI) Logs(ctx context.Context, crit filters.FilterCriteri
logs := FilterLogs(evmtypes.LogsToEthereum(txResponse.Logs), crit.FromBlock, crit.ToBlock, crit.Addresses, crit.Topics)

for _, log := range logs {
err = notifier.Notify(rpcSub.ID, log)
if err != nil {
return
}
_ = notifier.Notify(rpcSub.ID, log)
}
case <-rpcSub.Err(): // client send an unsubscribe request
logsSub.Unsubscribe(api.events)
Expand Down
26 changes: 26 additions & 0 deletions tests/e2e/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -721,3 +721,29 @@ func (s *IntegrationTestSuite) TestWeb3Sha3() {
})
}
}

func (s *IntegrationTestSuite) TestPendingTransactionFilter() {
var (
filterID string
filterResult []common.Hash
)
// create filter
err := s.rpcClient.Call(&filterID, "eth_newPendingTransactionFilter")
yihuang marked this conversation as resolved.
Show resolved Hide resolved
s.Require().NoError(err)
// check filter result is empty
err = s.rpcClient.Call(&filterResult, "eth_getFilterChanges", filterID)
s.Require().NoError(err)
s.Require().Empty(filterResult)
// send transaction
signedTx := s.signValidTx(common.HexToAddress("0x378c50D9264C63F3F92B806d4ee56E9D86FfB3Ec"), big.NewInt(10)).AsTransaction()
err = s.network.Validators[0].JSONRPCClient.SendTransaction(s.ctx, signedTx)
s.Require().NoError(err)

s.waitForTransaction()
s.expectSuccessReceipt(signedTx.Hash())

// check filter changes match the tx hash
err = s.rpcClient.Call(&filterResult, "eth_getFilterChanges", filterID)
s.Require().NoError(err)
s.Require().Equal([]common.Hash{signedTx.Hash()}, filterResult)
}
2 changes: 1 addition & 1 deletion testutil/network/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func startInProcess(cfg Config, val *Validator) error {
}

tmEndpoint := "/websocket"
tmRPCAddr := fmt.Sprintf("tcp://%s", val.AppConfig.GRPC.Address)
tmRPCAddr := val.RPCAddress

val.jsonrpc, val.jsonrpcDone, err = server.StartJSONRPC(val.Ctx, val.ClientCtx, tmRPCAddr, tmEndpoint, *val.AppConfig)
if err != nil {
Expand Down