diff --git a/packages/memory-storage/src/resource-clients/request-queue.ts b/packages/memory-storage/src/resource-clients/request-queue.ts index b3fd19db4794..1a8bbc907819 100644 --- a/packages/memory-storage/src/resource-clients/request-queue.ts +++ b/packages/memory-storage/src/resource-clients/request-queue.ts @@ -4,6 +4,7 @@ import { randomUUID } from 'node:crypto'; import { resolve } from 'node:path'; import { rm } from 'node:fs/promises'; import { move } from 'fs-extra'; +import { AsyncQueue } from '@sapphire/async-queue'; import type { MemoryStorage } from '../index'; import { StorageTypes } from '../consts'; import { purgeNullsFromObject, uniqueKeyToRequestId } from '../utils'; @@ -55,6 +56,7 @@ export class RequestQueueClient extends BaseClient implements storage.RequestQue handledRequestCount = 0; pendingRequestCount = 0; requestQueueDirectory: string; + private readonly mutex = new AsyncQueue(); private readonly requests = new Map>(); private readonly client: MemoryStorage; @@ -66,6 +68,18 @@ export class RequestQueueClient extends BaseClient implements storage.RequestQue this.client = options.client; } + private async getQueue() : Promise { + const existingQueueById = await findRequestQueueByPossibleId(this.client, this.name ?? this.id); + + if (!existingQueueById) { + this.throwOnNonExisting(StorageTypes.RequestQueue); + } + + existingQueueById.updateTimestamps(false); + + return existingQueueById; + } + async get(): Promise { const found = await findRequestQueueByPossibleId(this.client, this.name ?? this.id); @@ -163,6 +177,111 @@ export class RequestQueueClient extends BaseClient implements storage.RequestQue }; } + async listAndLockHead(options: storage.ListAndLockOptions): Promise { + const { limit, lockSecs } = s.object({ + limit: s.number.optional.default(100), + lockSecs: s.number, + }).parse(options); + + const queue = await this.getQueue(); + + const start = Date.now(); + const isLocked = (request: InternalRequest) => !request.orderNo || request.orderNo > start || request.orderNo < -start; + + const items = []; + + await this.mutex.wait(); + + try { + for (const storageEntry of queue.requests.values()) { + if (items.length === limit) { + break; + } + + const request = await storageEntry.get(); + + if (isLocked(request)) { + continue; + } + + request.orderNo = (start + lockSecs * 1000) * (request.orderNo! > 0 ? 1 : -1); + await storageEntry.update(request); + + items.push(request); + } + + return { + limit, + lockSecs, + hadMultipleClients: false, + queueModifiedAt: queue.modifiedAt, + items: items.map(({ json }) => this._jsonToRequest(json)!), + }; + } finally { + this.mutex.shift(); + } + } + + async prolongRequestLock(id: string, options: storage.ProlongRequestLockOptions) : Promise { + s.string.parse(id); + const { lockSecs, forefront } = s.object({ + lockSecs: s.number, + forefront: s.boolean.optional.default(false), + }).parse(options); + + const queue = await this.getQueue(); + const request = await queue.requests.get(id); + + const internalRequest = await request?.get(); + + if (!internalRequest) { + throw new Error(`Request with ID ${id} not found in queue ${queue.name ?? queue.id}`); + } + + const currentTimestamp = Date.now(); + const canProlong = (r: InternalRequest) => !r.orderNo || r.orderNo > currentTimestamp || r.orderNo < -currentTimestamp; + + if (!canProlong(internalRequest)) { + throw new Error(`Request with ID ${id} is not locked in queue ${queue.name ?? queue.id}`); + } + + const unlockTimestamp = Math.abs(internalRequest.orderNo!) + lockSecs * 1000; + internalRequest.orderNo = forefront ? -unlockTimestamp : unlockTimestamp; + + await request?.update(internalRequest); + + return { + lockExpiresAt: new Date(unlockTimestamp), + }; + } + + async deleteRequestLock(id: string, options: storage.DeleteRequestLockOptions = {}) : Promise { + s.string.parse(id); + const { forefront } = s.object({ + forefront: s.boolean.optional.default(false), + }).parse(options); + + const queue = await this.getQueue(); + const request = await queue.requests.get(id); + + const internalRequest = await request?.get(); + + if (!internalRequest) { + throw new Error(`Request with ID ${id} not found in queue ${queue.name ?? queue.id}`); + } + + const start = Date.now(); + + const isLocked = (r: InternalRequest) => !r.orderNo || r.orderNo > start || r.orderNo < -start; + if (!isLocked(internalRequest)) { + throw new Error(`Request with ID ${id} is not locked in queue ${queue.name ?? queue.id}`); + } + + internalRequest.orderNo = forefront ? -start : start; + + await request?.update(internalRequest); + } + async addRequest(request: storage.RequestSchema, options: storage.RequestOptions = {}): Promise { requestShapeWithoutId.parse(request); requestOptionsShape.parse(options); @@ -281,15 +400,8 @@ export class RequestQueueClient extends BaseClient implements storage.RequestQue async getRequest(id: string): Promise { s.string.parse(id); - const existingQueueById = await findRequestQueueByPossibleId(this.client, this.name ?? this.id); - - if (!existingQueueById) { - this.throwOnNonExisting(StorageTypes.RequestQueue); - } - - existingQueueById.updateTimestamps(false); - - const json = (await existingQueueById.requests.get(id)?.get())?.json; + const queue = await this.getQueue(); + const json = (await queue.requests.get(id)?.get())?.json; return this._jsonToRequest(json); } diff --git a/packages/types/src/storages.ts b/packages/types/src/storages.ts index 75ca6859f35c..73b06e466f23 100644 --- a/packages/types/src/storages.ts +++ b/packages/types/src/storages.ts @@ -223,6 +223,27 @@ export interface ListOptions { limit?: number; } +export interface ListAndLockOptions extends ListOptions { + lockSecs: number; +} + +export interface ListAndLockHeadResult extends QueueHead { + lockSecs: number; +} + +export interface ProlongRequestLockOptions { + lockSecs: number; + forefront?: boolean; +} + +export interface ProlongRequestLockResult { + lockExpiresAt: Date; +} + +export interface DeleteRequestLockOptions { + forefront?: boolean; +} + export interface RequestOptions { forefront?: boolean; [k: string]: unknown; @@ -275,6 +296,9 @@ export interface RequestQueueClient { getRequest(id: string): Promise; updateRequest(request: UpdateRequestSchema, options?: RequestOptions): Promise; deleteRequest(id: string): Promise; + listAndLockHead(options: ListAndLockOptions): Promise; + prolongRequestLock(id: string, options: ProlongRequestLockOptions): Promise; + deleteRequestLock(id: string, options?: DeleteRequestLockOptions): Promise; } export interface RequestQueueOptions { diff --git a/test/core/storages/request_queue.test.ts b/test/core/storages/request_queue.test.ts index 3ed6e7394552..2a3eddf6c925 100644 --- a/test/core/storages/request_queue.test.ts +++ b/test/core/storages/request_queue.test.ts @@ -685,3 +685,89 @@ describe('RequestQueue remote', () => { expect(r3.userData.__crawlee).toEqual({}); }); }); + +describe('RequestQueue v2', () => { + const totalRequestsPerTest = 100; + + function calculateHistogram(requests: { uniqueKey: string }[]) : number[] { + const histogram: number[] = []; + for (const item of requests) { + const key = item.uniqueKey; + const index = parseInt(key, 10); + histogram[index] = histogram[index] ? histogram[index] + 1 : 1; + } + + return histogram; + } + + async function getEmptyQueue(name: string) { + const queue = await RequestQueue.open(name); + await queue.drop(); + return RequestQueue.open(name); + } + + function getUniqueRequests(count: number) { + return new Array(count).fill(0).map((_, i) => new Request({ url: `http://example.com/${i}`, uniqueKey: String(i) })); + } + + test('listAndLockHead works as expected', async () => { + const queue = await getEmptyQueue('list-and-lock-head'); + await queue.addRequests(getUniqueRequests(totalRequestsPerTest)); + + const [{ items: firstFetch }, { items: secondFetch }] = await Promise.all([ + queue.client.listAndLockHead({ limit: totalRequestsPerTest / 2, lockSecs: 60 }), + queue.client.listAndLockHead({ limit: totalRequestsPerTest / 2, lockSecs: 60 }), + ]); + + const histogram = calculateHistogram([...firstFetch, ...secondFetch]); + expect(histogram).toEqual(Array(totalRequestsPerTest).fill(1)); + }); + + test('lock timers work as expected (timeout unlocks)', async () => { + jest.useFakeTimers(); + const queue = await getEmptyQueue('lock-timers'); + await queue.addRequests(getUniqueRequests(totalRequestsPerTest)); + + const { items: firstFetch } = await queue.client.listAndLockHead({ limit: totalRequestsPerTest / 2, lockSecs: 60 }); + + jest.advanceTimersByTime(65000); + + const { items: secondFetch } = await queue.client.listAndLockHead({ limit: totalRequestsPerTest / 2, lockSecs: 60 }); + + const histogram = calculateHistogram([...firstFetch, ...secondFetch]); + expect(histogram).toEqual(Array(totalRequestsPerTest / 2).fill(2)); + jest.useRealTimers(); + }); + + test('prolongRequestLock works as expected ', async () => { + jest.useFakeTimers(); + const queue = await getEmptyQueue('prolong-request-lock'); + await queue.addRequests(getUniqueRequests(1)); + + const { items: firstFetch } = await queue.client.listAndLockHead({ limit: 1, lockSecs: 60 }); + await queue.client.prolongRequestLock(firstFetch[0].id, { lockSecs: 60 }); + expect(firstFetch).toHaveLength(1); + + jest.advanceTimersByTime(65000); + const { items: secondFetch } = await queue.client.listAndLockHead({ limit: 1, lockSecs: 60 }); + expect(secondFetch).toHaveLength(0); + + jest.advanceTimersByTime(65000); + const { items: thirdFetch } = await queue.client.listAndLockHead({ limit: 1, lockSecs: 60 }); + + expect(thirdFetch).toHaveLength(1); + jest.useRealTimers(); + }); + + test('deleteRequestLock works as expected', async () => { + const queue = await getEmptyQueue('delete-request-lock'); + await queue.addRequests(getUniqueRequests(1)); + + const { items: firstFetch } = await queue.client.listAndLockHead({ limit: 1, lockSecs: 60 }); + await queue.client.deleteRequestLock(firstFetch[0].id); + + const { items: secondFetch } = await queue.client.listAndLockHead({ limit: 1, lockSecs: 60 }); + + expect(secondFetch[0]).toEqual(firstFetch[0]); + }); +});