Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: bull support #219

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .npmrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
@taskforcesh:registry=https://npm.taskforce.sh/
//npm.taskforce.sh/:_authToken=${TASKFORCE_NPM_TOKEN}
158 changes: 158 additions & 0 deletions bull_standard_generator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
import { faker } from '@faker-js/faker';
import Bull from 'bull';
//import * as Bull from 'bullmq';
import { cleanEnv, num, port, str } from 'envalid';

function sleep(ms) {
return new Promise((resolve) => {
setTimeout(resolve, ms);
});
}

enum ACTIONS {
CREATE = 'create',
CREATEPROCESS = 'createprocess',
PROCESS = 'process',
REMOVE = 'remove',
}

const config = cleanEnv(process.env, {
QUEUE: str({ default: 'dummy_test' }),
ACTION: str({
default: ACTIONS.PROCESS,
choices: [
ACTIONS.PROCESS,
ACTIONS.REMOVE,
ACTIONS.CREATE,
ACTIONS.CREATEPROCESS,
],
}),
MAX_JOB_DELAY_MS: num({ default: 100 }),
MAX_JOB_ATTEMPTS: num({ default: 4 }),
PREFIX: str({ default: 'bull' }),
LIMITER_MAX: num({ default: 10 }),
LIMITER_DURATION_MS: num({ default: 1000 }),
REDIS_HOST: str({ default: '127.0.0.1' }),
REDIS_PASSWORD: str({ default: '' }),
REDIS_PORT: port({ default: 6001 }),
CREATE_DELAY_MS: num({ default: 0 }),
CONCURRENCY: num({ default: 1 }),
});

const log = (msg: string) => {
console.log(`[${new Date()}] [${config.PREFIX}:${config.QUEUE}] ${msg}`);
};

const main = async () => {
log('Creating queue');
const queue = new Bull(config.QUEUE, {
prefix: config.PREFIX,
redis: {
host: config.REDIS_HOST,
port: config.REDIS_PORT,
password: config.REDIS_PASSWORD,
},
});

/*
const scheduler = new Bull.QueueScheduler(config.QUEUE, {
prefix: config.PREFIX,
connection: {
host: config.REDIS_HOST,
port: config.REDIS_PORT,
password: config.REDIS_PASSWORD,
},
});
*/

if (
config.ACTION === ACTIONS.PROCESS ||
config.ACTION === ACTIONS.CREATEPROCESS
) {
log(
`Enabling processing (${config.LIMITER_MAX} jobs per ${config.LIMITER_DURATION_MS}ms)`,
);

queue.on('active', async (job) => {
const delay = Math.floor(Math.random() * config.MAX_JOB_DELAY_MS);
log(`[JOB: ${job.id}] Starting job with delay ${delay}`);
await job.log(`[JOB: ${job.id}] Starting job with delay ${delay}`);
await sleep(delay);
const fail = Math.round(Math.random()) === 1;

if (fail) {
log(`[JOB: ${job.id}] Job set to fail`);
await job.log(`[JOB: ${job.id}] Job set to fail`);
throw new Error(`[JOB: ${job.id}] Failing job for random reason`);
}

log(`[JOB: ${job.id}] Job is now complete after the delay`);
job.log(`[JOB: ${job.id}] Job is now complete after the delay`);
});

/*
new Bull.Worker(
config.QUEUE,
async (job) => {
const delay = Math.floor(Math.random() * config.MAX_JOB_DELAY_MS);
log(`[JOB: ${job.id}] Starting job with delay ${delay}`);
await job.log(`[JOB: ${job.id}] Starting job with delay ${delay}`);
await sleep(delay);
const fail = Math.round(Math.random()) === 1;

if (fail) {
log(`[JOB: ${job.id}] Job set to fail`);
await job.log(`[JOB: ${job.id}] Job set to fail`);
throw new Error(`[JOB: ${job.id}] Failing job for random reason`);
}

log(`[JOB: ${job.id}] Job is now complete after the delay`);
job.log(`[JOB: ${job.id}] Job is now complete after the delay`);
},
{
connection: {
host: config.REDIS_HOST,
port: config.REDIS_PORT,
password: config.REDIS_PASSWORD,
},
limiter: {
duration: config.LIMITER_DURATION_MS,
max: config.LIMITER_MAX,
},
concurrency: config.CONCURRENCY,
},
);
*/
}

if (
config.ACTION === ACTIONS.CREATE ||
config.ACTION === ACTIONS.CREATEPROCESS
) {
log(`Creating jobs every ${config.CREATE_DELAY_MS}ms...`);
setInterval(() => {
log('Adding dummy job');
queue.add(
faker.commerce.product(),
{
buzzword: faker.company.bsBuzz(),
job: faker.name.jobTitle(),
name: faker.name.firstName,
},
{ attempts: config.MAX_JOB_ATTEMPTS },
);
}, config.CREATE_DELAY_MS);
}

if (config.ACTION === ACTIONS.REMOVE) {
//scheduler.on('error', () => {
// do nothing
//});
log(`Removing queue`);
//await scheduler.disconnect();
await queue.obliterate({ force: true });
process.exit();
}
};

main();
171 changes: 135 additions & 36 deletions src/bull/bull-queues.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ import { ConfigService } from '@app/config/config.service';
import { InjectLogger, LoggerService } from '@app/logger';
import { Injectable, OnModuleDestroy, OnModuleInit } from '@nestjs/common';
import { Mutex, withTimeout } from 'async-mutex';
import { Queue, QueueScheduler } from 'bullmq';
import { Queue, Queue3, QueueScheduler } from 'bullmq';
import { RedisService } from 'nestjs-redis';
import { TypedEmitter } from 'tiny-typed-emitter2';
import {
EVENT_TYPES,
QUEUE_TYPES,
REDIS_CLIENTS,
REDIS_EVENT_TYPES,
REDIS_KEYSPACE_EVENT_TYPES,
Expand Down Expand Up @@ -56,21 +57,25 @@ export class BullQueuesService implements OnModuleInit, OnModuleDestroy {
queuePrefix: string,
queueName: string,
) {
return await this._redisMutex.runExclusive(async () => {
this.logger.debug(
`processMessage(${eventType}): ${queuePrefix}::${queueName}`,
);
switch (eventType) {
case REDIS_KEYSPACE_EVENT_TYPES.HSET:
await this.addQueue(queuePrefix, queueName);
break;
case REDIS_KEYSPACE_EVENT_TYPES.DELETE:
await this.removeQueue(queuePrefix, queueName);
break;
default:
this.logger.debug(`Ignoring event '${eventType}'`);
}
});
this.logger.debug(
`processMessage(${eventType}): ${queuePrefix}::${queueName}`,
);
switch (eventType) {
case REDIS_KEYSPACE_EVENT_TYPES.HSET:
await this.addQueue(queuePrefix, queueName, QUEUE_TYPES.BULLMQ);
break;
case REDIS_KEYSPACE_EVENT_TYPES.DELETE:
await this.removeQueue(queuePrefix, queueName);
break;
/**
* @url https://github.com/OptimalBits/bull/blob/edfbd163991c212dd6548875c22f2745f897ae28/lib/commands/addJob-6.lua#L92
*/
case REDIS_KEYSPACE_EVENT_TYPES.RPUSH:
// TODO: implement check to see if we need to add something or if it can be ignored
break;
default:
this.logger.debug(`Ignoring event '${eventType}'`);
}
}

getLoadedQueues(): string[] {
Expand All @@ -91,7 +96,11 @@ export class BullQueuesService implements OnModuleInit, OnModuleDestroy {
};
}

private addQueue(queuePrefix: string, queueName: string) {
private addQueue(
queuePrefix: string,
queueName: string,
queueType: QUEUE_TYPES,
) {
return this._bullMutex.runExclusive(async () => {
const queueKey = this.generateQueueKey(queuePrefix, queueName);
this.logger.debug(`Attempting to add queue: ${queueKey}`);
Expand Down Expand Up @@ -184,19 +193,35 @@ export class BullQueuesService implements OnModuleInit, OnModuleDestroy {
(item) => item.trim(),
);

// loop through each queue prefix and add anything
// we find
/**
* @url https://redis.io/docs/manual/keyspace-notifications/
* loop through each key prefix and add anything we find
*/
for (const queuePrefix of queuePrefixes) {
// subscribe to keyspace events
await subscriber.psubscribe(
`__keyspace@0__:${queuePrefix}:*:meta`,
`__keyspace@${this.configService.config.REDIS_DATABASE}__:${queuePrefix}:*:meta`,
(err, count) => {
if (err) {
this.logger.error(err.stack);
} else {
this.logger.log(
`[${QUEUE_TYPES.BULLMQ}, ${QUEUE_TYPES.BULLMQ_PRO}] Subscribed to ${count} keyspace event(s) for '${queuePrefix}'`,
);
}
},
);

await subscriber.psubscribe(
`__keyspace@${this.configService.config.REDIS_DATABASE}__:${queuePrefix}:*:wait`,
(err, count) => {
if (err) {
this.logger.error(err.stack);
} else {
this.logger.log(
`[${QUEUE_TYPES.BULL3}] Subscribed to ${count} keyspace event(s) for '${queuePrefix}'`,
);
}
this.logger.log(
`Subscribed to ${count} keyspace event(s) for '${queuePrefix}'`,
);
},
);
}
Expand Down Expand Up @@ -240,26 +265,65 @@ export class BullQueuesService implements OnModuleInit, OnModuleDestroy {
this._initialized = false;
}

private async findAndPopulateQueues(match: string): Promise<string[]> {
/**
* The following logic is used to determine the type of bull queue encountered:
* - QUEUE_PREFIX:*:meta
* - If version key exists and is set to bullmq-pro, this is a bullmq pro queue
* - This is a regular bullmq queue
* - QUEUE_PREFIX:*:wait
* - If queue has not been detected already, this is a bull queue
*/
private async findAndPopulateQueues(
type: 'hash' | 'list',
match: string,
processFn: (
discoveredName: string,
queuePrefix: string,
queueName: string,
) => Promise<QUEUE_TYPES | void>,
): Promise<string[]> {
const client = await this.redisService.getClient(REDIS_CLIENTS.PUBLISH);
const loadedQueues = new Set([]);
const discoveredQueues = new Set([]);
let done = false;
return new Promise((resolve, reject) => {
client
.scanStream({ type: 'hash', match, count: 100 })
.on('data', (keys: string[]) => {
.scanStream({
type,
match,
count: 100,
})
.on('data', async (keys: string[]) => {
for (const key of keys) {
const queueMatch = parseBullQueue(key);
loadedQueues.add(
this.generateQueueKey(
const discoveredQueue = this.generateQueueKey(
queueMatch.queuePrefix,
queueMatch.queueName,
);
const addQueueType = await processFn(
discoveredQueue,
queueMatch.queuePrefix,
queueMatch.queueName,
);

if (addQueueType) {
this.logger.log(
`[${addQueueType}] attempting to add queue '${discoveredQueue}'`,
);
discoveredQueues.add(discoveredQueue);
await this.addQueue(
queueMatch.queuePrefix,
queueMatch.queueName,
),
);
this.addQueue(queueMatch.queuePrefix, queueMatch.queueName);
addQueueType,
);
}
}

if (done) {
resolve(Array.from(discoveredQueues));
}
})
.on('end', () => {
resolve(Array.from(loadedQueues));
done = true;
})
.on('error', (err) => {
this.logger.error(`${err.name}: ${err.message}`);
Expand Down Expand Up @@ -372,13 +436,48 @@ export class BullQueuesService implements OnModuleInit, OnModuleDestroy {
for (const queuePrefix of queuePrefixes) {
this.logger.log(`Loading queues from queuePrefix: '${queuePrefix}'`);

/*
newlyLoadedQueues = (
await Promise.all([
// this.findAndPopulateQueues(`${queuePrefix}:*:stalled-check`),
//this.findAndPopulateQueues(`${queuePrefix}:*:id`),
this.findAndPopulateQueues(`${queuePrefix}:*:meta`),
this.findAndPopulateQueues(`${queuePrefix}:*:meta`, (discoveredName, queuePrefix, queueName) => {
}),
])
).flat();
*/

newlyLoadedQueues = await this.findAndPopulateQueues(
'hash',
`${queuePrefix}:*:meta`,
async (discoveredName, queuePrefix, queueName) => {
const client = await this.redisService.getClient(
REDIS_CLIENTS.PUBLISH,
);
const version = await client.hget(
`${queuePrefix}:${queueName}:meta`,
'version',
);

if (version === 'bullmq-pro') {
return QUEUE_TYPES.BULLMQ_PRO;
}

return QUEUE_TYPES.BULLMQ;
},
).then(async (loadedQueues) => {
this.logger.debug('looking for bull3 queues');
const bull3Queues = await this.findAndPopulateQueues(
'list',
`${queuePrefix}:*:wait`,
async (discoveredName, queuePrefix, queueName) => {
if (loadedQueues.includes(discoveredName)) {
return;
}
return QUEUE_TYPES.BULL3;
},
);

return [loadedQueues, bull3Queues].flat();
});
}

/**
Expand Down
Loading