Skip to content

Commit

Permalink
Add tests and update docs
Browse files Browse the repository at this point in the history
  • Loading branch information
jasrusable committed Jun 29, 2020
1 parent 218c257 commit 9df99fe
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 26 deletions.
7 changes: 1 addition & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -619,11 +619,6 @@ const manager = createManager({
},
// Pass in a shared redis instance.
redisClient: sharedRedisInstance,
// Rate limit config. 1 task every 10 seconds.
queueRateLimitConfig: {
points: 1,
duration: 10,
},
});
```

Expand Down Expand Up @@ -774,7 +769,7 @@ await manager.resumeQueue();

#### manager.setQueueRateLimit

Sets the rate limit of a queue.
Sets the rate limit of a queue. (100 tasks every 60 seconds)

```js
await manager.setQueueRateLimit({ points: 100, duration: 60 });
Expand Down
17 changes: 6 additions & 11 deletions src/actions/create-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,14 @@ import { Manager } from '../domain/manager/manager';
import { TaskResponse } from '../domain/manager/task-response';
import { OnBeforeEnqueueTask, OnAfterEnqueueTask } from './enqueue-task';
import { setQueueRateLimit } from './set-queue-rate-limit';
import { QueueRateLimitConfig } from './get-queue-rate-limit-config';
import { getQueueRateLimitConfig } from './get-queue-rate-limit-config';

const debug = debugF('conveyor-mq:manager');

export interface ManagerInput {
queue: string;
redisConfig: RedisConfig;
redisClient?: Redis;
queueRateLimitConfig?: QueueRateLimitConfig;
hooks?: {
onBeforeEnqueueTask?: OnBeforeEnqueueTask;
onAfterEnqueueTask?: OnAfterEnqueueTask;
Expand Down Expand Up @@ -64,14 +63,14 @@ export interface ManagerInput {
* - .pauseQueue(): Promise<void> - Pauses the queue.
* - .resumeQueue(): Promise<void> - Resumes the queue.
* - .setQueueRateLimit({ points, duration }): Promise<void> - Sets the rate limit on the queue.
* - .getQueueRateLimit(): Promise<{ points, duration }> - Gets the rate limit on the queue.
* - .onReady(): Promise<void> - Returns a promise which resolves once the manager is ready.
* - .quit(): Promise<void> - Quits the manager, disconnects the redis clients.
*/
export const createManager = ({
queue,
redisConfig,
redisClient,
queueRateLimitConfig,
hooks,
}: ManagerInput): Manager => {
debug('Starting');
Expand Down Expand Up @@ -173,14 +172,6 @@ export const createManager = ({

const ready = async () => {
await Promise.all([listenerSetupPromise]);
if (queueRateLimitConfig) {
await setQueueRateLimit({
points: queueRateLimitConfig.points,
duration: queueRateLimitConfig.duration,
queue,
client,
});
}
debug('Ready');
};
const readyPromise = ready();
Expand Down Expand Up @@ -261,6 +252,10 @@ export const createManager = ({
await readyPromise;
await setQueueRateLimit({ points, duration, queue, client });
},
getQueueRateLimit: async () => {
await readyPromise;
return getQueueRateLimitConfig({ queue, client });
},
onReady: async () => {
await readyPromise;
debug(`onReady`);
Expand Down
21 changes: 14 additions & 7 deletions src/actions/create-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ import { serializeWorker } from '../domain/worker/serialize-worker';
import { Task } from '../domain/tasks/task';
import { Worker } from '../domain/worker/worker';
import { markTaskProcessing } from './mark-task-processing';
import { getQueueRateLimitConfig } from './get-queue-rate-limit-config';
import {
getQueueRateLimitConfig,
QueueRateLimitConfig,
} from './get-queue-rate-limit-config';
import { createListener } from './create-listener';

const debug = debugF('conveyor-mq:worker');
Expand Down Expand Up @@ -144,6 +147,7 @@ export const createWorker = ({
let isShuttingDown = false;
let isShutdown = false;
let upsertInterval: SetIntervalAsyncTimer;
let rateLimitConfig: QueueRateLimitConfig | undefined;
let rateLimiter: RateLimiterRedis | undefined;

const worker: WorkerInstance = {
Expand Down Expand Up @@ -295,6 +299,7 @@ export const createWorker = ({
points: number;
duration: number;
}) => {
rateLimitConfig = { points, duration };
rateLimiter = new RateLimiterRedis({
storeClient: workerClient,
points,
Expand All @@ -305,6 +310,7 @@ export const createWorker = ({
};

const clearQueueRateLimit = () => {
rateLimitConfig = undefined;
rateLimiter = undefined;
};

Expand Down Expand Up @@ -379,18 +385,18 @@ export const createWorker = ({
ensureConnected({ client }),
),
);
const rateLimitConfig = await getQueueRateLimitConfig({
const queueRateLimitConfig = await getQueueRateLimitConfig({
queue,
client: workerClient,
});
if (
rateLimitConfig &&
rateLimitConfig.duration &&
rateLimitConfig.points
queueRateLimitConfig &&
queueRateLimitConfig.duration &&
queueRateLimitConfig.points
) {
setQueueRateLimit({
points: rateLimitConfig.points,
duration: rateLimitConfig.duration,
points: queueRateLimitConfig.points,
duration: queueRateLimitConfig.duration,
});
}
forEach([workerQueue, takerQueue], (q) => q.start());
Expand Down Expand Up @@ -477,6 +483,7 @@ export const createWorker = ({
return pause({ killProcessingTasks });
},
start: () => start(),
getQueueRateLimitConfig: async () => rateLimitConfig,
shutdown: async (killProcessingTasks?: boolean) => {
await readyPromise;
return shutdown({ killProcessingTasks });
Expand Down
2 changes: 2 additions & 0 deletions src/domain/manager/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { duration } from 'moment';
import { Task } from '../tasks/task';
import { TaskResponse } from './task-response';
import { WorkerInstance } from '../worker/worker-instance';
import { QueueRateLimitConfig } from '../../actions/get-queue-rate-limit-config';

export interface Manager {
enqueueTask: (task: Partial<Task>) => Promise<TaskResponse>;
Expand Down Expand Up @@ -30,6 +31,7 @@ export interface Manager {
points: number;
duration: number;
}) => Promise<void>;
getQueueRateLimit: () => Promise<QueueRateLimitConfig | null>;
onReady: () => Promise<void>;
quit: () => Promise<void>;
}
3 changes: 3 additions & 0 deletions src/domain/worker/worker.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import { QueueRateLimitConfig } from '../../actions/get-queue-rate-limit-config';

export interface Worker {
id: string;
createdAt: Date;
onReady: () => Promise<void>;
pause: () => Promise<void>;
start: () => Promise<void>;
shutdown: () => Promise<void>;
getQueueRateLimitConfig: () => Promise<QueueRateLimitConfig | undefined>;
onIdle: () => Promise<void>;
}
11 changes: 11 additions & 0 deletions src/tests/actions/create-manager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -235,4 +235,15 @@ describe('createManager', () => {
expect(fn).toHaveBeenCalledWith(expect.objectContaining({ task }));
await manager.quit();
});
it('createManager setQueueRateLimit sets queue rate limit', async () => {
const manager = createManager({
queue,
redisConfig,
});
await manager.setQueueRateLimit({ points: 100, duration: 60 });
const rateLimitConfig = await manager.getQueueRateLimit();
expect(rateLimitConfig?.points).toBe(100);
expect(rateLimitConfig?.duration).toBe(60);
await manager.quit();
});
});
24 changes: 22 additions & 2 deletions src/tests/actions/create-worker.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ describe('createWorker', () => {
redisConfig,
});
await manager.enqueueTask({ id: 'a', data: 'hi' });
await sleep(20);
await sleep(50);
expect(onAfterTaskErrorFn).toHaveBeenCalledTimes(0);
expect(onAfterTaskSuccessFn).toHaveBeenCalledTimes(1);
expect(onAfterTaskSuccessFn).toHaveBeenCalledWith(
Expand Down Expand Up @@ -374,7 +374,7 @@ describe('createWorker', () => {
redisConfig,
});
await manager.enqueueTask({ id: 'a', data: 'hi' });
await sleep(20);
await sleep(50);
expect(onAfterTaskSuccessFn).toHaveBeenCalledTimes(0);
expect(onAfterTaskErrorFn).toHaveBeenCalledTimes(1);
expect(onAfterTaskErrorFn).toHaveBeenCalledWith(
Expand All @@ -387,4 +387,24 @@ describe('createWorker', () => {
await manager.quit();
await worker.shutdown();
});
it('createWorker queue rate limit gets set', async () => {
const worker = createWorker({
queue,
redisConfig,
handler: () => 'some-result',
});
const manager = createManager({
queue,
redisConfig,
});
const rateLimitConfig = await worker.getQueueRateLimitConfig();
expect(rateLimitConfig).toBe(undefined);
await manager.setQueueRateLimit({ points: 100, duration: 60 });
await sleep(50);
const rateLimitConfig2 = await worker.getQueueRateLimitConfig();
expect(rateLimitConfig2?.points).toBe(100);
expect(rateLimitConfig2?.duration).toBe(60);
await manager.quit();
await worker.shutdown();
});
});

0 comments on commit 9df99fe

Please sign in to comment.