-
Notifications
You must be signed in to change notification settings - Fork 42
/
demand.go
307 lines (283 loc) · 11.7 KB
/
demand.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
package module
import (
"context"
"fmt"
"math"
"math/rand"
"sort"
cosmosMath "cosmossdk.io/math"
state "github.com/allora-network/allora-chain/x/emissions"
"github.com/allora-network/allora-chain/x/emissions/keeper"
sdk "github.com/cosmos/cosmos-sdk/types"
)
// A structure to hold the original value and a random tiebreaker
type SortableItem[T any] struct {
Value T
Weight uint64
Tiebreaker float64
}
type RequestId = string
type TopicId = uint64
type PriceAndReturn = struct {
Price cosmosMath.Uint
Return cosmosMath.Uint
}
type Demand struct {
Requests []state.InferenceRequest
FeesGenerated cosmosMath.Uint
}
// Sorts the given slice of topics in descending order according to their corresponding return, using randomness as tiebreaker
// e.g. ([]uint64{1, 2, 3}, map[uint64]uint64{1: 2, 2: 2, 3: 3}, 0) -> [3, 1, 2] or [3, 2, 1]
func SortTopicsByReturnDescWithRandomTiebreaker(valsToSort []state.Topic, weights map[TopicId]PriceAndReturn, randSeed uint64) []state.Topic {
// Convert the slice of Ts to a slice of SortableItems, each with a random tiebreaker
r := rand.New(rand.NewSource(int64(randSeed)))
items := make([]SortableItem[state.Topic], len(valsToSort))
for i, topic := range valsToSort {
items[i] = SortableItem[state.Topic]{topic, weights[topic.Id].Price.Uint64(), r.Float64()}
}
// Sort the slice of SortableItems
// If the values are equal, the tiebreaker will decide their order
sort.Slice(items, func(i, j int) bool {
if items[i].Value == items[j].Value {
return items[i].Tiebreaker > items[j].Tiebreaker
}
return items[i].Weight > items[j].Weight
})
// Extract and print the sorted values to demonstrate the sorting
sortedValues := make([]state.Topic, len(valsToSort))
for i, item := range items {
sortedValues[i] = item.Value
}
return sortedValues
}
// Check that a request:
//
// should be checked in this timestep
// AND did not expire
// AND would have enough funds to cover the potential next price
// AND admits and acceptable max price per inference
func IsValidAtPrice(
ctx sdk.Context,
k keeper.Keeper,
req state.InferenceRequest,
price cosmosMath.Uint,
currentTime uint64) (bool, error) {
reqId, err := req.GetRequestId()
if err != nil {
fmt.Println("Error getting request id: ", err)
return false, err
}
reqUnmetDemand, err := k.GetRequestDemand(ctx, reqId)
if err != nil {
fmt.Println("Error getting request demand: ", err)
return false, err
}
/*
fmt.Println("req.LastChecked+req.Cadence <= currentTime", req.LastChecked+req.Cadence <= currentTime)
fmt.Println("req.TimestampValidUntil > currentTime", req.TimestampValidUntil > currentTime)
fmt.Println("reqUnmetDemand.GTE(price)", reqUnmetDemand.GTE(price))
fmt.Println("req.MaxPricePerInference.GTE(price)", req.MaxPricePerInference.GTE(price))
*/
res :=
req.LastChecked+req.Cadence <= currentTime &&
req.TimestampValidUntil > currentTime &&
reqUnmetDemand.GTE(price) &&
req.MaxPricePerInference.GTE(price)
return res, nil
}
// Inactivates topics with below keeper.MIN_TOPIC_UNMET_DEMAND demand
// returns a list of topics that are still active after this operation
func InactivateLowDemandTopics(ctx context.Context, k keeper.Keeper) (remainingActiveTopics []*state.Topic, err error) {
remainingActiveTopics = make([]*state.Topic, 0)
topicsActive, err := k.GetActiveTopics(ctx)
if err != nil {
fmt.Println("Error getting active topics: ", err)
return nil, err
}
minTopicDemand, err := k.GetParamsMinTopicUnmetDemand(ctx)
if err != nil {
fmt.Println("Error getting min topic unmet demand: ", err)
return nil, err
}
for _, topic := range topicsActive {
topicUnmetDemand, err := k.GetTopicUnmetDemand(ctx, topic.Id)
if err != nil {
fmt.Println("Error getting unmet demand: ", err)
return nil, err
}
if topicUnmetDemand.LT(minTopicDemand) {
fmt.Printf("Inactivating topic due to no demand: %v metadata: %s\n", topic.Id, topic.Metadata)
k.InactivateTopic(ctx, topic.Id)
} else {
remainingActiveTopics = append(remainingActiveTopics, topic)
}
}
return remainingActiveTopics, nil
}
// Generate a demand curve, which is a data structure that captures the price
// that maximizes the demand drawn from valid requests. Each price is mapped to the list of people
// willing to pay AT LEAST that price for inference. Then the maximum amount of fees is found
// by multiplying the price by the number of requests willing to pay at least that price.
// TODO: think if we can sort the data structure first, then process it in order to do O(2*n)
// probably we can use some kind of ordered tree to do this
// instead of what we are currently doing which is O(n^2)
func GetRequestsThatMaxFees(
ctx sdk.Context,
k keeper.Keeper,
currentTime uint64,
requestsForGivenTopic []state.InferenceRequest) (
bestPrice cosmosMath.Uint,
maxFees cosmosMath.Uint,
requests []state.InferenceRequest,
err error) {
// Initialize a map of request price to map of valid requests
// map must be of type string, because the complex type of a Uint
// will not work for the map, equality test tests the pointer not the value
demandCurve := make(map[string]Demand)
requests = make([]state.InferenceRequest, 0)
maxFees = cosmosMath.NewUint(0)
bestPrice = cosmosMath.NewUint(0)
// Loop through inference requests and then loop again (nested) checking validity of all other inferences at the first inference's max price
for _, req := range requestsForGivenTopic {
// Check validity of current request at its own price
isValidAtPrice, err := IsValidAtPrice(ctx, k, req, req.MaxPricePerInference, currentTime)
if err != nil {
fmt.Println("Error checking if request is valid at price: ", err)
return cosmosMath.Uint{}, cosmosMath.Uint{}, nil, err
}
//fmt.Println("Req id ", req.TopicId, " is valid at price ", req.MaxPricePerInference, " : ", isValidAtPrice)
if isValidAtPrice {
price := req.MaxPricePerInference
priceStr := price.String()
_, exists := demandCurve[priceStr]
if exists {
// if the demand curve has already computed the list of buyers
// at this price level before, we dont need to do it again
continue
}
demandCurve[priceStr] = Demand{
Requests: make([]state.InferenceRequest, 0),
FeesGenerated: cosmosMath.ZeroUint()}
for _, req2 := range requestsForGivenTopic {
isValidAtPrice, err := IsValidAtPrice(ctx, k, req2, price, currentTime)
if err != nil {
fmt.Println("Error checking if request is valid at price: ", err)
return cosmosMath.Uint{}, cosmosMath.Uint{}, nil, err
}
if isValidAtPrice {
newFeesGenerated := demandCurve[priceStr].FeesGenerated.Add(price)
newRequests := append(demandCurve[priceStr].Requests, req2)
demandCurve[priceStr] = Demand{
Requests: newRequests,
FeesGenerated: newFeesGenerated}
if newFeesGenerated.GT(maxFees) {
maxFees = newFeesGenerated
bestPrice = price
}
}
}
}
}
if !bestPrice.IsZero() {
requests = demandCurve[bestPrice.String()].Requests
}
return bestPrice, maxFees, requests, nil
}
// The price of inference for a topic is determined by the price that maximizes the demand drawn from valid requests.
// Which topics get processed (inference solicitation and weight-adjustment) is based on ordering topics by their return
// at their optimal prices and then skimming the top.
func ChurnRequestsGetActiveTopicsAndDemand(ctx sdk.Context, k keeper.Keeper, currentTime uint64) ([]state.Topic, cosmosMath.Uint, error) {
topicsActive, err := InactivateLowDemandTopics(ctx, k)
if err != nil {
fmt.Println("Error getting active topics: ", err)
return nil, cosmosMath.Uint{}, err
}
topicsActiveWithDemand := make([]state.Topic, 0)
topicBestPrices := make(map[TopicId]PriceAndReturn)
requestsToDrawDemandFrom := make(map[TopicId][]state.InferenceRequest, 0)
for _, topic := range topicsActive {
inferenceRequests, err := k.GetMempoolInferenceRequestsForTopic(ctx, topic.Id)
if err != nil {
fmt.Println("Error getting mempool inference requests: ", err)
return nil, cosmosMath.Uint{}, err
}
priceOfMaxReturn, maxReturn, requestsToUse, err := GetRequestsThatMaxFees(ctx, k, currentTime, inferenceRequests)
if err != nil {
fmt.Println("Error getting requests that maximize fees: ", err)
return nil, cosmosMath.Uint{}, err
}
fmt.Println("Topic: ", topic.Id, " Price of max return: ", priceOfMaxReturn, " Max return: ", maxReturn, " Requests to use: ", len(requestsToUse))
topicsActiveWithDemand = append(topicsActiveWithDemand, *topic)
topicBestPrices[topic.Id] = PriceAndReturn{priceOfMaxReturn, maxReturn}
requestsToDrawDemandFrom[topic.Id] = requestsToUse
}
// Sort topics by topicBestPrices
sortedTopics := SortTopicsByReturnDescWithRandomTiebreaker(topicsActiveWithDemand, topicBestPrices, currentTime)
maxTopicsPerBlock, err := k.GetParamsMaxTopicsPerBlock(ctx)
if err != nil {
fmt.Println("Error getting max topics per block: ", err)
return nil, cosmosMath.Uint{}, err
}
// Take top keeper.MAX_TOPICS_PER_BLOCK number of topics with the highest demand
cutoff := uint(math.Min(float64(len(sortedTopics)), float64(maxTopicsPerBlock)))
topTopicsByReturn := sortedTopics[:cutoff]
// Reset Churn Ready Topics
err = k.ResetChurnReadyTopics(ctx)
if err != nil {
fmt.Println("Error resetting churn ready topics: ", err)
return nil, cosmosMath.Uint{}, err
}
// Determine how many funds to draw from demand and Remove depleted/insufficiently funded requests
totalFundsToDrawFromDemand := cosmosMath.NewUint(0)
var topicsToSetChurn []*state.Topic
for _, topic := range topTopicsByReturn {
// Log the accumulated met demand for each topic
k.AddTopicAccumulateMetDemand(ctx, topic.Id, topicBestPrices[topic.Id].Return)
// Draw demand from the valid requests
bestPrice := topicBestPrices[topic.Id].Price
numRequestsServed := 0
for _, req := range requestsToDrawDemandFrom[topic.Id] {
reqId, err := req.GetRequestId()
if err != nil {
fmt.Println("Error getting request demand: ", err)
return nil, cosmosMath.Uint{}, err
}
reqDemand, err := k.GetRequestDemand(ctx, reqId)
if err != nil {
fmt.Println("Error getting request demand: ", err)
return nil, cosmosMath.Uint{}, err
}
// all the previous conditionals were already applied to the requests in the previous loop
// => should never be negative
newReqDemand := reqDemand.Sub(bestPrice)
k.SetRequestDemand(ctx, reqId, newReqDemand)
// if the request is a one-shot request, remove it from the mempool
if req.Cadence == 0 {
k.RemoveFromMempool(ctx, req)
} else { // if it is a subscription check that the subscription has enough funds left to be worth serving
minRequestUnmetDemand, err := k.GetParamsMinRequestUnmetDemand(ctx)
if err != nil {
fmt.Println("Error getting min request unmet demand: ", err)
return nil, cosmosMath.Uint{}, err
}
if newReqDemand.LT(minRequestUnmetDemand) {
// Should convey to users to not surprise them. This helps prevent spamming the mempool with requests that are not worth serving
// The effectively burned dust is 1-time "cost" the consumer incurs when they create "subscriptions" they don't ever refill nor fill enough
// This encourages consumers to maximize how much they fund any single request, discouraging a pattern of many less-funded requests
k.RemoveFromMempool(ctx, req)
}
}
numRequestsServed++
}
totalFundsToDrawFromDemand = totalFundsToDrawFromDemand.Add(bestPrice.Mul(cosmosMath.NewUint(uint64(numRequestsServed))))
topicCopy := topic
topicsToSetChurn = append(topicsToSetChurn, &topicCopy)
}
// Set the topics as churn ready
err = k.SetChurnReadyTopics(ctx, state.TopicList{Topics: topicsToSetChurn})
if err != nil {
fmt.Println("Error setting churn ready topic: ", err)
return nil, cosmosMath.Uint{}, err
}
return topTopicsByReturn, totalFundsToDrawFromDemand, nil
}