-
Notifications
You must be signed in to change notification settings - Fork 86
/
exchange_to_market_prices.go
137 lines (118 loc) · 4.45 KB
/
exchange_to_market_prices.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package types
import (
"errors"
"fmt"
"time"
"github.com/cosmos/cosmos-sdk/telemetry"
"github.com/dydxprotocol/v4-chain/protocol/daemons/pricefeed/types"
"github.com/dydxprotocol/v4-chain/protocol/lib/metrics"
)
// ExchangeToMarketPrices maintains price info for multiple exchanges. Each exchange can support
// prices from multiple market sources. Methods are goroutine safe in the underlying MarketToPrice
// objects.
type ExchangeToMarketPrices interface {
UpdatePrice(
exchangeId ExchangeId,
marketPriceTimestamp *MarketPriceTimestamp,
)
GetAllPrices() map[ExchangeId][]MarketPriceTimestamp
GetIndexPrice(
marketId MarketId,
cutoffTime time.Time,
resolver types.Resolver,
) (
medianPrice uint64,
numPricesMedianized int,
)
}
type ExchangeToMarketPricesImpl struct {
// {k: exchangeId, v: market prices, read-write lock}
ExchangeMarketPrices map[ExchangeId]*MarketToPrice
}
// Enforce conformity of ExchangeToMarketPricesImpl to ExchangeToMarketPrices interface at compile time.
var _ ExchangeToMarketPrices = &ExchangeToMarketPricesImpl{}
// NewExchangeToMarketPrices creates a new ExchangeToMarketPrices. Since `ExchangeToMarketPrices` is
// not goroutine safe to write to, all key-value store creation is done on initialization.
// Validation is also done to verify `exchangeIds` is a valid input.
func NewExchangeToMarketPrices(exchangeIds []ExchangeId) (ExchangeToMarketPrices, error) {
// Verify `ExchangeToMarketPrices` will not be initialized without `exchangeIds`.
if len(exchangeIds) == 0 {
return nil, errors.New("exchangeIds must not be empty")
}
exchangeToMarketPrices := &ExchangeToMarketPricesImpl{
ExchangeMarketPrices: make(map[ExchangeId]*MarketToPrice, len(exchangeIds)),
}
for _, exchangeId := range exchangeIds {
// Verify no `exchangeIds` are duplicates.
if _, ok := exchangeToMarketPrices.ExchangeMarketPrices[exchangeId]; ok {
return nil, fmt.Errorf("exchangeId: '%v' appears twice in request", exchangeId)
}
exchangeToMarketPrices.ExchangeMarketPrices[exchangeId] = NewMarketToPrice()
}
return exchangeToMarketPrices, nil
}
// UpdatePrice updates a price for a market for an exchange. Prices are only updated if the
// timestamp on the updates are greater than the timestamp on existing prices. NOTE:
// `UpdatePrice` will only ever read from `ExchangeMarketPrices` and calls a
// goroutine-safe method on the fetched `MarketToPrice`.
// Note: if an invalid `exchangeId` is being written to the `UpdatePrice` it is possible the
// underlying map was corrupted or the priceDaemon logic is invalid. Therefore, `UpdatePrice`
// will panic.
func (exchangeToMarketPrices *ExchangeToMarketPricesImpl) UpdatePrice(
exchangeId ExchangeId,
marketPriceTimestamp *MarketPriceTimestamp,
) {
// Measure latency to update price in in-memory map.
defer telemetry.ModuleMeasureSince(
metrics.PricefeedDaemon,
time.Now(),
metrics.PriceEncoderUpdatePrice,
metrics.Latency,
)
exchangeToMarketPrices.ExchangeMarketPrices[exchangeId].UpdatePrice(marketPriceTimestamp)
}
// GetAllPrices returns a map of exchangeIds to a list of all `MarketPriceTimestamps` for the exchange.
func (exchangeToMarketPrices *ExchangeToMarketPricesImpl) GetAllPrices() map[ExchangeId][]MarketPriceTimestamp {
// Measure latency to get all prices from in-memory map.
defer telemetry.ModuleMeasureSince(
metrics.PricefeedDaemon,
time.Now(),
metrics.GetAllPrices_MarketIdToPrice,
metrics.Latency,
)
exchangeIdToPrices := make(
map[ExchangeId][]MarketPriceTimestamp,
len(exchangeToMarketPrices.ExchangeMarketPrices),
)
for exchangeId, mtp := range exchangeToMarketPrices.ExchangeMarketPrices {
marketPrices := mtp.GetAllPrices()
exchangeIdToPrices[exchangeId] = marketPrices
}
return exchangeIdToPrices
}
// GetIndexPrice returns the index price for a given marketId, disallowing prices that are older than cutoffTime.
// If no valid prices are found, an error is returned.
func (exchangeToMarketPrices *ExchangeToMarketPricesImpl) GetIndexPrice(
marketId MarketId,
cutoffTime time.Time,
resolver types.Resolver,
) (
medianPrice uint64,
numPricesMedianized int,
) {
prices := make([]uint64, 0, len(exchangeToMarketPrices.ExchangeMarketPrices))
for _, mtp := range exchangeToMarketPrices.ExchangeMarketPrices {
price, ok := mtp.GetValidPriceForMarket(marketId, cutoffTime)
if ok {
prices = append(prices, price)
}
}
if len(prices) == 0 {
return 0, 0
}
median, err := resolver(prices)
if err != nil {
return 0, 0
}
return median, len(prices)
}