Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Question: Processing jobs in microservice setup #1481

Closed
ganesh-sg opened this issue Sep 22, 2019 · 5 comments
Closed

Question: Processing jobs in microservice setup #1481

ganesh-sg opened this issue Sep 22, 2019 · 5 comments
Labels

Comments

@ganesh-sg
Copy link

ganesh-sg commented Sep 22, 2019

Description

I would like to process scheduled jobs using node.js bull. Basically I have two processors that handle 2 types of jobs. There is one configurator that configures the jobs which will be added to the bull queue using cron.

The scheduler will be in one microservice and the each of the processor will be a separate microservice. So I will be having 3 micro services.

My question is am I using the correct pattern with bull?

Minimal, Working Test code to reproduce the issue.

index.js

const Queue = require('bull');

const fetchQueue = new Queue('MyScheduler');
fetchQueue.add("fetcher", {name: "earthQuakeAlert"}, {repeat: {cron: '1-59/2 * * * *'}, removeOnComplete: true});
fetchQueue.add("fetcher", {name: "weatherAlert"}, {repeat: {cron: '3-59/3 * * * *'}, removeOnComplete: true});

processor-configurator.js

const Queue=require('bull');

const scheduler = new Queue("MyScheduler");
scheduler.process("processor", __dirname + "/alert-processor");

fetcher-configurator.js

const Queue=require('bull');

const scheduler = new Queue("MyScheduler");
scheduler.process("fetcher", __dirname+"/fetcher");

fetcher.js

const Queue = require('bull');
const moment = require('moment');

module.exports = function (job) {
    const scheduler = new Queue('MyScheduler');
    console.log("Insider processor ", job.data, moment().format("YYYY-MM-DD hh:mm:ss"));
    scheduler.add('processor', {'name': 'Email needs to be sent'}, {removeOnComplete: true});
    return Promise.resolve()
};

alert-processor.js

const Queue = require('bull');
const moment = require('moment');

module.exports = async (job) => {
    console.log("Alert Processor", job.data, moment().format("YYYY-MM-DD hh:mm:ss"));
    return Promise.resolve()
};

There will be three microservices -

  1. node index.js
  2. node fetcher-configurator.js
  3. node processor-configurator.js

With this setup, sometimes I am getting the error Missing process handler for job type

(An easy to reproduce test case will dramatically decrease the resolution time.)

Bull version

3.10.0

Additional information

Note that I have posted this question in StackOverflow. But I don't see a lot of questions on bull.js over there. So I am reposting it here.

@manast
Copy link
Member

manast commented Sep 22, 2019

Looks good, however you cannot use named processors like that, you will need to define the 2 processors in every processor instance, so I think you should better use 2 queues, one for the alarms and one for the fetcher.

@ganesh-sg
Copy link
Author

ganesh-sg commented Sep 22, 2019

Thanks @manast for your response.

you will need to define the 2 processors in every processor instance

I am not clear with the above.
Also, any reason for getting the error Missing process handler for job type

@stansv
Copy link
Contributor

stansv commented Sep 23, 2019

Hi! This is because both workers use the same queue. Worker tries to get next job from queue, receives a job with wrong type (eg "fetcher" instead of "processor") and fails because it knows how to handle "processor" and doesn't know what to do with "fetcher". Bull doesn't allow you to take only compatible jobs from queue, both workers should be able to process all types of jobs. The simplest solution would be to use two different queues, one for processors and one for fetchers. Then you can remove names from jobs and processors, it won't be needed anymore since name is defined by the queue.

@ganesh-sg
Copy link
Author

Thanks @stansv. Understood the queue mechanism now.

@HugoPoi
Copy link

HugoPoi commented May 20, 2021

Hi! This is because both workers use the same queue. Worker tries to get next job from queue, receives a job with wrong type (eg "fetcher" instead of "processor") and fails because it knows how to handle "processor" and doesn't know what to do with "fetcher". Bull doesn't allow you to take only compatible jobs from queue, both workers should be able to process all types of jobs. The simplest solution would be to use two different queues, one for processors and one for fetchers. Then you can remove names from jobs and processors, it won't be needed anymore since name is defined by the queue.

This should be in upper case with some blinking in the documentation.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants