This repository has been archived by the owner on Oct 15, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 14
/
mempool.go
114 lines (102 loc) · 3.45 KB
/
mempool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package core
import (
"fmt"
"time"
abci "github.com/tendermint/abci/types"
data "github.com/tendermint/go-wire/data"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
"github.com/tendermint/tendermint/types"
)
//-----------------------------------------------------------------------------
// NOTE: tx should be signed, but this is only checked at the app level (not by Tendermint!)
// Returns right away, with no response
func BroadcastTxAsync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
err := mempool.CheckTx(tx, nil)
if err != nil {
return nil, fmt.Errorf("Error broadcasting transaction: %v", err)
}
return &ctypes.ResultBroadcastTx{Hash: tx.Hash()}, nil
}
// Returns with the response from CheckTx
func BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
resCh := make(chan *abci.Response, 1)
err := mempool.CheckTx(tx, func(res *abci.Response) {
resCh <- res
})
if err != nil {
return nil, fmt.Errorf("Error broadcasting transaction: %v", err)
}
res := <-resCh
r := res.GetCheckTx()
return &ctypes.ResultBroadcastTx{
Code: r.Code,
Data: r.Data,
Log: r.Log,
Hash: tx.Hash(),
}, nil
}
// CONTRACT: only returns error if mempool.BroadcastTx errs (ie. problem with the app)
// or if we timeout waiting for tx to commit.
// If CheckTx or DeliverTx fail, no error will be returned, but the returned result
// will contain a non-OK ABCI code.
func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
// subscribe to tx being committed in block
deliverTxResCh := make(chan types.EventDataTx, 1)
types.AddListenerForEvent(eventSwitch, "rpc", types.EventStringTx(tx), func(data types.TMEventData) {
deliverTxResCh <- data.Unwrap().(types.EventDataTx)
})
// broadcast the tx and register checktx callback
checkTxResCh := make(chan *abci.Response, 1)
err := mempool.CheckTx(tx, func(res *abci.Response) {
checkTxResCh <- res
})
if err != nil {
logger.Error("err", "err", err)
return nil, fmt.Errorf("Error broadcasting transaction: %v", err)
}
checkTxRes := <-checkTxResCh
checkTxR := checkTxRes.GetCheckTx()
if checkTxR.Code != abci.CodeType_OK {
// CheckTx failed!
return &ctypes.ResultBroadcastTxCommit{
CheckTx: checkTxR.Result(),
DeliverTx: abci.Result{},
Hash: tx.Hash(),
}, nil
}
// Wait for the tx to be included in a block,
// timeout after something reasonable.
// TODO: configureable?
timer := time.NewTimer(60 * 2 * time.Second)
select {
case deliverTxRes := <-deliverTxResCh:
// The tx was included in a block.
deliverTxR := &abci.ResponseDeliverTx{
Code: deliverTxRes.Code,
Data: deliverTxRes.Data,
Log: deliverTxRes.Log,
}
logger.Info("DeliverTx passed ", "tx", data.Bytes(tx), "response", deliverTxR)
return &ctypes.ResultBroadcastTxCommit{
CheckTx: checkTxR.Result(),
DeliverTx: deliverTxR.Result(),
Hash: tx.Hash(),
Height: deliverTxRes.Height,
}, nil
case <-timer.C:
logger.Error("failed to include tx")
return &ctypes.ResultBroadcastTxCommit{
CheckTx: checkTxR.Result(),
DeliverTx: abci.Result{},
Hash: tx.Hash(),
}, fmt.Errorf("Timed out waiting for transaction to be included in a block")
}
panic("Should never happen!")
}
func UnconfirmedTxs() (*ctypes.ResultUnconfirmedTxs, error) {
txs := mempool.Reap(-1)
return &ctypes.ResultUnconfirmedTxs{len(txs), txs}, nil
}
func NumUnconfirmedTxs() (*ctypes.ResultUnconfirmedTxs, error) {
return &ctypes.ResultUnconfirmedTxs{N: mempool.Size()}, nil
}