/
trade_scraper.go
110 lines (94 loc) · 3.2 KB
/
trade_scraper.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package scraper
import (
"time"
horizonclient "github.com/stellar/go/clients/horizonclient"
hProtocol "github.com/stellar/go/protocols/horizon"
)
// checkRecords check if a list of records contains entries older than minTime. If it does,
// it will return a filtered page with only the passing records and lastPage = true.
func (c *ScraperConfig) checkRecords(trades []hProtocol.Trade, minTime time.Time) (lastPage bool, cleanTrades []hProtocol.Trade) {
lastPage = false
for _, t := range trades {
if t.LedgerCloseTime.After(minTime) {
NormalizeTradeAssets(&t)
cleanTrades = append(cleanTrades, t)
} else {
c.Logger.Debugln("Reached entries older than the acceptable time range:", t.LedgerCloseTime)
lastPage = true
return
}
}
return
}
// retrieveTrades retrieves trades from the Horizon API for the last timeDelta period.
// If limit = 0, will fetch all trades within that period.
func (c *ScraperConfig) retrieveTrades(since time.Time, limit int) (trades []hProtocol.Trade, err error) {
r := horizonclient.TradeRequest{Limit: 200, Order: horizonclient.OrderDesc}
tradesPage, err := c.Client.Trades(r)
if err != nil {
return
}
for tradesPage.Links.Next.Href != tradesPage.Links.Self.Href {
// Enforcing time boundaries:
last, cleanTrades := c.checkRecords(tradesPage.Embedded.Records, since)
trades = append(trades, cleanTrades...)
if last {
break
}
// Enforcing limit of results:
if limit != 0 {
numTrades := len(trades)
if numTrades >= limit {
diff := numTrades - limit
trades = trades[0 : numTrades-diff]
break
}
}
// Finding next page's params:
nextURL := tradesPage.Links.Next.Href
n, err := nextCursor(nextURL)
if err != nil {
return trades, err
}
c.Logger.Debugln("Cursor currently at:", n)
r.Cursor = n
tradesPage, err = c.Client.Trades(r)
if err != nil {
return trades, err
}
}
return
}
// streamTrades streams trades directly from horizon and calls the handler function
// whenever a new trade appears.
func (c *ScraperConfig) streamTrades(h horizonclient.TradeHandler, cursor string) error {
if cursor == "" {
cursor = "now"
}
r := horizonclient.TradeRequest{
Limit: 200,
Cursor: cursor,
}
return r.StreamTrades(*c.Ctx, c.Client, h)
}
// addNativeData adds additional fields when one of the assets is native.
func addNativeData(trade *hProtocol.Trade) {
if trade.BaseAssetType == "native" {
trade.BaseAssetCode = "XLM"
trade.BaseAssetIssuer = "native"
}
if trade.CounterAssetType == "native" {
trade.CounterAssetCode = "XLM"
trade.CounterAssetIssuer = "native"
}
}
// reverseAssets swaps out the base and counter assets of a trade.
func reverseAssets(trade *hProtocol.Trade) {
trade.BaseAmount, trade.CounterAmount = trade.CounterAmount, trade.BaseAmount
trade.BaseAccount, trade.CounterAccount = trade.CounterAccount, trade.BaseAccount
trade.BaseAssetCode, trade.CounterAssetCode = trade.CounterAssetCode, trade.BaseAssetCode
trade.BaseAssetType, trade.CounterAssetType = trade.CounterAssetType, trade.BaseAssetType
trade.BaseAssetIssuer, trade.CounterAssetIssuer = trade.CounterAssetIssuer, trade.BaseAssetIssuer
trade.BaseIsSeller = !trade.BaseIsSeller
trade.Price.N, trade.Price.D = trade.Price.D, trade.Price.N
}