-
Notifications
You must be signed in to change notification settings - Fork 19
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[marketplace] Availability improvements #535
Conversation
d43aa61
to
1426f7c
Compare
- add requestId and slotIndex to Reservation (hopefully these will prove to be useful when we persist Reservations until request are completed, to add back bytes to Availability) - add querying of all reservations, with accompanying tests - change from find to findAvailabilities - move onCleanUp from SalesContext to SalesAgent as it was getting overwritten for each slot processed - remove sales agent AFTER deleting reservation, as this was causing some SIGSEGVs - retrofit testsales and testslotqueue to match updated Reservations module API
Apply to onStore errors as we are seeing undetailed errors in the dist tests logs
Because availability filtering on push was removed, when availability is added and past storage request events are queried, those requests need to be filtered by availability before being added to the queue.
1426f7c
to
0c23626
Compare
This reverts commit 0c23626.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like these changes a lot, the codes cleans up nicely by moving to the reservations model instead of using availability.used.
codex/sales/reservations.nim
Outdated
for a in availabilities: | ||
if availability =? (await a) and not availability.used: | ||
ret.add availability | ||
# NOTICE: there is a swallowed deserialization error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we log it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Surprisingly, I can't seem to log it because there is an odd compilation error, which I suspect has something to do with questionable. If I change this to:
for storable in storables.items:
if bytes =? (await storable):
without obj =? T.fromJson(bytes), e:
error "deserialization error", error = e.msg
continue
ret.add obj
I get the compilation error:
/Users/egonat/repos/status-im/nim-codex/tests/codex/sales/testsales.nim(442, 41) template/generic instantiation of `all` from here
/Users/egonat/repos/status-im/nim-codex/codex/sales/reservations.nim(458, 15) template/generic instantiation of `without` from here
/Users/egonat/repos/status-im/nim-codex/vendor/questionable/questionable/withoutresult.nim(36, 3) template/generic instantiation of `without` from here
/Users/egonat/repos/status-im/nim-codex/codex/sales/reservations.nim(459, 15) template/generic instantiation of `error` from here
/Users/egonat/repos/status-im/nim-codex/vendor/nim-chronicles/chronicles.nim(363, 10) template/generic instantiation of `log` from here
/Users/egonat/repos/status-im/nim-codex/vendor/nim-chronicles/chronicles.nim(332, 21) Error: undeclared identifier: 'activeChroniclesStream'
This is extrememly odd because there are other uses of without
and chronicles error logging in the same module and there are no compilation errors there.
Any ideas?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For future reference: needed to export chronicles.
Due to the generic type parameter, the context at which this proc is instantiated and called (which is different than at compile time) does not contain chronicles in its scope unless exported by the current module.
- ensure compiler support for changes to SomeStorableObject types in Reservations.storables - do not swallow json deserialization error
This should be g2g mark, ready for another look. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pretty great work! I like the testing! Although I have a few comments, please have a look ;-)
codex/sales.nim
Outdated
let agent = newSalesAgent( | ||
sales.context, | ||
slot.request.id, | ||
slot.slotIndex, | ||
some slot.request) | ||
|
||
agent.context.onCleanUp = proc {.async.} = await sales.remove(agent) | ||
agent.onCleanUp = proc {.async.} = await sales.remove(agent) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be set to the await sales.cleanUp(agent, done)
as well, otherwise loaded Sales won't do proper cleanup.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. I must've missed this because of the rebased changes (on top of your changes that modified at which point we complete the processing future).
The loaded slots are not processed in the context of the slot queue, however, I've created a dummy future to pass in to sales.cleanUp
.
codex/sales/salesagent.nim
Outdated
onCleanUp*: OnCleanUp | ||
|
||
OnCleanUp* = proc: Future[void] {.gcsafe, upraises: [].} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did you duplicate onCleanUp
callback here from context
? And why didn't you move it here and remove it from context
? Now I believe it is duplicated, right? Even though only this new one is being used?
IMHO I would leave it at the context
as there are also other callbacks...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Took me a while to remember, thank god for well-documented git comments 😄
You're right, this should not be duplicated, which has been updated (by removing OnCleanUp
from SalesContext
.
The reason this was moved was because every time processSlot
was called, the context's onCleanUp
callback was overwritten, taking the done
future in its closure with it. Effectively, done
was only called for the last processed slot, and the other workers would remain blocked. This will likely also happen with onFilled
, so I've moved that to SalesAgent
.
We have slot/agent-level callbacks, that are specific to the slot and the callback closure will be overwritten for each slot that is process. Then we have sales-level callbacks, in which their closure will not be overwritten for each slot processed. I've added som comments regarded this.
OnSale
is a sales-level callback, which is triggered once a slot is filled. It is only used in the tests, which I don't particularly care for, but I don't think we have better options for testing that filled
is called at this point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See df7ac21 for all the above changes.
- remove duplicated OnCleanUp from SalesContext, as it is better-placed at the agent/slot-level than sales-level, as its closure will get overwritten for each processed slot - remove onSale call from the filled state, which is now called in sales.filled. Note, onSale is only being used in the tests - In sales.load, call sales.cleanUp when agent.onCleanUp is triggered, to ensure that reservations are released correctly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
else: | ||
trace "deleted unused reservation" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will log this message for every reservation deleted, right? Maybe only one would be sufficient? 😅
@@ -21,7 +21,7 @@ asyncchecksuite "Reservations module": | |||
reservations: Reservations | |||
|
|||
setup: | |||
randomize() | |||
randomize(1.int64) # create reproducible results |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is also possible, but it will run with the same inputs every time. If it is not a problem then keep it.
Fixes #534.
Problem
When Availabilities are created, the amount of bytes in the Availability are reserved in the repo, so those bytes on disk cannot be written to otherwise. When a request for storage is received by a node, if a previously created Availability is matched, an attempt will be made to fill a slot in the request (more accurately, the request's slots are added to the SlotQueue, and eventually those slots will be processed). During download, bytes that were reserved for the Availability were released (as they were written to disk). To prevent more bytes from being released than were reserved in the Availability, the Availability was marked as used during the download, so that no other requests would match the Availability, and therefore no new downloads (and byte releases) would begin. The unfortunate downside to this, is that the number of Availabilities a node has determines the download concurrency capacity. If, for example, a node creates a single Availability that covers all available disk space the operator is willing to use, that single Availability would mean that only one download could occur at a time, meaning the node could potentially miss out on storage opportunities.
Solution
To alleviate the concurrency issue, each time a slot is processed, a Reservation is created, which takes size (aka reserved bytes) away from the Availability and stores them in the Reservation object. This can be done as many times as needed as long as there are enough bytes remaining in the Availability. Therefore, concurrent downloads are no longer limited by the number of Availabilities. Instead, they would more likely be limited to the SlotQueue's
maxWorkers
.From a database design perspective, an Availability has zero or more Reservations, and looks like this:
Reservations are persisted in the RepoStore's metadata, along with Availabilities. The metadata store key path for Reservations is
meta / sales / reservations / <availabilityId> / <reservationId>
, while Availabilities are stored one level up, egmeta / sales / reservations / <availabilityId>
, allowing all Reservations for an Availability to be queried (this is not currently needed, but may be useful when work to restore Availability size is implemented, more on this later).Lifecycle
When a reservation is created, its size is deducted from the Availability, and when a reservation is deleted, any remaining size (bytes not written to disk) is returned to the Availability. If the request finishes, is cancelled (expired), or an error occurs, the Reservation is deleted (and any undownloaded bytes returned to the Availability). In addition, when the Sales module starts, any Reservations that are not actively being used in a filled slot, are deleted.
Having a Reservation persisted until after a storage request is completed, will allow for the originally set Availability size to be reclaimed once a request contract has been completed. This is a feature that is yet to be implemented, however the work in this PR is a step in the direction towards enabling this.
Unknowns
Reservation size is determined by the
StorageAsk.slotSize
. If during download, more bytes thanslotSize
are attempted to be downloaded than this, then the Reservation update will fail, and the state machine will move to aSaleErrored
state, deleting the Reservation. This will likely prevent the slot from being filled.Notes
Based on #514