Skip to content

Commit

Permalink
feat: make RequestQueue v2 the default queue, see more on [Apify bl…
Browse files Browse the repository at this point in the history
…og](https://blog.apify.com/new-apify-request-queue/) (#2390)

Closes #2388

---------

Co-authored-by: drobnikj <drobnik.j@gmail.com>
Co-authored-by: Martin Adámek <banan23@gmail.com>
  • Loading branch information
3 people committed May 14, 2024
1 parent 2d5d443 commit 41ae8ab
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 39 deletions.
15 changes: 15 additions & 0 deletions docs/experiments/request_locking.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,21 @@ description: Parallelize crawlers with ease using request locking

import ApiLink from '@site/src/components/ApiLink';

:::tip Release announcement

As of **May 2024** (`crawlee` version `3.10.0`), this experiment is now enabled by default! With that said, if you encounter issues you can:

- set `requestLocking` to `false` in the `experiments` object of your crawler options
- update all imports of `RequestQueue` to `RequestQueueV1`
- open an issue on our [GitHub repository](https://github.com/apify/crawlee)

The content below is kept for documentation purposes.
If you're interested in the changes, you can read the [blog post about the new Request Queue storage system on the Apify blog](https://blog.apify.com/new-apify-request-queue/).

:::

---

:::caution

This is an experimental feature. While we welcome testers, keep in mind that it is currently not recommended to use this in production.
Expand Down
26 changes: 9 additions & 17 deletions packages/basic-crawler/src/internals/basic-crawler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ import {
mergeCookies,
NonRetryableError,
purgeDefaultStorages,
RequestQueueV1,
RequestQueue,
RequestQueueV2,
RequestState,
RetryRequestError,
Router,
Expand Down Expand Up @@ -356,8 +356,10 @@ export interface BasicCrawlerOptions<Context extends CrawlingContext = BasicCraw
*/
export interface CrawlerExperiments {
/**
* Enables the use of the new RequestQueue API, which allows multiple clients to use the same queue,
* by locking the requests they are processing for a period of time.
* @deprecated This experiment is now enabled by default, and this flag will be removed in a future release.
* If you encounter issues due to this change, please:
* - report it to us: https://github.com/apify/crawlee
* - set `requestLocking` to `false` in the `experiments` option of the crawler
*/
requestLocking?: boolean;
}
Expand Down Expand Up @@ -592,14 +594,6 @@ export class BasicCrawler<Context extends CrawlingContext = BasicCrawlingContext
this.domainAccessedTime = new Map();
this.experiments = experiments;

if (requestQueue && requestQueue instanceof RequestQueueV2 && !experiments.requestLocking) {
throw new Error([
'You provided the new RequestQueue v2 class into your crawler without enabling the experiment!',
"If you're sure you want to test out the new experimental RequestQueue v2, please provide `experiments: { requestLocking: true }` "
+ 'in your crawler options, and try again.',
].join('\n'));
}

this._handlePropertyNameChange({
newName: 'requestHandler',
oldName: 'handleRequestFunction',
Expand Down Expand Up @@ -1567,16 +1561,14 @@ export class BasicCrawler<Context extends CrawlingContext = BasicCrawlingContext
}

private async _getRequestQueue() {
if (this.experiments.requestLocking) {
// Check if it's explicitly disabled
if (this.experiments.requestLocking === false) {
if (!this._experimentWarnings.requestLocking) {
this.log.warning([
'The RequestQueue v2 is an experimental feature, and may have issues when used in a production environment.',
'Please report any issues you encounter on GitHub: https://github.com/apify/crawlee',
].join('\n'));
this.log.info('Using the old RequestQueue implementation without request locking.');
this._experimentWarnings.requestLocking = true;
}

return RequestQueueV2.open(null, { config: this.config });
return RequestQueueV1.open(null, { config: this.config });
}

return RequestQueue.open(null, { config: this.config });
Expand Down
5 changes: 3 additions & 2 deletions packages/core/src/storages/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ export * from './dataset';
export * from './key_value_store';
export * from './request_list';
export * from './request_provider';
export * from './request_queue';
export * from './request_queue_v2';
export { RequestQueueV1 } from './request_queue';
export { RequestQueue } from './request_queue_v2';
export { RequestQueue as RequestQueueV2 } from './request_queue_v2';
export * from './storage_manager';
export * from './utils';
export * from './access_checking';
17 changes: 17 additions & 0 deletions packages/core/src/storages/request_provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,23 @@ export abstract class RequestProvider implements IStorage {
return new Request(requestOptions as unknown as RequestOptions);
}

/**
* Returns a next request in the queue to be processed, or `null` if there are no more pending requests.
*
* Once you successfully finish processing of the request, you need to call
* {@apilink RequestQueue.markRequestHandled}
* to mark the request as handled in the queue. If there was some error in processing the request,
* call {@apilink RequestQueue.reclaimRequest} instead,
* so that the queue will give the request to some other consumer in another call to the `fetchNextRequest` function.
*
* Note that the `null` return value doesn't mean the queue processing finished,
* it means there are currently no pending requests.
* To check whether all requests in queue were finished,
* use {@apilink RequestQueue.isFinished} instead.
*
* @returns
* Returns the request object or `null` if there are no more pending requests.
*/
abstract fetchNextRequest<T extends Dictionary = Dictionary>(options?: RequestOptions): Promise<Request<T> | null>;

/**
Expand Down
28 changes: 27 additions & 1 deletion packages/core/src/storages/request_queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,10 @@ const RECENTLY_HANDLED_CACHE_SIZE = 1000;
* await queue.addRequest({ url: 'http://example.com/foo/bar' }, { forefront: true });
* ```
* @category Sources
*
* @deprecated RequestQueue v1 is deprecated and will be removed in the future. Please use {@apilink RequestQueue} instead.
*/
export class RequestQueue extends RequestProvider {
class RequestQueue extends RequestProvider {
private queryQueueHeadPromise?: Promise<{
wasLimitReached: boolean;
prevLimit: number;
Expand Down Expand Up @@ -327,6 +329,12 @@ export class RequestQueue extends RequestProvider {
return super.markRequestHandled(...args);
}

/**
* Reclaims a failed request back to the queue, so that it can be returned for processing later again
* by another call to {@apilink RequestQueue.fetchNextRequest}.
* The request record in the queue is updated using the provided `request` parameter.
* For example, this lets you store the number of retries or error messages for the request.
*/
override async reclaimRequest(...args: Parameters<RequestProvider['reclaimRequest']>) {
checkStorageAccess();

Expand Down Expand Up @@ -359,7 +367,25 @@ export class RequestQueue extends RequestProvider {
this.lastActivity = new Date();
}

/**
* Opens a request queue and returns a promise resolving to an instance
* of the {@apilink RequestQueue} class.
*
* {@apilink RequestQueue} represents a queue of URLs to crawl, which is stored either on local filesystem or in the cloud.
* The queue is used for deep crawling of websites, where you start with several URLs and then
* recursively follow links to other pages. The data structure supports both breadth-first
* and depth-first crawling orders.
*
* For more details and code examples, see the {@apilink RequestQueue} class.
*
* @param [queueIdOrName]
* ID or name of the request queue to be opened. If `null` or `undefined`,
* the function returns the default request queue associated with the crawler run.
* @param [options] Open Request Queue options.
*/
static override async open(...args: Parameters<typeof RequestProvider.open>): Promise<RequestQueue> {
return super.open(...args) as Promise<RequestQueue>;
}
}

export { RequestQueue as RequestQueueV1 };
60 changes: 42 additions & 18 deletions packages/core/src/storages/request_queue_v2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,41 @@ const MAX_CACHED_REQUESTS = 2_000_000;
*/
const RECENTLY_HANDLED_CACHE_SIZE = 1000;

class RequestQueue extends RequestProvider {
/**
* Represents a queue of URLs to crawl, which is used for deep crawling of websites
* where you start with several URLs and then recursively
* follow links to other pages. The data structure supports both breadth-first and depth-first crawling orders.
*
* Each URL is represented using an instance of the {@apilink Request} class.
* The queue can only contain unique URLs. More precisely, it can only contain {@apilink Request} instances
* with distinct `uniqueKey` properties. By default, `uniqueKey` is generated from the URL, but it can also be overridden.
* To add a single URL multiple times to the queue,
* corresponding {@apilink Request} objects will need to have different `uniqueKey` properties.
*
* Do not instantiate this class directly, use the {@apilink RequestQueue.open} function instead.
*
* `RequestQueue` is used by {@apilink BasicCrawler}, {@apilink CheerioCrawler}, {@apilink PuppeteerCrawler}
* and {@apilink PlaywrightCrawler} as a source of URLs to crawl.
* Unlike {@apilink RequestList}, `RequestQueue` supports dynamic adding and removing of requests.
* On the other hand, the queue is not optimized for operations that add or remove a large number of URLs in a batch.
*
* **Example usage:**
*
* ```javascript
* // Open the default request queue associated with the crawler run
* const queue = await RequestQueue.open();
*
* // Open a named request queue
* const queueWithName = await RequestQueue.open('some-name');
*
* // Enqueue few requests
* await queue.addRequest({ url: 'http://example.com/aaa' });
* await queue.addRequest({ url: 'http://example.com/bbb' });
* await queue.addRequest({ url: 'http://example.com/foo/bar' }, { forefront: true });
* ```
* @category Sources
*/
export class RequestQueue extends RequestProvider {
private _listHeadAndLockPromise: Promise<void> | null = null;

constructor(options: RequestProviderOptions, config = Configuration.getGlobalConfig()) {
Expand Down Expand Up @@ -63,21 +97,7 @@ class RequestQueue extends RequestProvider {
}

/**
* Returns a next request in the queue to be processed, or `null` if there are no more pending requests.
*
* Once you successfully finish processing of the request, you need to call
* {@apilink RequestQueue.markRequestHandled}
* to mark the request as handled in the queue. If there was some error in processing the request,
* call {@apilink RequestQueue.reclaimRequest} instead,
* so that the queue will give the request to some other consumer in another call to the `fetchNextRequest` function.
*
* Note that the `null` return value doesn't mean the queue processing finished,
* it means there are currently no pending requests.
* To check whether all requests in queue were finished,
* use {@apilink RequestQueue.isFinished} instead.
*
* @returns
* Returns the request object or `null` if there are no more pending requests.
* @inheritDoc
*/
override async fetchNextRequest<T extends Dictionary = Dictionary>(): Promise<Request<T> | null> {
checkStorageAccess();
Expand Down Expand Up @@ -143,6 +163,9 @@ class RequestQueue extends RequestProvider {
return request;
}

/**
* @inheritDoc
*/
override async reclaimRequest(...args: Parameters<RequestProvider['reclaimRequest']>): ReturnType<RequestProvider['reclaimRequest']> {
checkStorageAccess();

Expand Down Expand Up @@ -350,9 +373,10 @@ class RequestQueue extends RequestProvider {
}
}

/**
* @inheritDoc
*/
static override async open(...args: Parameters<typeof RequestProvider.open>): Promise<RequestQueue> {
return super.open(...args) as Promise<RequestQueue>;
}
}

export { RequestQueue as RequestQueueV2 };
2 changes: 1 addition & 1 deletion test/core/storages/request_queue.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import {
QUERY_HEAD_MIN_LENGTH,
API_PROCESSED_REQUESTS_DELAY_MILLIS,
STORAGE_CONSISTENCY_DELAY_MILLIS,
RequestQueue,
RequestQueueV1 as RequestQueue,
Request,
Configuration,
ProxyConfiguration,
Expand Down

0 comments on commit 41ae8ab

Please sign in to comment.