fix: Simplified RequestQueueV2 implementation#2775
Conversation
drobnikj
left a comment
There was a problem hiding this comment.
Nice 💪
I would do some testing myself, but the first what about some unit tests, did you consider,add some? There are none -> https://github.com/apify/crawlee/blob/03951bdba8fb34f6bed00d1b68240ff7cd0bacbf/test/core/storages/request_queue.test.ts
Honesly, we are dealing with various bugs during time and we do not have any tests for these features still.
|
The build did not finish, can you check @janbuchar ? |
I can, but only later this week - I have different stuff to finish first. |
|
@drobnikj the unit tests are now passing so you should be able to build. I'm still working on some e2e tests, if you have any ideas for scenarios to test (e2e, unit, doesn't matter), I'd love to hear those. |
There was a problem hiding this comment.
Looks good, I did not find any issue, even during testing.
I have a few more comments, can you check pls? @janbuchar
| @@ -361,7 +430,8 @@ export class RequestQueue extends RequestProvider { | |||
|
|
|||
There was a problem hiding this comment.
I cannot comment it below but during code review, I see that we are removing locks one by one in _clearPossibleLock.
see
There is 200 rps rate limit. I would remove lock in some batches maybe 10 to speed it up.
There was a problem hiding this comment.
What do you mean? I don't think there is a batch unlock endpoint. Launching those requests in parallel surely won't help against rate limiting, too.
There was a problem hiding this comment.
I mean to unlock in some batches like
protected async _clearPossibleLocks() {
this.queuePausedForMigration = true;
let requestId: string | null;
const batchSize = 10;
const deleteRequests: Promise<void>[] = [];
// eslint-disable-next-line no-cond-assign
while ((requestId = this.queueHeadIds.removeFirst()) !== null) {
deleteRequests.push(
this.client.deleteRequestLock(requestId).catch(() => {
// We don't have the lock, or the request was never locked. Either way it's fine
})
);
if (deleteRequests.length >= batchSize) {
// Process the batch of 10
await Promise.all(deleteRequests);
deleteRequests.length = 0; // Reset the array for the next batch
}
}
// Process any remaining requests that didn't form a full batch
if (deleteRequests.length > 0) {
await Promise.all(deleteRequests);
}
}
There was a problem hiding this comment.
I see. However, I still doubt that there will be any measurable benefit - this code is only executed on migration and there shouldn't be more than ~25 requests in the queue head.
|
@barjin I gave the forefront handling a makeover. If you could check that out, I'd be super grateful. |
|
Looking good to me 👍🏽 I remember reversing the forefront array somewhere already (likely |
drobnikj
left a comment
There was a problem hiding this comment.
Looks like almost all my notes were addressed and I commented the rest.
| @@ -361,7 +430,8 @@ export class RequestQueue extends RequestProvider { | |||
|
|
|||
There was a problem hiding this comment.
I mean to unlock in some batches like
protected async _clearPossibleLocks() {
this.queuePausedForMigration = true;
let requestId: string | null;
const batchSize = 10;
const deleteRequests: Promise<void>[] = [];
// eslint-disable-next-line no-cond-assign
while ((requestId = this.queueHeadIds.removeFirst()) !== null) {
deleteRequests.push(
this.client.deleteRequestLock(requestId).catch(() => {
// We don't have the lock, or the request was never locked. Either way it's fine
})
);
if (deleteRequests.length >= batchSize) {
// Process the batch of 10
await Promise.all(deleteRequests);
deleteRequests.length = 0; // Reset the array for the next batch
}
}
// Process any remaining requests that didn't form a full batch
if (deleteRequests.length > 0) {
await Promise.all(deleteRequests);
}
}
Co-authored-by: Vlad Frangu <me@vladfrangu.dev>
vladfrangu
left a comment
There was a problem hiding this comment.
lgtm once the format is fixed (woops, sorryy ;w;)
B4nan
left a comment
There was a problem hiding this comment.
few comments from my end, nothing really blocking so approving
| // for request queue v2, we want to lock requests by the timeout that would also account for internals (plus 5 seconds padding), but | ||
| // with a minimum of a minute | ||
| this.requestQueue.requestLockSecs = Math.max(this.internalTimeoutMillis / 1000 + 5, 60); | ||
| this.requestQueue.requestLockSecs = Math.max(this.requestHandlerTimeoutMillis / 1000 + 5, 60); |
There was a problem hiding this comment.
the comment still mentions the internal timeout
There was a problem hiding this comment.
I'm honestly not sure what it was trying to say so I reworded it.
| this.inProgressRequestBatches.push(promise); | ||
| void promise.finally(() => { | ||
| this.inProgressRequestBatches = this.inProgressRequestBatches.filter((it) => it !== promise); | ||
| }); |
There was a problem hiding this comment.
how many items do you we expect in that array in a high concurrency run? this solution is not the best one, but if the size wont be large, we can keep it.
how is this different than a simple integer counter? that would be the most performant approach, just increment instead of push and decrement in the finally block
There was a problem hiding this comment.
The integer counter was in fact the previous implementation. However, it could not work with multiple clients, and we cannot reliably detect that - the queueHadMultipleClients flag is set even if the other client was a pre-migration instance of the same run, if that makes sense.
You are right that each forefront request might make us lock 25 more requests, and that could unbalance parallel instances quite a bit. Maybe we should give up "excess" requests after we're done checking for forefront requests.
There was a problem hiding this comment.
Hmm, not sure I follow why the counter wouldn't be enough, how is this better? Each client will have its own local cache (this new var). You store values in an array and wipe them based on identity, but the promises are not really used anywhere. My suggestion is doing the same, just without the memory/perf overhead.
Just to be sure, this is what I meant, it still uses the promise.finally:
this.inProgressRequestBatches++;
void promise.finally(() => {
this.inProgressRequestBatches--;
});There was a problem hiding this comment.
Oh damn, I'm sorry. I thought you are commenting on a different part of code - the one that handles forefront requests. If it's any help, you pushed me to tie up a loose end that I forgot about.
Regarding the batches, you're probably right 😁
| if (this.queueHeadIds.length() > 0) { | ||
| return false; | ||
| } |
There was a problem hiding this comment.
i guess the duplicity (same check 5 lines later) here is for performance reasons?
There was a problem hiding this comment.
Yup. If the queueHeadIds is non-empty, we return immediately, otherwise we try to fetch something from the upstream queue, which may take time. I'll add a comment.
This PR ports over the changes from apify/crawlee#2775. Key changes: - tracking of "locked" or "in progress" requests was moved from `storages.RequestQueue` to request storage client implementations - queue head cache gets invalidated after we enqueue a new forefront request (before that, it would only be processed after the current head cache is consumed) - the `RequestQueue.is_finished` function has been rewritten to avoid race conditions - I tried running SDK integration tests with these changes and they passed
queueHasLockedRequeststo simplify RequestQueue v2 #2767