Skip to content

Commit

Permalink
Implement queue rate limit
Browse files Browse the repository at this point in the history
  • Loading branch information
jasrusable committed Jun 28, 2020
1 parent cde896b commit 76ba3e2
Show file tree
Hide file tree
Showing 12 changed files with 153 additions and 17 deletions.
4 changes: 4 additions & 0 deletions src/actions/create-listener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import {
getWorkerShutdownChannel,
getQueueTaskScheduledChannel,
getQueueTaskProgressUpdatedChannel,
getQueueRateLimitUpdatedChannel,
} from '../utils/keys';
import { deSerializeEvent } from '../domain/events/deserialize-event';
import { Event } from '../domain/events/event';
Expand Down Expand Up @@ -72,6 +73,9 @@ export const createListener = ({
[EventType.WorkerStarted]: getWorkerStartedChannel({ queue }),
[EventType.WorkerPaused]: getWorkerPausedChannel({ queue }),
[EventType.WorkerShutdown]: getWorkerShutdownChannel({ queue }),
[EventType.QueueRateLimitUpdated]: getQueueRateLimitUpdatedChannel({
queue,
}),
};
client.on('message', (channel, eventString) => {
const event = deSerializeEvent(eventString);
Expand Down
23 changes: 23 additions & 0 deletions src/actions/create-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,16 @@ import { resumeQueue } from './resume-queue';
import { Manager } from '../domain/manager/manager';
import { TaskResponse } from '../domain/manager/task-response';
import { OnBeforeEnqueueTask, OnAfterEnqueueTask } from './enqueue-task';
import { setQueueRateLimit } from './set-queue-rate-limit';
import { QueueRateLimitConfig } from './get-queue-rate-limit-config';

const debug = debugF('conveyor-mq:manager');

export interface ManagerInput {
queue: string;
redisConfig: RedisConfig;
redisClient?: Redis;
queueRateLimitConfig?: QueueRateLimitConfig;
hooks?: {
onBeforeEnqueueTask?: OnBeforeEnqueueTask;
onAfterEnqueueTask?: OnAfterEnqueueTask;
Expand Down Expand Up @@ -60,13 +63,15 @@ export interface ManagerInput {
* - .destroyQueue(): Promise<void> - Destroys the queue by removing all data & data structures.
* - .pauseQueue(): Promise<void> - Pauses the queue.
* - .resumeQueue(): Promise<void> - Resumes the queue.
* - .setQueueRateLimit({ points, duration }): Promise<void> - Sets the rate limit on the queue.
* - .onReady(): Promise<void> - Returns a promise which resolves once the manager is ready.
* - .quit(): Promise<void> - Quits the manager, disconnects the redis clients.
*/
export const createManager = ({
queue,
redisConfig,
redisClient,
queueRateLimitConfig,
hooks,
}: ManagerInput): Manager => {
debug('Starting');
Expand Down Expand Up @@ -168,6 +173,14 @@ export const createManager = ({

const ready = async () => {
await Promise.all([listenerSetupPromise]);
if (queueRateLimitConfig) {
await setQueueRateLimit({
points: queueRateLimitConfig.points,
duration: queueRateLimitConfig.duration,
queue,
client,
});
}
debug('Ready');
};
const readyPromise = ready();
Expand Down Expand Up @@ -238,6 +251,16 @@ export const createManager = ({
debug(`resumeQueue`);
return resumeQueue({ queue, client });
},
setQueueRateLimit: async ({
points,
duration,
}: {
points: number;
duration: number;
}) => {
await readyPromise;
await setQueueRateLimit({ points, duration, queue, client });
},
onReady: async () => {
await readyPromise;
debug(`onReady`);
Expand Down
58 changes: 52 additions & 6 deletions src/actions/create-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ import { serializeWorker } from '../domain/worker/serialize-worker';
import { Task } from '../domain/tasks/task';
import { Worker } from '../domain/worker/worker';
import { markTaskProcessing } from './mark-task-processing';
import { getQueueRateLimitConfig } from './get-queue-rate-limit-config';
import { createListener } from './create-listener';

const debug = debugF('conveyor-mq:worker');

Expand Down Expand Up @@ -286,6 +288,42 @@ export const createWorker = ({
});
};

const setQueueRateLimit = ({
points,
duration,
}: {
points: number;
duration: number;
}) => {
rateLimiter = new RateLimiterRedis({
storeClient: workerClient,
points,
duration,
keyPrefix: queue,
inmemoryBlockOnConsumed: points + 1,
});
};

const clearQueueRateLimit = () => {
rateLimiter = undefined;
};

const setupListener = async () => {
const listener = createListener({
queue,
redisConfig,
events: [EventType.QueueRateLimitUpdated],
});
await listener.onReady();
listener.on(EventType.QueueRateLimitUpdated, ({ event }) => {
if (event.data?.rateLimitConfig) {
setQueueRateLimit(event.data.rateLimitConfig);
} else {
clearQueueRateLimit();
}
});
};

const pause = async ({
killProcessingTasks,
}: {
Expand Down Expand Up @@ -341,13 +379,20 @@ export const createWorker = ({
ensureConnected({ client }),
),
);
rateLimiter = new RateLimiterRedis({
storeClient: workerClient,
points: 1,
duration: 5,
keyPrefix: queue,
inmemoryBlockOnConsumed: 1,
const rateLimitConfig = await getQueueRateLimitConfig({
queue,
client: workerClient,
});
if (
rateLimitConfig &&
rateLimitConfig.duration &&
rateLimitConfig.points
) {
setQueueRateLimit({
points: rateLimitConfig.points,
duration: rateLimitConfig.duration,
});
}
forEach([workerQueue, takerQueue], (q) => q.start());
await upsertWorker();
upsertInterval = setIntervalAsync(upsertWorker, 15000);
Expand Down Expand Up @@ -413,6 +458,7 @@ export const createWorker = ({
};

const ready = async () => {
await setupListener();
if (autoStart) await start();
if (onReady) onReady();
debug('Ready');
Expand Down
25 changes: 25 additions & 0 deletions src/actions/get-queue-rate-limit-config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { Redis } from 'ioredis';
import { get } from '../utils/redis';
import { getQueueRateLimitKey } from '../utils/keys';

export interface QueueRateLimitConfig {
points: number;
duration: number;
}

export const getQueueRateLimitConfig = async ({
queue,
client,
}: {
queue: string;
client: Redis;
}) => {
const rateLimitString = await get({
key: getQueueRateLimitKey({ queue }),
client,
});
if (!rateLimitString) {
return null;
}
return JSON.parse(rateLimitString) as QueueRateLimitConfig;
};
36 changes: 25 additions & 11 deletions src/actions/set-queue-rate-limit.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,35 @@
import { Redis } from 'ioredis';
import { set } from '../utils/redis';
import { getQueueRateLimitKey } from '../utils/keys';
import { exec } from '../utils/redis';
import {
getQueueRateLimitKey,
getQueueRateLimitUpdatedChannel,
} from '../utils/keys';
import { EventType } from '../domain/events/event-type';
import { serializeEvent } from '../domain/events/serialize-event';

export const setQueueRateLimit = async ({
amount,
interval,
points,
duration,
queue,
client,
}: {
amount: number;
interval: number;
points: number;
duration: number;
queue: string;
client: Redis;
}) => {
await set({
key: getQueueRateLimitKey({ queue }),
value: JSON.stringify({ amount, interval }),
client,
});
const multi = client.multi();
multi.set(
getQueueRateLimitKey({ queue }),
JSON.stringify({ points, duration }),
);
multi.publish(
getQueueRateLimitUpdatedChannel({ queue }),
serializeEvent({
type: EventType.QueueRateLimitUpdated,
createdAt: new Date(),
data: { rateLimitConfig: { points, duration } },
}),
);
await exec(multi);
};
1 change: 1 addition & 0 deletions src/domain/events/event-from-json.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ export const eventFromJson = (eventJson: any): Event => {
type: eventJson.type as EventType,
task: eventJson.task ? taskFromJson(eventJson.task) : undefined,
worker: eventJson.worker ? workerFromJson(eventJson.worker) : undefined,
data: eventJson.data,
};
};
1 change: 1 addition & 0 deletions src/domain/events/event-to-json.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@ export const eventToJson = (event: Event) => {
type: event.type,
task: event.task ? taskToJson(event.task) : undefined,
worker: event.worker ? workerToJson(event.worker) : undefined,
data: event.data,
};
};
1 change: 1 addition & 0 deletions src/domain/events/event-type.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@ export enum EventType {
WorkerStarted = 'worker_started',
WorkerPaused = 'worker_paused',
WorkerShutdown = 'worker_shutdown',
QueueRateLimitUpdated = 'queue_rate_limit_updated',
}
4 changes: 4 additions & 0 deletions src/domain/events/event.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
import { Task } from '../tasks/task';
import { EventType } from './event-type';
import { WorkerInstance } from '../worker/worker-instance';
import { QueueRateLimitConfig } from '../../actions/get-queue-rate-limit-config';

export interface Event {
createdAt: Date;
type: EventType;
task?: Task;
worker?: WorkerInstance;
data?: {
rateLimitConfig?: QueueRateLimitConfig;
};
}
8 changes: 8 additions & 0 deletions src/domain/manager/manager.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { duration } from 'moment';
import { Task } from '../tasks/task';
import { TaskResponse } from './task-response';
import { WorkerInstance } from '../worker/worker-instance';
Expand All @@ -22,6 +23,13 @@ export interface Manager {
destroyQueue: () => Promise<void>;
pauseQueue: () => Promise<void>;
resumeQueue: () => Promise<void>;
setQueueRateLimit: ({
points,
duration,
}: {
points: number;
duration: number;
}) => Promise<void>;
onReady: () => Promise<void>;
quit: () => Promise<void>;
}
3 changes: 3 additions & 0 deletions src/utils/keys.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ export const getWorkerStartedChannel = ({ queue }: { queue: string }) =>
export const getWorkerShutdownChannel = ({ queue }: { queue: string }) =>
`queue:${queue}:worker-shutdown`;

export const getQueueRateLimitUpdatedChannel = ({ queue }: { queue: string }) =>
`queue:${queue}:queue-rate-limit-updated`;

export const getQueueTaskQueuedChannel = ({ queue }: { queue: string }) =>
`queue:${queue}:task-queued`;

Expand Down
6 changes: 6 additions & 0 deletions src/utils/redis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -257,3 +257,9 @@ export const mget = ({
keys: string[];
client: Redis;
}) => client.mget(...mgetKeys);

export const get = ({ key, client }: { key: string; client: Redis }) =>
client.get(key);

export const del = ({ key, client }: { key: string; client: Redis }) =>
client.del(key);

0 comments on commit 76ba3e2

Please sign in to comment.