Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

async.queue max length? #1022

Closed
jmt0806 opened this issue Feb 9, 2016 · 9 comments
Closed

async.queue max length? #1022

jmt0806 opened this issue Feb 9, 2016 · 9 comments

Comments

@jmt0806
Copy link

jmt0806 commented Feb 9, 2016

Is there any way to prevent/enforce a max async.queue length? I realize it's possible to set the concurrency level, but want to prevent more than a certain number of elements from being added to the queue.

Tried using an if statement with queue.length(), but more elements are added than wanted, due to race conditions.

if(queue.length() < maxLength){
      // add an item
}

More info: queue.length() is checked after every task is finished, in order to see if more work can be added. If two or more tasks finish at the same time check queue.length() and add work, queue.length() can go beyond maxLength.

@aearly aearly added the bug label Feb 9, 2016
@aearly
Copy link
Collaborator

aearly commented Feb 9, 2016

There are no limits on the queue length -- only the practical limits of a JS array.

I think you've uncovered a inconsistency in how queue.length() is reported though. 🐛

@jmt0806
Copy link
Author

jmt0806 commented Feb 10, 2016

Is there any way to "lock" a separate variable in NodeJS to prevent this situation?

@aearly
Copy link
Collaborator

aearly commented Feb 10, 2016

You shouldn't need to "lock" a variable in JS since it is single-threaded -- there's only a single thread writing and accessing variables. You might have a tricky order-of-execution problem. Try placing a breakpoint on that line.

@jmt0806
Copy link
Author

jmt0806 commented Feb 10, 2016

True, but if I add 5 tasks to async.queue and they all finish at the same time and try to increment/decrement a var on callback, isn't that a race condition?

@aearly
Copy link
Collaborator

aearly commented Feb 10, 2016

It depends. There is nothing that should cause a race condition here -- execution can't be preempted:

// queue is length 9, max is 10
if(queue.length() < maxLength){
    // length is still 9
    queue.push(item) // add an item
    // length is now 10
}

Out of curiosity, what version of async are you using?

@aearly
Copy link
Collaborator

aearly commented Mar 21, 2016

A more concrete test case would be useful here.

@aearly aearly closed this as completed Mar 21, 2016
@umeshanthem
Copy link

umeshanthem commented Dec 15, 2017

Code :

/////////////////////////////////////////////////////////////////////////////////////////////////////////////////
var Q = async.queue(function(line, callback) {
       // I have an async call here. Eg- CallDbAndGetData(line);
	callback();
}, 10);

var s = fs.createReadStream('C:/idcards/dev/scripts/XXXXXXXXXX.txt').pipe(es.split()).pipe(es.mapSync(function(line) {
   Q.push(line, function(err) {
       // console.log('finished pushing '+ line);
    });
}));

Q.drain = function() {
	console.log("All CallDbAndGetData calls were done");
}

/////////////////////////////////////////////////////////////////////////////////////////////////////////////////

Problem statement:
-Consider my database has tens of millions of data rows and when i call the CallDbAndGetData function, it takes 2-3 seconds to get the response.
-My Q.push is just an I/O operation which will read a line in the file and adds it to 'Q', which will happen around millisecond's time.
-Problem : InMemory size of Q will increase too big as the poping from q is taking around 2seconds.
Can we restrict the size of Q, like defining the maxSize of Q?

@aearly
Copy link
Collaborator

aearly commented Dec 15, 2017

A TransformStream or through stream is probably a better fit for what your'e doing. It won't buffer the file's entire contents.

@umeshanthem
Copy link

umeshanthem commented Dec 19, 2017

/////////////////////////////////////////////////////////////////////////////////////////////////////////////////
var Q = async.queue(function(line, callback) {
       // I have an async call here. Eg- CallDbAndGetData(line);
       if(q._tasks.length < 100)
	    s.resume();
	callback();
}, 10);

var s = fs.createReadStream('C:/idcards/dev/scripts/XXXXXXXXXX.txt').pipe(es.split()).pipe(es.mapSync(function(line) {
   Q.push(line, function(err) {
       // console.log('finished pushing '+ line);
       if(q._tasks.length >= 100) // restricting q length
	      s.pause();
    });
}));

Q.drain = function() {
	console.log("All CallDbAndGetData calls were done");
}

/////////////////////////////////////////////////////////////////////////////////////////////////////////////////

This is working fine for my case. Where q's in-memory size will not be huge and make the system slow.
P.S : look for this node module require('sizeof'); to get the memory size of an object in nodejs.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants