Skip to content

Commit

Permalink
feat: RQv2 memory storage support (#1874)
Browse files Browse the repository at this point in the history
Co-authored-by: Vlad Frangu <kingdgrizzle@gmail.com>
  • Loading branch information
barjin and vladfrangu committed May 5, 2023
1 parent 44646d6 commit 049486b
Show file tree
Hide file tree
Showing 3 changed files with 231 additions and 9 deletions.
130 changes: 121 additions & 9 deletions packages/memory-storage/src/resource-clients/request-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { randomUUID } from 'node:crypto';
import { resolve } from 'node:path';
import { rm } from 'node:fs/promises';
import { move } from 'fs-extra';
import { AsyncQueue } from '@sapphire/async-queue';
import type { MemoryStorage } from '../index';
import { StorageTypes } from '../consts';
import { purgeNullsFromObject, uniqueKeyToRequestId } from '../utils';
Expand Down Expand Up @@ -55,6 +56,7 @@ export class RequestQueueClient extends BaseClient implements storage.RequestQue
handledRequestCount = 0;
pendingRequestCount = 0;
requestQueueDirectory: string;
private readonly mutex = new AsyncQueue();

private readonly requests = new Map<string, StorageImplementation<InternalRequest>>();
private readonly client: MemoryStorage;
Expand All @@ -66,6 +68,18 @@ export class RequestQueueClient extends BaseClient implements storage.RequestQue
this.client = options.client;
}

private async getQueue() : Promise<RequestQueueClient> {
const existingQueueById = await findRequestQueueByPossibleId(this.client, this.name ?? this.id);

if (!existingQueueById) {
this.throwOnNonExisting(StorageTypes.RequestQueue);
}

existingQueueById.updateTimestamps(false);

return existingQueueById;
}

async get(): Promise<storage.RequestQueueInfo | undefined> {
const found = await findRequestQueueByPossibleId(this.client, this.name ?? this.id);

Expand Down Expand Up @@ -163,6 +177,111 @@ export class RequestQueueClient extends BaseClient implements storage.RequestQue
};
}

async listAndLockHead(options: storage.ListAndLockOptions): Promise<storage.ListAndLockHeadResult> {
const { limit, lockSecs } = s.object({
limit: s.number.optional.default(100),
lockSecs: s.number,
}).parse(options);

const queue = await this.getQueue();

const start = Date.now();
const isLocked = (request: InternalRequest) => !request.orderNo || request.orderNo > start || request.orderNo < -start;

const items = [];

await this.mutex.wait();

try {
for (const storageEntry of queue.requests.values()) {
if (items.length === limit) {
break;
}

const request = await storageEntry.get();

if (isLocked(request)) {
continue;
}

request.orderNo = (start + lockSecs * 1000) * (request.orderNo! > 0 ? 1 : -1);
await storageEntry.update(request);

items.push(request);
}

return {
limit,
lockSecs,
hadMultipleClients: false,
queueModifiedAt: queue.modifiedAt,
items: items.map(({ json }) => this._jsonToRequest(json)!),
};
} finally {
this.mutex.shift();
}
}

async prolongRequestLock(id: string, options: storage.ProlongRequestLockOptions) : Promise<storage.ProlongRequestLockResult> {
s.string.parse(id);
const { lockSecs, forefront } = s.object({
lockSecs: s.number,
forefront: s.boolean.optional.default(false),
}).parse(options);

const queue = await this.getQueue();
const request = await queue.requests.get(id);

const internalRequest = await request?.get();

if (!internalRequest) {
throw new Error(`Request with ID ${id} not found in queue ${queue.name ?? queue.id}`);
}

const currentTimestamp = Date.now();
const canProlong = (r: InternalRequest) => !r.orderNo || r.orderNo > currentTimestamp || r.orderNo < -currentTimestamp;

if (!canProlong(internalRequest)) {
throw new Error(`Request with ID ${id} is not locked in queue ${queue.name ?? queue.id}`);
}

const unlockTimestamp = Math.abs(internalRequest.orderNo!) + lockSecs * 1000;
internalRequest.orderNo = forefront ? -unlockTimestamp : unlockTimestamp;

await request?.update(internalRequest);

return {
lockExpiresAt: new Date(unlockTimestamp),
};
}

async deleteRequestLock(id: string, options: storage.DeleteRequestLockOptions = {}) : Promise<void> {
s.string.parse(id);
const { forefront } = s.object({
forefront: s.boolean.optional.default(false),
}).parse(options);

const queue = await this.getQueue();
const request = await queue.requests.get(id);

const internalRequest = await request?.get();

if (!internalRequest) {
throw new Error(`Request with ID ${id} not found in queue ${queue.name ?? queue.id}`);
}

const start = Date.now();

const isLocked = (r: InternalRequest) => !r.orderNo || r.orderNo > start || r.orderNo < -start;
if (!isLocked(internalRequest)) {
throw new Error(`Request with ID ${id} is not locked in queue ${queue.name ?? queue.id}`);
}

internalRequest.orderNo = forefront ? -start : start;

await request?.update(internalRequest);
}

async addRequest(request: storage.RequestSchema, options: storage.RequestOptions = {}): Promise<storage.QueueOperationInfo> {
requestShapeWithoutId.parse(request);
requestOptionsShape.parse(options);
Expand Down Expand Up @@ -281,15 +400,8 @@ export class RequestQueueClient extends BaseClient implements storage.RequestQue

async getRequest(id: string): Promise<storage.RequestOptions | undefined> {
s.string.parse(id);
const existingQueueById = await findRequestQueueByPossibleId(this.client, this.name ?? this.id);

if (!existingQueueById) {
this.throwOnNonExisting(StorageTypes.RequestQueue);
}

existingQueueById.updateTimestamps(false);

const json = (await existingQueueById.requests.get(id)?.get())?.json;
const queue = await this.getQueue();
const json = (await queue.requests.get(id)?.get())?.json;
return this._jsonToRequest(json);
}

Expand Down
24 changes: 24 additions & 0 deletions packages/types/src/storages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,27 @@ export interface ListOptions {
limit?: number;
}

export interface ListAndLockOptions extends ListOptions {
lockSecs: number;
}

export interface ListAndLockHeadResult extends QueueHead {
lockSecs: number;
}

export interface ProlongRequestLockOptions {
lockSecs: number;
forefront?: boolean;
}

export interface ProlongRequestLockResult {
lockExpiresAt: Date;
}

export interface DeleteRequestLockOptions {
forefront?: boolean;
}

export interface RequestOptions {
forefront?: boolean;
[k: string]: unknown;
Expand Down Expand Up @@ -275,6 +296,9 @@ export interface RequestQueueClient {
getRequest(id: string): Promise<RequestOptions | undefined>;
updateRequest(request: UpdateRequestSchema, options?: RequestOptions): Promise<QueueOperationInfo>;
deleteRequest(id: string): Promise<unknown>;
listAndLockHead(options: ListAndLockOptions): Promise<ListAndLockHeadResult>;
prolongRequestLock(id: string, options: ProlongRequestLockOptions): Promise<ProlongRequestLockResult>;
deleteRequestLock(id: string, options?: DeleteRequestLockOptions): Promise<void>;
}

export interface RequestQueueOptions {
Expand Down
86 changes: 86 additions & 0 deletions test/core/storages/request_queue.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -685,3 +685,89 @@ describe('RequestQueue remote', () => {
expect(r3.userData.__crawlee).toEqual({});
});
});

describe('RequestQueue v2', () => {
const totalRequestsPerTest = 100;

function calculateHistogram(requests: { uniqueKey: string }[]) : number[] {
const histogram: number[] = [];
for (const item of requests) {
const key = item.uniqueKey;
const index = parseInt(key, 10);
histogram[index] = histogram[index] ? histogram[index] + 1 : 1;
}

return histogram;
}

async function getEmptyQueue(name: string) {
const queue = await RequestQueue.open(name);
await queue.drop();
return RequestQueue.open(name);
}

function getUniqueRequests(count: number) {
return new Array(count).fill(0).map((_, i) => new Request({ url: `http://example.com/${i}`, uniqueKey: String(i) }));
}

test('listAndLockHead works as expected', async () => {
const queue = await getEmptyQueue('list-and-lock-head');
await queue.addRequests(getUniqueRequests(totalRequestsPerTest));

const [{ items: firstFetch }, { items: secondFetch }] = await Promise.all([
queue.client.listAndLockHead({ limit: totalRequestsPerTest / 2, lockSecs: 60 }),
queue.client.listAndLockHead({ limit: totalRequestsPerTest / 2, lockSecs: 60 }),
]);

const histogram = calculateHistogram([...firstFetch, ...secondFetch]);
expect(histogram).toEqual(Array(totalRequestsPerTest).fill(1));
});

test('lock timers work as expected (timeout unlocks)', async () => {
jest.useFakeTimers();
const queue = await getEmptyQueue('lock-timers');
await queue.addRequests(getUniqueRequests(totalRequestsPerTest));

const { items: firstFetch } = await queue.client.listAndLockHead({ limit: totalRequestsPerTest / 2, lockSecs: 60 });

jest.advanceTimersByTime(65000);

const { items: secondFetch } = await queue.client.listAndLockHead({ limit: totalRequestsPerTest / 2, lockSecs: 60 });

const histogram = calculateHistogram([...firstFetch, ...secondFetch]);
expect(histogram).toEqual(Array(totalRequestsPerTest / 2).fill(2));
jest.useRealTimers();
});

test('prolongRequestLock works as expected ', async () => {
jest.useFakeTimers();
const queue = await getEmptyQueue('prolong-request-lock');
await queue.addRequests(getUniqueRequests(1));

const { items: firstFetch } = await queue.client.listAndLockHead({ limit: 1, lockSecs: 60 });
await queue.client.prolongRequestLock(firstFetch[0].id, { lockSecs: 60 });
expect(firstFetch).toHaveLength(1);

jest.advanceTimersByTime(65000);
const { items: secondFetch } = await queue.client.listAndLockHead({ limit: 1, lockSecs: 60 });
expect(secondFetch).toHaveLength(0);

jest.advanceTimersByTime(65000);
const { items: thirdFetch } = await queue.client.listAndLockHead({ limit: 1, lockSecs: 60 });

expect(thirdFetch).toHaveLength(1);
jest.useRealTimers();
});

test('deleteRequestLock works as expected', async () => {
const queue = await getEmptyQueue('delete-request-lock');
await queue.addRequests(getUniqueRequests(1));

const { items: firstFetch } = await queue.client.listAndLockHead({ limit: 1, lockSecs: 60 });
await queue.client.deleteRequestLock(firstFetch[0].id);

const { items: secondFetch } = await queue.client.listAndLockHead({ limit: 1, lockSecs: 60 });

expect(secondFetch[0]).toEqual(firstFetch[0]);
});
});

0 comments on commit 049486b

Please sign in to comment.