Skip to content

Commit

Permalink
chore: fix bugs, implement support for abort/migrating dropping locks
Browse files Browse the repository at this point in the history
  • Loading branch information
vladfrangu committed Aug 25, 2023
1 parent 419fcc4 commit fc0de1d
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 51 deletions.
7 changes: 6 additions & 1 deletion packages/core/src/request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ const requestOptionalPredicates = {
state: ow.optional.number.greaterThanOrEqual(0).lessThanOrEqual(6),
};

const ignoredProperties = [
'lockByClient',
'lockExpiresAt',
];

export enum RequestState {
UNPROCESSED,
BEFORE_NAV,
Expand Down Expand Up @@ -146,7 +151,7 @@ export class Request<UserData extends Dictionary = Dictionary> {
if (predicate) {
ow(value, `RequestOptions.${prop}`, predicate as BasePredicate);
// 'url' is checked above because it's not optional, and lockExpiresAt is ignored
} else if (prop !== 'url' && prop !== 'lockExpiresAt') {
} else if (prop !== 'url' && !ignoredProperties.includes(prop)) {
const msg = `Did not expect property \`${prop}\` to exist, got \`${value}\` in object \`RequestOptions\``;
throw new ArgumentError(msg, this.constructor);
}
Expand Down
28 changes: 27 additions & 1 deletion packages/core/src/storages/request_provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import type { IStorage, StorageManagerOptions } from './storage_manager';
import { StorageManager } from './storage_manager';
import { QUERY_HEAD_MIN_LENGTH, STORAGE_CONSISTENCY_DELAY_MILLIS, getRequestId, purgeDefaultStorages } from './utils';
import { Configuration } from '../configuration';
import { EventType } from '../events';
import { log } from '../log';
import type { ProxyConfiguration } from '../proxy_configuration';
import { Request } from '../request';
Expand Down Expand Up @@ -46,6 +47,7 @@ export abstract class RequestProvider implements IStorage {

// TODO: RQv1 logic for stuck queues, this might not be needed anymore
protected lastActivity = new Date();
protected queuePausedForMigration = false;

constructor(options: InternalRequestProviderOptions, readonly config = Configuration.getGlobalConfig()) {
this.id = options.id;
Expand All @@ -60,6 +62,16 @@ export abstract class RequestProvider implements IStorage {
this.requestCache = new LruCache({ maxLength: options.requestCacheMaxSize });
this.recentlyHandledRequestsCache = new LruCache({ maxLength: options.recentlyHandledRequestsMaxSize });
this.log = log.child({ prefix: options.logPrefix });

const eventManager = config.getEventManager();

eventManager.on(EventType.MIGRATING, async () => {
await this._clearPossibleLocks();
});

eventManager.on(EventType.ABORTING, async () => {
await this._clearPossibleLocks();
});
}

/**
Expand Down Expand Up @@ -490,7 +502,7 @@ export abstract class RequestProvider implements IStorage {
/**
* Adds a request straight to the queueHeadDict, to improve performance.
*/
private _maybeAddRequestToQueueHead(requestId: string, forefront: boolean): void {
protected _maybeAddRequestToQueueHead(requestId: string, forefront: boolean): void {
if (forefront) {
this.queueHeadIds.add(requestId, requestId, true);
} else if (this.assumedTotalCount < QUERY_HEAD_MIN_LENGTH) {
Expand Down Expand Up @@ -600,6 +612,20 @@ export abstract class RequestProvider implements IStorage {
return downloadListOfUrls(options);
}

protected async _clearPossibleLocks() {
this.queuePausedForMigration = true;
let requestId: string | null;

// eslint-disable-next-line no-cond-assign
while ((requestId = this.queueHeadIds.removeFirst()) !== null) {
try {
await this.client.deleteRequestLock(requestId);
} catch {
// We don't have the lock, or the request was never locked. Either way it's fine
}
}
}

/**
* Opens a request queue and returns a promise resolving to an instance
* of the {@apilink RequestQueue} class.
Expand Down
9 changes: 8 additions & 1 deletion packages/core/src/storages/request_queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,15 @@ export class RequestQueue extends RequestProvider {
limit = Math.max(this.inProgressCount() * QUERY_HEAD_BUFFER, QUERY_HEAD_MIN_LENGTH),
iteration = 0,
): Promise<boolean> {
// If we are paused for migration, resolve immediately.
if (this.queuePausedForMigration) {
return true;
}

// If is nonempty resolve immediately.
if (this.queueHeadIds.length() > 0) return true;
if (this.queueHeadIds.length() > 0) {
return true;
}

if (!this.queryQueueHeadPromise) {
const queryStartedAt = new Date();
Expand Down
64 changes: 19 additions & 45 deletions packages/core/src/storages/request_queue_v2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ import type { Request } from '../request';
const MAX_CACHED_REQUESTS = 2_000_000;

/**
* When prolonging a lock, we do it for a minute from Date.now()
* When prolonging a lock, we do it for 3 minutes from Date.now()
*/
const PROLONG_LOCK_BY_SECS = 60;
const PROLONG_LOCK_BY_SECS = 3 * 60;

/**
* This number must be large enough so that processing of all these requests cannot be done in
Expand All @@ -26,7 +26,6 @@ const RECENTLY_HANDLED_CACHE_SIZE = 1000;

class RequestQueue extends RequestProvider {
private _listHeadAndLockPromise: Promise<void> | null = null;
private _hydratingRequestPromise: Promise<any> | null = null;

constructor(options: RequestProviderOptions, config = Configuration.getGlobalConfig()) {
super({
Expand Down Expand Up @@ -71,8 +70,6 @@ class RequestQueue extends RequestProvider {
*/
override async fetchNextRequest<T extends Dictionary = Dictionary>(): Promise<Request<T> | null> {
await this.ensureHeadIsNonEmpty();
// Wait for the currently hydrating request to finish, as it might be the one we return
await this._hydratingRequestPromise;

const nextRequestId = this.queueHeadIds.removeFirst();

Expand All @@ -81,16 +78,6 @@ class RequestQueue extends RequestProvider {
return null;
}

// Schedule the next hydration in the background (if there is any request left in the queue)
// This should hopefully make the next request available faster.
const nextNextId = this.queueHeadIds.getFirst();

if (nextNextId) {
this._hydratingRequestPromise = this.getOrHydrateRequest(nextNextId).finally(() => {
this._hydratingRequestPromise = null;
});
}

// This should never happen, but...
if (this.requestIdsInProgress.has(nextRequestId) || this.recentlyHandledRequestsCache.get(nextRequestId)) {
this.log.warning('Queue head returned a request that is already in progress?!', {
Expand All @@ -114,7 +101,7 @@ class RequestQueue extends RequestProvider {
// into the queueHeadDict straight again. After the interval expires, fetchNextRequest()
// will try to fetch this request again, until it eventually appears in the main table.
if (!request) {
this.log.debug('Cannot find a request from the beginning of queue, will be retried later', { nextRequestId });
this.log.debug('Cannot find a request from the beginning of queue or lost lock, will be retried later', { nextRequestId });

setTimeout(() => {
this.requestIdsInProgress.delete(nextRequestId);
Expand All @@ -137,7 +124,13 @@ class RequestQueue extends RequestProvider {
}

protected async ensureHeadIsNonEmpty() {
if (this.queueHeadIds.length() > 0) {
// Stop fetching if we are paused for migration
if (this.queuePausedForMigration) {
return;
}

// We want to fetch ahead of time to minimize dead time
if (this.queueHeadIds.length() > 1) {
return;
}

Expand All @@ -151,28 +144,6 @@ class RequestQueue extends RequestProvider {
private async _listHeadAndLock(): Promise<void> {
const headData = await this.client.listAndLockHead({ limit: 25, lockSecs: PROLONG_LOCK_BY_SECS });

// Cache the first request, if it exists, and trigger a hydration for it
const firstRequest = headData.items.shift();

if (firstRequest) {
this.queueHeadIds.add(firstRequest.id, firstRequest.id, false);
this._cacheRequest(getRequestId(firstRequest.uniqueKey), {
requestId: firstRequest.id,
uniqueKey: firstRequest.uniqueKey,
wasAlreadyPresent: true,
wasAlreadyHandled: false,
});

// Await current hydration, if any, to not lose it
if (this._hydratingRequestPromise) {
await this._hydratingRequestPromise;
}

this._hydratingRequestPromise = this.getOrHydrateRequest(firstRequest.id).finally(() => {
this._hydratingRequestPromise = null;
});
}

for (const { id, uniqueKey } of headData.items) {
// Queue head index might be behind the main table, so ensure we don't recycle requests
if (!id || !uniqueKey || this.requestIdsInProgress.has(id) || this.recentlyHandledRequestsCache.get(id)) {
Expand Down Expand Up @@ -210,7 +181,7 @@ class RequestQueue extends RequestProvider {
try {
await this.client.deleteRequestLock(requestId);
} catch {
// Ignore
// Ignore
}

return null;
Expand All @@ -229,7 +200,7 @@ class RequestQueue extends RequestProvider {

// 1.1. If hydrated, prolong the lock more and return it
if (cachedEntry.hydrated) {
// 1.1.1. If the lock expired on the hydrated requests, try to prolong. If we fail, we lost the request
// 1.1.1. If the lock expired on the hydrated requests, try to prolong. If we fail, we lost the request (or it was handled already)
if (cachedEntry.lockExpiresAt && cachedEntry.lockExpiresAt < Date.now()) {
const prolonged = await this._prolongRequestLock(cachedEntry.id);

Expand Down Expand Up @@ -275,11 +246,10 @@ class RequestQueue extends RequestProvider {
try {
const res = await this.client.prolongRequestLock(requestId, { lockSecs: PROLONG_LOCK_BY_SECS });
return res.lockExpiresAt;
} catch (err) {
} catch (err: any) {
// Most likely we do not own the lock anymore
this.log.warning(`Failed to prolong lock for cached request ${requestId}, possibly lost the lock`, {
error: err,
requestId,
this.log.warning(`Failed to prolong lock for cached request ${requestId}, either lost the lock or the request was already handled\n`, {
err,
});

return null;
Expand All @@ -290,6 +260,10 @@ class RequestQueue extends RequestProvider {
super._reset();
this._listHeadAndLockPromise = null;
}

protected override _maybeAddRequestToQueueHead() {
// Do nothing for request queue v2, as we are only able to lock requests when listing the head
}
}

export { RequestQueue as RequestQueueV2 };
9 changes: 6 additions & 3 deletions test/e2e/cheerio-request-queue-v2/actor/main.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Actor, LogLevel, log } from 'apify';
import { Actor, LogLevel, log as Logger } from 'apify';
import { CheerioCrawler, Dataset } from '@crawlee/cheerio';

const mainOptions = {
Expand All @@ -15,14 +15,17 @@ await Actor.main(async () => {
});

const pageTitle = $('title').first().text();
log.info(`URL: ${url} TITLE: ${pageTitle}`);
log.info(`REQUEST ID: ${request.id} URL: ${url} TITLE: ${pageTitle}`);

await Dataset.pushData({ url, pageTitle });
},
experiments: {
useRequestQueueV2: true,
},
log: log.child({ prefix: 'CheerioCrawler', level: LogLevel.DEBUG }),
log: Logger.child({
prefix: 'CheerioCrawler',
// level: LogLevel.DEBUG,
}),
});

try {
Expand Down

0 comments on commit fc0de1d

Please sign in to comment.