Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Need a synchronization method for batches of tasks #3553

Open
biddisco opened this issue Nov 16, 2018 · 17 comments
Open

Need a synchronization method for batches of tasks #3553

biddisco opened this issue Nov 16, 2018 · 17 comments

Comments

@biddisco
Copy link
Contributor

It frequently occurs that we have loops of the kind

for (int i=0; i<large_number; ++i) {
    hpx::async(task, stuff);
}
wait_here_for_tasks_to_complete();
// access some shared structure that is being modified by the tasks

Examples of this can be seen in tests like the future overhead performance test, the network storage tests and various other places in the code.
https://github.com/STEllAR-GROUP/hpx/blob/6df436bf5b260716aad5663c943c2f7f5d76857e/tests/performance/local/future_overhead.cpp

In some ways this is related to fork_join parallelism since we have to do N lots of work and block on the result. In the future_overhead test we have a wait_each, wait_all and a thread counter implementation. The wait_all version is most well suited to the hpx way of doing things, but is expensive since we must store futures and wait on them. the thread counter implementation is intrusive since it queries the pool at start and at end to ensure no other tasks are running. Another version in my own local tree uses a sliding semaphore, but this requires the user to decorate their task with a decrement of the counter. In other tests, we have used a simepl atomic counter and this also requires modification of the user's task to update the value.

Ideally, we should be able to add tasks (to a thread pool for example) and supply a latch so that we can be signalled when the last one has completed.

const in num_tasks = 1000000;
hpx::lco::latch my_latch(num_tasks);
for  (int i=0; i<num_tasks; ++i) {
    hpx::async(my_latch, task, args);
}
my_latch.wait();

An alternative that uses the existing executor style API

const in num_tasks = 1000000;
hpx::lco::latch my_latch(num_tasks);
for  (int i=0; i<num_tasks; ++i) {
    hpx::async(an_executor.with(my_latch);, task, args);
}
my_latch.wait();

When tasks complete, the existing internal plumbing would decrement the latch.

I believe something of this kind would be a very useful mechanism to have in our code. I welcome comments on what the best API might be. It could be used on some of the parallel::algorithms to remove some wait_all uses.

@biddisco
Copy link
Contributor Author

This is also connected to the idea of N-ary tasks. In that case, promoting the callable object (as suggested in #3348) to be created earlier and storing things like counters in a task structure. I didn't want to write this in the issue descriptions, but just tag it here as a random thought.

@biddisco
Copy link
Contributor Author

Note that in the examples I pasted in above, it would actually be nice to have this functionality with hpx::apply since we don't need to return the futures and wait on them.

@msimberg
Copy link
Contributor

I'm not against this idea at all, but I think for example the future_overhead test is somewhat artificial because there we're really just interested in testing task spawning. In practice one wants to avoid spawning that many tasks and would instead use for_loop or for_each:

for_loop(par, 0, num_tasks, [](const int /*don't care about the index*/) { task(args); });

This essentially does what you want but with chunking. It removes the syntactic overhead of (e.g.) having to use a latch but adds some with the lambda (is there a cleaner way to write that if task doesn't take an index as it's first argument?). Is there an argument that for_loop is not the semantically correct thing to use?

@biddisco
Copy link
Contributor Author

Internally for_loop is creating tasks and then waiting on the futures. I could use the for_loop in my example (and I will for the time being), but I wanted to address the general case where you create a large number of tasks and want to wait on them without storing a vector of futures and using wait_all or something similar.
the idea of the latch is simply to simplify the user experience and provide a hook that links the scheduler back to the user's code.

A for loop becomes much harder when we wish to mix some futures and throw away tasks as follows (which is my main intention in the long run).

for (int i=0; i<large_number; ++i) {
    if (i%10==0) {
        my_futures.emplace_back(std::move(hpx::async(latch, blah)));
    }
    else {
        hpx::async(latch, task, stuff);
    }
    // also might spawn tasks without a latch in same loop as we don't need them to be counted
}
latch.wait_here_for_tasks_to_complete();
for (auto && f : my_futures) {
    // do things
}

Using a latch here for some tasks, and no latch for others, saving some futures and not saving others. This pattern keeps cropping up.

@hkaiser
Copy link
Member

hkaiser commented Nov 16, 2018

@biddisco FWIW, we do have a latch (see here: https://github.com/STEllAR-GROUP/hpx/blob/master/hpx/lcos/local/latch.hpp). Also, if you don't need the future returned by hpx::async, just use hpx::apply (i.e. fire and forget) - that saves you creating the future in the first place.

@biddisco
Copy link
Contributor Author

Yes. I am aware of the existence of the latch - this is what I want to use. My question is about how we could nicely fire off tasks and use a latch to keep track of them, without the user having to do anything to their actual tasks.

@hkaiser
Copy link
Member

hkaiser commented Nov 16, 2018

My question is about how we could nicely fire off tasks and use a latch to keep track of them, without the user having to do anything to their actual tasks.

I surmise that hpx::wait_all is not what you want? Would a task_block work for you?

@biddisco
Copy link
Contributor Author

As I wrote above. I would like to mix tasks that are 'important' and should be counted in the latch, with tasks that are not important and can just run as they want, but synchronize cheaply on the completion.
A task_block would not allow me to mix between the two.

@msimberg
Copy link
Contributor

I think your key point is this:

synchronize cheaply on the completion

no? Is the code you posted above with the latch your ideal way of writing that or would go further if you had anything available? Also, I'm really curious about the actual code where you need this pattern. I think this is an interesting problem, but it's not clear yet how general this pattern might be. You can always write your own for_loop like thing that uses a latch or semaphore internally to keep track of the tasks instead of having the user do that. It's also a question of how much performance you actually lose by using the parallel for_loop or even just wait_all/when_all.

@biddisco
Copy link
Contributor Author

It seems I didn't really explain the problem very well - suppose I do the following

auto f = hpx::parallel::for_loop(
         hpx::parallel::execution::par_task, 0, N,
          [&](int i) {
            hpx::async(task, stuff);
        });
}
f.get();

This only waits on the loop, it does not wait on the tasks generated inside the loop. The loop will be chunked and there will be C chunks and C futures waited on, but the N async tasks will not complete until later. What I want to do be using a construct of this kind

const in N = 1000000;
hpx::lco::latch my_latch(N);
for (int i=0; i<N; ++i) {
    hpx::async(my_latch, task, args); 
}
my_latch.wait();

is wait on the N tasks so that I know when they have all completed without having to have a vector<future> of size N that I then wait on. The specific reason I want it is to implement one of the loops in DCA++, but I have had to use vectors of futures in numerous tests previously.

Is the code you posted above with the latch your ideal way of writing that or would go further if you had anything available?

Well it seems like a very simple way of doing it and could be implemented using a custom applier that is instantiated by an executor that is 'aware' of it. The other obvious ways involve modifying the task itself to signal, but that is intrusive. It is also related to the use of a sliding semaphore in other tests to limit the number of tasks, in those, we have to modify the tasks to signal, but with this approach, it would be possible to do it via the executor/scheduler and leave the tasks untouched - I believe it would be a cleaner way of handling it.

@msimberg
Copy link
Contributor

With the risk of getting sidetracked, why would you call async inside the parallel for loop (if all you want to do is call task(stuff) for each index)?

@biddisco
Copy link
Contributor Author

If you were copying 100 arrays of small data to the GPU for example, you would asynchronously call copystuff in a loop - but you want to know when they complete. Saving 100 futures is ok, but if you could do it without saving any futures and having an executor that signals you when they are complete, then your code is simpler and there are less overheads.

@sithhell
Copy link
Member

the most consistent and, IMHO, leanest way to accomplish this is via an executor. On the executor side of things, you can block until all tasks are completed, the implementation can then be completely hidden inside that executor

@hkaiser
Copy link
Member

hkaiser commented Nov 19, 2018

I agree. This new executor could even be a wrapper around any other (already existing) executor, just adding the ability to wait for all executed tasks. Also, some of our executors already block on destruction. That could be a good starting point.

@biddisco
Copy link
Contributor Author

biddisco commented Nov 19, 2018

I am imagining that we have an executor that creates an applier - with an additional function object that is called when the task completes - instead of (or as well as) setting a future, it can decrement a counter/latch/semaphore or suchlike. This could be used for apply or for async - (I didn't look at the code for apply recently, so I'm not sure if it uses an applier, or something else).

We would need to add a template param to applier, and have the usual executors pass a default param to these that would be the function that sets the future. The latching_executor or whatever, would decorate this function with it's extra features, or replace the future set with other operations.

@stale
Copy link

stale bot commented Jul 4, 2019

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

@biddisco
Copy link
Contributor Author

Issues #3348 and #3553 should not be closed as they are under consideration at the time of writing.

@stale stale bot removed the tag: wontfix label Jul 10, 2019
@hkaiser hkaiser added the tag: pinned Never close as stale label Jul 11, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants