Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Slot queue #455

Merged
merged 68 commits into from
Jul 25, 2023
Merged

Slot queue #455

merged 68 commits into from
Jul 25, 2023

Conversation

emizzle
Copy link
Contributor

@emizzle emizzle commented Jun 21, 2023

Closes #429.

Implementation of a slot queue for the sales module as described in the slot queue design document

In order to support contract event subscriptions in the slot queue, events in the Sales and SalesAgent modules were overhauled. Contract events are now subscribed to in the Sales module, with each event calling SalesAgent callbacks. This prevents each SalesAgent from creating their own event subscriptions which, in a busy host, could have quickly lead to the blockchain provider hitting its cap of event emitters, as well as a higher memory footprint.

  1. Relies on Support slot queue codex-contracts-eth#61 (merged and submodule commit updated)
  2. Depends on Query past contract events nim-ethers#51.
  3. Bumps nim-json-rpc submodule to 0bf2bcb

Copy link
Member

@AuHau AuHau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great (and big 😅) work! I haven't still seen everything, but I think I got the main approach you are taking for the implementation and see two issues.

The first is regarding Availabilities management. I see you left it decoupled from the SlotQueue, leaving it in places like the new Sales state "preparing" and the onRequestEvent handler. IMHO it is very tightly coupled to the decision-making "if slot should be picked up or not" which I thought should be the sole domain of the SlotQueue, so I am wondering if this logic should not be moved to SlotQueue?
In the current form, I see a potential issue, where availability can be free when passed into SlotQueue, but then the situation can change as another slot might take priority and eliminate the availability which was originally "assigned" to the original slot.

My second concern is about the efficiency of this solution. Correct me if I am wrong but this PR tries to bring cost optimization for picking up slots, right? If I understand the implementation correctly, then when there is availability (either newly-added, or some of the host's slot he hosted are finished), your implementation pretty much takes the first new Request (or its Slots) that is created since the availability is made.
The steps that I see that will happen: NewRequestEvent is emitted, the availability is asserted as free, then it is added to the queue (which is currently "frozen" as all other Slots were already removed because of no availability), this starts the queue processing and as the queue is empty it takes on the newly added slots. Is my understanding here correct? If so, then IMHO it is not really addressing the problem no?

Maybe one solution could be to do rounds, where the Queue accumulates new Requests/Slots for some time and then takes the best ones... I guess the problem of that is that somebody else might already snatch the best if it waits too long.

Another solution could be to add Slots to the Queue even if there is no availabilities. As it has cap on max. items in the queue and the least interesting slots will be removed. And once there would be available availability (there would have to be some signal coming into the SlotQueue for that), then it would start processing, until the availability is used up.

codex/sales.nim Outdated
Comment on lines 157 to 156
if availability =? waitFor reservations.find(ask.slotSize,
ask.duration,
ask.pricePerSlot,
ask.collateral,
used = false):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this. Shouldn't this happen inside the queue? I get that we don't want every request to be inserted into the queue, but you are asserting here something (eq. we have space for the request) which then might change because some other request will take over the priority and might fill the availability inside the queue.

IMHO it should be up to the queue to do this sort of decision and there should be a signal here from the Queue if it accepts new requests (eq. if all availabilities are filled) or not.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like for this to happen inside the state machine as a first step. Then we don't need the waitFor, which I'd really like to avoid.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this happen inside the queue?

It's a good idea for it to be added as a check for pushing to the queue, if we still need that check (more on this in the main comments)

I get that we don't want every request to be inserted into the queue, but you are asserting here something (eq. we have space for the request) which then might change because some other request will take over the priority and might fill the availability inside the queue.

Yes, true. There are two availabilities checks. One is done before pushing to the queue. The second is done once the item is popped from the queue before processing. The first check is, as you mentioned, to prevent putting every request onto the queue. The second check is done, as you also mentioned, to check that there is still availability which may have changed since initially added. I thought I had put the first AND second checks in the slot queue design, but after re-reading, the first check was not added. It might be better without the first check as you've pointed out in your main comment.

I would like for this to happen inside the state machine as a first step.

The availability check is already happening inside of the state machine as a first step (states/preparing.nim)

Then we don't need the waitFor, which I'd really like to avoid.

Respectfully, I disagree that this is a good reason to move the availabilities check.

codex/sales.nim Outdated
Comment on lines 276 to 249
let sqi = SlotQueueItem.init(request, slotIndex.truncate(uint64))
if err =? queue.push(sqi).errorOption:
error "Error adding slot index to slot queue", error = err.msg
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need help understanding this as well. SlotFreed is emitted when a host is kicked-out from the Slot because he missed too many proofs and got slashed too many times, right? This is not about the slots that this specific host is hosting but about any slot out there, right?

Shouldn't there be then the same logic here like at onRequestEvent? Eq. matching availabilities?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SlotFreed is emitted when a host is kicked-out from the Slot because he missed too many proofs and got slashed too many times, right?

Correct.

Shouldn't there be then the same logic here like at onRequestEvent? Eq. matching availabilities?

Indeed, moving the availability maching inside the state machine would ensure that it's used consistently, and avoids the waitFor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't there be then the same logic here like at onRequestEvent? Eq. matching availabilities?

Yes, agreed. If we are to go with the notion that an availabilities check should be done prior to pushing slots to the queue, then it should also be done when a slot is freed, as this is an indiscriminate event that did not necessarily pertain to previously-hosted slots by this host. However, if we decide not to include a first availabilities check, I guess we can leave it as is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, moving the availability maching inside the state machine would ensure that it's used consistently, and avoids the waitFor.

There is already an availabilities check inside the state machine

Comment on lines +53 to +67
without availability =? await reservations.find(
request.ask.slotSize,
request.ask.duration,
request.ask.pricePerSlot,
request.ask.collateral,
used = false):
info "no availability found for request, ignoring",
slotSize = request.ask.slotSize,
duration = request.ask.duration,
pricePerSlot = request.ask.pricePerSlot,
used = false
return some State(SaleIgnored())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't the SalesQueue handle this?


while self.running: # and self.workers.len.uint < self.maxWorkers:
if self.workers.len.uint >= self.maxWorkers:
await sleepAsync(1.millis)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO I would increase this to something like at least 100ms, maybe even more. This will "spam" the CPU...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can avoid the sleepAsync entirely by introducing an async queue of inactive workers. The workers can put themselves into the queue when they are done processing a slot. This loop could then simply async pop an item from the workers queue, without the need for a sleep.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have swapped out the seq[SlotQueueWorker] for AsyncQueue[SlotQueueWorker] as mark suggested in 8afef33.

Copy link
Contributor Author

@emizzle emizzle Jun 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, it does seem that awaiting asyncqueue.popFirst could consume the thread without any kind of throttling inside of its while aq.empty() loop (https://github.com/status-im/nim-chronos/blob/6525f4ce1d1a7eba146e5f1a53f6f105077ae686/chronos/asyncsync.nim#L364)

codex/sales/slotqueue.nim Outdated Show resolved Hide resolved
codex/sales/slotqueue.nim Outdated Show resolved Hide resolved
Copy link
Member

@markspanbroek markspanbroek left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work @emizzle! It's coming a long nicely. I left some comments, of which the definition of < for the priority queue is probably the most relevant.

codex/sales.nim Outdated Show resolved Hide resolved
codex/sales.nim Outdated Show resolved Hide resolved
codex/sales.nim Outdated
return slotIndex.u256
proc remove(sales: Sales, agent: SalesAgent) {.async.} =
await agent.stop()
if not sales.stopping:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need this optimization? I don't expect the amount of simultaneously running agents to be that large.
I'd rather avoid the added complexity.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This had to be added in because the tests would fail with an error like the sequence was modified while iterating and then usually followed by a SIGSEGV

codex/sales.nim Outdated Show resolved Hide resolved
codex/sales.nim Outdated Show resolved Hide resolved
codex/rng.nim Outdated Show resolved Hide resolved
codex/sales.nim Outdated
Comment on lines 157 to 156
if availability =? waitFor reservations.find(ask.slotSize,
ask.duration,
ask.pricePerSlot,
ask.collateral,
used = false):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like for this to happen inside the state machine as a first step. Then we don't need the waitFor, which I'd really like to avoid.

codex/sales.nim Outdated
Comment on lines 276 to 249
let sqi = SlotQueueItem.init(request, slotIndex.truncate(uint64))
if err =? queue.push(sqi).errorOption:
error "Error adding slot index to slot queue", error = err.msg
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SlotFreed is emitted when a host is kicked-out from the Slot because he missed too many proofs and got slashed too many times, right?

Correct.

Shouldn't there be then the same logic here like at onRequestEvent? Eq. matching availabilities?

Indeed, moving the availability maching inside the state machine would ensure that it's used consistently, and avoids the waitFor.

codex/sales.nim Show resolved Hide resolved
codex/sales/slotqueue.nim Outdated Show resolved Hide resolved
@emizzle emizzle force-pushed the feat/marketplace/request-queue branch from 923c1d6 to b979ba5 Compare June 28, 2023 01:27
@emizzle
Copy link
Contributor Author

emizzle commented Jun 28, 2023

In the current form, I see a potential issue, where availability can be free when passed into SlotQueue, but then the situation can change as another slot might take priority and eliminate the availability which was originally "assigned" to the original slot.

Hopefully, #455 (comment) cleared this up, showing that there is indeed a second availability check at the start of the processing of the slot.

IMHO it is very tightly coupled to the decision-making "if slot should be picked up or not" which I thought should be the sole domain of the SlotQueue, so I am wondering if this logic should not be moved to SlotQueue?

I definitely agree with this, however let's first agree on the design as to whether or not we should be filtering out requests/slots on push.

Is my understanding here correct? If so, then IMHO it is not really addressing the problem no?

Your understanding is correct. I believe it is addressing the problem of filtering and sorting slots that it is interested in, based on its availabilities and other factors like profitability. However, I do agree that it has the downside that it may receive requests that it cannot process at the time, but could potentially process in the future if the host adds additional availabilities. My line of thinking in this case was that if there are un-filled requests available in the network, then by the time the host adds additional availabilities, these requests would likely have been picked up by other hosts, assuming they have made their offer attractive.

Another solution could be to add Slots to the Queue even if there is no availabilities. As it has cap on max. items in the queue and the least interesting slots will be removed. And once there would be available availability (there would have to be some signal coming into the SlotQueue for that), then it would start processing, until the availability is used up.

I like this suggestion, however once a new availabilities signal comes in to the slot queue, assuming there are no matching availabilities to the slots in the queue, what happens to the slots in the queue? Do they remain for some period of time (like a ttl, similar to your previous suggestion)?

Another potential solution could be that once a host adds a new availability, it could query the contract for the last x requests that are not yet filled or expired, and then add any that match the new availability to the queue. Thoughts?

@AuHau
Copy link
Member

AuHau commented Jun 28, 2023

Hopefully, #455 (comment) cleared this up, showing that there is indeed a second availability check at the start of the processing of the slot.

Yup, makes sense 👍 Although the two checks somewhat does not seem like "clean" solution... But let see about what do we agree on.

Your understanding is correct. I believe it is addressing the problem of filtering and sorting slots that it is interested in, based on its availabilities and other factors like profitability.

Well, I don't think that in the current implementation, it actually addresses it. IMO it just picks the first slot(s) that are requested since the availability is made available (what a word twist 🤪🙈).

I like this suggestion, however once a new availabilities signal comes in to the slot queue, assuming there are no matching availabilities to the slots in the queue, what happens to the slots in the queue? Do they remain for some period of time (like a ttl, similar to your previous suggestion)?

From my point of view, there should be a subscription to SlotFilled event and once it is detected then the slot should be removed from the queue, unless it has been already evicted because of the Queue size limit.

Another potential solution could be that once a host adds a new availability, it could query the contract for the last x requests that are not yet filled or expired, and then add any that match the new availability to the queue. Thoughts?

Hmmm this would be actually a more simple solution and potentially a good one as well. In this case we would not even have to have the async slot queue 😅

@emizzle
Copy link
Contributor Author

emizzle commented Jun 29, 2023

Well, I don't think that in the current implementation, it actually addresses it. IMO it just picks the first slot(s) that are requested since the availability is made available (what a word twist 🤪🙈).

I believe there are two different problems: matching new requests AFTER availabilities were added to a host (the PR in its current form) and matching existing requests that were created BEFORE availabilities were added to a host (the point you've mentioned). Both of these are valid. I had opted to solve only the former (the AFTER problem) in this PR, however the more we discuss, the more it becomes clear that we likely need to solve the BEFORE problem as well.

From my point of view, there should be a subscription to SlotFilled event and once it is detected then the slot should be removed from the queue, unless it has been already evicted because of the Queue size limit.

This has been added already, specifically line 228 removes the slot from the queue once filled: https://github.com/codex-storage/nim-codex/blob/8afef3324dda1bb9cd754c741d6ff0f70e00a9ef/codex/sales.nim#L222-L241

Hmmm this would be actually a more simple solution and potentially a good one as well. In this case we would not even have to have the async slot queue 😅

Yes, I believe it would solve the BEFORE problem^^, but not the AFTER problem, so we'd still need the queue for the BEFORE problem. The reason being is that this querying of the contract would be triggered only once availabilities are added to the host. However, it would be much less efficient to query the contract for the last x non-filled requests every time a new StorageRequested event is emitted, and instead more efficient to use a queue to manage those requests as they come in.

Once new availability is added and available requests are queried, we could put the requests in the queue, which should sort and process the requests automatically. This could be the new work in this PR to handle the BEFORE problem.

If we do go with this approach, I believe we can keep the filtering of requests by availabilities on push, and move that functionality into the SlotQueue as suggested.

@emizzle emizzle force-pushed the feat/marketplace/request-queue branch from 13794ef to c9d4815 Compare June 29, 2023 06:49
@emizzle emizzle mentioned this pull request Jul 4, 2023
@emizzle emizzle force-pushed the feat/marketplace/request-queue branch 4 times, most recently from 5c77c6a to d5ace2b Compare July 14, 2023 06:03
codex/sales.nim Outdated Show resolved Hide resolved
@emizzle emizzle force-pushed the feat/marketplace/request-queue branch 4 times, most recently from b01f45d to 2e1f178 Compare July 19, 2023 05:51
When a storage request is handled by the sales module, a slot index was randomly assigned and then the slot was filled.

Now, the random slot index is checked that its state is `SlotState.Free` before continuing with the download process.

An additional sales agent state was added, preparing, that handles this step of assigning a random slot index. All state machine setup, such as retrieving the request and subscribing to events that were previously in the `SaleDownloading` state have been moved to `SalePreparing`. If all indices of the request have been filled, the state is changed to `SaleIgnored`.
* check availability before adding request to queue
* rename callback `onRequestAvailable` to `onProcessRequest`
* add in profitability check to queue invariant
* WIP: current integration tests are failing due to the abstraction of the queue design — the request at the top continually gets processed
The sales module needed to subscribe to request events to ensure that the request queue was managed correctly on each event. In the process of doing this, the sales agents were updated to avoid subscribing to events in each agent, and instead dispatch received events from the sales module to all created sales agents. This would prevent memory leaks on having too many eventemitters subscribed to.
Once agents have parked in an end state (eg ignored, cancelled, finished, failed, errored), they were not getting cleaned up and the sales module was keeping a handle on their reference. This could prvent a memory leak if the number of requests coming in is high.

Add an `onCleanUp` that stops the agents and removes them from the sales module agent list. `onCleanUp` is called from sales end states (eg ignored, cancelled, finished, failed, errored).
## Request Workers
Limit the concurrent requests being processed in the queue by using a limited pool of workers (default = 3).

## OnCleanUp statemachine callback
Once agents have parked in an end state (eg ignored, cancelled, finished, failed, errored), they were not getting cleaned up and the sales module was keeping a handle on their reference. This could prvent a memory leak if the number of requests coming in is high.

Add an `onCleanUp` that stops the agents and removes them from the sales module agent list. `onCleanUp` is called from sales end states (eg ignored, cancelled, finished, failed, errored).
- change RequestQueue to SlotQueue, where any time storage is requested, all slots from that request are immediately added to the queue. Finished, Canclled, Failed requests remove all slots with that request id from the queue. SlotFreed events add a new slot to the queue and SlotFilled events remove the slot from the queue. This allows popping of a slot each time one is processed, making things much simpler.
    - removed availableSlotIndices as this is covered by included slots in the queue instead of requests
    - when an entire request of slots is added to the queue, the slot indices are shuffled randomly to hopefully prevent nodes that pick up the same storage requested event from clashing on the first processed slot index. This allowed removal of assigning a random slot index in the SalePreparing state and it also ensured that all SalesAgents will have a slot index assigned to them at the start thus the removal of the optional slotIndex.
    - no peek is required as no slots need to remain in the queue. Thus, peek and peekNoWait were removed from AsyncHeapQueue and tests.

- remove slotId from SlotFreed event as it was not being used. RequestId and slotIndex were added to the SlotFreed event earlier and those are being used

- prevent removal of agents from sales module while stopping, otherwise the agents seq len is modified while iterating
@emizzle emizzle force-pushed the feat/marketplace/request-queue branch from ca1b86a to 4672f5a Compare July 20, 2023 04:56
The short-circuiting approach of `or` was not satisfactory for weighting property values. Instead, use a scoring mechanism to give certain properties more than weight than others.
Rename `TrackableFutures` > `TrackedFutures`
markspanbroek
markspanbroek previously approved these changes Jul 24, 2023
Copy link
Member

@markspanbroek markspanbroek left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice 👍

@emizzle emizzle merged commit 1d161d3 into master Jul 25, 2023
8 checks passed
@emizzle emizzle deleted the feat/marketplace/request-queue branch July 25, 2023 02:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Implement request queue design
4 participants