-
Notifications
You must be signed in to change notification settings - Fork 0
/
txListBySenderMap.go
171 lines (136 loc) · 4.83 KB
/
txListBySenderMap.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
package txcache
import (
"sync"
"github.com/bhagyaraj1208117/andes-core-go/core/atomic"
"github.com/bhagyaraj1208117/andes-storage-go/txcache/maps"
)
const numberOfScoreChunks = uint32(100)
// txListBySenderMap is a map-like structure for holding and accessing transactions by sender
type txListBySenderMap struct {
backingMap *maps.BucketSortedMap
senderConstraints senderConstraints
counter atomic.Counter
scoreComputer scoreComputer
txGasHandler TxGasHandler
txFeeHelper feeHelper
mutex sync.Mutex
}
// newTxListBySenderMap creates a new instance of TxListBySenderMap
func newTxListBySenderMap(
nChunksHint uint32,
senderConstraints senderConstraints,
scoreComputer scoreComputer,
txGasHandler TxGasHandler,
txFeeHelper feeHelper,
) *txListBySenderMap {
backingMap := maps.NewBucketSortedMap(nChunksHint, numberOfScoreChunks)
return &txListBySenderMap{
backingMap: backingMap,
senderConstraints: senderConstraints,
scoreComputer: scoreComputer,
txGasHandler: txGasHandler,
txFeeHelper: txFeeHelper,
}
}
// addTx adds a transaction in the map, in the corresponding list (selected by its sender)
func (txMap *txListBySenderMap) addTx(tx *WrappedTransaction) (bool, [][]byte) {
sender := string(tx.Tx.GetSndAddr())
listForSender := txMap.getOrAddListForSender(sender)
return listForSender.AddTx(tx, txMap.txGasHandler, txMap.txFeeHelper)
}
// getOrAddListForSender gets or lazily creates a list (using double-checked locking pattern)
func (txMap *txListBySenderMap) getOrAddListForSender(sender string) *txListForSender {
listForSender, ok := txMap.getListForSender(sender)
if ok {
return listForSender
}
txMap.mutex.Lock()
defer txMap.mutex.Unlock()
listForSender, ok = txMap.getListForSender(sender)
if ok {
return listForSender
}
return txMap.addSender(sender)
}
func (txMap *txListBySenderMap) getListForSender(sender string) (*txListForSender, bool) {
listForSenderUntyped, ok := txMap.backingMap.Get(sender)
if !ok {
return nil, false
}
listForSender := listForSenderUntyped.(*txListForSender)
return listForSender, true
}
func (txMap *txListBySenderMap) addSender(sender string) *txListForSender {
listForSender := newTxListForSender(sender, &txMap.senderConstraints, txMap.notifyScoreChange)
txMap.backingMap.Set(listForSender)
txMap.counter.Increment()
return listForSender
}
// This function should only be called in a critical section managed by a "txListForSender"
func (txMap *txListBySenderMap) notifyScoreChange(txList *txListForSender, scoreParams senderScoreParams) {
score := txMap.scoreComputer.computeScore(scoreParams)
txList.setLastComputedScore(score)
txMap.backingMap.NotifyScoreChange(txList, score)
}
// removeTx removes a transaction from the map
func (txMap *txListBySenderMap) removeTx(tx *WrappedTransaction) bool {
sender := string(tx.Tx.GetSndAddr())
listForSender, ok := txMap.getListForSender(sender)
if !ok {
// This happens when a sender whose transactions were selected for processing is removed from cache in the meantime.
// When it comes to remove one if its transactions due to processing (commited / finalized block), they don't exist in cache anymore.
log.Trace("txListBySenderMap.removeTx() detected slight inconsistency: sender of tx not in cache", "tx", tx.TxHash, "sender", []byte(sender))
return false
}
isFound := listForSender.RemoveTx(tx)
isEmpty := listForSender.IsEmpty()
if isEmpty {
txMap.removeSender(sender)
}
return isFound
}
func (txMap *txListBySenderMap) removeSender(sender string) bool {
_, removed := txMap.backingMap.Remove(sender)
if removed {
txMap.counter.Decrement()
}
return removed
}
// RemoveSendersBulk removes senders, in bulk
func (txMap *txListBySenderMap) RemoveSendersBulk(senders []string) uint32 {
numRemoved := uint32(0)
for _, senderKey := range senders {
if txMap.removeSender(senderKey) {
numRemoved++
}
}
return numRemoved
}
func (txMap *txListBySenderMap) notifyAccountNonce(accountKey []byte, nonce uint64) {
sender := string(accountKey)
listForSender, ok := txMap.getListForSender(sender)
if !ok {
return
}
listForSender.notifyAccountNonce(nonce)
}
func (txMap *txListBySenderMap) getSnapshotAscending() []*txListForSender {
itemsSnapshot := txMap.backingMap.GetSnapshotAscending()
listsSnapshot := make([]*txListForSender, len(itemsSnapshot))
for i, item := range itemsSnapshot {
listsSnapshot[i] = item.(*txListForSender)
}
return listsSnapshot
}
func (txMap *txListBySenderMap) getSnapshotDescending() []*txListForSender {
itemsSnapshot := txMap.backingMap.GetSnapshotDescending()
listsSnapshot := make([]*txListForSender, len(itemsSnapshot))
for i, item := range itemsSnapshot {
listsSnapshot[i] = item.(*txListForSender)
}
return listsSnapshot
}
func (txMap *txListBySenderMap) clear() {
txMap.backingMap.Clear()
txMap.counter.Set(0)
}