Skip to content

Commit

Permalink
Problem: pending tx comes after get mined (#488)
Browse files Browse the repository at this point in the history
* Problem: pending tx comes after get mined

* use sdk mempool

* doc

* cleanup

* fix

* fix test

* fix lint

* fix

---------

Co-authored-by: huangyi <huang@crypto.com>
  • Loading branch information
mmsqe and yihuang committed Jun 6, 2024
1 parent 179e436 commit 6f4031d
Show file tree
Hide file tree
Showing 8 changed files with 75 additions and 51 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
* (rpc) [#473](https://github.com/crypto-org-chain/ethermint/pull/473) Avoid panic on invalid elasticity_multiplier.
* (rpc) [#474](https://github.com/crypto-org-chain/ethermint/pull/474), [#476](https://github.com/crypto-org-chain/ethermint/pull/441) Align genesis related cmd.
* (rpc) [#480](https://github.com/crypto-org-chain/ethermint/pull/480), [#482](https://github.com/crypto-org-chain/ethermint/pull/482) Fix parsed logs from old events.
* (rpc) [#488](https://github.com/crypto-org-chain/ethermint/pull/488) Fix handling of pending transactions related APIs.

### Improvements

Expand Down
19 changes: 19 additions & 0 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,8 @@ var (

type GenesisState map[string]json.RawMessage

type PendingTxListener func([]byte)

// var _ server.Application (*EthermintApp)(nil)

// EthermintApp implements an extended ABCI application. It is an application
Expand All @@ -205,6 +207,8 @@ type EthermintApp struct {

invCheckPeriod uint

pendingTxListeners []PendingTxListener

// keys to access the substores
keys map[string]*storetypes.KVStoreKey
tkeys map[string]*storetypes.TransientStoreKey
Expand Down Expand Up @@ -1063,6 +1067,21 @@ func (app *EthermintApp) GetStoreKey(name string) storetypes.StoreKey {
return app.okeys[name]
}

// RegisterPendingTxListener is used by json-rpc server to listen to pending transactions in CheckTx.
func (app *EthermintApp) RegisterPendingTxListener(listener PendingTxListener) {
app.pendingTxListeners = append(app.pendingTxListeners, listener)
}

func (app *EthermintApp) CheckTx(req *abci.RequestCheckTx) (*abci.ResponseCheckTx, error) {
res, err := app.BaseApp.CheckTx(req)
if err == nil && res.Code == 0 && req.Type == abci.CheckTxType_New {
for _, listener := range app.pendingTxListeners {
listener(req.Tx)
}
}
return res, err
}

// RegisterSwaggerAPI registers swagger route with API Server
func RegisterSwaggerAPI(_ client.Context, rtr *mux.Router) {
root, err := fs.Sub(docs.SwaggerUI, "swagger-ui")
Expand Down
4 changes: 2 additions & 2 deletions rpc/namespaces/ethereum/eth/filters/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID {
}

id := rpc.NewID()
_, offset := api.events.TxStream().ReadNonBlocking(-1)
_, offset := api.events.PendingTxStream().ReadNonBlocking(-1)
api.filters[id] = &filter{
typ: filters.PendingTransactionsSubscription,
deadline: time.NewTimer(deadline),
Expand Down Expand Up @@ -321,7 +321,7 @@ func (api *PublicFilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) {
switch f.typ {
case filters.PendingTransactionsSubscription:
var hashes []common.Hash
hashes, f.offset = api.events.TxStream().ReadAllNonBlocking(f.offset)
hashes, f.offset = api.events.PendingTxStream().ReadAllNonBlocking(f.offset)
return returnHashes(hashes), nil
case filters.BlocksSubscription:
var headers []stream.RPCHeader
Expand Down
77 changes: 32 additions & 45 deletions rpc/stream/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ const (
)

var (
txEvents = tmtypes.QueryForEvent(tmtypes.EventTx).String()
evmEvents = cmtquery.MustCompile(fmt.Sprintf("%s='%s' AND %s.%s='%s'",
tmtypes.EventTypeKey,
tmtypes.EventTx,
Expand All @@ -51,22 +50,26 @@ type RPCStream struct {
logger log.Logger
txDecoder sdk.TxDecoder

headerStream *Stream[RPCHeader]
txStream *Stream[common.Hash]
logStream *Stream[*ethtypes.Log]
headerStream *Stream[RPCHeader]
pendingTxStream *Stream[common.Hash]
logStream *Stream[*ethtypes.Log]

wg sync.WaitGroup
}

func NewRPCStreams(evtClient rpcclient.EventsClient, logger log.Logger, txDecoder sdk.TxDecoder) (*RPCStream, error) {
func NewRPCStreams(
evtClient rpcclient.EventsClient,
logger log.Logger,
txDecoder sdk.TxDecoder,
) (*RPCStream, error) {
s := &RPCStream{
evtClient: evtClient,
logger: logger,
txDecoder: txDecoder,

headerStream: NewStream[RPCHeader](headerStreamSegmentSize, headerStreamCapacity),
txStream: NewStream[common.Hash](txStreamSegmentSize, txStreamCapacity),
logStream: NewStream[*ethtypes.Log](logStreamSegmentSize, logStreamCapacity),
headerStream: NewStream[RPCHeader](headerStreamSegmentSize, headerStreamCapacity),
pendingTxStream: NewStream[common.Hash](txStreamSegmentSize, txStreamCapacity),
logStream: NewStream[*ethtypes.Log](logStreamSegmentSize, logStreamCapacity),
}

ctx := context.Background()
Expand All @@ -76,14 +79,6 @@ func NewRPCStreams(evtClient rpcclient.EventsClient, logger log.Logger, txDecode
return nil, err
}

chTx, err := s.evtClient.Subscribe(ctx, streamSubscriberName, txEvents, subscribBufferSize)
if err != nil {
if err := s.evtClient.UnsubscribeAll(ctx, streamSubscriberName); err != nil {
s.logger.Error("failed to unsubscribe", "err", err)
}
return nil, err
}

chLogs, err := s.evtClient.Subscribe(ctx, streamSubscriberName, evmEvents, subscribBufferSize)
if err != nil {
if err := s.evtClient.UnsubscribeAll(context.Background(), streamSubscriberName); err != nil {
Expand All @@ -92,7 +87,7 @@ func NewRPCStreams(evtClient rpcclient.EventsClient, logger log.Logger, txDecode
return nil, err
}

go s.start(&s.wg, chBlocks, chTx, chLogs)
go s.start(&s.wg, chBlocks, chLogs)

return s, nil
}
Expand All @@ -109,18 +104,34 @@ func (s *RPCStream) HeaderStream() *Stream[RPCHeader] {
return s.headerStream
}

func (s *RPCStream) TxStream() *Stream[common.Hash] {
return s.txStream
func (s *RPCStream) PendingTxStream() *Stream[common.Hash] {
return s.pendingTxStream
}

func (s *RPCStream) LogStream() *Stream[*ethtypes.Log] {
return s.logStream
}

// ListenPendingTx is a callback passed to application to listen for pending transactions in CheckTx.
func (s *RPCStream) ListenPendingTx(bytes []byte) {
tx, err := s.txDecoder(bytes)
if err != nil {
s.logger.Error("fail to decode tx", "error", err.Error())
return
}

var hashes []common.Hash
for _, msg := range tx.GetMsgs() {
if ethTx, ok := msg.(*evmtypes.MsgEthereumTx); ok {
hashes = append(hashes, ethTx.AsTransaction().Hash())
}
}
s.pendingTxStream.Add(hashes...)
}

func (s *RPCStream) start(
wg *sync.WaitGroup,
chBlocks <-chan coretypes.ResultEvent,
chTx <-chan coretypes.ResultEvent,
chLogs <-chan coretypes.ResultEvent,
) {
wg.Add(1)
Expand Down Expand Up @@ -150,31 +161,7 @@ func (s *RPCStream) start(
// TODO: fetch bloom from events
header := types.EthHeaderFromTendermint(data.Block.Header, ethtypes.Bloom{}, baseFee)
s.headerStream.Add(RPCHeader{EthHeader: header, Hash: common.BytesToHash(data.Block.Header.Hash())})
case ev, ok := <-chTx:
if !ok {
chTx = nil
break
}

data, ok := ev.Data.(tmtypes.EventDataTx)
if !ok {
s.logger.Error("event data type mismatch", "type", fmt.Sprintf("%T", ev.Data))
continue
}

tx, err := s.txDecoder(data.Tx)
if err != nil {
s.logger.Error("fail to decode tx", "error", err.Error())
continue
}

var hashes []common.Hash
for _, msg := range tx.GetMsgs() {
if ethTx, ok := msg.(*evmtypes.MsgEthereumTx); ok {
hashes = append(hashes, ethTx.AsTransaction().Hash())
}
}
s.txStream.Add(hashes...)
case ev, ok := <-chLogs:
if !ok {
chLogs = nil
Expand All @@ -201,7 +188,7 @@ func (s *RPCStream) start(
s.logStream.Add(txLogs...)
}

if chBlocks == nil && chTx == nil && chLogs == nil {
if chBlocks == nil && chLogs == nil {
break
}
}
Expand Down
2 changes: 1 addition & 1 deletion rpc/websockets.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,7 @@ func (api *pubSubAPI) subscribeLogs(wsConn *wsConn, subID rpc.ID, extra interfac
func (api *pubSubAPI) subscribePendingTransactions(wsConn *wsConn, subID rpc.ID) (context.CancelFunc, error) {
ctx, cancel := context.WithCancel(context.Background())
//nolint: errcheck
go api.events.TxStream().Subscribe(ctx, func(items []common.Hash, _ int) error {
go api.events.PendingTxStream().Subscribe(ctx, func(items []common.Hash, _ int) error {
for _, hash := range items {
// write to ws conn
res := &SubscriptionNotification{
Expand Down
8 changes: 8 additions & 0 deletions server/json_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/cosmos/cosmos-sdk/server"
ethlog "github.com/ethereum/go-ethereum/log"
ethrpc "github.com/ethereum/go-ethereum/rpc"
"github.com/evmos/ethermint/app"
"github.com/evmos/ethermint/rpc"
"github.com/evmos/ethermint/rpc/stream"

Expand All @@ -41,12 +42,17 @@ const (
MaxRetry = 6
)

type AppWithPendingTxStream interface {
RegisterPendingTxListener(listener app.PendingTxListener)
}

// StartJSONRPC starts the JSON-RPC server
func StartJSONRPC(srvCtx *server.Context,
clientCtx client.Context,
g *errgroup.Group,
config *config.Config,
indexer ethermint.EVMTxIndexer,
app AppWithPendingTxStream,
) (*http.Server, chan struct{}, error) {
logger := srvCtx.Logger.With("module", "geth")

Expand All @@ -69,6 +75,8 @@ func StartJSONRPC(srvCtx *server.Context,
return nil, nil, fmt.Errorf("failed to create rpc streams after %d attempts: %w", MaxRetry, err)
}

app.RegisterPendingTxListener(rpcStream.ListenPendingTx)

ethlog.Root().SetHandler(ethlog.FuncHandler(func(r *ethlog.Record) error {
switch r.Lvl {
case ethlog.LvlTrace, ethlog.LvlDebug:
Expand Down
10 changes: 8 additions & 2 deletions server/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ func startInProcess(svrCtx *server.Context, clientCtx client.Context, opts Start
defer apiSrv.Close()
}

clientCtx, httpSrv, httpSrvDone, err := startJSONRPCServer(svrCtx, clientCtx, g, config, genDocProvider, idxer)
clientCtx, httpSrv, httpSrvDone, err := startJSONRPCServer(svrCtx, clientCtx, g, config, genDocProvider, idxer, app)
if httpSrv != nil {
defer func() {
shutdownCtx, cancelFn := context.WithTimeout(context.Background(), 10*time.Second)
Expand Down Expand Up @@ -655,20 +655,26 @@ func startJSONRPCServer(
config config.Config,
genDocProvider node.GenesisDocProvider,
idxer ethermint.EVMTxIndexer,
app types.Application,
) (ctx client.Context, httpSrv *http.Server, httpSrvDone chan struct{}, err error) {
ctx = clientCtx
if !config.JSONRPC.Enable {
return
}

txApp, ok := app.(AppWithPendingTxStream)
if !ok {
return ctx, httpSrv, httpSrvDone, fmt.Errorf("json-rpc server requires AppWithPendingTxStream")
}

genDoc, err := genDocProvider()
if err != nil {
return ctx, httpSrv, httpSrvDone, err
}

ctx = clientCtx.WithChainID(genDoc.ChainID)
g.Go(func() error {
httpSrv, httpSrvDone, err = StartJSONRPC(svrCtx, clientCtx, g, &config, idxer)
httpSrv, httpSrvDone, err = StartJSONRPC(svrCtx, clientCtx, g, &config, idxer, txApp)
return err
})
return
Expand Down
5 changes: 4 additions & 1 deletion testutil/network/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,10 @@ func startInProcess(cfg Config, val *Validator) error {
return fmt.Errorf("validator %s context is nil", val.Moniker)
}

val.jsonrpc, val.jsonrpcDone, err = server.StartJSONRPC(val.Ctx, val.ClientCtx, val.errGroup, val.AppConfig, nil)
val.jsonrpc, val.jsonrpcDone, err = server.StartJSONRPC(
val.Ctx, val.ClientCtx, val.errGroup, val.AppConfig,
nil, app.(server.AppWithPendingTxStream),
)
if err != nil {
return err
}
Expand Down

0 comments on commit 6f4031d

Please sign in to comment.