/
txputter.go
138 lines (123 loc) · 3.46 KB
/
txputter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package rpc
import (
"bytes"
"context"
"reflect"
"time"
"github.com/aergoio/aergo-actor/actor"
"github.com/aergoio/aergo-lib/log"
"github.com/aergoio/aergo/v2/p2p/p2putil"
"github.com/aergoio/aergo/v2/pkg/component"
"github.com/aergoio/aergo/v2/types"
"github.com/aergoio/aergo/v2/types/message"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
const actorBufSize = 10
type txPutter struct {
hub component.ICompSyncRequester
logger *log.Logger
actorTimeout time.Duration
maxRetry int
ctx context.Context
Txs []*types.Tx
txSize int
offset int
q *p2putil.PressableQueue
rs []*types.CommitResult
futures []*actor.Future
}
func newPutter(ctx context.Context, txs []*types.Tx, hub component.ICompSyncRequester, timeout time.Duration) *txPutter {
m := &txPutter{ctx: ctx, Txs: txs, hub: hub, actorTimeout: timeout}
m.logger = log.NewLogger("txputter")
txSize := len(m.Txs)
m.txSize = txSize
m.q = p2putil.NewPressableQueue(actorBufSize)
m.maxRetry = actorBufSize << 1
m.rs = make([]*types.CommitResult, txSize)
m.futures = make([]*actor.Future, txSize)
return m
}
func (m *txPutter) Commit() error {
//phase.1 send tx message to mempool with size of workers
for true {
idx := m.putToNextTx()
if idx < 0 || m.q.Full() { // nothing to put or job queue is full
break
}
}
toRetry := 0
for !m.q.Empty() {
select {
case <-m.ctx.Done():
return m.ctx.Err()
default:
}
i := m.q.Poll().(int)
future := m.futures[i]
result, err := future.Result()
if err != nil { // error by actors
if err == actor.ErrTimeout && toRetry < m.maxRetry {
toRetry++
m.logger.Debug().Int("idx", i).Int("retryCnt", toRetry).Msg("Retrying timeout job")
m.rePutTx(i) // retry
} else {
m.logger.Debug().Err(err).Int("idx", i).Int("retryCnt", toRetry).Msg("Exiting commit")
return err
}
} else { //
m.writeResult(result, i)
m.putToNextTx()
}
}
m.logger.Debug().Int("txSize", m.txSize).Int("retryCnt", toRetry).Msg("putting txs complete")
return nil
}
func (m *txPutter) writeResult(result interface{}, i int) {
var err error
rsp, ok := result.(*message.MemPoolPutRsp)
if !ok {
err = status.Errorf(codes.Internal, "internal type (%v) error", reflect.TypeOf(result))
} else {
err = rsp.Err
}
m.rs[i].Error = convertError(err)
if err != nil {
m.rs[i].Detail = err.Error()
}
}
// put valid tx, skipping invalid tx
func (m *txPutter) putToNextTx() int {
for ; m.offset < m.txSize; m.offset++ {
tx := m.Txs[m.offset]
hash := tx.Hash
var r types.CommitResult
r.Hash = hash
m.rs[m.offset] = &r
calculated := tx.CalculateTxHash()
if !bytes.Equal(hash, calculated) {
m.logger.Trace().Stringer("calculated", types.LogBase58(calculated)).Stringer("in", types.LogBase58(hash)).Msg("tx hash mismatch")
r.Error = types.CommitStatus_TX_INVALID_HASH
} else {
f := m.hub.RequestFuture(message.MemPoolSvc,
&message.MemPoolPut{Tx: tx},
m.actorTimeout, "rpc.(*AergoRPCService).CommitTX")
m.futures[m.offset] = f
m.q.Offer(m.offset)
point := m.offset
m.offset++
m.logger.Trace().Object("tx", types.LogTxHash{Tx: tx}).Msg("putting tx to mempool")
return point
}
}
return -1
}
func (m *txPutter) rePutTx(i int) {
tx := m.Txs[i]
f := m.hub.RequestFuture(message.MemPoolSvc,
&message.MemPoolPut{Tx: tx},
m.actorTimeout, "rpc.(*AergoRPCService).CommitTX")
m.futures[i] = f
m.q.Offer(i)
m.logger.Trace().Object("tx", types.LogTxHash{Tx: tx}).Msg("putting tx to mempool")
}