Skip to content

Commit

Permalink
rpc: avoid leaking threads (#8329)
Browse files Browse the repository at this point in the history
Co-authored-by: Sam Kleinman <garen@tychoish.com>
  • Loading branch information
tnasu and tychoish committed Jul 13, 2023
1 parent 482b4d5 commit 1552e91
Showing 1 changed file with 74 additions and 51 deletions.
125 changes: 74 additions & 51 deletions rpc/core/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,31 @@ func BroadcastTxAsync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadca
// DeliverTx result.
// More: https://docs.tendermint.com/master/rpc/#/Tx/broadcast_tx_sync
func BroadcastTxSync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
res, err := env.Mempool.CheckTxSync(tx, mempl.TxInfo{})
resCh := make(chan *ocabci.Response, 1)
err := env.Mempool.CheckTxSync(tx, func(res *ocabci.Response) {
select {
case <-ctx.Context().Done():
case resCh <- res:
}

}, mempl.TxInfo{})
if err != nil {
return nil, err
}
r := res.GetCheckTx()
return &ctypes.ResultBroadcastTx{
Code: r.Code,
Data: r.Data,
Log: r.Log,
Codespace: r.Codespace,
MempoolError: r.MempoolError,
Hash: tx.Hash(),
}, nil

select {
case <-ctx.Context().Done():
return nil, fmt.Errorf("broadcast confirmation not received: %w", ctx.Context().Err())
case res := <-resCh:
r := res.GetCheckTx()
return &ctypes.ResultBroadcastTx{
Code: r.Code,
Data: r.Data,
Log: r.Log,
Codespace: r.Codespace,
Hash: tx.Hash(),
}, nil
}
}

// BroadcastTxCommit returns with the responses from CheckTx and DeliverTx.
Expand Down Expand Up @@ -80,53 +92,64 @@ func BroadcastTxCommit(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadc
}
}()

// Broadcast tx and check tx
checkTxResMsg, err := env.Mempool.CheckTxSync(tx, mempl.TxInfo{})
// Broadcast tx and wait for CheckTx result
checkTxResCh := make(chan *ocabci.Response, 1)
err = env.Mempool.CheckTxSync(tx, func(res *ocabci.Response) {
select {
case <-ctx.Context().Done():
case checkTxResCh <- res:
}
}, mempl.TxInfo{})
if err != nil {
env.Logger.Error("Error on broadcastTxCommit", "err", err)
return nil, fmt.Errorf("error on broadcastTxCommit: %v", err)
}
checkTxRes := checkTxResMsg.GetCheckTx()
if checkTxRes.Code != ocabci.CodeTypeOK {
return &ctypes.ResultBroadcastTxCommit{
CheckTx: *checkTxRes,
DeliverTx: abci.ResponseDeliverTx{},
Hash: tx.Hash(),
}, nil
}

// Wait for the tx to be included in a block or timeout.
select {
case msg := <-deliverTxSub.Out(): // The tx was included in a block.
deliverTxRes := msg.Data().(types.EventDataTx)
return &ctypes.ResultBroadcastTxCommit{
CheckTx: *checkTxRes,
DeliverTx: deliverTxRes.Result,
Hash: tx.Hash(),
Height: deliverTxRes.Height,
}, nil
case <-deliverTxSub.Cancelled():
var reason string
if deliverTxSub.Err() == nil {
reason = "Ostracon exited"
} else {
reason = deliverTxSub.Err().Error()
case <-ctx.Context().Done():
return nil, fmt.Errorf("broadcast confirmation not received: %w", ctx.Context().Err())
case checkTxResMsg := <-checkTxResCh:
checkTxRes := checkTxResMsg.GetCheckTx()
if checkTxRes.Code != abci.CodeTypeOK {
return &ctypes.ResultBroadcastTxCommit{
CheckTx: *checkTxRes,
DeliverTx: abci.ResponseDeliverTx{},
Hash: tx.Hash(),
}, nil
}

// Wait for the tx to be included in a block or timeout.
select {
case msg := <-deliverTxSub.Out(): // The tx was included in a block.
deliverTxRes := msg.Data().(types.EventDataTx)
return &ctypes.ResultBroadcastTxCommit{
CheckTx: *checkTxRes,
DeliverTx: deliverTxRes.Result,
Hash: tx.Hash(),
Height: deliverTxRes.Height,
}, nil
case <-deliverTxSub.Cancelled():
var reason string
if deliverTxSub.Err() == nil {
reason = "Tendermint exited"
} else {
reason = deliverTxSub.Err().Error()
}
err = fmt.Errorf("deliverTxSub was cancelled (reason: %s)", reason)
env.Logger.Error("Error on broadcastTxCommit", "err", err)
return &ctypes.ResultBroadcastTxCommit{
CheckTx: *checkTxRes,
DeliverTx: abci.ResponseDeliverTx{},
Hash: tx.Hash(),
}, err
case <-time.After(env.Config.TimeoutBroadcastTxCommit):
err = errors.New("timed out waiting for tx to be included in a block")
env.Logger.Error("Error on broadcastTxCommit", "err", err)
return &ctypes.ResultBroadcastTxCommit{
CheckTx: *checkTxRes,
DeliverTx: abci.ResponseDeliverTx{},
Hash: tx.Hash(),
}, err
}
err = fmt.Errorf("deliverTxSub was cancelled (reason: %s)", reason)
env.Logger.Error("Error on broadcastTxCommit", "err", err)
return &ctypes.ResultBroadcastTxCommit{
CheckTx: *checkTxRes,
DeliverTx: abci.ResponseDeliverTx{},
Hash: tx.Hash(),
}, err
case <-time.After(env.Config.TimeoutBroadcastTxCommit):
err = errors.New("timed out waiting for tx to be included in a block")
env.Logger.Error("Error on broadcastTxCommit", "err", err)
return &ctypes.ResultBroadcastTxCommit{
CheckTx: *checkTxRes,
DeliverTx: abci.ResponseDeliverTx{},
Hash: tx.Hash(),
}, err
}
}

Expand Down

0 comments on commit 1552e91

Please sign in to comment.