Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFC] Initial prototype of a bulk_sender concept #134

Merged
merged 5 commits into from Jun 20, 2020
Merged

Conversation

lewissbaker
Copy link
Contributor

Adds a typed_bulk_sender concept which checks for the
'next_types' type-alias on senders.

Adds a few basic bulk algorithms:

  • bulk_schedule()
  • bulk_transform()
  • bulk_join()

The bulk_schedule() concept currently just takes a count and
calls set_next() on the receiver with integral values from [0, count).
Default implementation just dispatches a single task via schedule()
and calls in a loop. Supports vectorised execution for 'unsequenced'
execution policies.

Adds some execution policies that mirror the policies available in C++20.
They are defined locally in libunifex for now because not all standard
libraries provide the header yet.

Adds get_execution_policy() CPO for receivers and/or functions to customise
so that they specify more relaxed constraints on how bulk-senders can call
the receiver's set_next() methods. The default is sequenced execution.

Adds a typed_bulk_sender concept which checks for the
'next_types' type-alias on senders.

Adds a few basic bulk algorithms:
- bulk_schedule()
- bulk_transform()
- bulk_join()

The bulk_schedule() concept currently just takes a count and
calls set_next() on the receiver with integral values from [0, count).
Default implementation just dispatches a single task via schedule()
and calls in a loop. Supports vectorised execution for 'unsequenced'
execution policies.

Adds some execution policies that mirror the policies available in C++20.
They are defined locally in libunifex for now because not all standard
libraries provide the <execution> header yet.

Adds get_execution_policy() CPO for receivers and/or functions to customise
so that they specify more relaxed constraints on how bulk-senders can call
the receiver's set_next() methods. The default is sequenced execution.
@facebook-github-bot facebook-github-bot added the CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed. label May 29, 2020
Copy link
Contributor Author

@lewissbaker lewissbaker left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some other questions I have with regards to this as a design direction:

  • How do we communicate the size of the bulk operation to consumers? e.g. so that a reduce-operation can ensure it allocates sufficient storage for all of the results.
  • How do we allow bulk operations to allocate per-execution-agent storage?
    e.g. so a reduce operation can do a local reduce into some local state, avoiding synchronisation, and then only synchronise when reducing partial results produced by each execution agent
  • How do nested bulk operations compose?
    e.g. we want a child bulk-operation for each element of a parent bulk operation
  • Should bulk_schedule() be taking an arbitrary range rather than a count?
    e.g. bulk_schedule(sched, std::views::iota(0, 1000)) instead of bulk_schedule(sched, 1000)
  • Should bulk_schedule() support a pack of ranges and produce the cross-product of those?
    e.g. bulk_schedule(sched, std::views::iota(0, 1000), std::views::iota(0, 1000))
    produce (i, j) indices of a 1000x1000 matrix

Also need to consider how to query a many_sender for whether or not it is able to execute things concurrently or in an unsequenced fashion.
This would allow an algorithm to provide a different receiver that avoided synchronisation if the execution agent was single-threaded.

Comment on lines +47 to +71
void set_value()
noexcept(is_nothrow_value_receiver_v<Receiver> &&
is_nothrow_next_receiver_v<Receiver, Integral>) {
using policy_t = decltype(get_execution_policy(receiver_));
if constexpr (is_one_of_v<policy_t, unsequenced_policy, parallel_unsequenced_policy>) {
// Vectorisable version
#if defined(__clang__)
#pragma clang loop vectorize(enable) interleave(enable)
#elif defined(__GNUC__)
#pragma GCC ivdep
#elif defined(_MSC_VER)
#pragma loop(ivdep)
#endif
for (Integral i(0); i < count_; ++i) {
unifex::set_next(receiver_, Integral(i));
}
} else {
// Sequenced version
for (Integral i(0); i < count_; ++i) {
unifex::set_next(receiver_, Integral(i));
}
}

unifex::set_value(std::move(receiver_));
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This ideally also needs to deal with requests to stop/cancel the work.
i.e. call get_stop_token() and periodically check for stop_requested().

Ideally the underlying schedule() operation itself would have checked for cancellation before starting the work so we don't need to handle that case.

Copy link

@dhollman dhollman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Except for a few things that I don't quite follow in the implementation and the one comment about wanting a customization point for shape, this looks good to me, and much of it looks familiar (when compared to the stuff I sketched up pre-Cologne), which is a good sign.

### `bulk_join(ManySender source) -> Sender`

Joins a bulk operation on a ManySender and turns it into a SingleSender
operation that completes once all of the `set_next()` calls have completed.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we know that all of the set_next() calls have completed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is up to the implementation of the many_sender to ensure that the return from all set_next() calls "strongly happens before" the call to set_value()/set_done()/set_error().


Note that customisations must still adhere to the constraints placed on
valid executions of `set_next()` according to the execution policy returned
from `get_execution_policy()`.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mostly like where you're going here (it's basically the direction I went with this, way back before Cologne), but I think that this sort of shape information is of more general applicability than this, and confining all customizations of scheduling with a given shape to go through this one customization point is probably not sufficient. I think that this information also needs to be a bit stickier—probably via a customization point like get_shape(ManySender) that is customized if and only if the shape is known lazily, as is the case with the result of bulk_schedule. Does that make sense?

: receiver_((Receiver2&&)r)
{}

void set_next() & noexcept {}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I follow how an empty body here constitutes tracking all of the set_next calls and completing once all of the set_next() calls have completed, as your spec says above. What am I missing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The calls to set_next() return before set_done()/set_value()/set_error() is called.
So we just discard the (empty) "next value" here and then rely on the normal sender signals forwarding through to the single-sender receiver to signal that the bulk operation has completed.

Copy link

@BillyONeal BillyONeal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not really capable of reviewing this from a 'is this the programming model we want' view but I think this at least would get us to something the parallel algorithms library could be built on top of (assuming that compositional problem of the execution policies is understood).

The other algorithms that would be 'interesting' to demonstrate with this model based on what we saw in MSVC's implementation are:

(We can build all of the above out of bulk_schedule but I want to make sure the executors proposers are OK with that)

noexcept(is_nothrow_value_receiver_v<Receiver> &&
is_nothrow_next_receiver_v<Receiver, Integral>) {
using policy_t = decltype(get_execution_policy(receiver_));
if constexpr (is_one_of_v<policy_t, unsequenced_policy, parallel_unsequenced_policy>) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that algorithms will likely need stronger guarantees than the user supplies in places. I think I see how that would work but I just want to make sure that has been considered. (For example, the user shows up with a receiver that wants unsequenced but the library doesn't have an implementation for anything other than sequenced)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(If we didn't have that 'property system' thing I wouldn't be concerned)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(If we didn't have that 'property system' thing I wouldn't be concerned)

Can you elaborate?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If an algorithm must declare that it needs stronger concurrency guarantees to do its job than the underlying executor, it has to do that by creating its own executor and returning different values from properties (if I understand correctly). But we don't know all properties that must be edited to say "no I really need sequenced here." In current P0443 that happens between blocking_t and bulk_guarantee_t -- if bulk_execute hasn't been customized and blocking_t is not 'always', then the only possible bulk_guarantee_t the fallback can provide is 'unsequenced'. With the parallel algorithm execution policies I could see a similar problem/conflict between properties being possible, leading to a case where the algorithm says it wants ordered/sequenced behavior but doesn't override every property other parts of the system use when making such determinations.

That is to say, the current algorithm execution policies don't say anything about the executor. They describe guarantees the user is making about their element access functions at the call site. In that sense involving that in any way as a property of the executor might be backwards.... just because the user's element access functions tolerate parallel_unsequenced_t doesn't mean the algorithm can tolerate an executor that provides such weak guarantees. So the algorithm needs to be able to adapt the executor into one providing stronger guarantees (e.g. with require()). That means we can't ever add a property that some other part of the system might interpret in a way that breaks other properties' guarantees, so if we had already shipped P0443 we would not be able to add this execution policy property because it conflicts with bulk_guarantee_t.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The get_execution_policy() query is there to allow a many-sender the ability to query the constraints on how it may call the the set_next() method of the receiver. The builtin policies provide constraints along two orthogonal axes: 'allow concurrent calls' (par, par_unseq) and 'allow interleaved calls' (unseq, par_unseq).

With these policies a receiver is not able to dictate that a sender must call it concurrently or interleaved. It is always valid for a given sender to call it sequentially from a single thread.

One gap that is currently missing is the ability for an algorithm to query whether a given sender may call the receiver concurrently or interleaved. An algorithm may choose to provide a receiver that takes a more efficient strategy if it knows it will not be called concurrently (eg. by avoiding use of atomics).

The algorithm/many-receiver and executor/many-sender can negotiate between them an execution strategy by allowing the algorithm to query the capabilities of a many-sender, and thus choose the most appropriate receiver to pass to the connect() call, and to allow a many-sender to query the constraints that the many-receiver places on calls to set_next(), so that it can choose an appropriate execution strategy.

There will likely also be other queries that are possible. eg. the 'adjacency' property proposed in P2155R0 could be implemented as a get_adjacency_hint() CPO customisable on a many-receiver that a many-sender can query to refine its strategy for calling set_next().

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With these policies a receiver is not able to dictate that a sender must call it concurrently or interleaved

I'm saying that algorithms need the opposite: they need to be able to dictate that they must not be called interleaved in places, even if the user passed an executor with the unseq or par_unseq property when the algorithm was called.

For example our implementation does not have a parallel merge so we need to say "too bad, you get serial".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm saying that algorithms need the opposite: they need to be able to dictate that they must not be called interleaved in places, even if the user passed an executor with the unseq or par_unseq property when the algorithm was called.

Yes. This is what we have currently implemented and what get_execution_policy(receiver) gives you.

It lets the receiver return, say, std::par, which indicates it is ok with being called concurrently, or sequentially, but is not allowed to be called interleaved. An executor that has the ability to call a receiver interleaved will need to fall back to a non-interleaved execution strategy. ie. the receiver has put a constraint on the valid execution-strategies that the sender can use.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, that's certainly something we can do, but there are already properties in P0443 (like blocking_t) that could contradict that setting. That's what I meant by:

(If we didn't have that 'property system' thing I wouldn't be concerned)

e.g. if you say execution policy seq but blocking_t says nonblocking or bulk_guarantee_t says unsequenced the result is semantically broken. So the algorithm needs to override all potential properties that might indicate some degree of parallelism if the algorithm can't tolerate that, and I'm not sure that's reasonable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I agree with the current design of the 'blocking_t' property as it stands in P0443.

In libunifex we have a blocking() query on a sender that allows you to query the disposition of that sender's completion: namely whether it will complete inline before start() returns, completes on some other thread before start() returns, or will never complete inline on the current thread. These can be used to apply certain optimisations in algorithms (like sync_wait(), or submit()) to avoid heap-allocation or synchronisation.

But I'm not sure it makes sense to be able to require() that an executor be blocking::always. While it may have been a necessity in a world where you only had an eager execute() function. But once you have lazy sender-based operations and schedulers with a schedule() basis operation, you can just call sync_wait() to block waiting for any async operation to complete.

For the bulk_guarantee_t property: a given many-sender, eg. that is backed by a GPU, may only support unsequenced execution. In this case it would be perfectly fine for that sender's connect() to be constrained such that it requires receivers passed to have a get_execution_policy() that is compatible with the requirements. If you try to give it a receiver that does not allow unsequenced execution then this would manifest as a compile-error.

This would limit the composability of these many-senders with some kinds of algorithms, though, so I'm not yet sold on whether this is "A Good Thing" to be adding.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the bulk_guarantee_t property: a given many-sender, eg. that is backed by a GPU, may only support unsequenced execution.

Then there will be algorithms that can't be used with that executor. The current parallel algorithms design is 'add a parallel overload unless it is blindingly obvious that no parallel version is possible', on the grounds that implementations where a parallel version is impossible can fall back to providing a serial version. There are still a lot of algorithms for which we don't know a parallel version faster than the serial version, including some (like generate and partial_sort) for which we think no parallel version is possible and the parallel overloads were added in error. See https://github.com/microsoft/STL/blob/0cdf5fbfac83650bf6b404a754be415493466643/stl/inc/yvals_core.h#L285-L311

@LeeHowes
Copy link
Contributor

LeeHowes commented Jun 18, 2020

Some other questions I have with regards to this as a design direction:

  • How do we communicate the size of the bulk operation to consumers? e.g. so that a reduce-operation can ensure it allocates sufficient storage for all of the results.

Do we need to? The algorithm knows what size it is asking for from the bulk_schedule call. In your example code you capture count in the transform. Any more complex algorithm that uses bulk_schedule inside can do the same.

Or is your concern that the second bulk_transform in your example doesn't know? Can we not require that that info be propagated?

Alternatively I'm happy to take the approach that the shape type should be a bit more complicated and carry the size as well as the current index. If we are going to do this we should pass through a subscheduler carrying that information, because see below.

  • How do we allow bulk operations to allocate per-execution-agent storage?
    e.g. so a reduce operation can do a local reduce into some local state, avoiding synchronisation, and then only synchronise when reducing partial results produced by each execution agent

I don't think we need anything special here. We can inject values with just() and customised versions, maybe with_local_storage that acts on the executor type. With nested calls we should assume that whatever is allocated that way is shared across nested calls.

  • How do nested bulk operations compose?
    e.g. we want a child bulk-operation for each element of a parent bulk operation

If we pass a subexecutor through, either always or via a with_subexecutor algorithm, then nesting is a bulk_let:

sync_wait( bulk_join(bulk_let(with_subexecutor(bulk_schedule(sched, count)),
  [](subexecutor& subsched, idx i) {
    return bulk_let(
      with_subexecutor(bulk_schedule(subsched, subcount)), 
      [](subexecutor& subsched, idx i) {
        return ......;
      } 
  })));

open to diiscussion if joins are implied in the nested cases.

  • Should bulk_schedule() be taking an arbitrary range rather than a count?
    e.g. bulk_schedule(sched, std::views::iota(0, 1000)) instead of bulk_schedule(sched, 1000)

I still think so, but not strongly. I do think it should be able to take an n-dimensional parameter, though, because 1D->nD is inefficient to calculate.

  • Should bulk_schedule() support a pack of ranges and produce the cross-product of those?
    e.g. bulk_schedule(sched, std::views::iota(0, 1000), std::views::iota(0, 1000))
    produce (i, j) indices of a 1000x1000 matrix

Do you have a good use case? I would rather have a clear std::vies::iota2d((0, 0), (1000, 1000)) (or whatever similar multi-dimensional iteration space we end up with) to achieve that.

Also need to consider how to query a many_sender for whether or not it is able to execute things concurrently or in an unsequenced fashion.
This would allow an algorithm to provide a different receiver that avoided synchronisation if the execution agent was single-threaded.

That is something we would want to query on a scheduler. Presumably we should propagate that query all the way through - though some senders might break it.

@lewissbaker lewissbaker merged commit 96a62a6 into master Jun 20, 2020
@lewissbaker lewissbaker deleted the bulk_schedule branch June 22, 2020 16:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants