-
Notifications
You must be signed in to change notification settings - Fork 162
/
aggregate.go
117 lines (103 loc) · 2.71 KB
/
aggregate.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package worker
import (
"time"
"github.com/anyswap/CrossChain-Bridge/cmd/utils"
"github.com/anyswap/CrossChain-Bridge/mongodb"
"github.com/anyswap/CrossChain-Bridge/tokens/btc"
"github.com/anyswap/CrossChain-Bridge/tokens/btc/electrs"
)
var (
utxoPageLimit = 100
aggSumVal uint64
aggAddrs []string
aggUtxos []*electrs.ElectUtxo
aggOffset int
aggInterval = 10 * time.Minute
)
// StartAggregateJob aggregate job
func StartAggregateJob() {
if btc.BridgeInstance == nil {
return
}
mongodb.MgoWaitGroup.Add(1)
go loopDoAggregateJob()
}
func loopDoAggregateJob() {
defer mongodb.MgoWaitGroup.Done()
for loop := 1; ; loop++ {
if utils.IsCleanuping() {
return
}
logWorker("aggregate", "start aggregate job", "loop", loop)
doAggregateJob()
logWorker("aggregate", "finish aggregate job", "loop", loop)
time.Sleep(aggInterval)
}
}
func doAggregateJob() {
aggOffset = 0
for {
if utils.IsCleanuping() {
return
}
p2shAddrs, err := mongodb.FindP2shAddresses(aggOffset, utxoPageLimit)
if err != nil {
logWorkerError("aggregate", "FindP2shAddresses failed", err, "offset", aggOffset, "limit", utxoPageLimit)
time.Sleep(3 * time.Second)
continue
}
for _, p2shAddr := range p2shAddrs {
findUtxosAndAggregate(p2shAddr.P2shAddress)
}
if len(p2shAddrs) < utxoPageLimit {
break
}
aggOffset += utxoPageLimit
}
}
func findUtxosAndAggregate(addr string) {
findUtxos, _ := btc.BridgeInstance.FindUtxos(addr)
for _, utxo := range findUtxos {
if utxo.Value == nil || *utxo.Value == 0 {
continue
}
if isUtxoExist(utxo) {
continue
}
outspend, err := btc.BridgeInstance.GetOutspend(*utxo.Txid, *utxo.Vout)
if err != nil {
logWorkerError("aggregate", "get out spend failed", err, "address", addr, "utxo", utxo.String())
continue
}
if *outspend.Spent {
logWorkerTrace("aggregate", "ignore spent utxo", "address", addr, "utxo", utxo.String(), "outspend", outspend.String())
continue
}
logWorker("aggregate", "find utxo", "address", addr, "utxo", utxo.String())
aggSumVal += *utxo.Value
aggAddrs = append(aggAddrs, addr)
aggUtxos = append(aggUtxos, utxo)
if btc.BridgeInstance.ShouldAggregate(len(aggUtxos), aggSumVal) {
aggregate()
}
}
}
func isUtxoExist(utxo *electrs.ElectUtxo) bool {
for _, item := range aggUtxos {
if *item.Txid == *utxo.Txid && *item.Vout == *utxo.Vout {
return true
}
}
return false
}
func aggregate() {
txHash, err := btc.BridgeInstance.AggregateUtxos(aggAddrs, aggUtxos)
if err != nil {
logWorkerError("aggregate", "AggregateUtxos failed", err)
} else {
logWorker("aggregate", "AggregateUtxos succeed", "txHash", txHash, "utxos", len(aggUtxos), "sumVal", aggSumVal)
}
aggSumVal = 0
aggAddrs = nil
aggUtxos = nil
}