-
Notifications
You must be signed in to change notification settings - Fork 0
/
tx_cacher.go
132 lines (120 loc) · 3.85 KB
/
tx_cacher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package core
import (
"runtime"
"github.com/PhoenixGlobal/Phoenix-Chain-SDK/libs/log"
"github.com/PhoenixGlobal/Phoenix-Chain-SDK/ethereum/core/types"
)
// senderCacher is a concurrent transaction sender recoverer anc cacher.
var SenderCacher = NewTxSenderCacher(runtime.NumCPU())
// txSenderCacherRequest is a request for recovering transaction senders with a
// specific signature scheme and caching it into the transactions themselves.
//
// The inc field defines the number of transactions to skip after each recovery,
// which is used to feed the same underlying input array to different threads but
// ensure they process the early transactions fast.
type txSenderCacherRequest struct {
signer types.Signer
txs []*types.Transaction
inc int
doneCh chan int
starts int
}
// txSenderCacher is a helper structure to concurrently ecrecover transaction
// senders from digital signatures on background threads.
type txSenderCacher struct {
threads int
tasks chan *txSenderCacherRequest
txPool *TxPool
}
//todoewTxSenderCacher creates a new transaction sender background cacher and starts
// as many processing goroutines as allowed by the GOMAXPROCS on construction.
func NewTxSenderCacher(threads int) *txSenderCacher {
cacher := &txSenderCacher{
tasks: make(chan *txSenderCacherRequest, threads),
threads: threads,
}
for i := 0; i < threads; i++ {
go cacher.cache()
}
return cacher
}
// if set txpool ,will find from txpool first,if txpool have the tx,will not cal from any more
func (cacher *txSenderCacher) SetTxPool(txPool *TxPool) {
cacher.txPool = txPool
}
// cache is an infinite loop, caching transaction senders from various forms of
// data structures.
func (cacher *txSenderCacher) cache() {
for task := range cacher.tasks {
txCal := 0
for i := task.starts; i < len(task.txs); i += task.inc {
types.Sender(task.signer, task.txs[i])
txCal++
}
if task.doneCh != nil {
task.doneCh <- txCal
}
}
}
// recover recovers the senders from a batch of transactions and caches them
// back into the same data structures. There is no validation being done, nor
// any reaction to invalid signatures. That is up to calling code later.
func (cacher *txSenderCacher) recover(signer types.Signer, txs []*types.Transaction) {
// If there's nothing to recover, abort
if len(txs) == 0 {
return
}
// Ensure we have meaningful task sizes and schedule the recoveries
tasks := cacher.threads
if len(txs) < tasks*4 {
tasks = (len(txs) + 3) / 4
}
for i := 0; i < tasks; i++ {
cacher.tasks <- &txSenderCacherRequest{
signer: signer,
txs: txs,
inc: tasks,
starts: i,
}
}
}
// recoverFromBlock recovers the senders from block and caches them
// back into the same data structures. There is no validation being done, nor
// any reaction to invalid signatures. That is up to calling code later.
func (cacher *txSenderCacher) RecoverFromBlock(signer types.Signer, block *types.Block) {
count := len(block.Transactions())
if count == 0 {
return
}
txs := make([]*types.Transaction, 0, count)
if cacher.txPool != nil && cacher.txPool.count() >= 200 {
for i, tx := range block.Transactions() {
if txInPool := cacher.txPool.Get(tx.Hash()); txInPool != nil {
block.Transactions()[i].CacheFromAddr(signer, txInPool.FromAddr(signer))
} else {
txs = append(txs, block.Transactions()[i])
}
}
} else {
txs = block.Transactions()
}
if len(txs) == 0 {
return
}
// Ensure we have meaningful task sizes and schedule the recoveries
tasks := cacher.threads
if len(txs) < tasks*4 {
tasks = (len(txs) + 3) / 4
}
log.Trace("Start recover tx FromBlock", "number", block.Number(), "txs", len(txs), "tasks", tasks)
block.CalTxFromCH = make(chan int, tasks)
for i := 0; i < tasks; i++ {
cacher.tasks <- &txSenderCacherRequest{
signer: signer,
txs: txs,
inc: tasks,
doneCh: block.CalTxFromCH,
starts: i,
}
}
}