-
Notifications
You must be signed in to change notification settings - Fork 84
/
sub_task_runner.go
443 lines (401 loc) · 14.7 KB
/
sub_task_runner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
package client
import (
"context"
"github.com/dydxprotocol/v4-chain/protocol/daemons/pricefeed/client/constants"
"github.com/dydxprotocol/v4-chain/protocol/daemons/pricefeed/client/price_encoder"
"github.com/dydxprotocol/v4-chain/protocol/daemons/pricefeed/client/price_fetcher"
daemontypes "github.com/dydxprotocol/v4-chain/protocol/daemons/types"
pricetypes "github.com/dydxprotocol/v4-chain/protocol/x/prices/types"
"net/http"
"time"
gometrics "github.com/armon/go-metrics"
"github.com/cometbft/cometbft/libs/log"
"github.com/cosmos/cosmos-sdk/telemetry"
"github.com/dydxprotocol/v4-chain/protocol/daemons/pricefeed/api"
"github.com/dydxprotocol/v4-chain/protocol/daemons/pricefeed/client/handler"
"github.com/dydxprotocol/v4-chain/protocol/daemons/pricefeed/client/types"
pricefeedmetrics "github.com/dydxprotocol/v4-chain/protocol/daemons/pricefeed/metrics"
"github.com/dydxprotocol/v4-chain/protocol/lib/metrics"
)
var (
HttpClient = http.Client{
Transport: &http.Transport{MaxConnsPerHost: constants.MaxConnectionsPerExchange},
}
)
// SubTaskRunnerImpl is the struct that implements the `SubTaskRunner` interface.
type SubTaskRunnerImpl struct{}
// Ensure the `SubTaskRunnerImpl` struct is implemented at compile time.
var _ SubTaskRunner = (*SubTaskRunnerImpl)(nil)
// SubTaskRunner is the interface for running pricefeed client task functions.
type SubTaskRunner interface {
StartPriceUpdater(
ctx context.Context,
ticker *time.Ticker,
stop <-chan bool,
exchangeToMarketPrices types.ExchangeToMarketPrices,
priceFeedServiceClient api.PriceFeedServiceClient,
logger log.Logger,
)
StartPriceEncoder(
exchangeId types.ExchangeId,
configs types.PricefeedMutableMarketConfigs,
exchangeToMarketPrices types.ExchangeToMarketPrices,
logger log.Logger,
bCh <-chan *price_fetcher.PriceFetcherSubtaskResponse,
)
StartPriceFetcher(
ticker *time.Ticker,
stop <-chan bool,
configs types.PricefeedMutableMarketConfigs,
exchangeStartupConfig types.ExchangeStartupConfig,
exchangeDetails types.ExchangeQueryDetails,
queryHandler handler.ExchangeQueryHandler,
logger log.Logger,
bCh chan<- *price_fetcher.PriceFetcherSubtaskResponse,
)
StartMarketParamUpdater(
ctx context.Context,
ticker *time.Ticker,
stop <-chan bool,
configs types.PricefeedMutableMarketConfigs,
pricesQueryClient pricetypes.QueryClient,
logger log.Logger,
)
}
// StartPriceUpdater periodically runs a task loop to send price updates to the pricefeed server
// via:
// 1) Get `MarketPriceTimestamps` for all exchanges in an `ExchangeToMarketPrices` struct.
// 2) Transform `MarketPriceTimestamps` and exchange ids into an `UpdateMarketPricesRequest` struct.
// StartPriceUpdater runs in the daemon's main goroutine and does not need access to the daemon's wait group
// to signal task completion.
func (s *SubTaskRunnerImpl) StartPriceUpdater(
ctx context.Context,
ticker *time.Ticker,
stop <-chan bool,
exchangeToMarketPrices types.ExchangeToMarketPrices,
priceFeedServiceClient api.PriceFeedServiceClient,
logger log.Logger,
) {
for {
select {
case <-ticker.C:
err := RunPriceUpdaterTaskLoop(ctx, exchangeToMarketPrices, priceFeedServiceClient, logger)
if err != nil {
panic(err)
}
case <-stop:
return
}
}
}
// StartPriceEncoder continuously reads from a buffered channel, reading encoded API responses for exchange
// requests and inserting them into an `ExchangeToMarketPrices` cache, performing currency conversions based
// on the index price of other markets as necessary.
// StartPriceEncoder reads price fetcher responses from a shared channel, and does not need a ticker or stop
// signal from the daemon to exit. It marks itself as done in the daemon's wait group when the price fetcher
// closes the shared channel.
func (s *SubTaskRunnerImpl) StartPriceEncoder(
exchangeId types.ExchangeId,
configs types.PricefeedMutableMarketConfigs,
exchangeToMarketPrices types.ExchangeToMarketPrices,
logger log.Logger,
bCh <-chan *price_fetcher.PriceFetcherSubtaskResponse,
) {
exchangeMarketConfig, err := configs.GetExchangeMarketConfigCopy(exchangeId)
if err != nil {
panic(err)
}
marketConfigs, err := configs.GetMarketConfigCopies(exchangeMarketConfig.GetMarketIds())
if err != nil {
panic(err)
}
priceEncoder, err := price_encoder.NewPriceEncoder(
exchangeMarketConfig,
marketConfigs,
exchangeToMarketPrices,
logger,
bCh,
)
if err != nil {
panic(err)
}
configs.AddPriceEncoder(priceEncoder)
// Listen for prices from the buffered channel and update the exchangeToMarketPrices cache.
// Also log any errors that occur.
for response := range bCh {
priceEncoder.ProcessPriceFetcherResponse(response)
}
}
// StartPriceFetcher periodically starts goroutines to "fetch" market prices from a specific exchange. Each
// goroutine does the following:
// 1) query a single market price from a specific exchange
// 2) transform response to `MarketPriceTimestamp`
// 3) send transformed response to a buffered channel that's shared across multiple goroutines
// NOTE: the subtask response shared channel has a buffer size and goroutines will block if the buffer is full.
// NOTE: the price fetcher kicks off 1 to n go routines every time the subtask loop runs, but the subtask
// loop blocks until all go routines are done. This means that these go routines are not tracked by the wait group.
func (s *SubTaskRunnerImpl) StartPriceFetcher(
ticker *time.Ticker,
stop <-chan bool,
configs types.PricefeedMutableMarketConfigs,
exchangeStartupConfig types.ExchangeStartupConfig,
exchangeDetails types.ExchangeQueryDetails,
queryHandler handler.ExchangeQueryHandler,
logger log.Logger,
bCh chan<- *price_fetcher.PriceFetcherSubtaskResponse,
) {
exchangeMarketConfig, err := configs.GetExchangeMarketConfigCopy(exchangeStartupConfig.ExchangeId)
if err != nil {
panic(err)
}
marketConfigs, err := configs.GetMarketConfigCopies(exchangeMarketConfig.GetMarketIds())
if err != nil {
panic(err)
}
// Create PriceFetcher to begin querying with.
priceFetcher, err := price_fetcher.NewPriceFetcher(
exchangeStartupConfig,
exchangeDetails,
exchangeMarketConfig,
marketConfigs,
queryHandler,
logger,
bCh,
)
if err != nil {
panic(err)
}
// The PricefeedMutableMarketConfigs object that stores the configs for each exchange
// is not initialized with the price fetcher, because both objects have references to
// each other defined during normal daemon operation. Instead, the price fetcher is
// initialized with the configs object after the price fetcher is created, and then adds
// itself to the config's list of exchange config updaters here.
configs.AddPriceFetcher(priceFetcher)
requestHandler := daemontypes.NewRequestHandlerImpl(
&HttpClient,
)
// Begin loop to periodically start goroutines to query market prices.
for {
select {
case <-ticker.C:
// Start goroutines to query exchange markets. The goroutines started by the price
// fetcher are not tracked by the global wait group, because RunTaskLoop will
// block until all goroutines are done.
priceFetcher.RunTaskLoop(requestHandler)
case <-stop:
// Signal to the encoder that the price fetcher is done.
close(bCh)
return
}
}
}
// StartMarketParamUpdater periodically starts a goroutine to update the market parameters that control which
// markets the daemon queries and how they are queried and computed from each exchange.
func (s *SubTaskRunnerImpl) StartMarketParamUpdater(
ctx context.Context,
ticker *time.Ticker,
stop <-chan bool,
configs types.PricefeedMutableMarketConfigs,
pricesQueryClient pricetypes.QueryClient,
logger log.Logger,
) {
// Delay reporting certain errors for a grace period to allow the daemon to start up. There is a bit of a race
// condition here with reading/writing the variable, but it's not a big deal if there is some jitter in the
// timing of the grace period ending.
isPastGracePeriod := false
go func() {
time.Sleep(constants.PriceDaemonStartupErrorGracePeriod)
isPastGracePeriod = true
}()
// Periodically update market parameters.
for {
select {
case <-ticker.C:
RunMarketParamUpdaterTaskLoop(ctx, configs, pricesQueryClient, logger, isPastGracePeriod)
case <-stop:
return
}
}
}
// -------------------- Task Loops -------------------- //
// RunPriceUpdaterTaskLoop copies the map of current `exchangeId -> MarketPriceTimestamp`,
// transforms the map values into a market price update request and sends the request to the socket
// where the pricefeed server is listening.
func RunPriceUpdaterTaskLoop(
ctx context.Context,
exchangeToMarketPrices types.ExchangeToMarketPrices,
priceFeedServiceClient api.PriceFeedServiceClient,
logger log.Logger,
) error {
logger = logger.With(constants.SubmoduleLogKey, constants.PriceUpdaterSubmoduleName)
priceUpdates := exchangeToMarketPrices.GetAllPrices()
request := transformPriceUpdates(priceUpdates)
// Measure latency to send prices over gRPC.
// Note: intentionally skipping latency for `GetAllPrices`.
defer telemetry.ModuleMeasureSince(
metrics.PricefeedDaemon,
time.Now(),
metrics.PriceUpdaterSendPrices,
metrics.Latency,
)
// On startup the length of request will likely be 0. However, sending a request of length 0
// is a fatal error.
// panic: rpc error: code = Unknown desc = Market price update has length of 0.
if len(request.MarketPriceUpdates) > 0 {
_, err := priceFeedServiceClient.UpdateMarketPrices(ctx, request)
if err != nil {
// Log error if an error will be returned from the task loop and measure failure.
logger.Error("Failed to run price updater task loop for price daemon", "error", err)
telemetry.IncrCounter(
1,
metrics.PricefeedDaemon,
metrics.PriceUpdaterTaskLoop,
metrics.Error,
)
return err
}
} else {
// This is expected to happen on startup until prices have been encoded into the in-memory
// `exchangeToMarketPrices` map. After that point, there should be no price updates of length 0.
logger.Info("Price update had length of 0")
telemetry.IncrCounter(
1,
metrics.PricefeedDaemon,
metrics.PriceUpdaterZeroPrices,
metrics.Count,
)
}
return nil
}
// RunMarketParamUpdaterTaskLoop queries all market params from the query client, and then updates the
// shared, in-memory `PricefeedMutableMarketConfigs` object with the latest market params.
func RunMarketParamUpdaterTaskLoop(
ctx context.Context,
configs types.PricefeedMutableMarketConfigs,
pricesQueryClient pricetypes.QueryClient,
logger log.Logger,
isPastGracePeriod bool,
) {
// Measure latency to fetch and parse the market params, and propagate all updates.
defer telemetry.ModuleMeasureSince(
metrics.PricefeedDaemon,
time.Now(),
metrics.MarketUpdaterUpdateMarkets,
metrics.Latency,
)
logger = logger.With(constants.SubmoduleLogKey, constants.MarketParamUpdaterSubmoduleName)
// Query all market params from the query client.
getAllMarketsResponse, err := pricesQueryClient.AllMarketParams(ctx, &pricetypes.QueryAllMarketParamsRequest{})
if err != nil {
var logMethod = logger.Info
if isPastGracePeriod {
// When the daemon starts, there is normally a delay between when the prices daemon starts and the prices
// query service becomes available. This is not a true error condition, so we log it as info instead of
// error in order to avoid spurious error logs and alerts.
logMethod = logger.Error
}
logMethod("Failed to get all market params",
"error",
err,
)
// Measure all failures to retrieve market params from the query client.
telemetry.IncrCounter(
1,
metrics.PricefeedDaemon,
metrics.MarketUpdaterGetAllMarketParams,
metrics.Error,
)
return
}
// Update shared, in-memory config with the latest market params. Report update success/failure via logging/metrics.
marketParamErrors, err := configs.UpdateMarkets(getAllMarketsResponse.MarketParams)
for _, marketParam := range getAllMarketsResponse.MarketParams {
outcome := metrics.Success
// Mark this update as an error either if this market failed to update, or if all markets failed.
if _, ok := marketParamErrors[marketParam.Id]; ok || err != nil {
outcome = metrics.Error
}
telemetry.IncrCounterWithLabels(
[]string{metrics.PricefeedDaemon, metrics.MarketUpdaterApplyMarketUpdates, outcome},
1,
[]gometrics.Label{
pricefeedmetrics.GetLabelForMarketId(marketParam.Id),
},
)
}
if err != nil {
logger.Error(
"Failed to apply all market updates",
"error",
err,
"marketParamErrors",
marketParamErrors,
)
} else if len(marketParamErrors) > 0 {
logger.Error(
"Failed to apply some market updates",
"marketParamErrors",
marketParamErrors,
)
}
}
// -------------------- Task Loop Helpers -------------------- //
// transformPriceUpdates transforms a map (key: exchangeId, value: list of market prices) into a
// market price update request.
func transformPriceUpdates(
updates map[types.ExchangeId][]types.MarketPriceTimestamp,
) *api.UpdateMarketPricesRequest {
// Measure latency to transform prices being sent over gRPC.
defer telemetry.ModuleMeasureSince(
metrics.PricefeedDaemon,
time.Now(),
metrics.PriceUpdaterTransformPrices,
metrics.Latency,
)
marketPriceUpdateMap := make(map[types.MarketId]*api.MarketPriceUpdate)
// Invert to marketId -> `api.MarketPriceUpdate`.
for exchangeId, marketPriceTimestamps := range updates {
for _, marketPriceTimestamp := range marketPriceTimestamps {
telemetry.IncrCounterWithLabels(
[]string{
metrics.PricefeedDaemon,
metrics.PriceUpdateCount,
metrics.Count,
},
1,
[]gometrics.Label{
pricefeedmetrics.GetLabelForExchangeId(exchangeId),
pricefeedmetrics.GetLabelForMarketId(marketPriceTimestamp.MarketId),
},
)
marketPriceUpdate, exists := marketPriceUpdateMap[marketPriceTimestamp.MarketId]
// Add key with empty `api.MarketPriceUpdate` if entry does not exist.
if !exists {
marketPriceUpdate = &api.MarketPriceUpdate{
MarketId: marketPriceTimestamp.MarketId,
ExchangePrices: []*api.ExchangePrice{},
}
marketPriceUpdateMap[marketPriceTimestamp.MarketId] = marketPriceUpdate
}
// Add `api.ExchangePrice`.
priceUpdateTime := marketPriceTimestamp.LastUpdatedAt
exchangePrice := &api.ExchangePrice{
ExchangeId: exchangeId,
Price: marketPriceTimestamp.Price,
LastUpdateTime: &priceUpdateTime,
}
marketPriceUpdate.ExchangePrices = append(marketPriceUpdate.ExchangePrices, exchangePrice)
}
}
// Add all `api.MarketPriceUpdate` to request to be sent by `client.UpdateMarketPrices`.
request := &api.UpdateMarketPricesRequest{
MarketPriceUpdates: make([]*api.MarketPriceUpdate, 0, len(marketPriceUpdateMap)),
}
for _, update := range marketPriceUpdateMap {
request.MarketPriceUpdates = append(
request.MarketPriceUpdates,
update,
)
}
return request
}