Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFC] Initial prototype of a bulk_sender concept #134

Merged
merged 5 commits into from
Jun 20, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
89 changes: 89 additions & 0 deletions doc/api_reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* `get_stop_token()`
* `get_scheduler()`
* `get_allocator()`
* `get_execution_policy()`
* Sender Algorithms
* `transform()`
* `transform_done()`
Expand All @@ -28,6 +29,10 @@
* `async_trace_sender`
* Sender Queries
* `blocking()`
* Many Sender Algorithms
* `bulk_transform()`
* `bulk_join()`
* `bulk_schedule()`
* Stream Algorithms
* `adapt_stream()`
* `next_adapt_stream()`
Expand Down Expand Up @@ -93,6 +98,41 @@ If a receiver has not customised this it will default to return `unstoppable_tok

See the [Cancellation](cancellation.md) section for more details on cancellation.

### `get_execution_policy(manyReceiver)`

For a ManyReceiver, obtains the execution policy object that specifies the constraints
on how a ManySender is allowed to call `set_next()`.

The following execution policies are built-in and understood by the many-sender
algorithms in libunifex.

* `unifex::sequenced_policy` - Calls to `set_next()` on the receiver must be sequenced
and may not be executed concurrently on different threads or have their executions
interleaved on a single thread.

* `unifex::unsequenced_policy` - Calls to `set_next()` are safe to be interleaved
with each other on the same thread but are not safe to be executed concurrently
on different threads. This typically allows vectorised execution of the calls using
SIMD instructions.

* `unifex::parallel_policy` - Calls to `set_next()` are safe to be executed
concurrently on different threads, but are not safe to be interleaved on
a given thread. Use this if the forward-progress of one call to `set_next()`
may be dependent on another call to `set_next()` making forward progress.
e.g. if multiple calls attempt to acquire a lock on the same mutex.

* `unifex::parallel_unsequenced_policy` - Calls to `set_next()` are safe to
be executed concurrently on different threads and are also safe to have
their executions interleaved on a given thread.

Note that, while it is possible to extend the set of execution policies with
application-specific policies, builtin implementations of bulk algorithms
will not necessarily understand them and will treat them as if they were
the `sequenced_policy`.

If a receiver does not customise the `get_execution_policy()` CPO then it
will default to returning the `sequenced_policy`.

# Sender Algorithms

### `transform(Sender predecessor, Func func) -> Sender`
Expand Down Expand Up @@ -465,6 +505,55 @@ Otherwise returns `blocking_kind::maybe`.
Senders can customise this algorithm by providing an overload of
`tag_invoke(tag_t<blocking>, const your_sender_type&)`.

## Many Sender Algorithms

### `bulk_transform(ManySender sender, Func func, FuncPolicy policy) -> ManySender`

For each `set_next(values...)` result produced by `sender`, invokes
`func(values...)` and produces the result of that call as its `set_next()`
result.

The `policy` argument is optional and if absent, defaults to `get_execution_policy(func)`.

The resulting execution policy incorporates the union of the constraints
placed on the execution of the function and the execution of the
downstream receiver's `set_next()` method.

i.e. both the down-stream ManyReceiver's execution policy and the function's
execution policy must allow parallel execution for the bulk_transform
operation to permit parallel execution. Same for unsequenced execution.

This algorithm is transparent to `set_value()`, `set_error()` and `set_done()`
completion signals.

### `bulk_join(ManySender source) -> Sender`

Joins a bulk operation on a ManySender and turns it into a SingleSender
operation that completes once all of the `set_next()` calls have completed.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we know that all of the set_next() calls have completed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is up to the implementation of the many_sender to ensure that the return from all set_next() calls "strongly happens before" the call to set_value()/set_done()/set_error().


The input `source` sender must be a ManySender of `void` (ie. no values passed
to `set_next()`).

The returned single-sender is transparent to the `set_value()`, `set_error()`
and `set_done()` signals.

### `bulk_schedule(Scheduler sched, Count n) -> ManySender`

Returns a ManySender of type `Count` that sends the values `0 .. n-1`
to the receiver's `set_next()` channel.

The default implementation of this algorithm schedules a single
task onto the specified scheduler using `schedule()` and then calls
`set_next()` in a loop.

Scheduler types are permitted to customise the `bulk_schedule()` operation
to allow more efficient implementations. e.g. a thread-pool may choose to
split the work up into M pieces to execute across M different threads.

Note that customisations must still adhere to the constraints placed on
valid executions of `set_next()` according to the execution policy returned
from `get_execution_policy()`.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mostly like where you're going here (it's basically the direction I went with this, way back before Cologne), but I think that this sort of shape information is of more general applicability than this, and confining all customizations of scheduling with a given shape to go through this one customization point is probably not sufficient. I think that this information also needs to be a bit stickier—probably via a customization point like get_shape(ManySender) that is customized if and only if the shape is known lazily, as is the case with the result of bulk_schedule. Does that make sense?


## Stream Algorithms

### `adapt_stream(Stream stream, Func adaptor) -> Stream`
Expand Down
158 changes: 158 additions & 0 deletions include/unifex/bulk_join.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Copyright 2019-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include <unifex/receiver_concepts.hpp>
#include <unifex/sender_concepts.hpp>
#include <unifex/tag_invoke.hpp>
#include <unifex/execution_policy.hpp>
#include <unifex/get_execution_policy.hpp>

#include <unifex/detail/prologue.hpp>

namespace unifex {

namespace _bulk_join {

template<typename Receiver>
struct _join_receiver {
class type;
};

template<typename Receiver>
using join_receiver = typename _join_receiver<Receiver>::type;

template<typename Receiver>
class _join_receiver<Receiver>::type {
public:
template<typename Receiver2>
explicit type(Receiver2&& r) noexcept(std::is_nothrow_constructible_v<Receiver, Receiver2>)
: receiver_((Receiver2&&)r)
{}

void set_next() & noexcept {}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I follow how an empty body here constitutes tracking all of the set_next calls and completing once all of the set_next() calls have completed, as your spec says above. What am I missing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The calls to set_next() return before set_done()/set_value()/set_error() is called.
So we just discard the (empty) "next value" here and then rely on the normal sender signals forwarding through to the single-sender receiver to signal that the bulk operation has completed.


template(typename... Values)
(requires is_value_receiver_v<Receiver, Values...>)
void set_value(Values&&... values) noexcept(is_nothrow_value_receiver_v<Receiver, Values...>) {
unifex::set_value(std::move(receiver_), (Values&&)values...);
}

template(typename Error)
(requires is_error_receiver_v<Receiver, Error>)
void set_error(Error&& error) noexcept {
unifex::set_error(std::move(receiver_), (Error&&)error);
}

template(typename R = Receiver)
(requires is_done_receiver_v<Receiver>)
void set_done() noexcept {
unifex::set_done(std::move(receiver_));
}

friend constexpr unifex::parallel_unsequenced_policy tag_invoke(
tag_t<get_execution_policy>, [[maybe_unused]] const type& r) noexcept {
return {};
}

template(typename CPO, typename Self)
(requires
is_receiver_query_cpo_v<CPO> AND
same_as<Self, type>)
friend auto tag_invoke(CPO cpo, const Self& self)
noexcept(is_nothrow_callable_v<CPO, const Receiver&>)
-> callable_result_t<CPO, const Receiver&> {
return cpo(self.receiver_);
}

private:
Receiver receiver_;
};

template<typename Source>
struct _join_sender {
class type;
};

template<typename Source>
using join_sender = typename _join_sender<Source>::type;

template<typename Source>
class _join_sender<Source>::type {
public:
template<template<typename...> class Variant, template<typename...> class Tuple>
using value_types = typename Source::template value_types<Variant, Tuple>;

template<template<typename...> class Variant>
using error_types = typename Source::template error_types<Variant>;

static constexpr bool sends_done = Source::sends_done;

template<typename Source2>
explicit type(Source2&& s)
noexcept(std::is_nothrow_constructible_v<Source, Source2>)
: source_((Source2&&)s)
{}

template(typename Self, typename Receiver)
(requires
same_as<remove_cvref_t<Self>, type> AND
sender_to<member_t<Self, Source>, join_receiver<remove_cvref_t<Receiver>>>)
friend auto tag_invoke(tag_t<unifex::connect>, Self&& self, Receiver&& r)
noexcept(
std::is_nothrow_constructible_v<remove_cvref_t<Receiver>> &&
is_nothrow_connectable_v<member_t<Self, Source>, join_receiver<remove_cvref_t<Receiver>>>)
-> connect_result_t<member_t<Self, Source>, join_receiver<remove_cvref_t<Receiver>>>
{
return unifex::connect(
static_cast<Self&&>(self).source_,
join_receiver<remove_cvref_t<Receiver>>{static_cast<Receiver&&>(r)});
}

private:
Source source_;
};

struct _fn {
template(typename Source)
(requires
typed_bulk_sender<Source> &&
tag_invocable<_fn, Source>)
auto operator()(Source&& source) const
noexcept(is_nothrow_tag_invocable_v<_fn, Source>)
-> tag_invoke_result_t<_fn, Source> {
return tag_invoke(_fn{}, (Source&&)source);
}

template(typename Source)
(requires
typed_bulk_sender<Source> &&
(!tag_invocable<_fn, Source>))
auto operator()(Source&& source) const
noexcept(std::is_nothrow_constructible_v<remove_cvref_t<Source>, Source>)
-> join_sender<remove_cvref_t<Source>> {
return join_sender<remove_cvref_t<Source>>{
(Source&&)source};
}
};

} // namespace _bulk_join

inline constexpr _bulk_join::_fn bulk_join{};

} // namespace unifex

#include <unifex/detail/epilogue.hpp>