Skip to content

Commit

Permalink
feat(core): add RequestQueue.addRequestsBatched() that is non-block…
Browse files Browse the repository at this point in the history
…ing (#1996)

Previously, the `BasicCrawler.addRequests()` was the only place where
we're handling adding the requests in non-blocking manner. This logic is
now moved to the `RequestQueue.addRequestsBatched()` method.

This method now also correctly handles the `requestsFromUrl` option,
meaning it can be also used with the `BasicCrawler.addRequests()`
shortcut. Moreover, the batching is now configurable via `batchSize` and
`waitBetweenBatchesMillis` options.

Closes #1995
  • Loading branch information
B4nan committed Jul 19, 2023
1 parent 766fa9b commit c85485d
Show file tree
Hide file tree
Showing 2 changed files with 145 additions and 98 deletions.
111 changes: 14 additions & 97 deletions packages/basic-crawler/src/internals/basic-crawler.ts
Expand Up @@ -4,6 +4,8 @@ import { addTimeoutToPromise, tryCancel, TimeoutError } from '@apify/timeout';
import { cryptoRandomObjectId } from '@apify/utilities';
import type { SetRequired } from 'type-fest';
import type {
AddRequestsBatchedOptions,
AddRequestsBatchedResult,
AutoscaledPoolOptions,
CrawlingContext,
EnqueueLinksOptions,
Expand All @@ -14,17 +16,16 @@ import type {
Request,
RequestList,
RequestOptions,
RequestQueueOperationOptions,
RouterHandler,
RouterRoutes,
Session,
SessionPoolOptions,
Source,
} from '@crawlee/core';
import {
mergeCookies,
AutoscaledPool,
Configuration,
createRequests,
enqueueLinks,
EventType,
KeyValueStore,
Expand All @@ -41,8 +42,7 @@ import {
} from '@crawlee/core';
import type { Method, OptionsInit } from 'got-scraping';
import { gotScraping } from 'got-scraping';
import type { ProcessedRequest, Dictionary, Awaitable, BatchAddRequestsResult, SetStatusMessageOptions } from '@crawlee/types';
import { chunk, sleep } from '@crawlee/utils';
import type { Dictionary, Awaitable, BatchAddRequestsResult, SetStatusMessageOptions } from '@crawlee/types';
import ow, { ArgumentError } from 'ow';

export interface BasicCrawlingContext<
Expand Down Expand Up @@ -774,75 +774,17 @@ Please note that the 'retryOnBlocked' feature might not work as expected.`);
}

/**
* Adds requests to be processed by the crawler
* Adds requests to the queue in batches. By default, it will resolve after the initial batch is added, and continue
* adding the rest in background. You can configure the batch size via `batchSize` option and the sleep time in between
* the batches via `waitBetweenBatchesMillis`. If you want to wait for all batches to be added to the queue, you can use
* the `waitForAllRequestsToBeAdded` promise you get in the response object.
*
* @param requests The requests to add
* @param options Options for the request queue
*/
async addRequests(requests: (string | Request | RequestOptions)[], options: CrawlerAddRequestsOptions = {}): Promise<CrawlerAddRequestsResult> {
ow(requests, ow.array.ofType(ow.any(ow.string, ow.object.partialShape({
url: ow.string,
id: ow.undefined,
}))));
ow(options, ow.object.exactShape({
forefront: ow.optional.boolean,
waitForAllRequestsToBeAdded: ow.optional.boolean,
}));

async addRequests(requests: (string | Source)[], options: CrawlerAddRequestsOptions = {}): Promise<CrawlerAddRequestsResult> {
const requestQueue = await this.getRequestQueue();
const builtRequests = createRequests(requests);

const attemptToAddToQueueAndAddAnyUnprocessed = async (providedRequests: Request[]) => {
const resultsToReturn: ProcessedRequest[] = [];
const apiResult = await requestQueue.addRequests(providedRequests, { forefront: options.forefront });
resultsToReturn.push(...apiResult.processedRequests);

if (apiResult.unprocessedRequests.length) {
await sleep(1000);

resultsToReturn.push(...await attemptToAddToQueueAndAddAnyUnprocessed(
providedRequests.filter((r) => !apiResult.processedRequests.some((pr) => pr.uniqueKey === r.uniqueKey)),
));
}

return resultsToReturn;
};

const initialChunk = builtRequests.splice(0, 1000);

// Add initial batch of 1000 to process them right away
const addedRequests = await attemptToAddToQueueAndAddAnyUnprocessed(initialChunk);

// If we have no more requests to add, return early
if (!builtRequests.length) {
return {
addedRequests,
waitForAllRequestsToBeAdded: Promise.resolve([]),
};
}

// eslint-disable-next-line no-async-promise-executor
const promise = new Promise<ProcessedRequest[]>(async (resolve) => {
const chunks = chunk(builtRequests, 1000);
const finalAddedRequests: ProcessedRequest[] = [];

for (const requestChunk of chunks) {
finalAddedRequests.push(...await attemptToAddToQueueAndAddAnyUnprocessed(requestChunk));

await sleep(1000);
}

resolve(finalAddedRequests);
});

// If the user wants to wait for all the requests to be added, we wait for the promise to resolve for them
if (options.waitForAllRequestsToBeAdded) {
addedRequests.push(...await promise);
}

return {
addedRequests,
waitForAllRequestsToBeAdded: promise,
};
return requestQueue.addRequestsBatched(requests, options);
}

protected async _init(): Promise<void> {
Expand Down Expand Up @@ -1344,13 +1286,9 @@ export interface CreateContextOptions {
proxyInfo?: ProxyInfo;
}

export interface CrawlerAddRequestsOptions extends RequestQueueOperationOptions {
/**
* Whether to wait for all the provided requests to be added, instead of waiting just for the initial batch of up to 1000.
* @default false
*/
waitForAllRequestsToBeAdded?: boolean;
}
export interface CrawlerAddRequestsOptions extends AddRequestsBatchedOptions {}

export interface CrawlerAddRequestsResult extends AddRequestsBatchedResult {}

export interface CrawlerRunOptions extends CrawlerAddRequestsOptions {
/**
Expand All @@ -1361,27 +1299,6 @@ export interface CrawlerRunOptions extends CrawlerAddRequestsOptions {
purgeRequestQueue?: boolean;
}

export interface CrawlerAddRequestsResult {
addedRequests: ProcessedRequest[];
/**
* A promise which will resolve with the rest of the requests that were added to the queue.
*
* Alternatively, we can set {@apilink CrawlerAddRequestsOptions.waitForAllRequestsToBeAdded|`waitForAllRequestsToBeAdded`} to `true`
* in the {@apilink BasicCrawler.addRequests|`crawler.addRequests()`} options.
*
* **Example:**
*
* ```ts
* // Assuming `requests` is a list of requests.
* const result = await crawler.addRequests(requests);
*
* // If we want to wait for the rest of the requests to be added to the queue:
* await result.waitForAllRequestsToBeAdded;
* ```
*/
waitForAllRequestsToBeAdded: Promise<ProcessedRequest[]>;
}

interface HandlePropertyNameChangeData<New, Old> {
oldProperty?: Old;
newProperty?: New;
Expand Down
132 changes: 131 additions & 1 deletion packages/core/src/storages/request_queue.ts
Expand Up @@ -7,12 +7,13 @@ import ow from 'ow';
import type {
BatchAddRequestsResult,
Dictionary,
ProcessedRequest,
QueueOperationInfo,
RequestQueueClient,
RequestQueueInfo,
StorageClient,
} from '@crawlee/types';
import { downloadListOfUrls } from '@crawlee/utils';
import { chunk, downloadListOfUrls } from '@crawlee/utils';
import type { StorageManagerOptions } from './storage_manager';
import { StorageManager } from './storage_manager';
import { log } from '../log';
Expand Down Expand Up @@ -398,6 +399,96 @@ export class RequestQueue {
return results;
}

/**
* Adds requests to the queue in batches. By default, it will resolve after the initial batch is added, and continue
* adding the rest in background. You can configure the batch size via `batchSize` option and the sleep time in between
* the batches via `waitBetweenBatchesMillis`. If you want to wait for all batches to be added to the queue, you can use
* the `waitForAllRequestsToBeAdded` promise you get in the response object.
*
* @param requests The requests to add
* @param options Options for the request queue
*/
async addRequestsBatched(requests: (string | Source)[], options: AddRequestsBatchedOptions = {}): Promise<AddRequestsBatchedResult> {
ow(requests, ow.array.ofType(ow.any(
ow.string,
ow.object.partialShape({ url: ow.string, id: ow.undefined }),
ow.object.partialShape({ requestsFromUrl: ow.string, regex: ow.regExp }),
)));
ow(options, ow.object.exactShape({
forefront: ow.optional.boolean,
waitForAllRequestsToBeAdded: ow.optional.boolean,
batchSize: ow.optional.number,
waitBetweenBatchesMillis: ow.optional.number,
}));

const {
batchSize = 1000,
waitBetweenBatchesMillis = 1000,
} = options;
const builtRequests: Request[] = [];

for (const opts of requests) {
if (opts && typeof opts === 'object' && 'requestsFromUrl' in opts) {
await this.addRequest(opts, { forefront: options.forefront });
} else {
builtRequests.push(new Request(typeof opts === 'string' ? { url: opts } : opts as RequestOptions));
}
}

const attemptToAddToQueueAndAddAnyUnprocessed = async (providedRequests: Request[]) => {
const resultsToReturn: ProcessedRequest[] = [];
const apiResult = await this.addRequests(providedRequests, { forefront: options.forefront });
resultsToReturn.push(...apiResult.processedRequests);

if (apiResult.unprocessedRequests.length) {
await sleep(waitBetweenBatchesMillis);

resultsToReturn.push(...await attemptToAddToQueueAndAddAnyUnprocessed(
providedRequests.filter((r) => !apiResult.processedRequests.some((pr) => pr.uniqueKey === r.uniqueKey)),
));
}

return resultsToReturn;
};

const initialChunk = builtRequests.splice(0, batchSize);

// Add initial batch of `batchSize` to process them right away
const addedRequests = await attemptToAddToQueueAndAddAnyUnprocessed(initialChunk);

// If we have no more requests to add, return early
if (!builtRequests.length) {
return {
addedRequests,
waitForAllRequestsToBeAdded: Promise.resolve([]),
};
}

// eslint-disable-next-line no-async-promise-executor
const promise = new Promise<ProcessedRequest[]>(async (resolve) => {
const chunks = chunk(builtRequests, batchSize);
const finalAddedRequests: ProcessedRequest[] = [];

for (const requestChunk of chunks) {
finalAddedRequests.push(...await attemptToAddToQueueAndAddAnyUnprocessed(requestChunk));

await sleep(waitBetweenBatchesMillis);
}

resolve(finalAddedRequests);
});

// If the user wants to wait for all the requests to be added, we wait for the promise to resolve for them
if (options.waitForAllRequestsToBeAdded) {
addedRequests.push(...await promise);
}

return {
addedRequests,
waitForAllRequestsToBeAdded: promise,
};
}

/**
* Gets the request from the queue specified by ID.
*
Expand Down Expand Up @@ -883,3 +974,42 @@ export interface RequestQueueOptions {
*/
proxyConfiguration?: ProxyConfiguration;
}

export interface AddRequestsBatchedOptions extends RequestQueueOperationOptions {
/**
* Whether to wait for all the provided requests to be added, instead of waiting just for the initial batch of up to `batchSize`.
* @default false
*/
waitForAllRequestsToBeAdded?: boolean;

/**
* @default 1000
*/
batchSize?: number;

/**
* @default 1000
*/
waitBetweenBatchesMillis?: number;
}

export interface AddRequestsBatchedResult {
addedRequests: ProcessedRequest[];
/**
* A promise which will resolve with the rest of the requests that were added to the queue.
*
* Alternatively, we can set {@apilink AddRequestsBatchedOptions.waitForAllRequestsToBeAdded|`waitForAllRequestsToBeAdded`} to `true`
* in the {@apilink BasicCrawler.addRequests|`crawler.addRequests()`} options.
*
* **Example:**
*
* ```ts
* // Assuming `requests` is a list of requests.
* const result = await crawler.addRequests(requests);
*
* // If we want to wait for the rest of the requests to be added to the queue:
* await result.waitForAllRequestsToBeAdded;
* ```
*/
waitForAllRequestsToBeAdded: Promise<ProcessedRequest[]>;
}

0 comments on commit c85485d

Please sign in to comment.