-
Notifications
You must be signed in to change notification settings - Fork 0
/
market_to_exchange_prices.go
127 lines (117 loc) · 4.09 KB
/
market_to_exchange_prices.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package types
import (
"sync"
"time"
gometrics "github.com/armon/go-metrics"
"github.com/cosmos/cosmos-sdk/telemetry"
"github.com/furyanprotocol/v4-chain/protocol/daemons/pricefeed/api"
pricefeedmetrics "github.com/furyanprotocol/v4-chain/protocol/daemons/pricefeed/metrics"
"github.com/furyanprotocol/v4-chain/protocol/lib"
"github.com/furyanprotocol/v4-chain/protocol/lib/metrics"
"github.com/furyanprotocol/v4-chain/protocol/x/prices/types"
)
// MarketToExchangePrices maintains price info for multiple markets. Each
// market can support prices from multiple exchange sources. Specifically,
// MarketToExchangePrices supports methods to update prices and to retrieve
// median prices. Methods are goroutine safe.
type MarketToExchangePrices struct {
sync.Mutex // lock
marketToExchangePrices map[uint32]*ExchangeToPrice // {k: market id, v: exchange prices}
// maxPriceAge is the maximum age of a price before it is considered too stale to be used.
// Prices older than this age will not be used to calculate the median price.
maxPriceAge time.Duration
}
// NewMarketToExchangePrices creates a new MarketToExchangePrices.
func NewMarketToExchangePrices(maxPriceAge time.Duration) *MarketToExchangePrices {
return &MarketToExchangePrices{
marketToExchangePrices: make(map[uint32]*ExchangeToPrice),
maxPriceAge: maxPriceAge,
}
}
// UpdatePrices updates market prices given a list of price updates. Prices are
// only updated if the timestamp on the updates are greater than the timestamp
// on existing prices.
func (mte *MarketToExchangePrices) UpdatePrices(
updates []*api.MarketPriceUpdate) {
mte.Lock()
defer mte.Unlock()
for _, marketPriceUpdate := range updates {
marketId := marketPriceUpdate.MarketId
exchangeToPrices, ok := mte.marketToExchangePrices[marketId]
if !ok {
exchangeToPrices = NewExchangeToPrice(marketId)
mte.marketToExchangePrices[marketId] = exchangeToPrices
}
exchangeToPrices.UpdatePrices(marketPriceUpdate.ExchangePrices)
}
}
// GetValidMedianPrices returns median prices for multiple markets.
// Specifically, it returns a map where the key is the market ID and the value
// is the median price for the market. It only returns "valid" prices where
// a price is valid iff
// 1) the last update time is within a predefined threshold away from the given
// read time.
// 2) the number of prices that meet 1) are greater than the minimum number of
// exchanges specified in the given input.
func (mte *MarketToExchangePrices) GetValidMedianPrices(
marketParams []types.MarketParam,
readTime time.Time,
) map[uint32]uint64 {
cutoffTime := readTime.Add(-mte.maxPriceAge)
marketIdToMedianPrice := make(map[uint32]uint64)
mte.Lock()
defer mte.Unlock()
for _, marketParam := range marketParams {
marketId := marketParam.Id
exchangeToPrice, ok := mte.marketToExchangePrices[marketId]
if !ok {
// No market price info yet, skip this market.
telemetry.IncrCounterWithLabels(
[]string{
metrics.PricefeedServer,
metrics.NoMarketPrice,
metrics.Count,
},
1,
[]gometrics.Label{
pricefeedmetrics.GetLabelForMarketId(marketId),
},
)
continue
}
// GetValidPriceForMarket filters prices based on cutoff time.
validPrices := exchangeToPrice.GetValidPrices(cutoffTime)
telemetry.SetGaugeWithLabels(
[]string{
metrics.PricefeedServer,
metrics.ValidPrices,
metrics.Count,
},
float32(len(validPrices)),
[]gometrics.Label{
pricefeedmetrics.GetLabelForMarketId(marketId),
},
)
// The number of valid prices must be >= min number of exchanges.
if len(validPrices) >= int(marketParam.MinExchanges) {
// Calculate the median. Returns an error if the input is empty.
median, err := lib.Median(validPrices)
if err != nil {
telemetry.IncrCounterWithLabels(
[]string{
metrics.PricefeedServer,
metrics.NoValidMedianPrice,
metrics.Count,
},
1,
[]gometrics.Label{
pricefeedmetrics.GetLabelForMarketId(marketId),
},
)
continue
}
marketIdToMedianPrice[marketId] = median
}
}
return marketIdToMedianPrice
}