Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Same job.id being processed by multiple cluster workers. #375

Closed
kimballfrank opened this issue Jul 28, 2014 · 12 comments

Comments

Projects
None yet
4 participants
@kimballfrank
Copy link

commented Jul 28, 2014

When using cluster to process jobs, I am seeing this issue intermittently. The same job is processed simultaneously by 2 or 3 different workers.

I have attached an image showing the job log printing out multiple pids and duplicate line items as the job is processed twice.

screen shot 2014-07-25 at 8 11 42 pm

My code has the following directory structure:
/jobs
---> sendEmailToGroup.js
---> sendEmail.js
server.js

server.js

var debug = require('debug')('cluster')
, cluster = require('cluster')
, config = require('./config.js');


//--------------------------------------------------------------------------------------------
// cluster
//--------------------------------------------------------------------------------------------

if (cluster.isMaster) {
  var clusterWorkerSize = 4; // set this manually. I am running on a Quadcore Mac mini
  console.log("master pid %s", process.pid);
  for (var i = 0; i < clusterWorkerSize; i++) {
    cluster.fork();
  }
} 
else {
process._debugPort = 5858 + cluster.worker.id;
debug("worker %s pid %s debugPort %s", cluster.worker && cluster.worker.id, process.pid, process._debugPort);

// I run the server only on worker processes. It is my understanding that these worker processes share nothing, and that I should be able to create a kue server on each worker. 

// I am wrapping Kue in express for future feature additions to this system.

//--------------------------------------------------------------------------------------------
// Express
//--------------------------------------------------------------------------------------------
var express = require('express');
var app = express();

app.set('port', config.PORT);

//--------------------------------------------------------------------------------------------
// Kue
//--------------------------------------------------------------------------------------------
var kue = require('kue');
var jobs = kue.createQueue({
    prefix: config.KUE_TYPE
    , redis: {
        port: config.REDIS_PORT || 6379
      , host: config.REDIS_HOST || '127.0.0.1'
    }
});

// make sure each worker promotes jobs
// if this is my issue...I'm still confused as to how it would breaks things
  var promote_interval = 100;
  jobs.promote(promote_interval);

  jobs.process('sendEmailToGroup', 1, require('./jobs/sendEmailToGroup'));
  jobs.process('sendEmail', 20, require('./jobs/sendEmail'));


 // simple express middleware
  app.use(function(req, res, next){
  kue.app.set('title', req.host);
    next();
  });

// 
  app.use(kue.app);
  app.listen(app.get('port'));

}

sendEmailToGroup.js

var cluser = require('cluster')
, config = require('../config.js')
, dpd = require('../dpd-init.js')(config.DPD_ROOT)
, _ = require('lodash')
, moment = require('moment')
, kue = require('kue');

var jobs = kue.createQueue({
    prefix: config.KUE_TYPE
    , redis: {
        port: config.REDIS_PORT || 6379
      , host: config.REDIS_HOST || '127.0.0.1'
    }
});

require('../parse-init.js');

exports = module.exports = (function(job, done){
  var jobKey = "job_" + job.id;
  job.log('start ' + jobKey + ' pid ' + process.pid);
  var queryCompletion = 0;
  var limit = 100;
  var upperLimit = 10001;
  var completion = upperLimit + limit;
  var skip = 0;
  var lastResultCount = limit;
  var totalResultCount = 0;

  var lastSent = moment().toDate();

  if(job.data.lastSent) {
    lastSent = moment(job.data.lastSent).toDate();
  }
  var query =  new Parse.Query(User);
  query.limit(limit);
  query.equalTo('emailGroup', job.data.emailGroup); 
  query.exists("email"); 
  query.notEqualTo('doNotSendEmail', true); 
  query.descending("createdAt");
  promiseWhile(function() {
    if(lastResultCount >= limit && lastResultCount != 0 && skip < upperLimit){
      return true;
    }
    else {
      return false;
    }
  },
  function() {

    query.lessThan("createdAt", lastSent);
    var mainPromise = new Parse.Promise();
    mainPromise = query.find().then(function(results){

      lastResultCount = results.length;
      totalResultCount += lastResultCount;
      skip += limit;

      if (lastResultCount < limit && upperLimit > totalResultCount){
          completion = totalResultCount;
      }
      job.progress(queryCompletion, completion);

      var promises = [];
      _.each(results, function(user) {
        var promise = new Parse.Promise();
        promises.push(promise);
        if(user.get('email') !== undefined){
          var userId = user.id;
          var userCreatedAt = user.createdAt;
          lastSent = moment(userCreatedAt).toDate();
          var toName = user.get('kFullNameKey') || user.get('username');
          var toEmail = user.get('email');
          var emailGroup = user.get('emailGroup');
          var sendEmail = jobs.create('sendEmail', {
            title: '' + job.data.emailType + ': ' + job.data.typeObjectId,
            emailType: job.data.emailType,
            typeObjectId: job.data.typeObjectId,
            template: job.data.template,
            userId: userId,
            toName: toName,
            toEmail: toEmail,
            fromName: job.data.fromName,
            fromEmail: job.data.fromEmail,
            subject: job.data.subject,
            emailGroup: emailGroup,
            emailScheduleId: job.data.emailScheduleId,
            userCreatedAt: userCreatedAt,
            user: user
          }).attempts(2).save();
          promise.resolve();
          return promise;
        }
      });
    job.log("Retrieved " + results.length + " Users" + " / " + totalResultCount + " Total");

    return Parse.Promise.when(promises);
    }, function(error) {
      //fail the job on error
      job.log("" + error.message);
      done(error);
  });
  return mainPromise;
  }).then(function() {
      job.log('job done');
      done();
  }, function(error) {

  });
});

Any ideas (other than "don't use Parse") would be extremely helpful.

@behrad

This comment has been minimized.

Copy link
Collaborator

commented Jul 28, 2014

Actually we are depending on redis' BLPOP to atomically fetch a job and pass to a worker. Can you provide more details on your deployment? seems a local redis? redis version? kue version? ...

@behrad

This comment has been minimized.

Copy link
Collaborator

commented Jul 28, 2014

and it shouldn't be nothing related to .promote(), you should even omit calling it if you are not using delayed jobs.

@kimballfrank

This comment has been minimized.

Copy link
Author

commented Jul 28, 2014

@behrad see this: https://github.com/kimballfrank/kue-cluster-f

Thanks for the help btw. :)

@behrad

This comment has been minimized.

Copy link
Collaborator

commented Jul 28, 2014

I'll try to play with it to see what happens...

2014-07-28 22:48 GMT+04:30 kimballfrank notifications@github.com:

@behrad https://github.com/behrad see this:
https://github.com/kimballfrank/kue-cluster-f

Thanks for the help btw. :)


Reply to this email directly or view it on GitHub
#375 (comment).

--Behrad

@licq

This comment has been minimized.

Copy link

commented Jul 30, 2014

I have the same problem. I start the server by pm2.

@behrad

This comment has been minimized.

Copy link
Collaborator

commented Jul 30, 2014

I've found these in your code:

You should just define express kue app once, in your master, not worker https://github.com/kimballfrank/kue-cluster-f/blob/master/server.js#L21

And also call jobs.promote(promote_interval); just once in your master

So your worker should only have the process method.
Would you tell me the results after these changes?

@behrad

This comment has been minimized.

Copy link
Collaborator

commented Jul 30, 2014

Multiple calls to .promote with very small polls is suspicious in my mind to create inconsistent job states.

@licq

This comment has been minimized.

Copy link

commented Aug 6, 2014

Is there another way to handle this? I have 4 servers to do load balance and I don't want to change the code to only allow one server to call the promote method.

@behrad

This comment has been minimized.

Copy link
Collaborator

commented Aug 6, 2014

you should balance your workload on distributed workers and that's all.
.promote can do fine for large amount of jobs in a single master process.
However it can be defined as a server side lua script later to be behind the scenes for developers.

@behrad behrad added the Question label Aug 8, 2014

@behrad

This comment has been minimized.

Copy link
Collaborator

commented Aug 8, 2014

@kimballfrank any updates on this?

@victornikitin

This comment has been minimized.

Copy link

commented Nov 1, 2018

Same problem. Several cluster workers get the same message.

Code (simplified):

if (cluster.isMaster) {
  for (let i = 0; i < 10; i++) {
    cluster.fork();
  }
} else {
  const queue = kue.createQueue();
  
  queue.process("q", 1, (job, done) => {
       // .. code here
      done()
    });
}

Is threre any solution?

@victornikitin

This comment has been minimized.

Copy link

commented Nov 16, 2018

@kimballfrank have you found the solution?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.