-
Notifications
You must be signed in to change notification settings - Fork 6
/
scanswaphistory.go
113 lines (100 loc) · 2.73 KB
/
scanswaphistory.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package btc
import (
"time"
"github.com/fsn-dev/crossChain-Bridge/log"
"github.com/fsn-dev/crossChain-Bridge/tokens/tools"
)
var (
maxScanLifetime = int64(3 * 24 * 3600)
retryIntervalInScanJob = 3 * time.Second
restIntervalInScanJob = 3 * time.Second
)
// StartSwapHistoryScanJob scan job
func (b *Bridge) StartSwapHistoryScanJob() {
log.Info("[scanhistory] start scan swap history job", "isSrc", b.IsSrc)
isProcessed := func(txid string) bool {
if b.IsSrc {
return tools.IsSwapinExist(txid)
}
return tools.IsSwapoutExist(txid)
}
go b.scanFirstLoop(isProcessed)
b.scanTransactionHistory(isProcessed)
}
func (b *Bridge) scanFirstLoop(isProcessed func(string) bool) {
// first loop process all tx history no matter whether processed before
log.Info("[scanhistory] start first scan loop", "isSrc", b.IsSrc)
var (
nowTime = time.Now().Unix()
lastSeenTxid = ""
initialHeight = b.TokenConfig.InitialHeight
)
isTooOld := func(time *uint64) bool {
return time != nil && int64(*time)+maxScanLifetime < nowTime
}
FIRST_LOOP:
for {
txHistory, err := b.GetTransactionHistory(b.TokenConfig.DcrmAddress, lastSeenTxid)
if err != nil {
time.Sleep(retryIntervalInScanJob)
continue
}
if len(txHistory) == 0 {
break
}
for _, tx := range txHistory {
if tx.Status.BlockHeight != nil && *tx.Status.BlockHeight < initialHeight {
break FIRST_LOOP
}
if isTooOld(tx.Status.BlockTime) {
break FIRST_LOOP
}
txid := *tx.Txid
if !isProcessed(txid) {
_ = b.processSwapin(txid)
}
}
lastSeenTxid = *txHistory[len(txHistory)-1].Txid
}
log.Info("[scanhistory] finish first scan loop", "isSrc", b.IsSrc)
}
func (b *Bridge) scanTransactionHistory(isProcessed func(string) bool) {
log.Info("[scanhistory] start scan swap history loop", "isSrc", b.IsSrc)
var (
lastSeenTxid = ""
rescan = true
initialHeight = b.TokenConfig.InitialHeight
)
for {
txHistory, err := b.GetTransactionHistory(b.TokenConfig.DcrmAddress, lastSeenTxid)
if err != nil {
log.Error("[scanhistory] get tx history error", "isSrc", b.IsSrc, "err", err)
time.Sleep(retryIntervalInScanJob)
continue
}
if len(txHistory) == 0 {
rescan = true
} else if rescan {
rescan = false
}
log.Info("[scanhistory] scan swap history", "isSrc", b.IsSrc, "count", len(txHistory))
for _, tx := range txHistory {
if tx.Status.BlockHeight != nil && *tx.Status.BlockHeight < initialHeight {
rescan = true
break
}
txid := *tx.Txid
if isProcessed(txid) {
rescan = true
break // rescan if already processed
}
_ = b.processSwapin(txid)
}
if rescan {
lastSeenTxid = ""
time.Sleep(restIntervalInScanJob)
} else {
lastSeenTxid = *txHistory[len(txHistory)-1].Txid
}
}
}