New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Priority doesn't work ? #733
Comments
I know it is quite lame but we only have 1 test for priorities: https://github.com/OptimalBits/bull/blob/master/test/test_queue.js#L436 |
Yeah sure, tomorrow i'm gonna work on that ! |
Mmm it's a pretty good challenge to write test for this kind of stuff.. Since it's not deterministic, i think a "probabilistic" approche is needed. I didn't search much, but could you help me find a way to test it ? Right now i've done this : it('should processes jobs by priority (undeterministic)', function(done){
this.timeout(12000);
var jobsPrioirtyInCompletedOrder = []
queue.process(function(job, jobDone){
expect(job.id).to.be.ok;
// setTimeout(function() {
jobsPrioirtyInCompletedOrder.push(job.data.p)
jobDone();
// }, 10)
})
// Test when jobs are all completed
var intervalId = setInterval(function() { // not sure about this implementation..
if(jobsPrioirtyInCompletedOrder.length === numJobsPerPriority * 2) {
clearInterval(intervalId);
// TODO <----------
}
}, 10)
// Make sure we remove the interval if there is a problem
queue.on("failed", function() {
clearInterval(intervalId);
assert.isOk(false, "a job failed")
})
// Add jobs to the queue ([p1, p2, p1, p2, ...])
var numJobsPerPriority = 500;
for(var i=0; i<numJobsPerPriority; i++) {
queue.add({p: 1}, {priority:1})
queue.add({p: 2}, {priority:2})
}
}) So i have an array where each values is the priority of the job, the order can be use to determine if "most" priority 1 was done before priority 2.. I'm just not sure how to do this :/. What do you think ? Maybe a simple correlation could work but i'm not sure.. I'm gonna try and give you feed back. And right now it doesn't test on more than one worker which might not produce the same result.. |
Why does it need to be undeterministic? It is important the the unit test is deterministic in fact :). Cant you just use the existing test for priorities and modify it so that it reproduces your issue? |
I feel like the existing test for priorities doesn't match the reality. You wait for all jobs to be added before starting the "work". Which in reality jobs are added on the fly. In my exemple it's not deterministic since |
then you can set the process function first and add the jobs evenly but the time to complete according to an array with delays in the order of magnitude 100ms, since it takes normally less than 10ms to process a single job without delay |
You can go and see how i did it, it's pretty simple. So i'm gonna try to use the same technique on my code and i will see if there is a difference. |
yeah. good luck, in any case these are good tests to complete the priority queue :) |
So... I've done some quick tests in my code and this the correlation that I've got (25 jobs per priority, only 2 priorities): [1, 1, ..., 2, 2] -> -0.36 I just dont understand.. But at least it's consistant. |
I feel like my problem looks a bit like #228 but the implementation changed a lot so idk.. I think it's more related to my env.. I'm not able to reproduce the bug outside of my server. If i try to test it in a new project it works as expected.. |
That was a little premature, i took sometime to evaluate kue more in depth.. and it doesn't fit well the current architecture, so for now we decided to stay on bull and dont use priorities.. Maybe in my free time i'm going to debug that, but for now i need to go foward.. Sorry |
yeah bee was heavily based on an early bull version, so it should be easy to go from bee to bull, but not the opposite since the feature set has increased considerably. |
Yeah like i said, i didn't find the source of the bug, i need to take a lot more time with this, maybe this weekend ! |
Alright I took my Friday night to look at this. I think i found where the problem is.. For some reason there is a problem with the addJob-6.lua This is my debugging code : ...
-- Priority add
redis.call("ZADD", KEYS[6], priority, jobId)
local count = redis.call("ZCOUNT", KEYS[6], 0, priority)
redis.call("RPUSH", "addJob:count", count) -- DEBUG
local len = redis.call("LLEN", target)
redis.call("RPUSH", "addJob:len", len) -- DEBUG
local id = redis.call("LINDEX", target, len - (count-1))
redis.call("RPUSH", "addJob:id", id) -- DEBUG
if id then
local l = redis.call("LINSERT", target, "BEFORE", id, jobId)
redis.call("RPUSH", "addJob:lenAfter", l) -- DEBUG
else
redis.call("RPUSH", target, jobId)
end
... When i add 12 jobs ([p1, p1, ..., p2, p2]) i'm having this output : The left one is what is expected, the right one is when i run my app.. If we look closely at |
thanks for you effort, I am analysing this data to see if I can understand what is going on. |
I assume the example data above has been generated with a client adding jobs but also there is a processor consuming jobs right? that could explain the zeroes in the addJob:len list. It would mean that the priority zset and the queue list has got out of sync. |
I have discovered that there is a slight hazard where the zset with the priorities and the queue list gets out of sync. Of course I am can not know if this causes your issue, seems like this small hazard should not be so destructive. It happens because when we use BRPOPLPUSH, which atomically moves a job from the queue to its active state, it needs to make a separate call to remove the item from the priority set. If a all to addJob gets interleaved between the BRPOPLPUSH and the ZREM, then the item will not be placed in its perfect position in the queue. However, this should not happen very often and when it happens it should not completely mess the priorities, just make a small "mistake". |
It's hard to tell if this is the issue.. (sry for the close and open i was on my phone and accidently touched the "comment and close") |
well since what I discovered is in fact a hazard that need to be fixed I will make a release and after that you can test again, if it still fails I can continue investigating. |
I just realize, this is a hazard, so it should happen at random moments no ? In my case, the pattern is always the same, there is no random.. I'm thinking of removing the processing from my micro-service (app) and make a dedicated nodejs server for the processing which could be on its own vm. Maybe the problem is coming from there... I'm interested in the difference between clustering the queue vs having more workers (process(nbOfWorker,...) ? |
instead of using cluster use the sandbox feature, it is easier to use with similar results. You should use the CPU as much as possible before scaling horizontally to more workers. |
This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions. |
Hi !
I'm having a hard time to understand how priorities work with the queue. I'm making a multimedia processing server, basically compression and resize stuff. So I have 3 queue : image, audio and video, but i'm focusing my test only on the image one for now.
The image queue has 2 types of job (for now), ResizeImage (priority: 1, high priority) and CompressImage (priority: 10, process "after" ResizeImage). I'm doing batch jobs so for each image those 2 jobs are added to the queue. I changed the order of how jobs are added to the queue and now it's by priority so for this case, every ResizeImage jobs are added before all CompressImage jobs.
So what i'm expecting to happen is that most of ResizeImage jobs are done first and after CompressImage jobs should be starting. But it just doesn't work like that at all :
This is base on the queue.on("completed", ..) event and with 4 workers. So i don't know how reliable that log is.
I don't think it matters, but that is a small sample size for fast testing, we will get prob 1000+ images by batches.. Also my nodejs server is on a VM with vagrant/virtualbox on ubuntu 16.04.
The text was updated successfully, but these errors were encountered: