Description
Version
v5.7.4
Platform
NodeJS
What happened?
Hi,
I have a typescript application running through BUN that should perform some long jobs submitted by the user with BullMQ for job scheduling.
When I try to use the sandboxed processor, the job never reaches the completed state, although the processor code has finished and returned a value or thrown an exception.
The problem disappears if I remove any "await" statement from my processor code or the "async" from the function.
Furthermore, the producer code (file queue.ts in my example below) does not exit if I do not add "await queue.disconnect();" at the end.
I added all event handlers and didn't notice any errors, but the completed handler is never called, and the process remains stuck (no other jobs can be processed) until I close it manually with CTRL + C
How to reproduce.
In my code, I have a queue.ts
file to submit test jobs:
import { Queue } from "bullmq";
const queue = new Queue("jobs", {
connection: {
host: (process.env.REDIS_HOST as string) ?? "localhost",
port: parseInt((process.env.REDIS_PORT as string) ?? "6379"),
username: process.env.REDIS_USER as string,
password: process.env.REDIS_PASS as string,
},
defaultJobOptions: {
attempts: 3,
removeOnComplete: process.env.NODE_ENV !== "development",
removeOnFail: process.env.NODE_ENV !== "development",
}
});
export default queue;
await queue.add("job1", { name: "Job 1" });
await queue.add("job2", { name: "Job 2" });
await queue.add("job3", { name: "Job 3" });
await queue.disconnect(); // I had to manually disconnect otherwise this code will not exit
a worker.ts
file to run the worker
import { Job, Worker } from "bullmq";
import path from "path";
const processorPath = path.join(__dirname, "processor.ts");
const worker = new Worker("job", processorPath, {
connection: {
host: (process.env.REDIS_HOST as string) ?? "localhost",
port: parseInt((process.env.REDIS_PORT as string) ?? "6379"),
username: process.env.REDIS_USER as string,
password: process.env.REDIS_PASS as string,
},
autorun: true,
skipStalledCheck: true,
concurrency: 5,
});
worker.on("error", (error) => {
console.error(error);
});
worker.on("active", (job, prev) => {
console.log(`Job ${job.id} active from ${prev}`);
});
worker.on("completed", (job: Job, returnValue: any) => {
console.log(`Job ${job.id} completed with return value: ${returnValue}`);
});
worker.on("failed", (job: Job | undefined, error: Error) => {
console.error(`Job ${job?.id} failed with error: ${error}`);
});
worker.on("progress", (job: Job, progress: number | object) => {
console.log(`Job ${job.id} is ${progress}% done`);
});
const gracefulShutdown = async (signal: string) => {
console.log(`Received ${signal}, closing worker...`);
await worker.close();
process.exit(0);
};
process.on("SIGINT", () => gracefulShutdown("SIGINT"));
process.on("SIGTERM", () => gracefulShutdown("SIGTERM"));
and a processor.ts
file
import { SandboxedJob } from "bullmq";
export default async function processor(job: SandboxedJob) {
job.log("Start processing job");
await job.updateProgress(100);
console.log("Doing something...", job.id);
// sleep for 5 seconds
await new Promise((resolve) => setTimeout(resolve, 5000));
return "Job completed!";
}
I run the script using bun queue.ts
and bun worker.ts
in two shells.
Relevant log output
> bun worker.ts
Job 178 active from waiting
Job 179 active from waiting
Job 180 active from waiting
Job 179 is 100% done
Job 180 is 100% done
Job 178 is 100% done
<-------------- The completed handler never executes, and the code does not exit until I close it manually with CTRL + C
Code of Conduct
- I agree to follow this project's Code of Conduct