Skip to content

Commit

Permalink
perf: optimize adding large amount of requests via `crawler.addReques…
Browse files Browse the repository at this point in the history
…ts()` (#2456)

This PR resolves three main issues with adding large amount of requests
into the queue:
- Every requests added to the queue was automatically added to the LRU
requests cache, which has a size of 1 million items. this makes sense
for enqueuing a few items, but if we try to add more than the limit, we
end up with overloading the LRU cache for no reason. Now we only add the
first 1000 requests to the cache (plus any requests added via separate
calls, e.g. when doing `enqueueLinks` from inside a request handler,
again with a limit of the first 1000 links).
- We used to validate the whole requests array via `ow`, and since the
shape can vary, it was very slow (e.g. 20s just for the `ow`
validation). Now we use a tailored validation for the array that does
the same but resolves within 100ms or so.
- We always created the `Request` objects out of everything, which had a
significant impact on memory usage. Now we skip this completely and let
the objects be created later when needed (when calling
`RQ.addRequests()` which only receives the actual batch and not the
whole array)

Related: https://apify.slack.com/archives/C0L33UM7Z/p1715109984834079
  • Loading branch information
B4nan committed May 13, 2024
1 parent 3a847f6 commit 6da86a8
Showing 1 changed file with 51 additions and 15 deletions.
66 changes: 51 additions & 15 deletions packages/core/src/storages/request_provider.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { inspect } from 'node:util';

import { ListDictionary, LruCache } from '@apify/datastructures';
import type { Log } from '@apify/log';
import { cryptoRandomObjectId } from '@apify/utilities';
Expand Down Expand Up @@ -180,9 +182,10 @@ export abstract class RequestProvider implements IStorage {
ow(requestsLike, ow.array);
ow(options, ow.object.exactShape({
forefront: ow.optional.boolean,
cache: ow.optional.boolean,
}));

const { forefront = false } = options;
const { forefront = false, cache = true } = options;

const uniqueKeyToCacheKey = new Map<string, string>();
const getCachedRequestId = (uniqueKey: string) => {
Expand Down Expand Up @@ -253,7 +256,10 @@ export abstract class RequestProvider implements IStorage {
const cacheKey = getCachedRequestId(newRequest.uniqueKey);

const { requestId, wasAlreadyPresent } = newRequest;
this._cacheRequest(cacheKey, newRequest);

if (cache) {
this._cacheRequest(cacheKey, newRequest);
}

if (!wasAlreadyPresent && !this.inProgress.has(requestId) && !this.recentlyHandledRequestsCache.get(requestId)) {
this.assumedTotalCount++;
Expand All @@ -278,55 +284,79 @@ export abstract class RequestProvider implements IStorage {
async addRequestsBatched(requests: (string | Source)[], options: AddRequestsBatchedOptions = {}): Promise<AddRequestsBatchedResult> {
checkStorageAccess();

ow(requests, ow.array.ofType(ow.any(
ow.string,
ow.object.partialShape({ url: ow.string, id: ow.undefined }),
ow.object.partialShape({ requestsFromUrl: ow.string, regex: ow.optional.regExp }),
)));
ow(options, ow.object.exactShape({
forefront: ow.optional.boolean,
waitForAllRequestsToBeAdded: ow.optional.boolean,
batchSize: ow.optional.number,
waitBetweenBatchesMillis: ow.optional.number,
}));

// The `requests` array can be huge, and `ow` is very slow for anything more complex.
// This explicit iteration takes a few milliseconds, while the ow check can take tens of seconds.

// ow(requests, ow.array.ofType(ow.any(
// ow.string,
// ow.object.partialShape({ url: ow.string, id: ow.undefined }),
// ow.object.partialShape({ requestsFromUrl: ow.string, regex: ow.optional.regExp }),
// )));

for (const request of requests) {
if (typeof request === 'string') {
continue;
}

if (typeof request === 'object' && request !== null) {
if (typeof request.url === 'string' && typeof request.id === 'undefined') {
continue;
}

if (typeof (request as any).requestsFromUrl === 'string') {
continue;
}
}

// eslint-disable-next-line max-len
throw new Error(`Request options are not valid, provide either a URL or an object with 'url' property (but without 'id' property), or an object with 'requestsFromUrl' property. Input: ${inspect(request)}`);
}

const {
batchSize = 1000,
waitBetweenBatchesMillis = 1000,
} = options;
const builtRequests: Request[] = [];
const sources: Source[] = [];

for (const opts of requests) {
if (opts && typeof opts === 'object' && 'requestsFromUrl' in opts) {
await this.addRequest(opts, { forefront: options.forefront });
} else {
builtRequests.push(new Request(typeof opts === 'string' ? { url: opts } : opts as RequestOptions));
sources.push(typeof opts === 'string' ? { url: opts } : opts as RequestOptions);
}
}

const attemptToAddToQueueAndAddAnyUnprocessed = async (providedRequests: Request[]) => {
const attemptToAddToQueueAndAddAnyUnprocessed = async (providedRequests: Source[], cache = true) => {
const resultsToReturn: ProcessedRequest[] = [];
const apiResult = await this.addRequests(providedRequests, { forefront: options.forefront });
const apiResult = await this.addRequests(providedRequests, { forefront: options.forefront, cache });
resultsToReturn.push(...apiResult.processedRequests);

if (apiResult.unprocessedRequests.length) {
await sleep(waitBetweenBatchesMillis);

resultsToReturn.push(...await attemptToAddToQueueAndAddAnyUnprocessed(
providedRequests.filter((r) => !apiResult.processedRequests.some((pr) => pr.uniqueKey === r.uniqueKey)),
false,
));
}

return resultsToReturn;
};

const initialChunk = builtRequests.splice(0, batchSize);
const initialChunk = sources.splice(0, batchSize);

// Add initial batch of `batchSize` to process them right away
const addedRequests = await attemptToAddToQueueAndAddAnyUnprocessed(initialChunk);

// If we have no more requests to add, return early
if (!builtRequests.length) {
if (!sources.length) {
return {
addedRequests,
waitForAllRequestsToBeAdded: Promise.resolve([]),
Expand All @@ -335,11 +365,11 @@ export abstract class RequestProvider implements IStorage {

// eslint-disable-next-line no-async-promise-executor
const promise = new Promise<ProcessedRequest[]>(async (resolve) => {
const chunks = chunk(builtRequests, batchSize);
const chunks = chunk(sources, batchSize);
const finalAddedRequests: ProcessedRequest[] = [];

for (const requestChunk of chunks) {
finalAddedRequests.push(...await attemptToAddToQueueAndAddAnyUnprocessed(requestChunk));
finalAddedRequests.push(...await attemptToAddToQueueAndAddAnyUnprocessed(requestChunk, false));

await sleep(waitBetweenBatchesMillis);
}
Expand Down Expand Up @@ -707,6 +737,12 @@ export interface RequestQueueOperationOptions {
* @default false
*/
forefront?: boolean;
/**
* Should the requests be added to the local LRU cache?
* @default false
* @internal
*/
cache?: boolean;
}

/**
Expand Down

0 comments on commit 6da86a8

Please sign in to comment.