/
governor_prices.go
301 lines (251 loc) · 9.39 KB
/
governor_prices.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
// This file contains the code to query for and update token prices for the chain governor.
//
// The initial prices are read from the static config (tokens.go). After that, prices are
// queried from CoinGecko. The chain governor then uses the maximum of the static price and
// the latest CoinGecko price. The CoinGecko poll interval is specified by coinGeckoQueryIntervalInMins.
package governor
import (
"context"
"encoding/json"
"fmt"
"io"
"math/big"
"net/http"
"net/url"
"strings"
"time"
"go.uber.org/zap"
"github.com/deltaswapio/deltaswap/node/pkg/common"
"github.com/deltaswapio/deltaswap/node/pkg/db"
"github.com/deltaswapio/deltaswap/node/pkg/supervisor"
)
// The CoinGecko API is documented here: https://www.coingecko.com/en/api/documentation
// An example of the query to be generated: https://api.coingecko.com/api/v3/simple/price?ids=gemma-extending-tech,bitcoin,weth&vs_currencies=usd
// coinGeckoQueryIntervalInMins specifies how often we query CoinGecko for prices.
const coinGeckoQueryIntervalInMins = 15
// tokensPerCoinGeckoQuery specifies how many tokens will be in each CoinGecko query. The token list will be broken up into chunks of this size.
const tokensPerCoinGeckoQuery = 200
// initCoinGecko builds the set of CoinGecko queries that will be used to update prices. It also starts a go routine to periodically do the queries.
func (gov *ChainGovernor) initCoinGecko(ctx context.Context, run bool) error {
// Create a slice of all the CoinGecko IDs so we can create the corresponding queries.
ids := make([]string, 0, len(gov.tokensByCoinGeckoId))
for id := range gov.tokensByCoinGeckoId {
ids = append(ids, id)
}
// Create the set of queries, breaking the IDs into the appropriate size chunks.
gov.coinGeckoQueries = createCoinGeckoQueries(ids, tokensPerCoinGeckoQuery)
for queryIdx, query := range gov.coinGeckoQueries {
gov.logger.Info("coingecko query: ", zap.Int("queryIdx", queryIdx), zap.String("query", query))
}
if len(gov.coinGeckoQueries) == 0 {
gov.logger.Info("did not find any tokens, nothing to do!")
return nil
}
if run {
if err := supervisor.Run(ctx, "govpricer", gov.PriceQuery); err != nil {
return err
}
}
return nil
}
// createCoinGeckoQueries creates the set of CoinGecko queries, breaking the set of IDs into the appropriate size chunks.
func createCoinGeckoQueries(idList []string, tokensPerQuery int) []string {
var queries []string
queryIdx := 0
tokenIdx := 0
ids := ""
first := true
for _, coinGeckoId := range idList {
if tokenIdx%tokensPerQuery == 0 && tokenIdx != 0 {
queries = append(queries, createCoinGeckoQuery(ids))
ids = ""
first = true
queryIdx += 1
}
if first {
first = false
} else {
ids += ","
}
ids += coinGeckoId
tokenIdx += 1
}
if ids != "" {
queries = append(queries, createCoinGeckoQuery(ids))
}
return queries
}
// createCoinGeckoQuery creates a CoinGecko query for the specified set of IDs.
func createCoinGeckoQuery(ids string) string {
params := url.Values{}
params.Add("ids", ids)
params.Add("vs_currencies", "usd")
query := "https://api.coingecko.com/api/v3/simple/price?" + params.Encode()
return query
}
// PriceQuery is the entry point for the routine that periodically queries CoinGecko for prices.
func (gov *ChainGovernor) PriceQuery(ctx context.Context) error {
// Do a query immediately, then once each interval.
// We ignore the error because an error would already have been logged, and we don't want to bring down the
// phylax due to a CoinGecko error. The prices would already have been reverted to the config values.
_ = gov.queryCoinGecko()
ticker := time.NewTicker(time.Duration(coinGeckoQueryIntervalInMins) * time.Minute)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
_ = gov.queryCoinGecko()
}
}
}
// queryCoinGecko sends a series of of one or more queries to the CoinGecko server to get the latest prices. It can
// return an error, but that is only used by the tool that validates the query. In the actual governor,
// it just logs the error and we will try again next interval. If an error happens, any tokens that have
// not been updated will be assigned their pre-configured price.
func (gov *ChainGovernor) queryCoinGecko() error {
result := make(map[string]interface{})
for queryIdx, query := range gov.coinGeckoQueries {
thisResult, err := gov.queryCoinGeckoChunk(query)
if err != nil {
gov.logger.Error("CoinGecko query failed", zap.Int("queryIdx", queryIdx), zap.String("query", query), zap.Error(err))
gov.revertAllPrices()
return err
}
for key, value := range thisResult {
result[key] = value
}
time.Sleep(1 * time.Second)
}
now := time.Now()
gov.mutex.Lock()
defer gov.mutex.Unlock()
localTokenMap := make(map[string][]*tokenEntry)
for coinGeckoId, cge := range gov.tokensByCoinGeckoId {
localTokenMap[coinGeckoId] = cge
}
for coinGeckoId, data := range result {
cge, exists := gov.tokensByCoinGeckoId[coinGeckoId]
if exists {
// If a price is not set in CoinGecko, they return an empty entry. Treat that as a zero price.
var price float64
m, ok := data.(map[string]interface{})
if !ok {
gov.logger.Error("failed to parse CoinGecko response, reverting to configured price for this token", zap.String("coinGeckoId", coinGeckoId))
// By continuing, we leave this one in the local map so the price will get reverted below.
continue
}
if len(m) != 0 {
var ok bool
price_, ok := m["usd"]
if !ok {
gov.logger.Error("failed to parse CoinGecko response, reverting to configured price for this token", zap.String("coinGeckoId", coinGeckoId))
// By continuing, we leave this one in the local map so the price will get reverted below.
continue
}
price, ok = price_.(float64)
if !ok {
gov.logger.Error("failed to parse CoinGecko response, reverting to configured price for this token", zap.String("coinGeckoId", coinGeckoId))
// By continuing, we leave this one in the local map so the price will get reverted below.
continue
}
}
for _, te := range cge {
te.coinGeckoPrice = big.NewFloat(price)
te.updatePrice()
te.priceTime = now
}
delete(localTokenMap, coinGeckoId)
} else {
gov.logger.Error("received a CoinGecko response for an unexpected symbol", zap.String("coinGeckoId", coinGeckoId))
}
}
if len(localTokenMap) != 0 {
for _, lcge := range localTokenMap {
for _, te := range lcge {
gov.logger.Error("did not receive a CoinGecko response for symbol, reverting to configured price",
zap.String("symbol", te.symbol),
zap.String("coinGeckoId",
te.coinGeckoId),
zap.Stringer("cfgPrice", te.cfgPrice),
)
te.price = te.cfgPrice
// Don't update the timestamp so we'll know when we last received an update from CoinGecko.
}
}
return fmt.Errorf("failed to update prices for some tokens")
}
return nil
}
// queryCoinGeckoChunk sends a single CoinGecko query and returns the result.
func (gov *ChainGovernor) queryCoinGeckoChunk(query string) (map[string]interface{}, error) {
var result map[string]interface{}
gov.logger.Debug("executing CoinGecko query", zap.String("query", query))
response, err := http.Get(query) //nolint:gosec,noctx
if err != nil {
return result, fmt.Errorf("failed to query CoinGecko: %w", err)
}
defer func() {
err = response.Body.Close()
if err != nil {
gov.logger.Error("failed to close CoinGecko query: %w", zap.Error(err))
}
}()
responseData, err := io.ReadAll(response.Body)
if err != nil {
return result, fmt.Errorf("failed to read CoinGecko response: %w", err)
}
resp := string(responseData)
if strings.Contains(resp, "error_code") {
return result, fmt.Errorf("CoinGecko query failed: %s", resp)
}
if err := json.Unmarshal(responseData, &result); err != nil {
return result, fmt.Errorf("failed to unmarshal CoinGecko json: %w", err)
}
return result, nil
}
// revertAllPrices reverts the price of all tokens to the configured prices. It is used when a CoinGecko query fails.
func (gov *ChainGovernor) revertAllPrices() {
gov.mutex.Lock()
defer gov.mutex.Unlock()
for _, cge := range gov.tokensByCoinGeckoId {
for _, te := range cge {
gov.logger.Info("reverting to configured price",
zap.String("symbol", te.symbol),
zap.String("coinGeckoId", te.coinGeckoId),
zap.Stringer("cfgPrice", te.cfgPrice),
)
te.price = te.cfgPrice
// Don't update the timestamp so we'll know when we last received an update from CoinGecko.
}
}
}
// updatePrice updates the price of a single token. We should use the max(coinGeckoPrice, configuredPrice) as our price for computing notional value.
func (te tokenEntry) updatePrice() {
if (te.coinGeckoPrice == nil) || (te.coinGeckoPrice.Cmp(te.cfgPrice) < 0) {
te.price.Set(te.cfgPrice)
} else {
te.price.Set(te.coinGeckoPrice)
}
}
// CheckQuery is a free function used to test that the CoinGecko query still works after the mainnet token list has been updated.
func CheckQuery(logger *zap.Logger) error {
logger.Info("Instantiating governor.")
ctx := context.Background()
var db db.MockGovernorDB
gov := NewChainGovernor(logger, &db, common.MainNet)
if err := gov.initConfig(); err != nil {
return err
}
logger.Info("Building CoinGecko query.")
if err := gov.initCoinGecko(ctx, false); err != nil {
return err
}
logger.Info("Initiating CoinGecko query.")
if err := gov.queryCoinGecko(); err != nil {
return err
}
logger.Info("CoinGecko query complete.")
return nil
}