/
trade_scraper.go
119 lines (101 loc) · 3.41 KB
/
trade_scraper.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package scraper
import (
"time"
"github.com/diamcircle/go/services/ticker/internal/utils"
auroraclient "github.com/diamcircle/go/clients/auroraclient"
hProtocol "github.com/diamcircle/go/protocols/aurora"
)
// checkRecords check if a list of records contains entries older than minTime. If it does,
// it will return a filtered page with only the passing records and lastPage = true.
func (c *ScraperConfig) checkRecords(trades []hProtocol.Trade, minTime time.Time) (lastPage bool, cleanTrades []hProtocol.Trade) {
lastPage = false
for _, t := range trades {
if t.LedgerCloseTime.After(minTime) {
NormalizeTradeAssets(&t)
cleanTrades = append(cleanTrades, t)
} else {
c.Logger.Debug("Reached entries older than the acceptable time range:", t.LedgerCloseTime)
lastPage = true
return
}
}
return
}
// retrieveTrades retrieves trades from the Aurora API for the last timeDelta period.
// If limit = 0, will fetch all trades within that period.
func (c *ScraperConfig) retrieveTrades(since time.Time, limit int) (trades []hProtocol.Trade, err error) {
r := auroraclient.TradeRequest{Limit: 200, Order: auroraclient.OrderDesc}
tradesPage, err := c.Client.Trades(r)
if err != nil {
return
}
for tradesPage.Links.Next.Href != tradesPage.Links.Self.Href {
// Enforcing time boundaries:
last, cleanTrades := c.checkRecords(tradesPage.Embedded.Records, since)
trades = append(trades, cleanTrades...)
if last {
break
}
// Enforcing limit of results:
if limit != 0 {
numTrades := len(trades)
if numTrades >= limit {
diff := numTrades - limit
trades = trades[0 : numTrades-diff]
break
}
}
// Finding next page's params:
nextURL := tradesPage.Links.Next.Href
n, err := nextCursor(nextURL)
if err != nil {
return trades, err
}
c.Logger.Debug("Cursor currently at:", n)
r.Cursor = n
err = utils.Retry(5, 5*time.Second, c.Logger, func() error {
tradesPage, err = c.Client.Trades(r)
if err != nil {
c.Logger.Info("Aurora rate limit reached!")
}
return err
})
if err != nil {
return trades, err
}
}
return
}
// streamTrades streams trades directly from aurora and calls the handler function
// whenever a new trade appears.
func (c *ScraperConfig) streamTrades(h auroraclient.TradeHandler, cursor string) error {
if cursor == "" {
cursor = "now"
}
r := auroraclient.TradeRequest{
Limit: 200,
Cursor: cursor,
}
return r.StreamTrades(*c.Ctx, c.Client, h)
}
// addNativeData adds additional fields when one of the assets is native.
func addNativeData(trade *hProtocol.Trade) {
if trade.BaseAssetType == "native" {
trade.BaseAssetCode = "XLM"
trade.BaseAssetIssuer = "native"
}
if trade.CounterAssetType == "native" {
trade.CounterAssetCode = "XLM"
trade.CounterAssetIssuer = "native"
}
}
// reverseAssets swaps out the base and counter assets of a trade.
func reverseAssets(trade *hProtocol.Trade) {
trade.BaseAmount, trade.CounterAmount = trade.CounterAmount, trade.BaseAmount
trade.BaseAccount, trade.CounterAccount = trade.CounterAccount, trade.BaseAccount
trade.BaseAssetCode, trade.CounterAssetCode = trade.CounterAssetCode, trade.BaseAssetCode
trade.BaseAssetType, trade.CounterAssetType = trade.CounterAssetType, trade.BaseAssetType
trade.BaseAssetIssuer, trade.CounterAssetIssuer = trade.CounterAssetIssuer, trade.BaseAssetIssuer
trade.BaseIsSeller = !trade.BaseIsSeller
trade.Price.N, trade.Price.D = trade.Price.D, trade.Price.N
}