-
Notifications
You must be signed in to change notification settings - Fork 3.4k
/
priority_nonce.go
479 lines (404 loc) · 14.6 KB
/
priority_nonce.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
package mempool
import (
"context"
"fmt"
"math"
"github.com/huandu/skiplist"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/cosmos-sdk/x/auth/signing"
)
var (
_ Mempool = (*PriorityNonceMempool[int64])(nil)
_ Iterator = (*PriorityNonceIterator[int64])(nil)
)
type (
// PriorityNonceMempoolConfig defines the configuration used to configure the
// PriorityNonceMempool.
PriorityNonceMempoolConfig[C comparable] struct {
// TxPriority defines the transaction priority and comparator.
TxPriority TxPriority[C]
// OnRead is a callback to be called when a tx is read from the mempool.
OnRead func(tx sdk.Tx)
// TxReplacement is a callback to be called when duplicated transaction nonce
// detected during mempool insert. An application can define a transaction
// replacement rule based on tx priority or certain transaction fields.
TxReplacement func(op, np C, oTx, nTx sdk.Tx) bool
// MaxTx sets the maximum number of transactions allowed in the mempool with
// the semantics:
// - if MaxTx == 0, there is no cap on the number of transactions in the mempool
// - if MaxTx > 0, the mempool will cap the number of transactions it stores,
// and will prioritize transactions by their priority and sender-nonce
// (sequence number) when evicting transactions.
// - if MaxTx < 0, `Insert` is a no-op.
MaxTx int
}
// PriorityNonceMempool is a mempool implementation that stores txs
// in a partially ordered set by 2 dimensions: priority, and sender-nonce
// (sequence number). Internally it uses one priority ordered skip list and one
// skip list per sender ordered by sender-nonce (sequence number). When there
// are multiple txs from the same sender, they are not always comparable by
// priority to other sender txs and must be partially ordered by both sender-nonce
// and priority.
PriorityNonceMempool[C comparable] struct {
priorityIndex *skiplist.SkipList
priorityCounts map[C]int
senderIndices map[string]*skiplist.SkipList
scores map[txMeta[C]]txMeta[C]
cfg PriorityNonceMempoolConfig[C]
}
// PriorityNonceIterator defines an iterator that is used for mempool iteration
// on Select().
PriorityNonceIterator[C comparable] struct {
mempool *PriorityNonceMempool[C]
priorityNode *skiplist.Element
senderCursors map[string]*skiplist.Element
sender string
nextPriority C
}
// TxPriority defines a type that is used to retrieve and compare transaction
// priorities. Priorities must be comparable.
TxPriority[C comparable] struct {
// GetTxPriority returns the priority of the transaction. A priority must be
// comparable via Compare.
GetTxPriority func(ctx context.Context, tx sdk.Tx) C
// Compare compares two transaction priorities. The result must be 0 if
// a == b, -1 if a < b, and +1 if a > b.
Compare func(a, b C) int
// MinValue defines the minimum priority value, e.g. MinInt64. This value is
// used when instantiating a new iterator and comparing weights.
MinValue C
}
// txMeta stores transaction metadata used in indices
txMeta[C comparable] struct {
// nonce is the sender's sequence number
nonce uint64
// priority is the transaction's priority
priority C
// sender is the transaction's sender
sender string
// weight is the transaction's weight, used as a tiebreaker for transactions
// with the same priority
weight C
// senderElement is a pointer to the transaction's element in the sender index
senderElement *skiplist.Element
}
)
// NewDefaultTxPriority returns a TxPriority comparator using ctx.Priority as
// the defining transaction priority.
func NewDefaultTxPriority() TxPriority[int64] {
return TxPriority[int64]{
GetTxPriority: func(goCtx context.Context, _ sdk.Tx) int64 {
return sdk.UnwrapSDKContext(goCtx).Priority()
},
Compare: func(a, b int64) int {
return skiplist.Int64.Compare(a, b)
},
MinValue: math.MinInt64,
}
}
func DefaultPriorityNonceMempoolConfig() PriorityNonceMempoolConfig[int64] {
return PriorityNonceMempoolConfig[int64]{
TxPriority: NewDefaultTxPriority(),
}
}
// skiplistComparable is a comparator for txKeys that first compares priority,
// then weight, then sender, then nonce, uniquely identifying a transaction.
//
// Note, skiplistComparable is used as the comparator in the priority index.
func skiplistComparable[C comparable](txPriority TxPriority[C]) skiplist.Comparable {
return skiplist.LessThanFunc(func(a, b any) int {
keyA := a.(txMeta[C])
keyB := b.(txMeta[C])
res := txPriority.Compare(keyA.priority, keyB.priority)
if res != 0 {
return res
}
// Weight is used as a tiebreaker for transactions with the same priority.
// Weight is calculated in a single pass in .Select(...) and so will be 0
// on .Insert(...).
res = txPriority.Compare(keyA.weight, keyB.weight)
if res != 0 {
return res
}
// Because weight will be 0 on .Insert(...), we must also compare sender and
// nonce to resolve priority collisions. If we didn't then transactions with
// the same priority would overwrite each other in the priority index.
res = skiplist.String.Compare(keyA.sender, keyB.sender)
if res != 0 {
return res
}
return skiplist.Uint64.Compare(keyA.nonce, keyB.nonce)
})
}
// NewPriorityMempool returns the SDK's default mempool implementation which
// returns txs in a partial order by 2 dimensions; priority, and sender-nonce.
func NewPriorityMempool[C comparable](cfg PriorityNonceMempoolConfig[C]) *PriorityNonceMempool[C] {
mp := &PriorityNonceMempool[C]{
priorityIndex: skiplist.New(skiplistComparable(cfg.TxPriority)),
priorityCounts: make(map[C]int),
senderIndices: make(map[string]*skiplist.SkipList),
scores: make(map[txMeta[C]]txMeta[C]),
cfg: cfg,
}
return mp
}
// DefaultPriorityMempool returns a priorityNonceMempool with no options.
func DefaultPriorityMempool() *PriorityNonceMempool[int64] {
return NewPriorityMempool(DefaultPriorityNonceMempoolConfig())
}
// NextSenderTx returns the next transaction for a given sender by nonce order,
// i.e. the next valid transaction for the sender. If no such transaction exists,
// nil will be returned.
func (mp *PriorityNonceMempool[C]) NextSenderTx(sender string) sdk.Tx {
senderIndex, ok := mp.senderIndices[sender]
if !ok {
return nil
}
cursor := senderIndex.Front()
return cursor.Value.(sdk.Tx)
}
// Insert attempts to insert a Tx into the app-side mempool in O(log n) time,
// returning an error if unsuccessful. Sender and nonce are derived from the
// transaction's first signature.
//
// Transactions are unique by sender and nonce. Inserting a duplicate tx is an
// O(log n) no-op.
//
// Inserting a duplicate tx with a different priority overwrites the existing tx,
// changing the total order of the mempool.
func (mp *PriorityNonceMempool[C]) Insert(ctx context.Context, tx sdk.Tx) error {
if mp.cfg.MaxTx > 0 && mp.CountTx() >= mp.cfg.MaxTx {
return ErrMempoolTxMaxCapacity
} else if mp.cfg.MaxTx < 0 {
return nil
}
sigs, err := tx.(signing.SigVerifiableTx).GetSignaturesV2()
if err != nil {
return err
}
if len(sigs) == 0 {
return fmt.Errorf("tx must have at least one signer")
}
sig := sigs[0]
sender := sdk.AccAddress(sig.PubKey.Address()).String()
priority := mp.cfg.TxPriority.GetTxPriority(ctx, tx)
nonce := sig.Sequence
key := txMeta[C]{nonce: nonce, priority: priority, sender: sender}
senderIndex, ok := mp.senderIndices[sender]
if !ok {
senderIndex = skiplist.New(skiplist.LessThanFunc(func(a, b any) int {
return skiplist.Uint64.Compare(b.(txMeta[C]).nonce, a.(txMeta[C]).nonce)
}))
// initialize sender index if not found
mp.senderIndices[sender] = senderIndex
}
// Since mp.priorityIndex is scored by priority, then sender, then nonce, a
// changed priority will create a new key, so we must remove the old key and
// re-insert it to avoid having the same tx with different priorityIndex indexed
// twice in the mempool.
//
// This O(log n) remove operation is rare and only happens when a tx's priority
// changes.
sk := txMeta[C]{nonce: nonce, sender: sender}
if oldScore, txExists := mp.scores[sk]; txExists {
if mp.cfg.TxReplacement != nil && !mp.cfg.TxReplacement(oldScore.priority, priority, senderIndex.Get(key).Value.(sdk.Tx), tx) {
return fmt.Errorf(
"tx doesn't fit the replacement rule, oldPriority: %v, newPriority: %v, oldTx: %v, newTx: %v",
oldScore.priority,
priority,
senderIndex.Get(key).Value.(sdk.Tx),
tx,
)
}
mp.priorityIndex.Remove(txMeta[C]{
nonce: nonce,
sender: sender,
priority: oldScore.priority,
weight: oldScore.weight,
})
mp.priorityCounts[oldScore.priority]--
}
mp.priorityCounts[priority]++
// Since senderIndex is scored by nonce, a changed priority will overwrite the
// existing key.
key.senderElement = senderIndex.Set(key, tx)
mp.scores[sk] = txMeta[C]{priority: priority}
mp.priorityIndex.Set(key, tx)
return nil
}
func (i *PriorityNonceIterator[C]) iteratePriority() Iterator {
// beginning of priority iteration
if i.priorityNode == nil {
i.priorityNode = i.mempool.priorityIndex.Front()
} else {
i.priorityNode = i.priorityNode.Next()
}
// end of priority iteration
if i.priorityNode == nil {
return nil
}
i.sender = i.priorityNode.Key().(txMeta[C]).sender
nextPriorityNode := i.priorityNode.Next()
if nextPriorityNode != nil {
i.nextPriority = nextPriorityNode.Key().(txMeta[C]).priority
} else {
i.nextPriority = i.mempool.cfg.TxPriority.MinValue
}
return i.Next()
}
func (i *PriorityNonceIterator[C]) Next() Iterator {
if i.priorityNode == nil {
return nil
}
cursor, ok := i.senderCursors[i.sender]
if !ok {
// beginning of sender iteration
cursor = i.mempool.senderIndices[i.sender].Front()
} else {
// middle of sender iteration
cursor = cursor.Next()
}
// end of sender iteration
if cursor == nil {
return i.iteratePriority()
}
key := cursor.Key().(txMeta[C])
// We've reached a transaction with a priority lower than the next highest
// priority in the pool.
if i.mempool.cfg.TxPriority.Compare(key.priority, i.nextPriority) < 0 {
return i.iteratePriority()
} else if i.mempool.cfg.TxPriority.Compare(key.priority, i.nextPriority) == 0 {
// Weight is incorporated into the priority index key only (not sender index)
// so we must fetch it here from the scores map.
weight := i.mempool.scores[txMeta[C]{nonce: key.nonce, sender: key.sender}].weight
if i.mempool.cfg.TxPriority.Compare(weight, i.priorityNode.Next().Key().(txMeta[C]).weight) < 0 {
return i.iteratePriority()
}
}
i.senderCursors[i.sender] = cursor
return i
}
func (i *PriorityNonceIterator[C]) Tx() sdk.Tx {
return i.senderCursors[i.sender].Value.(sdk.Tx)
}
// Select returns a set of transactions from the mempool, ordered by priority
// and sender-nonce in O(n) time. The passed in list of transactions are ignored.
// This is a readonly operation, the mempool is not modified.
//
// The maxBytes parameter defines the maximum number of bytes of transactions to
// return.
//
// NOTE: It is not safe to use this iterator while removing transactions from
// the underlying mempool.
func (mp *PriorityNonceMempool[C]) Select(_ context.Context, _ [][]byte) Iterator {
if mp.priorityIndex.Len() == 0 {
return nil
}
mp.reorderPriorityTies()
iterator := &PriorityNonceIterator[C]{
mempool: mp,
senderCursors: make(map[string]*skiplist.Element),
}
return iterator.iteratePriority()
}
type reorderKey[C comparable] struct {
deleteKey txMeta[C]
insertKey txMeta[C]
tx sdk.Tx
}
func (mp *PriorityNonceMempool[C]) reorderPriorityTies() {
node := mp.priorityIndex.Front()
var reordering []reorderKey[C]
for node != nil {
key := node.Key().(txMeta[C])
if mp.priorityCounts[key.priority] > 1 {
newKey := key
newKey.weight = senderWeight(mp.cfg.TxPriority, key.senderElement)
reordering = append(reordering, reorderKey[C]{deleteKey: key, insertKey: newKey, tx: node.Value.(sdk.Tx)})
}
node = node.Next()
}
for _, k := range reordering {
mp.priorityIndex.Remove(k.deleteKey)
delete(mp.scores, txMeta[C]{nonce: k.deleteKey.nonce, sender: k.deleteKey.sender})
mp.priorityIndex.Set(k.insertKey, k.tx)
mp.scores[txMeta[C]{nonce: k.insertKey.nonce, sender: k.insertKey.sender}] = k.insertKey
}
}
// senderWeight returns the weight of a given tx (t) at senderCursor. Weight is
// defined as the first (nonce-wise) same sender tx with a priority not equal to
// t. It is used to resolve priority collisions, that is when 2 or more txs from
// different senders have the same priority.
func senderWeight[C comparable](txPriority TxPriority[C], senderCursor *skiplist.Element) C {
if senderCursor == nil {
return txPriority.MinValue
}
weight := senderCursor.Key().(txMeta[C]).priority
senderCursor = senderCursor.Next()
for senderCursor != nil {
p := senderCursor.Key().(txMeta[C]).priority
if txPriority.Compare(p, weight) != 0 {
weight = p
}
senderCursor = senderCursor.Next()
}
return weight
}
// CountTx returns the number of transactions in the mempool.
func (mp *PriorityNonceMempool[C]) CountTx() int {
return mp.priorityIndex.Len()
}
// Remove removes a transaction from the mempool in O(log n) time, returning an
// error if unsuccessful.
func (mp *PriorityNonceMempool[C]) Remove(tx sdk.Tx) error {
sigs, err := tx.(signing.SigVerifiableTx).GetSignaturesV2()
if err != nil {
return err
}
if len(sigs) == 0 {
return fmt.Errorf("attempted to remove a tx with no signatures")
}
sig := sigs[0]
sender := sdk.AccAddress(sig.PubKey.Address()).String()
nonce := sig.Sequence
scoreKey := txMeta[C]{nonce: nonce, sender: sender}
score, ok := mp.scores[scoreKey]
if !ok {
return ErrTxNotFound
}
tk := txMeta[C]{nonce: nonce, priority: score.priority, sender: sender, weight: score.weight}
senderTxs, ok := mp.senderIndices[sender]
if !ok {
return fmt.Errorf("sender %s not found", sender)
}
mp.priorityIndex.Remove(tk)
senderTxs.Remove(tk)
delete(mp.scores, scoreKey)
mp.priorityCounts[score.priority]--
return nil
}
func IsEmpty[C comparable](mempool Mempool) error {
mp := mempool.(*PriorityNonceMempool[C])
if mp.priorityIndex.Len() != 0 {
return fmt.Errorf("priorityIndex not empty")
}
countKeys := make([]C, 0, len(mp.priorityCounts))
for k := range mp.priorityCounts {
countKeys = append(countKeys, k)
}
for _, k := range countKeys {
if mp.priorityCounts[k] != 0 {
return fmt.Errorf("priorityCounts not zero at %v, got %v", k, mp.priorityCounts[k])
}
}
senderKeys := make([]string, 0, len(mp.senderIndices))
for k := range mp.senderIndices {
senderKeys = append(senderKeys, k)
}
for _, k := range senderKeys {
if mp.senderIndices[k].Len() != 0 {
return fmt.Errorf("senderIndex not empty for sender %v", k)
}
}
return nil
}