Skip to content

Commit

Permalink
Add support for Multiworker
Browse files Browse the repository at this point in the history
  • Loading branch information
glensc committed Apr 28, 2020
1 parent 3fe92a5 commit 9190d2e
Showing 1 changed file with 73 additions and 31 deletions.
104 changes: 73 additions & 31 deletions examples/WorkerJobStatusPlugin.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
import { Worker } from "../src";
/* In your projects:
import { Worker } from "node-resque";
*/
import { Connection, MultiWorker, Worker } from "node-resque";
import { Job, JobEmit } from "node-resque/src/types/job";
import { ConnectionOptions } from "node-resque/src/types/options";

/**
* Add Job status update plugin (php-resque way).
*
* Emits status messages that php-resque is able to monitor:
* - https://github.com/resque/php-resque#tracking-job-statuses
*
* // Create worker object
* const worker = new Worker(...);
*
* // Attach WorkerJobStatusPlugin events to worker
* new WorkerJobStatusPlugin(worker);
* new WorkerJobStatusPlugin(worker, connectionDetails);
*
* @author Elan Ruusamäe <glen@pld-linux.org>
*/
Expand All @@ -21,52 +26,89 @@ export class WorkerJobStatusPlugin {
public STATUS_FAILED = 3;
public STATUS_COMPLETE = 4;

constructor(private readonly worker: Worker) {
worker.on("success", async (...args) => {
await this.onSuccess(...args);
private readonly connection: Connection;

constructor(worker: Worker | MultiWorker, connection: ConnectionOptions) {
if (worker instanceof Worker) {
this.subscribeWorker(worker);
} else if (worker instanceof MultiWorker) {
this.subscribeMultiWorker(worker);
} else {
throw new Error("Unsupported worker");
}

this.connection = new Connection(connection);
this.connection.on("error", (error) => {
console.log(error);
});
worker.on("failure", async (...args) => {
await this.onFailure(...args);
}

private subscribeWorker(worker: Worker) {
worker.on("success", async (queue: string, job: any, result: any) => {
await this.onSuccess(job, result);
});
worker.on("failure", async (queue: string, job: JobEmit, failure: any) => {
await this.onFailure(job, failure);
});
worker.on("job", async (...args) => {
await this.onJob(...args);
worker.on("job", async (queue: string, job: Job<any> | JobEmit) => {
await this.onJob(job);
});
}

private subscribeMultiWorker(worker: MultiWorker) {
worker.on(
"success",
async (workerId: number, queue: string, job: any, result: any) => {
await this.onSuccess(job, result);
}
);
worker.on(
"failure",
async (
workerId: number,
queue: string,
job: Job<any> | JobEmit,
failure: any
) => {
await this.onFailure(job, failure);
}
);
worker.on(
"job",
async (workerId: number, queue: string, job: Job<any> | JobEmit) => {
await this.onJob(job);
}
);
}

/**
* Called when job is created
*/
private async onJob(queue: string, job: any) {
private async onJob(job: any) {
await this.update(job, this.STATUS_RUNNING);
}

private async onFailure(
queue: string,
job: any,
failure: any,
duration: number
) {
await this.update(job, this.STATUS_FAILED);
private async onFailure(job: any, failure: Error) {
await this.update(job, this.STATUS_FAILED, failure);
}

private async onSuccess(
queue: string,
job: any,
result: any,
duration: number
) {
private async onSuccess(job: any, result: any) {
await this.update(job, this.STATUS_COMPLETE, result);
}

private async update(job: any, status: number, result: any = null) {
const packet = this.statusPacket(status, result);
const statusKey = this.statusKey(job);
const statusKey = await this.statusKey(job);

await this.redis().set(statusKey, JSON.stringify(packet));
await this.redisSet(statusKey, JSON.stringify(packet));
}

private redis() {
return this.worker.connection.redis;
private async redisSet(key: string, value: string) {
if (!this.connection.connected) {
await this.connection.connect();
}

await this.connection.redis.set(key, value);
}

/**
Expand All @@ -75,7 +117,7 @@ export class WorkerJobStatusPlugin {
private statusKey(job: any): string {
const key = `job:${job.prefix}${job.id}:status`;

return this.worker.connection.key(key);
return this.connection.key(key);
}

private statusPacket(status: number, result: any) {
Expand Down

0 comments on commit 9190d2e

Please sign in to comment.