Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFC] Initial prototype of a bulk_sender concept #134

Merged
merged 5 commits into from
Jun 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
89 changes: 89 additions & 0 deletions doc/api_reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* `get_stop_token()`
* `get_scheduler()`
* `get_allocator()`
* `get_execution_policy()`
* Sender Algorithms
* `transform()`
* `transform_done()`
Expand All @@ -28,6 +29,10 @@
* `async_trace_sender`
* Sender Queries
* `blocking()`
* Many Sender Algorithms
* `bulk_transform()`
* `bulk_join()`
* `bulk_schedule()`
* Stream Algorithms
* `adapt_stream()`
* `next_adapt_stream()`
Expand Down Expand Up @@ -93,6 +98,41 @@ If a receiver has not customised this it will default to return `unstoppable_tok

See the [Cancellation](cancellation.md) section for more details on cancellation.

### `get_execution_policy(manyReceiver)`

For a ManyReceiver, obtains the execution policy object that specifies the constraints
on how a ManySender is allowed to call `set_next()`.

The following execution policies are built-in and understood by the many-sender
algorithms in libunifex.

* `unifex::sequenced_policy` - Calls to `set_next()` on the receiver must be sequenced
and may not be executed concurrently on different threads or have their executions
interleaved on a single thread.

* `unifex::unsequenced_policy` - Calls to `set_next()` are safe to be interleaved
with each other on the same thread but are not safe to be executed concurrently
on different threads. This typically allows vectorised execution of the calls using
SIMD instructions.

* `unifex::parallel_policy` - Calls to `set_next()` are safe to be executed
concurrently on different threads, but are not safe to be interleaved on
a given thread. Use this if the forward-progress of one call to `set_next()`
may be dependent on another call to `set_next()` making forward progress.
e.g. if multiple calls attempt to acquire a lock on the same mutex.

* `unifex::parallel_unsequenced_policy` - Calls to `set_next()` are safe to
be executed concurrently on different threads and are also safe to have
their executions interleaved on a given thread.

Note that, while it is possible to extend the set of execution policies with
application-specific policies, builtin implementations of bulk algorithms
will not necessarily understand them and will treat them as if they were
the `sequenced_policy`.

If a receiver does not customise the `get_execution_policy()` CPO then it
will default to returning the `sequenced_policy`.

# Sender Algorithms

### `transform(Sender predecessor, Func func) -> Sender`
Expand Down Expand Up @@ -465,6 +505,55 @@ Otherwise returns `blocking_kind::maybe`.
Senders can customise this algorithm by providing an overload of
`tag_invoke(tag_t<blocking>, const your_sender_type&)`.

## Many Sender Algorithms

### `bulk_transform(ManySender sender, Func func, FuncPolicy policy) -> ManySender`

For each `set_next(values...)` result produced by `sender`, invokes
`func(values...)` and produces the result of that call as its `set_next()`
result.

The `policy` argument is optional and if absent, defaults to `get_execution_policy(func)`.

The resulting execution policy incorporates the union of the constraints
placed on the execution of the function and the execution of the
downstream receiver's `set_next()` method.

i.e. both the down-stream ManyReceiver's execution policy and the function's
execution policy must allow parallel execution for the bulk_transform
operation to permit parallel execution. Same for unsequenced execution.

This algorithm is transparent to `set_value()`, `set_error()` and `set_done()`
completion signals.

### `bulk_join(ManySender source) -> Sender`

Joins a bulk operation on a ManySender and turns it into a SingleSender
operation that completes once all of the `set_next()` calls have completed.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we know that all of the set_next() calls have completed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is up to the implementation of the many_sender to ensure that the return from all set_next() calls "strongly happens before" the call to set_value()/set_done()/set_error().


The input `source` sender must be a ManySender of `void` (ie. no values passed
to `set_next()`).

The returned single-sender is transparent to the `set_value()`, `set_error()`
and `set_done()` signals.

### `bulk_schedule(Scheduler sched, Count n) -> ManySender`

Returns a ManySender of type `Count` that sends the values `0 .. n-1`
to the receiver's `set_next()` channel.

The default implementation of this algorithm schedules a single
task onto the specified scheduler using `schedule()` and then calls
`set_next()` in a loop.

Scheduler types are permitted to customise the `bulk_schedule()` operation
to allow more efficient implementations. e.g. a thread-pool may choose to
split the work up into M pieces to execute across M different threads.

Note that customisations must still adhere to the constraints placed on
valid executions of `set_next()` according to the execution policy returned
from `get_execution_policy()`.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mostly like where you're going here (it's basically the direction I went with this, way back before Cologne), but I think that this sort of shape information is of more general applicability than this, and confining all customizations of scheduling with a given shape to go through this one customization point is probably not sufficient. I think that this information also needs to be a bit stickier—probably via a customization point like get_shape(ManySender) that is customized if and only if the shape is known lazily, as is the case with the result of bulk_schedule. Does that make sense?


## Stream Algorithms

### `adapt_stream(Stream stream, Func adaptor) -> Stream`
Expand Down
158 changes: 100 additions & 58 deletions doc/concepts.md
Original file line number Diff line number Diff line change
Expand Up @@ -331,12 +331,12 @@ namespace unifex
# ManySender/ManyReceiver

A **ManySender** represents an operation that asynchronously produces zero or
more values, produced by a call to `set_value()` for each value, terminated
by a call to `set_done()` or `set_error()`.
more values, produced by a call to `set_next()` for each value, terminated
by a call to either `set_value()`, `set_done()` or `set_error()`.

This is a general concept that encapsulates both sequences of values (where the calls to
`set_value()` are non-overlapping) and parallel/bulk operations (where there may
be concurrent/overlapping calls to `set_value()` on different threads
`set_next()` are non-overlapping) and parallel/bulk operations (where there may
be concurrent/overlapping calls to `set_next()` on different threads
and/or SIMD lanes).

A **ManySender** does not have a back-pressure mechanism. Once started, the delivery
Expand All @@ -347,32 +347,47 @@ the sender to stop sending values, e.g. by causing the StopToken to enter the
Contrast this with the **Stream** concept (see below) that lazily produces the next
value only when the consumer asks for it, providing a natural backpressure mechanism.

> NOTE: Unifex does not currently contain any implementations of ManySender.
> This section describes the current thinking about what would shape a
> ManySender concept would take.

## Sender vs ManySender

Whereas **Sender** produces a single result. ie. a single call to one of either
`set_value()`, `set_done()` or `set_error()`, a **ManySender** produces multiple values
via zero or more calls to `set_value()` followed by either a call to `set_done()` or
`set_error()` to terminate the sequence.
via zero or more calls to `set_next()` followed by a call to either `set_value()`,
`set_done()` or `set_error()` to terminate the sequence.

A **Sender** is a kind of **ManySender**, just a degenerate ManySender that never
sends any elements via `set_next()`.

Also, a **ManyReceiver** is a kind of **Receiver**. You can pass a **ManyReceiver**
to a **Sender**, it will just never have its `set_next()` method called on it.

Note that terminal calls to a receiver (i.e. `set_value()`, `set_done()` or `set_error()`)
must be passed an rvalue-reference to the receiver, while non-terminal calls to a receiver
(i.e. `set_next()`) must be passed an lvalue-reference to the receiver.

The sender is responsible for ensuring that the return from any call to `set_next()`
**strongly happens before** the call to deliver a terminal signal is made.
ie. that any effects of calls to `set_next()` are visible within the terminal signal call.

Note that terminal calls to a receiver must be passed an rvalue-reference to the
receiver, while non-terminal calls to a receiver must be passed an lvalue-reference
to the receiver.
A terminal call to `set_value()` indicates that the full-set of `set_next()` calls were
successfully delivered and that the operation as a whole completed successfully.

We can use this to distinguish between a receiver that supports receiving a single
value produced by a **Sender** and a receiver that supports receiving multiple
values produced by a **ManySender** by looking at whether the `set_value()` CPO
is callable with an lvalue-reference to the receiver or an rvalue-reference to
the receiver.
Note that the `set_value()` can be considered as the sentinel value of the parallel
tasks. Often this will be invoked with an empty pack of values, but it is also valid
to pass values to this `set_value()` call.
e.g. This can be used to produce the result of the reduce operation.

If the `set_value()` CPO is callable with an rvalue-reference to the receiver then
it is usable as a receiver for a **Sender**.
A terminal call to `set_done()` or `set_error()` indicates that the operation may have
completed early, either because the operation was asked to stop early (as in `set_done`)
or because the operation was unable to satisfy its post-conditions due to some failure
(as in `set_error`). In this case it is not guaranteed that the full set of values were
delivered via `set_next()` calls.

If the `set_value()` CPO is callable with an lvalue-reference to the receiver then
it is usable as a receiver for a **ManySender**.
As with a **Sender** and **ManySender** you must call `connect()` to connect a sender
to it. This returns an **OperationState** that holds state for the many-sender operation.

The **ManySender** will not make any calls to `set_next()`, `set_value()`, `set_done()`
or `set_error()` before calling `start()` on the operation-state returned from
`connect()`.

Thus, a **Sender** should usually constrain its `connect()` operation as follows:
```c++
Expand All @@ -388,7 +403,6 @@ struct some_sender_of_int {
tag_t<connect>, some_many_sender&& s, Receiver&& r);
};
```
And a **Receiver** should define only an rvalue customisation of `set_value()`.

While a **ManySender** should constrain its `connect()` opertation like this:
```c++
Expand All @@ -398,17 +412,13 @@ struct some_many_sender_of_ints {

template<typename Receiver>
requires
value_receiver<std::decay_t<Receiver>&, int> &&
done_receiver<std::decay_t<Receiver>
next_receiver<std::decay_t<Receiver>, int> &&
value_receiver<std::decay_t<Receiver>> &&
done_receiver<std::decay_t<Receiver>>
friend operation<std::decay_t<Receiver>> tag_invoke(
tag_t<connect>, some_many_sender&& s, Receiver&& r);
};
```
And a **ManyReceiver** should define only an lvalue customisation of `set_value()`.


NOTE: The `set_done()` and `set_error()` CPOs always terminate an async operation
and so must always be passed an rvalue reference to a the receiver.

## Sequential vs Parallel Execution

Expand All @@ -423,19 +433,20 @@ For other use-cases we want to process these values in parallel, allowing
multiple threads, SIMD lanes, or GPU cores to process the values more
quickly than would be possible normally.

In both cases, we have a number of calls to `set_value`, followed by a
call to `set_error` or `set_done`. So what is the difference between
these cases?
In both cases, we have a number of calls to `set_next`, followed by a
call to `set_value`, `set_error` or `set_done`.
So what is the difference between these cases?

Firstly, the **ManySender** implementation needs to be *capable* of making
overlapping calls to `set_value()` - it needs to have the necessary
Firstly, the **ManySender** implementation needs to be _capable_ of making
overlapping calls to `set_next()` - it needs to have the necessary
execution resources available to be able to do this.
Some senders may only be able to send a single value at a time.
Some senders may only have access to a single execution agent and so
are only able to send a single value at a time.

Secondly, the receiver needs to be prepared to handle overlapping calls
to `set_value()`. Some receiver implementations may update shared state
with the each value without synchronisation and so would be undefined
behaviour to make concurrent calls to `set_value()`. While other
to `set_next()`. Some receiver implementations may update shared state
with the each value without synchronisation and so it would be undefined
behaviour to make concurrent calls to `set_next()`. While other
receivers may have either implemented the required synchronisation or
just not require synchronisation e.g. because they do not modify
any shared state.
Expand All @@ -448,28 +459,59 @@ Note that the constraints that the receiver places on the valid
execution patterns are analagous to the "execution policy" parameter
of the standard library parallel algorithms.

When you pass an execution policy, such as `std::execution::par` or
`std::execution::seq`, to a parallel algorithm you are telling the
implementation of that algorithm the constraints of how it is allowed
to call the callback you passed to it.
With existing parallel algorithms in the standard library, when you
pass an execution policy, such as `std::execution::par`, you are telling
the implementation of that algorithm the constraints of how it is
allowed to call the callback you passed to it.

If we allow a **ManySender** to query from the **ManyReceiver**
what the execution constraints for calling the `set_value()` method
is, then the sender can make a decision about the strategy to use
when calling `set_value()`.
For example:
```c++
std::vector<int> v = ...;

int max = std::reduce(std::execution::par_unseq,
v.begin(), v.end(),
std::numeric_limits<int>::min(),
[](int a, int b) { return std::max(a, b); });
```

Passing `std::execution::par` is not saying that the algorithm
implementation _must_ call the lambda concurrently, only that it _may_
do so. It is always valid for the algorithm to call the lambda sequentially.

We want to take the same approach with the **ManySender** / **ManyReceiver**
contract to allow a **ManySender** to query from the **ManyReceiver**
what the execution constraints for calling its `set_next()` method
are. Then the sender can make a decision about the best strategy to
use when calling `set_next()`.

For example, we can define a `get_execution_policy()` CPO that
can be invoked, passing the receiver as the argument, and have it
return the execution policy that specifies how the receiver's
`set_value()` method is allowed to be called.
To do this, we define a `get_execution_policy()` CPO that can be invoked,
passing the receiver as the argument, and have it return the execution
policy that specifies how the receiver's `set_next()` method is allowed
to be called.

A receiver that supports concurrent calls to `set_value()` would
customise `get_execution_policy()` for its type to return
`std::execution::par`.
For example, a receiver that supports concurrent calls to `set_next()`
would customise `get_execution_policy()` for its type to return
either `unifex::par` or `unifex::par_unseq`.

A sender that has multiple threads available can then call
`get_execution_policy(receiver)` and choose how to invoke
the `set_value()` methods based on this.
`get_execution_policy(receiver)`, see that it allows concurrent execution
and distribute the calls to `set_next()` across available threads.

## TypedManySender

With the **TypedSender** concept, the type exposes type-aliases that allow
the consumer of the sender to query what types it is going to invoke a
receiver's `set_value()` and `set_error()` methods with.

A **TypedManySender** concept similarly extends the **ManySender**
concept, requiring the sender to describe the types it will invoke `set_net()`,
via a `next_types` type-alias, in addition to the `value_types` and `error_types`
type-aliases required by **TypedSender**.

Note that this requirement for a **TypedManySender** to provide the `next_types`
type-alias means that the **TypedSender** concept, which only need to provide the
`value_types` and `error_types` type-aliases, does not subsume the **TypedManySender**
concept, even though **Sender** logically subsumes the **ManySender** concept.

# Streams

Expand Down Expand Up @@ -517,8 +559,8 @@ This has a number of differences compared with a **ManySender**.
* The consumer of a stream may process the result asynchronously and can
defer asking for the next value until it has finished processing the
previous value.
* A **ManySender** can continue calling `set_value()` as soon as the
previous call to `set_value()` returns.
* A **ManySender** can continue calling `set_next()` as soon as the
previous call to `set_next()` returns.
* A **ManySender** has no mechanism for flow-control. The **ManyReceiver**
must be prepared to accept as many values as the **ManySender** sends
to it.
Expand Down