Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AutoscaledPool cannot grow anymore #39

Closed
gsouf opened this issue Mar 16, 2018 · 4 comments
Closed

AutoscaledPool cannot grow anymore #39

gsouf opened this issue Mar 16, 2018 · 4 comments

Comments

@gsouf
Copy link

gsouf commented Mar 16, 2018

Hi,

I made some experiments with the AutoscaledPool and found that it has issues when the worker function depends on an external source.

There are two mains issues:

  • First issue: when the number of worker in the pool decreases because a worker function returned null, it is unable to grow again.

The following example illustrates it: you initially have 2 worker running, while workers are running you add more workers, but one of them will terminate before data are available, so all the data that remain are run by only 1 worker when we would ecpect it to grow again.

            // words to be processed by the worker
            let words = ['foo', 'bar'];
            // words to be added later for processing
            let moreWords = ['baz', 'qux', 'quux', 'corge', 'grault'];

            // add a new word at a given internval
            let addWords;
            addWords = setInterval(() => {
                words.push(moreWords.shift());
                if (moreWords.length === 0) {
                    clearInterval(addWords)
                }
            }, 900);

            // print words
            const pool = new Apify.AutoscaledPool({
                maxConcurrency: 6,
                minConcurrency: 4,
                workerFunction: () => {

                    let word = words.shift();

                    if (word) {
                        console.log('init ' + word)
                        return new Promise(resolve => {
                            console.log('start ' + word)
                            setTimeout(() => {
                                console.log(word);
                                resolve();
                            }, 1000)
                        })
                    } else {
                        return null;
                    }

                },
            });

            await pool.run();

To address this issue, the simplerst solution I can imagine would be to have the ability to notice the pool that new data are available for worker function and so that the pool tries to grow again. That could be achieved by a simple method call example: pool.tryAddWorker() or by having a callback function that would be looped and tested when workers are waiting, when it returns true a new worker is added (unless maxConcurency is reached) like that:

            new Apify.AutoscaledPool({
                maxConcurrency: 6,
                minConcurrency: 4,
                workerFunction: () => { /**/ }
                hasData: () => { return words.length > 0; }

  • Second issue: if the pool completed but the external data source still has data to process then the pool will stop and remaining data will never be processed.

To reproduce this issue, use the same code as above, but make the time in setInterval greater than the timeout in the promise.

To adresse this issue I could imagine to have the ability to flag the pool as "non ready to stop", in which case it would stay alive and will wait for an opportunity to grow until it's not flagged anymore as "non ready to stop".

@mtrunkat
Copy link
Member

Thank you for reporting. We will investigate the issue and fix it shortly.

@mtrunkat
Copy link
Member

mtrunkat commented Mar 19, 2018

Ad second issue: This is because Autoscaled pool is configured to finish when there is no task in progress and workerFunction returns null. We will add configuration parameter to leave pool running in this case and new method to finish it manually.

Ad first issue: I did some tests and it seems to be working correctly. It looks for spare capacity and tries to run new worker functions everytime some of the function gets finished. And also there is interval every 1s that looks for spare capacity and possible starts new worker functions.

In your example you are adding new task every 900ms and tasks takes 1000s so there should be about 1 task running the most of the time.

@gsouf
Copy link
Author

gsouf commented Mar 19, 2018

Thanks for the details.

For the first issue I now understand the reason. Would be worth to be mentioned in the docs!

@mtrunkat
Copy link
Member

mtrunkat commented Mar 19, 2018 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants