/
clist_mempool.go
1859 lines (1602 loc) · 54.3 KB
/
clist_mempool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
package mempool
import (
"bytes"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
"math"
"math/big"
"net/http"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum/rlp"
"github.com/VictoriaMetrics/fastcache"
"github.com/tendermint/go-amino"
"github.com/brc20-collab/brczero/libs/system/trace"
abci "github.com/brc20-collab/brczero/libs/tendermint/abci/types"
cfg "github.com/brc20-collab/brczero/libs/tendermint/config"
"github.com/brc20-collab/brczero/libs/tendermint/libs/clist"
"github.com/brc20-collab/brczero/libs/tendermint/libs/log"
tmmath "github.com/brc20-collab/brczero/libs/tendermint/libs/math"
"github.com/brc20-collab/brczero/libs/tendermint/proxy"
"github.com/brc20-collab/brczero/libs/tendermint/types"
)
type TxInfoParser interface {
GetRawTxInfo(tx types.Tx) ExTxInfo
GetTxHistoryGasUsed(tx types.Tx, gasLimit int64) (int64, bool)
GetRealTxFromRawTx(rawTx types.Tx) abci.TxEssentials
}
var (
// GlobalRecommendedGP is initialized to 1Wei
GlobalRecommendedGP = big.NewInt(1)
IsCongested = false
)
const (
ZeroTxPath = "/crawler/zeroindexer/"
CrawlerHeightPath = "/crawler/height"
)
//--------------------------------------------------------------------------------
// CListMempool is an ordered in-memory pool for transactions before they are
// proposed in a consensus round. Transaction validity is checked using the
// CheckTx abci message before the transaction is added to the pool. The
// mempool uses a concurrent list structure for storing transactions that can
// be efficiently accessed by multiple concurrent readers.
type CListMempool struct {
// Atomic integers
height int64 // the last block Update()'d to
btcHeight int64 // the last btc height
txsBytes int64 // total size of mempool, in bytes
// notify listeners (ie. consensus) when txs are available
notifiedTxsAvailable bool
txsAvailable chan struct{} // fires once for each height, when the mempool is not empty
config *cfg.MempoolConfig
// Exclusive mutex for Update method to prevent concurrent execution of
// CheckTx or ReapMaxBytesMaxGas(ReapMaxTxs) methods.
updateMtx sync.RWMutex
preCheck PreCheckFunc
postCheck PostCheckFunc
//bcTxsList *clist.CList // only for tx sort model
proxyAppConn proxy.AppConnMempool
// Track whether we're rechecking txs.
// These are not protected by a mutex and are expected to be mutated in
// serial (ie. by abci responses which are called in serial).
recheckCursor *clist.CElement // next expected response
recheckEnd *clist.CElement // re-checking stops here
// Keep a cache of already-seen txs.
// This reduces the pressure on the proxyApp.
// Save wtx as value if occurs or save nil as value
cache txCache
eventBus types.TxEventPublisher
logger log.Logger
pguLogger log.Logger
metrics *Metrics
pendingPool *PendingPool
accountRetriever AccountRetriever
pendingPoolNotify chan map[string]uint64
consumePendingTxQueue chan *AddressNonce
consumePendingTxQueueLimit int
txInfoparser TxInfoParser
checkCnt int64
checkRPCCnt int64
checkP2PCnt int64
checkTotalTime int64
checkRpcTotalTime int64
checkP2PTotalTime int64
txs ITransactionQueue
// btc height -> brczero data
zeroTxs map[int64]*types.ZeroData
zeroMtx sync.RWMutex
zeroReorgChan chan int64
simQueue chan *mempoolTx
rmPendingTxChan chan types.EventDataRmPendingTx
gpo *Oracle
info pguInfo
pullTicker *time.Ticker
fastsyncEndHeight int64
}
type pguInfo struct {
txCount int64
gasUsed int64
}
func (p *pguInfo) reset() {
p.txCount = 0
p.gasUsed = 0
}
var _ Mempool = &CListMempool{}
// CListMempoolOption sets an optional parameter on the mempool.
type CListMempoolOption func(*CListMempool)
// NewCListMempool returns a new mempool with the given configuration and connection to an application.
func NewCListMempool(
config *cfg.MempoolConfig,
proxyAppConn proxy.AppConnMempool,
height int64,
latestBTCHeight int64,
options ...CListMempoolOption,
) *CListMempool {
var txQueue ITransactionQueue
if config.SortTxByGp {
txQueue = NewOptimizedTxQueue(int64(config.TxPriceBump))
} else {
txQueue = NewBaseTxQueue()
}
gpoConfig := NewGPOConfig(cfg.DynamicConfig.GetDynamicGpWeight(), cfg.DynamicConfig.GetDynamicGpCheckBlocks())
gpo := NewOracle(gpoConfig)
mempool := &CListMempool{
config: config,
proxyAppConn: proxyAppConn,
height: height,
recheckCursor: nil,
recheckEnd: nil,
eventBus: types.NopEventBus{},
logger: log.NewNopLogger(),
pguLogger: log.NewNopLogger(),
metrics: NopMetrics(),
txs: txQueue,
zeroTxs: make(map[int64]*types.ZeroData),
zeroReorgChan: make(chan int64),
simQueue: make(chan *mempoolTx, 100000),
gpo: gpo,
btcHeight: latestBTCHeight,
fastsyncEndHeight: 0,
}
crawlerH, err := mempool.pullCrawlerHeight()
if err != nil {
mempool.logger.Error(fmt.Sprintf("pull crawler height faild: %s", err.Error()))
} else if h := int64(crawlerH) - mempool.config.FastSyncHeightGap; h > 0 {
mempool.fastsyncEndHeight = h
}
if config.PendingRemoveEvent {
mempool.rmPendingTxChan = make(chan types.EventDataRmPendingTx, 1000)
go mempool.fireRmPendingTxEvents()
}
for i := 0; i < cfg.DynamicConfig.GetPGUConcurrency(); i++ {
go mempool.simulationRoutine()
}
if cfg.DynamicConfig.GetMempoolCacheSize() > 0 {
mempool.cache = newMapTxCache(cfg.DynamicConfig.GetMempoolCacheSize())
} else {
mempool.cache = nopTxCache{}
}
proxyAppConn.SetResponseCallback(mempool.globalCb)
for _, option := range options {
option(mempool)
}
if config.EnablePendingPool {
mempool.pendingPool = newPendingPool(config.PendingPoolSize, config.PendingPoolPeriod,
config.PendingPoolReserveBlocks, config.PendingPoolMaxTxPerAddress)
mempool.pendingPoolNotify = make(chan map[string]uint64, 1)
go mempool.pendingPoolJob()
// consumePendingTxQueueLimit use PendingPoolSize, because consumePendingTx is consume pendingTx.
mempool.consumePendingTxQueueLimit = mempool.config.PendingPoolSize
mempool.consumePendingTxQueue = make(chan *AddressNonce, mempool.consumePendingTxQueueLimit)
go mempool.consumePendingTxQueueJob()
}
mempool.pullTicker = time.NewTicker(PullZeroDataInterval)
go mempool.pullZeroDataRoutine()
return mempool
}
// NOTE: not thread safe - should only be called once, on startup
func (mem *CListMempool) EnableTxsAvailable() {
mem.txsAvailable = make(chan struct{}, 1)
}
// SetLogger sets the Logger.
func (mem *CListMempool) SetEventBus(eventBus types.TxEventPublisher) {
mem.eventBus = eventBus
}
// SetLogger sets the Logger.
func (mem *CListMempool) SetLogger(l log.Logger) {
mem.logger = l
mem.pguLogger = l.With("module", "pgu")
}
// WithPreCheck sets a filter for the mempool to reject a tx if f(tx) returns
// false. This is run before CheckTx.
func WithPreCheck(f PreCheckFunc) CListMempoolOption {
return func(mem *CListMempool) { mem.preCheck = f }
}
// WithPostCheck sets a filter for the mempool to reject a tx if f(tx) returns
// false. This is ran after CheckTx.
func WithPostCheck(f PostCheckFunc) CListMempoolOption {
return func(mem *CListMempool) { mem.postCheck = f }
}
// WithMetrics sets the metrics.
func WithMetrics(metrics *Metrics) CListMempoolOption {
return func(mem *CListMempool) { mem.metrics = metrics }
}
// Safe for concurrent use by multiple goroutines.
func (mem *CListMempool) Lock() {
mem.updateMtx.Lock()
}
// Safe for concurrent use by multiple goroutines.
func (mem *CListMempool) Unlock() {
mem.updateMtx.Unlock()
}
// Safe for concurrent use by multiple goroutines.
func (mem *CListMempool) Size() int {
return mem.txs.Len()
}
// Safe for concurrent use by multiple goroutines.
func (mem *CListMempool) TxsBytes() int64 {
return atomic.LoadInt64(&mem.txsBytes)
}
// Safe for concurrent use by multiple goroutines.
func (mem *CListMempool) Height() int64 {
return atomic.LoadInt64(&mem.height)
}
// Lock() must be help by the caller during execution.
func (mem *CListMempool) FlushAppConn() error {
return mem.proxyAppConn.FlushSync()
}
// XXX: Unsafe! Calling Flush may leave mempool in inconsistent state.
func (mem *CListMempool) Flush() {
mem.updateMtx.Lock()
defer mem.updateMtx.Unlock()
for e := mem.txs.Front(); e != nil; e = e.Next() {
mem.removeTx(e)
}
_ = atomic.SwapInt64(&mem.txsBytes, 0)
mem.cache.Reset()
}
// TxsFront returns the first transaction in the ordered list for peer
// goroutines to call .NextWait() on.
// FIXME: leaking implementation details!
//
// Safe for concurrent use by multiple goroutines.
func (mem *CListMempool) TxsFront() *clist.CElement {
return mem.txs.Front()
}
func (mem *CListMempool) BroadcastTxsFront() *clist.CElement {
return mem.txs.BroadcastFront()
}
// TxsWaitChan returns a channel to wait on transactions. It will be closed
// once the mempool is not empty (ie. the internal `mem.txs` has at least one
// element)
//
// Safe for concurrent use by multiple goroutines.
func (mem *CListMempool) TxsWaitChan() <-chan struct{} {
return mem.txs.TxsWaitChan()
}
// It blocks if we're waiting on Update() or Reap().
// cb: A callback from the CheckTx command.
//
// It gets called from another goroutine.
//
// CONTRACT: Either cb will get called, or err returned.
//
// Safe for concurrent use by multiple goroutines.
func (mem *CListMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo TxInfo) error {
timeStart := int64(0)
if cfg.DynamicConfig.GetMempoolCheckTxCost() {
timeStart = time.Now().UnixMicro()
}
txSize := len(tx)
// the old logic for can not allow to delete low gasprice tx,then we must check mempool txs weather is full.
if !mem.GetEnableDeleteMinGPTx() {
if err := mem.isFull(txSize); err != nil {
return err
}
}
// TODO
// the new logic that even if mempool is full, we check tx gasprice weather > the minimum gas price tx in mempool. If true , we delete it.
// But For mempool is under the abci, it can not get tx gasprice, so the line we can not precheck gasprice. Maybe we can break abci level for
// The size of the corresponding amino-encoded TxMessage
// can't be larger than the maxMsgSize, otherwise we can't
// relay it to peers.
if txSize > mem.config.MaxTxBytes {
return ErrTxTooLarge{mem.config.MaxTxBytes, txSize}
}
var nonce uint64
wCMTx := mem.CheckAndGetWrapCMTx(tx, txInfo)
if wCMTx != nil {
txInfo.wrapCMTx = wCMTx
tx = wCMTx.GetTx()
nonce = wCMTx.GetNonce()
mem.logger.Debug("checkTx is wrapCMTx", "nonce", nonce)
}
txkey := txKey(tx)
// CACHE
if !mem.cache.PushKey(txkey) {
// Record a new sender for a tx we've already seen.
// Note it's possible a tx is still in the cache but no longer in the mempool
// (eg. after committing a block, txs are removed from mempool but not cache),
// so we only record the sender for txs still in the mempool.
if ele, ok := mem.txs.Load(txkey); ok {
memTx := ele.Value.(*mempoolTx)
memTx.senderMtx.Lock()
memTx.senders[txInfo.SenderID] = struct{}{}
memTx.senderMtx.Unlock()
// TODO: consider punishing peer for dups,
// its non-trivial since invalid txs can become valid,
// but they can spam the same tx with little cost to them atm.
}
return ErrTxInCache
}
// END CACHE
mem.updateMtx.RLock()
// use defer to unlock mutex because application (*local client*) might panic
defer mem.updateMtx.RUnlock()
var err error
if mem.preCheck != nil {
if err = mem.preCheck(tx); err != nil {
return ErrPreCheck{err}
}
}
// NOTE: proxyAppConn may error if tx buffer is full
if err = mem.proxyAppConn.Error(); err != nil {
return err
}
if txInfo.from != "" {
types.SignatureCache().Add(txkey[:], txInfo.from)
}
reqRes := mem.proxyAppConn.CheckTxAsync(abci.RequestCheckTx{Tx: tx, Type: txInfo.checkType, From: txInfo.wtx.GetFrom(), Nonce: nonce})
if r, ok := reqRes.Response.Value.(*abci.Response_CheckTx); ok {
gasLimit := r.CheckTx.GasWanted
if cfg.DynamicConfig.GetMaxGasUsedPerBlock() > -1 {
txInfo.gasUsed, txInfo.isGasPrecise = mem.txInfoparser.GetTxHistoryGasUsed(tx, gasLimit) // r.CheckTx.GasWanted is gasLimit
if txInfo.gasUsed <= 0 {
txInfo.gasUsed, _ = mem.simulateTx(tx, gasLimit)
}
mem.logger.Info(fmt.Sprintf("mempool.SimulateTx: txhash<%s>, gasLimit<%d>, gasUsed<%d>",
hex.EncodeToString(tx.Hash()), r.CheckTx.GasWanted, txInfo.gasUsed))
}
if txInfo.gasUsed <= 0 || txInfo.gasUsed > gasLimit {
txInfo.gasUsed = gasLimit
}
}
reqRes.SetCallback(mem.reqResCb(tx, txInfo, cb))
atomic.AddInt64(&mem.checkCnt, 1)
if cfg.DynamicConfig.GetMempoolCheckTxCost() {
pastTime := time.Now().UnixMicro() - timeStart
if txInfo.SenderID != 0 {
atomic.AddInt64(&mem.checkP2PCnt, 1)
atomic.AddInt64(&mem.checkP2PTotalTime, pastTime)
} else {
atomic.AddInt64(&mem.checkRPCCnt, 1)
atomic.AddInt64(&mem.checkRpcTotalTime, pastTime)
}
atomic.AddInt64(&mem.checkTotalTime, pastTime)
}
return nil
}
func (mem *CListMempool) ZeroReorgChan() <-chan int64 {
return mem.zeroReorgChan
}
func (mem *CListMempool) pullZeroDataRoutine() {
for {
select {
case <-mem.pullTicker.C:
mem.pullZeroDataTask()
default:
}
}
}
func (mem *CListMempool) pullZeroDataTask() {
mem.logger.Info("start pullZeroDataTask")
// if mem.zeroTxS number > mem.config.ZeroDataCacheSize slow pull
if uint64(len(mem.zeroTxs)) > mem.config.ZeroDataCacheSize {
mem.pullTicker.Reset(5 * PullZeroDataInterval)
return
} else {
mem.pullTicker.Reset(PullZeroDataInterval)
}
mem.zeroMtx.Lock()
// maxBtcHeight means max btc height saved in the current mempool
maxBtcHeight := mem.getZeroDataMaxHeight()
mem.zeroMtx.Unlock()
// insertHeight means latest btc block height to be fetched
insertHeight := maxBtcHeight + 1
txs, btcBlockHash, err := mem.pullZeroData(insertHeight)
if err != nil {
mem.logger.Error(fmt.Sprintf("pull zero data at height %d faild: %s", insertHeight, err.Error()))
return
}
mem.zeroMtx.Lock()
defer mem.zeroMtx.Unlock()
if insertHeight <= mem.fastsyncEndHeight {
mem.insertZeroData(insertHeight, btcBlockHash, txs)
mem.confirmZeroDataByBTCHeight(insertHeight)
//fast sync mode 500ms pulldata
mem.pullTicker.Reset(PullZeroDataInterval / 2)
return
}
// after success get data of new btc height
pendingConfirmedH := maxBtcHeight - BtcConfirmedGap
// note: pendingConfirmedData is local data, maybe different from btc node
pendingConfirmedData, err := mem.getZeroDataByBTCHeight(pendingConfirmedH)
if err != nil || pendingConfirmedData.IsConfirmed {
// if
// 1. err!=nil means the early data has been ApplyBlock and clear
// 2. pendingConfirmedH data has been confirmed, i.e., reorg is impossible
// so the data should be added to mempool
mem.logger.Debug(fmt.Sprintf("insert zero data at height: %d, btcBlockHash: %s", insertHeight, btcBlockHash))
mem.insertZeroData(insertHeight, btcBlockHash, txs)
return
}
_, curBtcBlockHash, err := mem.pullZeroData(pendingConfirmedH)
// judge if the pendingConfirmedH should be confirmed
if err != nil {
mem.logger.Error(fmt.Sprintf("pull zero data at height %d, faild: %s", pendingConfirmedH, err.Error()))
} else if pendingConfirmedData.BTCBlockHash == curBtcBlockHash {
// if the btcBlockHash is same from crawler and mempool,
// the height should be confirmed and the new data should be added to mempool
mem.confirmZeroDataByBTCHeight(pendingConfirmedH)
mem.logger.Debug(fmt.Sprintf("insert zero data at height: %d, btcBlockHash: %s", insertHeight, btcBlockHash))
mem.insertZeroData(insertHeight, btcBlockHash, txs)
} else {
// if there is different btcBlockHash of this height, it should reorg
mem.logger.Error("reorg!")
mem.zeroReorgChan <- pendingConfirmedH
for h := pendingConfirmedH; h <= maxBtcHeight; h++ {
delete(mem.zeroTxs, h)
}
}
}
// pullCrawlerHeight is used for fast sync
func (mem *CListMempool) pullCrawlerHeight() (uint64, error) {
baseUrl := mem.config.ZeroDataUrl
hUrl := fmt.Sprintf("%s%s", baseUrl, CrawlerHeightPath)
res, err := http.Get(hUrl)
if err != nil {
return 0, fmt.Errorf("get crawler height url %s failed: %s", hUrl, err.Error())
}
defer res.Body.Close()
body, err := io.ReadAll(res.Body)
if err != nil {
return 0, fmt.Errorf("read all body failed: %s", err.Error())
}
var apiResp types.ZeroAPIResponse[types.CrawlerHeightData]
err = json.Unmarshal(body, &apiResp)
if err != nil {
return 0, fmt.Errorf("json unmarshal api response failed: %s", err.Error())
}
return apiResp.Data.CrawlerHeight, nil
}
func (mem *CListMempool) pullZeroData(btcHeight int64) ([]types.Tx, string, error) {
// e.g., http://127.0.0.1:81/api/v1/crawler/zeroindexer/
var page uint = 1
var sum uint = 0
txs := make([]types.Tx, 0)
btcBlockHash := ""
baseUrl := mem.config.ZeroDataUrl
limit := mem.config.PullZeroDataLimit
heightStr := strconv.FormatInt(btcHeight, 10)
for {
var pUrl string
if limit <= 0 {
pUrl = fmt.Sprintf("%s%s%s", baseUrl, ZeroTxPath, heightStr)
} else {
pUrl = fmt.Sprintf("%s%s%s?page=%d&limit=%d", baseUrl, ZeroTxPath, heightStr, page, limit)
}
zeroRespData, err := getUrl(pUrl, heightStr)
if err != nil {
return nil, "", err
}
//todo: process btcfee
for _, tx := range zeroRespData.ZeroTxs {
txBytes, err := rlp.EncodeToBytes(tx)
if err != nil {
return nil, "", fmt.Errorf("rlp encode zero tx failed: %s", err.Error())
}
txs = append(txs, txBytes)
}
btcBlockHash = zeroRespData.BTCBlockHash
sum += zeroRespData.Count
if sum == zeroRespData.Sum {
break
} else if sum > zeroRespData.Sum {
return nil, "", fmt.Errorf("pagination process failed: %s", err.Error())
}
page++
}
return txs, btcBlockHash, nil
}
func getUrl(pUrl string, heightStr string) (*types.ZeroResponseData, error) {
res, err := http.Get(pUrl)
if err != nil {
return nil, fmt.Errorf("get protocol url %s failed: %s", pUrl, err.Error())
}
defer res.Body.Close()
body, err := io.ReadAll(res.Body)
if err != nil {
return nil, fmt.Errorf("read all body failed: %s", err.Error())
}
var apiResp types.ZeroAPIResponse[types.ZeroResponseData]
err = json.Unmarshal(body, &apiResp)
if err != nil {
return nil, fmt.Errorf("json unmarshal api response failed: %s", err.Error())
}
if apiResp.Code != types.OK_CODE {
return nil, fmt.Errorf("get api response error: %s", apiResp.Msg)
}
if apiResp.Data.BTCBlockHash == "" {
return nil, fmt.Errorf("zero data cannot be fetched at height %s", heightStr)
}
return &apiResp.Data, nil
}
func (mem *CListMempool) InsertZeroData(btcHeight int64, btcBlockHash string, txs []types.Tx) {
mem.zeroMtx.Lock()
defer mem.zeroMtx.Unlock()
mem.insertZeroData(btcHeight, btcBlockHash, txs)
}
func (mem *CListMempool) insertZeroData(btcHeight int64, btcBlockHash string, txs []types.Tx) {
brc0d := &types.ZeroData{
Txs: txs,
BTCBlockHash: btcBlockHash,
IsConfirmed: false,
Delivered: false,
}
brc0d.ZeroHash()
mem.zeroTxs[btcHeight] = brc0d
}
func (mem *CListMempool) GetZeroDataByBTCHeight(btcHeight int64) (types.ZeroData, error) {
mem.zeroMtx.RLock()
defer mem.zeroMtx.RUnlock()
return mem.getZeroDataByBTCHeight(btcHeight)
}
func (mem *CListMempool) getZeroDataByBTCHeight(btcHeight int64) (types.ZeroData, error) {
if d, ok := mem.zeroTxs[btcHeight]; ok {
return *d, nil
}
return types.ZeroData{}, errors.New(fmt.Sprintf("zero data at height %d does not exist!", btcHeight))
}
func (mem *CListMempool) ConfirmZeroDataByBTCHeight(btcHeight int64) {
mem.zeroMtx.Lock()
defer mem.zeroMtx.Unlock()
mem.confirmZeroDataByBTCHeight(btcHeight)
}
func (mem *CListMempool) confirmZeroDataByBTCHeight(btcHeight int64) {
if d, ok := mem.zeroTxs[btcHeight]; ok {
d.ToConfirmed()
}
}
func (mem *CListMempool) DelAllPrevZeroDataBeforeHeight(height int64) {
mem.zeroMtx.Lock()
defer mem.zeroMtx.Unlock()
mem.delAllPrevZeroDataBeforeHeight(height)
}
func (mem *CListMempool) delAllPrevZeroDataBeforeHeight(height int64) {
if len(mem.zeroTxs) == 0 {
return
}
for h, _ := range mem.zeroTxs {
if h < height {
delete(mem.zeroTxs, h)
}
}
}
func (mem *CListMempool) GetZeroDataMinHeight() int64 {
mem.zeroMtx.RLock()
defer mem.zeroMtx.RUnlock()
return mem.getZeroDataMinHeight()
}
func (mem *CListMempool) getZeroDataMinHeight() int64 {
if len(mem.zeroTxs) == 0 {
return 0
}
var btcH int64 = math.MaxInt64
for h, _ := range mem.zeroTxs {
if h < btcH {
btcH = h
}
}
return btcH
}
func (mem *CListMempool) GetZeroDataMaxHeight() int64 {
mem.zeroMtx.RLock()
defer mem.zeroMtx.RUnlock()
return mem.getZeroDataMaxHeight()
}
func (mem *CListMempool) getZeroDataMaxHeight() int64 {
if len(mem.zeroTxs) == 0 {
return atomic.LoadInt64(&mem.btcHeight)
}
var btcH int64 = 0
for h, _ := range mem.zeroTxs {
if h > btcH {
btcH = h
}
}
return btcH
}
func (mem *CListMempool) SetZeroDataDelivered(btcH int64, value bool) {
mem.zeroMtx.Lock()
defer mem.zeroMtx.Unlock()
mem.setZeroDataDelivered(btcH, value)
}
func (mem *CListMempool) setZeroDataDelivered(btcH int64, value bool) {
mem.zeroTxs[btcH].Delivered = value
}
func (mem *CListMempool) DelZeroDataByBTCHeight(btcHeight int64) {
mem.zeroMtx.Lock()
defer mem.zeroMtx.Unlock()
mem.delZeroDataByBTCHeight(btcHeight)
}
func (mem *CListMempool) delZeroDataByBTCHeight(btcHeight int64) {
if _, ok := mem.zeroTxs[btcHeight]; ok {
delete(mem.zeroTxs, btcHeight)
}
}
func (mem *CListMempool) GetCurrentZeroData() map[int64]types.ZeroData {
mem.zeroMtx.Lock()
defer mem.zeroMtx.Unlock()
res := make(map[int64]types.ZeroData)
for h, d := range mem.zeroTxs {
res[h] = *d
}
return res
}
func (mem *CListMempool) CheckAndGetWrapCMTx(tx types.Tx, txInfo TxInfo) *types.WrapCMTx {
if txInfo.wrapCMTx != nil { // from p2p
return txInfo.wrapCMTx
}
// from rpc should check if the tx is WrapCMTx
wtx := &types.WrapCMTx{}
err := cdc.UnmarshalJSON(tx, &wtx)
if err != nil {
return nil
}
return wtx
}
// Global callback that will be called after every ABCI response.
// Having a single global callback avoids needing to set a callback for each request.
// However, processing the checkTx response requires the peerID (so we can track which txs we heard from who),
// and peerID is not included in the ABCI request, so we have to set request-specific callbacks that
// include this information. If we're not in the midst of a recheck, this function will just return,
// so the request specific callback can do the work.
//
// When rechecking, we don't need the peerID, so the recheck callback happens
// here.
func (mem *CListMempool) globalCb(req *abci.Request, res *abci.Response) {
if mem.recheckCursor == nil {
return
}
mem.metrics.RecheckTimes.Add(1)
mem.resCbRecheck(req, res)
// update metrics
mem.metrics.Size.Set(float64(mem.Size()))
}
// Request specific callback that should be set on individual reqRes objects
// to incorporate local information when processing the response.
// This allows us to track the peer that sent us this tx, so we can avoid sending it back to them.
// NOTE: alternatively, we could include this information in the ABCI request itself.
//
// External callers of CheckTx, like the RPC, can also pass an externalCb through here that is called
// when all other response processing is complete.
//
// Used in CheckTx to record PeerID who sent us the tx.
func (mem *CListMempool) reqResCb(
tx []byte,
txInfo TxInfo,
externalCb func(*abci.Response),
) func(res *abci.Response) {
return func(res *abci.Response) {
if mem.recheckCursor != nil {
// this should never happen
panic("recheck cursor is not nil in reqResCb")
}
mem.resCbFirstTime(tx, txInfo, res)
// update metrics
mem.metrics.Size.Set(float64(mem.Size()))
if mem.pendingPool != nil {
mem.metrics.PendingPoolSize.Set(float64(mem.pendingPool.Size()))
}
// passed in by the caller of CheckTx, eg. the RPC
if externalCb != nil {
externalCb(res)
}
}
}
// Called from:
// - resCbFirstTime (lock not held) if tx is valid
func (mem *CListMempool) addTx(memTx *mempoolTx) error {
if err := mem.txs.Insert(memTx); err != nil {
return err
}
if cfg.DynamicConfig.GetMaxGasUsedPerBlock() > -1 && cfg.DynamicConfig.GetEnablePGU() && atomic.LoadUint32(&memTx.isSim) == 0 {
select {
case mem.simQueue <- memTx:
default:
mem.logger.Error("tx simulation queue is full")
}
}
atomic.AddInt64(&mem.txsBytes, int64(len(memTx.tx)))
mem.metrics.TxSizeBytes.Observe(float64(len(memTx.tx)))
mem.eventBus.PublishEventPendingTx(types.EventDataTx{
TxResult: types.TxResult{
Height: memTx.height,
Tx: memTx.tx,
},
Nonce: memTx.senderNonce,
})
return nil
}
// Called from:
// - Update (lock held) if tx was committed
// - resCbRecheck (lock not held) if tx was invalidated
func (mem *CListMempool) removeTx(elem *clist.CElement) {
mem.txs.Remove(elem)
tx := elem.Value.(*mempoolTx).tx
atomic.AddInt64(&mem.txsBytes, int64(-len(tx)))
}
func (mem *CListMempool) removeTxByKey(key [32]byte) (elem *clist.CElement) {
elem = mem.txs.RemoveByKey(key)
if elem != nil {
tx := elem.Value.(*mempoolTx).tx
atomic.AddInt64(&mem.txsBytes, int64(-len(tx)))
}
return
}
func (mem *CListMempool) isFull(txSize int) error {
var (
memSize = mem.Size()
txsBytes = mem.TxsBytes()
)
if memSize >= cfg.DynamicConfig.GetMempoolSize() || int64(txSize)+txsBytes > mem.config.MaxTxsBytes {
return ErrMempoolIsFull{
memSize, cfg.DynamicConfig.GetMempoolSize(),
txsBytes, mem.config.MaxTxsBytes,
}
}
return nil
}
func (mem *CListMempool) addPendingTx(memTx *mempoolTx) error {
// nonce is continuous
expectedNonce := memTx.senderNonce
pendingNonce, ok := mem.GetPendingNonce(memTx.from)
if ok {
expectedNonce = pendingNonce + 1
}
txNonce := memTx.realTx.GetNonce()
mem.logger.Debug("mempool", "addPendingTx", hex.EncodeToString(memTx.realTx.TxHash()), "nonce", memTx.realTx.GetNonce(), "gp", memTx.realTx.GetGasPrice(), "pending Nouce", pendingNonce, "excepectNouce", expectedNonce)
if txNonce == 0 || txNonce < expectedNonce {
return mem.addTx(memTx)
}
// add pending tx
if txNonce == expectedNonce {
err := mem.addTx(memTx)
if err == nil {
addrNonce := addressNoncePool.Get().(*AddressNonce)
addrNonce.addr = memTx.from
addrNonce.nonce = txNonce + 1
select {
case mem.consumePendingTxQueue <- addrNonce:
default:
//This line maybe be lead to user pendingTx will not be packed into block
//when extreme condition (mem.consumePendingTxQueue is block which is maintain caused by mempool is full).
//But we must be do thus,for protect chain's block can be product.
addressNoncePool.Put(addrNonce)
mem.logger.Error("mempool", "addPendingTx", "when consumePendingTxQueue and mempool is full, disable consume pending tx")
}
//go mem.consumePendingTx(memTx.from, txNonce+1)
}
return err
}
// add tx to PendingPool
if err := mem.pendingPool.validate(memTx.from, memTx.tx); err != nil {
return err
}
pendingTx := memTx
mem.pendingPool.addTx(pendingTx)
mem.logger.Debug("mempool", "add-pending-Tx", hex.EncodeToString(memTx.realTx.TxHash()), "nonce", memTx.realTx.GetNonce(), "gp", memTx.realTx.GetGasPrice())
mem.logger.Debug("pending pool addTx", "tx", pendingTx)
return nil
}
func (mem *CListMempool) consumePendingTx(address string, nonce uint64) {
for {
pendingTx := mem.pendingPool.getTx(address, nonce)
if pendingTx == nil {
return
}
if err := mem.isFull(len(pendingTx.tx)); err != nil {
minGPTx := mem.txs.Back().Value.(*mempoolTx)
// If disable deleteMinGPTx, it'old logic, must be remove cache key
// If enable deleteMinGPTx,it's new logic, check tx.gasprice < minimum tx gas price then remove cache key
thresholdGasPrice := MultiPriceBump(minGPTx.realTx.GetGasPrice(), int64(mem.config.TxPriceBump))
if !mem.GetEnableDeleteMinGPTx() || (mem.GetEnableDeleteMinGPTx() && thresholdGasPrice.Cmp(pendingTx.realTx.GetGasPrice()) >= 0) {
time.Sleep(time.Duration(mem.pendingPool.period) * time.Second)
continue
}
}
mem.logger.Debug("mempool", "consumePendingTx", hex.EncodeToString(pendingTx.realTx.TxHash()), "nonce", pendingTx.realTx.GetNonce(), "gp", pendingTx.realTx.GetGasPrice())
mempoolTx := pendingTx
mempoolTx.height = mem.Height()
if err := mem.addTx(mempoolTx); err != nil {
mem.logger.Error(fmt.Sprintf("Pending Pool add tx failed:%s", err.Error()))
mem.pendingPool.removeTx(address, nonce)
return
}
mem.logger.Info("Added good transaction",
"tx", txIDStringer{mempoolTx.tx, mempoolTx.height},
"height", mempoolTx.height,
"total", mem.Size(),
)
mem.notifyTxsAvailable()
mem.pendingPool.removeTx(address, nonce)
nonce++
}
}
type logAddTxData struct {
Params [8]interface{}
TxID txIDStringer
Height int64
Total int
}
var logAddTxDataPool = sync.Pool{
New: func() interface{} {
return &logAddTxData{}
},
}
func (mem *CListMempool) logAddTx(memTx *mempoolTx, r *abci.Response_CheckTx) {
logAddTxData := logAddTxDataPool.Get().(*logAddTxData)
logAddTxData.TxID = txIDStringer{memTx.tx, memTx.height}
logAddTxData.Height = memTx.height
logAddTxData.Total = mem.Size()
params := &logAddTxData.Params
params[0] = "tx"
params[1] = &logAddTxData.TxID
params[2] = "res"
params[3] = r
params[4] = "height"
params[5] = &logAddTxData.Height
params[6] = "total"
params[7] = &logAddTxData.Total
mem.logger.Info("Added good transaction", params[:8]...)
logAddTxDataPool.Put(logAddTxData)
}
// callback, which is called after the app checked the tx for the first time.
//
// The case where the app checks the tx for the second and subsequent times is
// handled by the resCbRecheck callback.