Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configure pmap behavior via keyword args #15975

Merged
merged 2 commits into from
May 19, 2016
Merged

Configure pmap behavior via keyword args #15975

merged 2 commits into from
May 19, 2016

Conversation

amitmurthy
Copy link
Contributor

This PR enables customization of the new pmap implementation via keyword args.

distributed=true

By default pmap is a parallel map using all available workers. With distributed=false, the parallelization is limited to using tasks in the current process and can be used in place of @sync / @async blocks. For example, we may want to retrieve data from other processes in parallel using a construction like

results=Array(Any, nworkers())
@sync begin
    for (idx,p) in enumerate(workers())
        @async results[idx] = remotecall_fetch(foo, p, args...)
    end
end

which can now be written as

results=pmap(p->remotecall_fetch(foo, p, args...), workers(); distributed=false) 

batch_size=1

By default, pmap will not perform any batching. You may specify an explicit integer value for the batch_size, or specify batch_size=:auto wherein pmap will create batches depending on the number of workers available.

on_error = e->rethrow(e)

By default, pmap will stop on any error. Users may override this behavior by supplying an on_error function. For example, on_error = e -> e will simply return the exception inline or on_error = e -> SOME_ERROR_CODE will return an error code value inline in the results.

@amitmurthy
Copy link
Contributor Author

It is a WIP, I have just outlined the approach. Please don't review the code in detail yet. Have put it out to start a discussion on the feature/interface.

@bjarthur
Copy link
Contributor

the flexibility to specify whether it's distributed or not and the number of tasks on each worker would be great! two questions / suggestions:

  1. will there still be a way to return an iterator? i would suggest that pmap always return an iterator, and to instruct the user to collect it if needed. there is a precedence for this API in Range, keys, values, etc.
  2. i know that addprocs can be made asynchronous by preceding it with @schedule. but it doesn't seem to add workers to the pool as soon as each one becomes available. rather it waits until they all are available. am i wrong about this? is it possible??

my use case is a shared cluster with a high load. if one addprocs_sge(N), where N exceeds what is currently free, the wait time can be very long. it would be nice if (a) the pool was dynamically expanded as the cluster became free, (b) a currently running pmap could utilize the newly available workers, and (c) each result could be processed as soon as it was ready.

so in code, something like this:

@schedule addprocs_sge(1<<32)  # no cluster is this big, but give me what you've got now,
                               # and add to my pool as other people's job finish

for result in pmap(f,...  # start the pmap right away,
                          # with perhaps not even a single worker in the pool.
                          # give tasks to new workers as they become available.
                          # process each result as it finishes, either b/c
                          # the current process would otherwise be idle, or it
                          # doesn't have enough memory to hold all the results at once
    do_something(result)
end

while true
pool.count == 0 && throw(ErrorException("No active worker available in pool"))
worker = take!(pool.channel)
if worker in procs()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can it happen that worker is not in procs()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Between the time a worker is added back to the pool and it is picked up again, the worker may die, or the host may die, or the network to that particular host may fail. And retry will just end up retrying to a failed worker and adding it back to the pool again and again.

If the connection to a worker is lost, it is removed from procs().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better to remove the worker from default_worker_pool at the same time that it is removed from procs()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we should do that too.

The check here is still required as the worker pool used may not always be the default_worker_pool

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess that makes sense.
Perhaps there should be some kind of worker death notification mechanism or callback so that custom worker pools implementations can register and deal with cleaning up dead workers for themselves.

I think the thing that is making me nervous is having code that detects an inconsistency and cleans it up. It makes it seem like there is a source of inconsistency that is not fully understood.

@samoconnor
Copy link
Contributor

will there still be a way to return an iterator? i would suggest that pmap always return an iterator, and to instruct the user to collect it if needed.

@bjarthur pgenerate is like pmap but returns an iterator. I agree that it would be better if pmap returned an iterator by default.

@samoconnor
Copy link
Contributor

A few questions...

What are the pros and cons of the alternate interfaces ?

distributed=false vs asyncmap ... ?

results = pmap(p -> remotecall_fetch(foo, p, args...), workers(); distributed=false) 
results = asyncmap(p -> remotecall_fetch(foo, p, args...), workers())

on_error vs try ... ?

results = pmap(f, c, on_error = ex -> ex)
results = pmap(x -> try f(x) catch ex end, c)

If on_error is added to to pmap should it also be added to map for consistency?

In what pmap use case is it preferable to have batch_size=1 compared to the existing automatic batch size?

batch_size=1 vs asyncmap(remote ... ?

results = pmap(f, c, batch_size = 1)
results = asyncmap(remote(f), c)

@amitmurthy
Copy link
Contributor Author

Would prefer pmap to behave like map as much as possible and hence not return an iterator.

@amitmurthy
Copy link
Contributor Author

on_error vs try - The latter does not capture errors that may happen outside of the user function. For example, worker death or network connection lost.

@amitmurthy
Copy link
Contributor Author

amitmurthy commented Apr 27, 2016

In what pmap use case is it preferable to have batch_size=1 compared to the existing automatic batch size?

If the computation time is much higher than the communication time, a batch_size=1 will be more efficient. Also, the default behavior shouldn't do anything surprising. In case the user wants to optimize for the other case (where batching makes sense), he can either opt for :auto (which we need to explain) or specify a batch size.

@samoconnor
Copy link
Contributor

Would prefer pmap to behave like map as much as possible and hence not return an iterator.

Yes, I agree, they should be consistent as much as possible. pmap should only be changed to return an iterator if map is changed first.

@amitmurthy
Copy link
Contributor Author

amitmurthy commented Apr 27, 2016

distributed=false vs asyncmap ... ?

My thinking is along the lines that pmap, i.e., a parallel map should be a single interface for parallelizing over tasks, processes, combination of the two (via batching) and threads in the future.

@samoconnor
Copy link
Contributor

on_error vs try - The latter does not capture errors that may happen outside of the user function. For example, worker death or network connection lost.

asyncmap(x->try remote(f)(x) catch ex end, c), or asyncmap(retry(remote(f)), c) could be used in that case.

However, if there is a reasonable algorithm for handling network / cluster errors in a remote call then perhaps it should be handled by remote and not by next(itr::AsyncCollector.

I'm not necessarily against adding kw args to manipulate this kind of behaviour, but it seems that it would be better to start with a simple set of composeable functions and add the kw args later if needed after there has been time to see some real-world usage. i.e. if there is a verbose pattern that gets repeated a lot.

I like the idea that pmap has no difference in interface at all compared to map.
The documentation can say something like "Note that pmap(f,c) is equivalent to asyncmap(remote(f),c). You can easily achieve variations on the default behaviour of pmap by using asyncmap e.g. asyncmap(retry(remote(f)), c)."

@samoconnor
Copy link
Contributor

My thinking is along the lines that pmap, i.e., a parallel map should be a single interface for paralleling over tasks, processes, combination of the two (via batching) and threads in the future.

I would argue that pmap and asyncmap are different enough to warrant two different function names. My feeling is that pmap should be more prominent in the documentation. New users who just want some parallelism should use pmap. pmap should make a best effort to use tasks and workers to get things done in a way that is ok for most use cases. asyncmap should be seen as a slightly more advanced interface. If a user understands the difference between tasks and workers, then they can handle knowing about the extra asyncmap function. Users who become familiar with asyncmap will realise that pmap(f,c) is just a convenience function for asyncmap(remote(f), c). I think that the result would be a good balance between: ease of use for new users; flexibility; and transparency of implementation.

@samoconnor
Copy link
Contributor

In what pmap use case is it preferable to have batch_size=1 compared to the existing automatic batch size?

If the computation time is much higher than the communication time, a batch_size will be more efficient.

If computation time is much higher than the communication time, won't the overhead of batching be negligible anyway? Perhaps I'm missing a nuance here...

Also, the default behavior shouldn't do anything surprising. In case the user wants to optimize for the other case (where batching makes sense), he can either opt for :auto (which we need to explain) or specify a batch size.

It seems to me that the current behaviour (auto batching) won't be surprising because it should be completely invisible to the user. The results are always returned in order in a single collection. (If there is a use-case where the auto batching has surprising results, we should adjust the auto-batching algorithm.)

The batching behaviour was added based on your suggestion that "having pmap processes chunks of the list at a time will reduce network roundtrips". My feeling is that if you are not comfortable with having batching as a default pmap behaviour, perhaps we should just ask the users who do want batching to do flatten(pmap(b->asyncmap(f, b), batchsplit(c))).

However, I think the batching is probably a win in many cases and probably has no significant overhead in most others. I would suggest waiting until there is a real-world performance problem before trying to make it user-tunable through kw args. If the auto-tuning can be improved over time, then everyone benefits. If there are a bunch of kw args for fine-tuning then only those users who take the time to tune them will benefit.

@samoconnor
Copy link
Contributor

samoconnor commented Apr 27, 2016

It seems that there is a trade-off decision here between "Do we need two functions for something quite similar?" and "Do we need all these kw options?".

What if the remote call functionality was removed from pmap?

i.e. pmap(f, c) would only do task parallelism and to get remote parallelism you would have to do pmap(remote(f), c).

This seems to achieve both goals: only one function; and not needing the "distributed" kw option.

The documentation could tell a story something like this: "You can use pmap(f,c) to evaluate a function over a collection in parallel. Note that pmap uses light-weight tasks to provide parallelism within the current process. To take advantage of multiple worker processes you can do pmap(remote(f), c)."

@tkelman
Copy link
Contributor

tkelman commented Apr 27, 2016

The majority of users would use that API incorrectly and be surprised when pmap isn't really parallel.

@amitmurthy
Copy link
Contributor Author

Keyword args make it easier to customize program behavior via config files / environment variables.

We could have both. Expose the composable functions as well as kw args.

While batching is efficient when the ratio of jobs to workers is high, for example, a 100 jobs over 10 workers, consider the case where 10 jobs are split into 5 batches over 5 workers and each job takes a varied amount of processing time. Say the the fastest job is 10 times as slower as the fastest. Now, if a worker ends up with the 2 of the slowest jobs as part of a single batch it will just slow down the overall time taken. Which is why, I would prefer the default as batch_size=1, i.e., no batching, API behavior is predictable and only if the user needs a requirement of batching, the same can be specified via batch_size, including an :auto option.

@samoconnor
Copy link
Contributor

consider the case where 10 jobs are split into 5 batches

The current code already handles that and uses batch_size = 1 automatically.
In pgenerate...

    batches = batchsplit(c, min_batch_count = length(p) * 3)

For 5 workers min_batch_count is set to 15.
Therefore, given that there are less than 15 jobs, in batchsplit the batch size is overwritten to 1 by this code:

    # If there are not enough batches, use a smaller batch size...
    if length(head) < min_batch_count
        batch_size = max(1, div(sum(length, head), min_batch_count))
        return partition(collect(flatten(head)), batch_size)
    end

e.g.

# 5 workers...
julia> p = 1:5
1:5

# 10 jobs...
julia> c = 1:10
1:10

julia> collect(batchsplit(c, min_batch_count = length(p) * 3))
10-element Array{Array{Int64,1},1}:
 [1]
 [2]
 [3]
 [4]
 [5]
 [6]
 [7]
 [8]
 [9]
 [10]

In fact, batch size 1 is used all the way up to 29 jobs (for 5 workers)...

julia> c = 1:29
1:29

julia> collect(batchsplit(c, min_batch_count = length(p) * 3))
29-element Array{Array{Int64,1},1}:
 [1]
 [2]
 [3]
 [4]
 [5]
 [6]
 [7]
 [8]
 [9]
 [10]
 [11]
 [12]
 [13]
 [14]
 [15]
 [16]
 [17]
 [18]
 [19]
 [20]
 [21]
 [22]
 [23]
 [24]
 [25]
 [26]
 [27]
 [28]
 [29]

For 30 jobs, batch size is set to 2:

julia> c = 1:30
1:30

julia> collect(batchsplit(c, min_batch_count = length(p) * 3))
15-element Array{Array{Int64,1},1}:
 [1,2]
 [3,4]
 [5,6]
 [7,8]
 [9,10]
 [11,12]
 [13,14]
 [15,16]
 [17,18]
 [19,20]
 [21,22]
 [23,24]
 [25,26]
 [27,28]
 [29,30]

... obviously this could be tuned so that batching only takes effect for larger numbers of jobs.

@samoconnor
Copy link
Contributor

BUG in current code:

The example above caused: ERROR: MethodError: no method matching length(::Base.Flatten{Array{Array{Int64,1},1}})

It seems that Jeff's new flatten iterator does not have a length method.

This patch works around that by doing collect(flatten(head)). The overhead of this is negligible because head is only the first little bit of the collection of input data...

diff --git a/base/pmap.jl b/base/pmap.jl
index acf48fa..e40192e 100644
--- a/base/pmap.jl
+++ b/base/pmap.jl
@@ -73,7 +73,7 @@ function batchsplit(c; min_batch_count=1, max_batch_size=100)
     # If there are not enough batches, use a smaller batch size...
     if length(head) < min_batch_count
         batch_size = max(1, div(sum(length, head), min_batch_count))
-        return partition(flatten(head), batch_size)
+        return partition(collect(flatten(head)), batch_size)
     end

     return flatten((head, tail))

@amitmurthy
Copy link
Contributor Author

OK. Can set default to :auto and document the behavior. Also agree that cleanup across all WorkerPools should be done automatically.

@bjarthur
Copy link
Contributor

how about adding another keyword argument to return an iterator? iterator=false.

another argument against a batch_size > 1 is when each task consumes all of RAM. that's my use case.

i think the main source of tension here in the API is between the use case of asynchronous tasks and the use case of distributed workers. having a single function with keyword args that can flexibly specify any combination of the two serves both needs.

lastly, can someone please answer my second question in my first post here about workers being dynamically added to the pool as they become available? thanks.

@amitmurthy
Copy link
Contributor Author

lastly, can someone please answer my second question in my first post here about workers being dynamically added to the pool as they become available? thanks.

@schedule addprocs(N) should have the desired effect. I remember testing it, but it is not working now. Can you open an issue for it?

@amitmurthy
Copy link
Contributor Author

Ready for review. I have kept the default batch size at 1 for now as I am not very comfortable with the idea of automatic batching yet.

I'll submit a separate PR for automatic removal of failed workers from worker pools. Exposing / exporting sub-functionalities of pmap independently can be in a separate PR too.

@amitmurthy amitmurthy changed the title RFC/WIP configure pmap behavior via keyword args [ci skip] Configure pmap behavior via keyword args Apr 28, 2016
:batch_size => 1,
:on_error => nothing)

const PMAP_KW_NAMES = [:distributed, :batch_size, :on_error]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be less redundant to say ``const PMAP_KW_NAMES = keys(DEFAULT_PMAP_ARGS)`

Copy link
Sponsor Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

collect(keys(DEFAULT_PMAP_ARGS)) then, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

@bjarthur
Copy link
Contributor

at the very beginning of this pmap refactoring someone made the suggestion to limit the number of tasks to 100 because otherwise problems would arise. i can't find the original discussion or the issue that was linked to then. could someone please remind me? i ask because i'm having problems adding more procs then 100 and think it might be related. thanks.

@samoconnor
Copy link
Contributor

... suggestion to limit the number of tasks to 100 ...

The AsyncGenerator constructor accepts a ntasks= option (100 by default) that sets the maximum number of concurrent tasks.

Note: asyncmap(f, c...) = collect(AsyncGenerator(f, c...))
and : pmap(f, c...) ~= collect(AsyncGenerator(remote(f), c...))

... i'm having problems adding more procs then 100 ...

Can you describe what the problems are?


if (distributed == false) ||
(length(p) == 0) ||
(length(p) == 1 && fetch(p.channel) == myid())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can this happen?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the pmap is run from a different worker.

Also, we should add the master process to default_worker_pool and remove it once the first worker is added since in the absence of any workers, all remote calls should execute on the master itself. In the current implementation remote with nprocs() == 1 would wait for a worker to be added. I'll change it.

@amitmurthy
Copy link
Contributor Author

I am going to merge this after a green CI. Have documented the minimum 50 ms delay for retry, which is also the default max_delay and changed error handling in AsyncCollector as suggested.

And will open a new issue to discuss changing defaults once this is merged.

Returns a lambda that retries function ``f`` up to ``n`` times in the event of an exception. If ``condition`` is a ``Type`` then retry only for exceptions of that type. If ``condition`` is a function ``cond(::Exception) -> Bool`` then retry only if it is true.
Returns a lambda that retries function ``f`` up to ``n`` times in the event of an exception. If ``retry_on`` is a ``Type`` then retry only for exceptions of that type. If ``retry_on`` is a function ``test_error(::Exception) -> Bool`` then retry only if it is true.

The first retry happens after a gap of 50 milliseconds or ``max_delay``\ , whichever is lower. Subsequently, the delays between retries are exponentially increased with a random factor upto ``max_delay``\ .
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems odd to me to describe the exponential back off but have the default max_delay set so that there is no exponential back-off.
Would you consider leaving max_delay at 10 in this PR so that it can be discussed further before it is changed.
After all now that default n is 1 the max_delay has not effect anyway unless the user changes n.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK.

@samoconnor
Copy link
Contributor

Hi @amitmurthy,
I haven't yet had time to read the new code around process_batch_errors!, wrap_batch, wrap_on_error etc.
I'm happy to do that but it might take me another day or two to get the time.

@samoconnor
Copy link
Contributor

Hence opting for conservative defaults that do not surprise, i.e., no batching and no retry with pmap.

Is it an error that the manual entry says the default is retry_on=e->true, i.e. retry on any exception ?
58785fc#diff-9fb1a61a41996cbee3b485880044acf9R276
58785fc#diff-2b2ebeef35e91b1fff15d49dc1d526b5R54

@amitmurthy
Copy link
Contributor Author

OK. Will hold off the merge till the weekend.

Since retry_n=0 for pmap, failed elements are not retried. Could change the order of the keyword args so that there is no confusion.

@amitmurthy
Copy link
Contributor Author

@tkelman is the AV error a known issue?

@tkelman
Copy link
Contributor

tkelman commented May 19, 2016

It's awfully frequent lately but I don't know what's causing it. One of the workers may have frozen?

is used.

By default, `pmap` distributes the computation over all specified workers. To use only the
local process and distribute over tasks, specifiy `distributed=false`. This is equivalent to `asyncmap`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

specify

@amitmurthy
Copy link
Contributor Author

OK. Will go ahead and merge then.

@amitmurthy amitmurthy merged commit 55e3a39 into master May 19, 2016
@amitmurthy amitmurthy deleted the amitm/pmap_error branch May 19, 2016 09:04
end

if retry_n > 0
f = wrap_retry(f, retry_on, retry_n, retry_max_delay)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just retry(f, retry_on; n=n, max_delay=retry_max_delay) ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This results in a stack overflow

f = () -> 1
f = () -> f() + 1
f()

This works

f = ()->1
function wrap(f)
  () -> f() + 1
end
f = wrap(f)
f()

I don't know if this is by design. Should open an issue if not.

Different variable names for f at each step would have removed the need for the wrapping function.

@samoconnor
Copy link
Contributor

There are a few longish lines of code and doc where the surrounding code was previously 80-cols.
This should probably be wrapped to match, or at least adhere to the 92 char limit.



"""
@catch(f) -> Function
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no mention of deleting @catch in the PR description or the commit logs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR is a continuation of the discussion in #15409 . But yes, mentioning it here / commit log would have helped.

@amitmurthy
Copy link
Contributor Author

Yes, we should refactor this to have pgenerate only return an iterator as before.

@samoconnor
Copy link
Contributor

Perhaps you should just back out the whole batching thing for now and give a simple example of how to use flatten + batchsplit in the doc.
It seems like the clean refactoring of pmap just got really messy because of batching.
I think we should build batch ap

@samoconnor
Copy link
Contributor

(Fat phone fingers pressed Comment prematurely)
I think we should build batchmap separately on a branch or in a package and get the design thoroughly refined before merging it into pmap.

@amitmurthy
Copy link
Contributor Author

We can keep the other iterators simple and move more logic only into pmap. Addressing issues like #16345 with cache management for serialized functions ought to be in pmap. What pmap internally builds on can be simpler building blocks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
domain:parallelism Parallel or distributed computation
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants