Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename on execution::schedule_from, rename just_on to just_transfer, and add transfer #5656

Merged
merged 1 commit into from Nov 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
28 changes: 14 additions & 14 deletions libs/core/async_cuda/tests/unit/transform_stream.cu
Expand Up @@ -117,7 +117,7 @@ int hpx_main()
auto s2 = cu::transform_stream(std::move(s1), dummy{});
// NOTE: transform_stream calls triggers the receiver on a plain
// std::thread. We explicitly change the context back to an hpx::thread.
ex::sync_wait(ex::on(std::move(s2), ex::thread_pool_scheduler{}));
ex::sync_wait(ex::transfer(std::move(s2), ex::thread_pool_scheduler{}));
HPX_TEST_EQ(dummy::host_void_calls.load(), std::size_t(0));
HPX_TEST_EQ(dummy::stream_void_calls.load(), std::size_t(1));
HPX_TEST_EQ(dummy::host_int_calls.load(), std::size_t(0));
Expand All @@ -132,7 +132,7 @@ int hpx_main()
auto s2 = cu::transform_stream(std::move(s1), dummy{});
auto s3 = cu::transform_stream(std::move(s2), dummy{});
auto s4 = cu::transform_stream(std::move(s3), dummy{});
ex::sync_wait(ex::on(std::move(s4), ex::thread_pool_scheduler{}));
ex::sync_wait(ex::transfer(std::move(s4), ex::thread_pool_scheduler{}));
HPX_TEST_EQ(dummy::host_void_calls.load(), std::size_t(0));
HPX_TEST_EQ(dummy::stream_void_calls.load(), std::size_t(3));
HPX_TEST_EQ(dummy::host_int_calls.load(), std::size_t(0));
Expand All @@ -146,10 +146,10 @@ int hpx_main()
dummy::reset_counts();
auto s1 = ex::just();
auto s2 = cu::transform_stream(std::move(s1), dummy{});
auto s3 = ex::on(std::move(s2), ex::thread_pool_scheduler{});
auto s3 = ex::transfer(std::move(s2), ex::thread_pool_scheduler{});
auto s4 = ex::then(std::move(s3), dummy{});
auto s5 = cu::transform_stream(std::move(s4), dummy{});
ex::sync_wait(ex::on(std::move(s5), ex::thread_pool_scheduler{}));
ex::sync_wait(ex::transfer(std::move(s5), ex::thread_pool_scheduler{}));
HPX_TEST_EQ(dummy::host_void_calls.load(), std::size_t(1));
HPX_TEST_EQ(dummy::stream_void_calls.load(), std::size_t(2));
HPX_TEST_EQ(dummy::host_int_calls.load(), std::size_t(0));
Expand All @@ -163,7 +163,7 @@ int hpx_main()
auto s1 = ex::schedule(ex::thread_pool_scheduler{});
auto s2 = ex::then(std::move(s1), dummy{});
auto s3 = cu::transform_stream(std::move(s2), dummy{});
auto s4 = ex::on(std::move(s3), ex::thread_pool_scheduler{});
auto s4 = ex::transfer(std::move(s3), ex::thread_pool_scheduler{});
auto s5 = ex::then(std::move(s4), dummy{});
ex::sync_wait(std::move(s5));
HPX_TEST_EQ(dummy::host_void_calls.load(), std::size_t(2));
Expand All @@ -180,7 +180,7 @@ int hpx_main()
auto s1 = ex::just(1);
auto s2 = cu::transform_stream(std::move(s1), dummy{});
HPX_TEST_EQ(
ex::sync_wait(ex::on(std::move(s2), ex::thread_pool_scheduler{})),
ex::sync_wait(ex::transfer(std::move(s2), ex::thread_pool_scheduler{})),
2.0);
HPX_TEST_EQ(dummy::host_void_calls.load(), std::size_t(0));
HPX_TEST_EQ(dummy::stream_void_calls.load(), std::size_t(0));
Expand All @@ -197,7 +197,7 @@ int hpx_main()
auto s3 = cu::transform_stream(std::move(s2), dummy{});
auto s4 = cu::transform_stream(std::move(s3), dummy{});
HPX_TEST_EQ(
ex::sync_wait(ex::on(std::move(s4), ex::thread_pool_scheduler{})),
ex::sync_wait(ex::transfer(std::move(s4), ex::thread_pool_scheduler{})),
4.0);
HPX_TEST_EQ(dummy::host_void_calls.load(), std::size_t(0));
HPX_TEST_EQ(dummy::stream_void_calls.load(), std::size_t(0));
Expand All @@ -212,11 +212,11 @@ int hpx_main()
dummy::reset_counts();
auto s1 = ex::just(1);
auto s2 = cu::transform_stream(std::move(s1), dummy{});
auto s3 = ex::on(std::move(s2), ex::thread_pool_scheduler{});
auto s3 = ex::transfer(std::move(s2), ex::thread_pool_scheduler{});
auto s4 = ex::then(std::move(s3), dummy{});
auto s5 = cu::transform_stream(std::move(s4), dummy{});
HPX_TEST_EQ(
ex::sync_wait(ex::on(std::move(s5), ex::thread_pool_scheduler{})),
ex::sync_wait(ex::transfer(std::move(s5), ex::thread_pool_scheduler{})),
4.0);
HPX_TEST_EQ(dummy::host_void_calls.load(), std::size_t(0));
HPX_TEST_EQ(dummy::stream_void_calls.load(), std::size_t(0));
Expand All @@ -229,10 +229,10 @@ int hpx_main()
{
dummy::reset_counts();
auto s1 = ex::just(1);
auto s2 = ex::on(std::move(s1), ex::thread_pool_scheduler{});
auto s2 = ex::transfer(std::move(s1), ex::thread_pool_scheduler{});
auto s3 = ex::then(std::move(s2), dummy{});
auto s4 = cu::transform_stream(std::move(s3), dummy{});
auto s5 = ex::on(std::move(s4), ex::thread_pool_scheduler{});
auto s5 = ex::transfer(std::move(s4), ex::thread_pool_scheduler{});
auto s6 = ex::then(std::move(s5), dummy{});
HPX_TEST_EQ(ex::sync_wait(std::move(s6)), 4.0);
HPX_TEST_EQ(dummy::host_void_calls.load(), std::size_t(0));
Expand All @@ -245,11 +245,11 @@ int hpx_main()

{
dummy::reset_counts();
auto s1 = ex::just_on(ex::thread_pool_scheduler{}, 1);
auto s1 = ex::transfer_just(ex::thread_pool_scheduler{}, 1);
auto s2 = ex::then(std::move(s1), dummy{});
auto s3 = cu::transform_stream(std::move(s2), dummy{});
auto s4 = cu::transform_stream(std::move(s3), dummy{});
auto s5 = ex::on(std::move(s4), ex::thread_pool_scheduler{});
auto s5 = ex::transfer(std::move(s4), ex::thread_pool_scheduler{});
auto s6 = ex::then(std::move(s5), dummy{});
HPX_TEST_EQ(ex::sync_wait(std::move(s6)), 5.0);
HPX_TEST_EQ(dummy::host_void_calls.load(), std::size_t(0));
Expand Down Expand Up @@ -280,7 +280,7 @@ int hpx_main()
cu::transform_stream(cuda_memcpy_async{}) |
ex::then(&cu::check_cuda_error) |
ex::then([&p_h] { HPX_TEST_EQ(p_h, 3); }) |
ex::on(ex::thread_pool_scheduler{}) | ex::sync_wait();
ex::transfer(ex::thread_pool_scheduler{}) | ex::sync_wait();

cu::check_cuda_error(cudaFree(p));
}
Expand Down
5 changes: 3 additions & 2 deletions libs/core/execution/CMakeLists.txt
Expand Up @@ -15,16 +15,17 @@ set(execution_headers
hpx/execution/algorithms/ensure_started.hpp
hpx/execution/algorithms/execute.hpp
hpx/execution/algorithms/just.hpp
hpx/execution/algorithms/just_on.hpp
hpx/execution/algorithms/keep_future.hpp
hpx/execution/algorithms/let_error.hpp
hpx/execution/algorithms/let_value.hpp
hpx/execution/algorithms/make_future.hpp
hpx/execution/algorithms/on.hpp
hpx/execution/algorithms/schedule_from.hpp
hpx/execution/algorithms/split.hpp
hpx/execution/algorithms/start_detached.hpp
hpx/execution/algorithms/sync_wait.hpp
hpx/execution/algorithms/then.hpp
hpx/execution/algorithms/transfer.hpp
hpx/execution/algorithms/transfer_just.hpp
hpx/execution/algorithms/when_all.hpp
hpx/execution/detail/async_launch_policy_dispatch.hpp
hpx/execution/detail/execution_parameter_callbacks.hpp
Expand Down
Expand Up @@ -11,7 +11,6 @@
#include <hpx/datastructures/optional.hpp>
#include <hpx/datastructures/tuple.hpp>
#include <hpx/datastructures/variant.hpp>
#include <hpx/execution/algorithms/detail/partial_algorithm.hpp>
#include <hpx/execution_base/completion_scheduler.hpp>
#include <hpx/execution_base/receiver.hpp>
#include <hpx/execution_base/sender.hpp>
Expand All @@ -30,7 +29,7 @@
namespace hpx { namespace execution { namespace experimental {
namespace detail {
template <typename Sender, typename Scheduler>
struct on_sender
struct schedule_from_sender
{
HPX_NO_UNIQUE_ADDRESS std::decay_t<Sender> predecessor_sender;
HPX_NO_UNIQUE_ADDRESS std::decay_t<Scheduler> scheduler;
Expand Down Expand Up @@ -75,7 +74,7 @@ namespace hpx { namespace execution { namespace experimental {
>
friend constexpr auto tag_invoke(
hpx::execution::experimental::get_completion_scheduler_t<CPO>,
on_sender const& sender)
schedule_from_sender const& sender)
{
if constexpr (std::is_same_v<std::decay_t<CPO>,
hpx::execution::experimental::set_value_t>)
Expand Down Expand Up @@ -292,46 +291,38 @@ namespace hpx { namespace execution { namespace experimental {

template <typename Receiver>
friend operation_state<Receiver> tag_invoke(
connect_t, on_sender&& s, Receiver&& receiver)
connect_t, schedule_from_sender&& s, Receiver&& receiver)
{
return {HPX_MOVE(s.predecessor_sender), HPX_MOVE(s.scheduler),
HPX_FORWARD(Receiver, receiver)};
}

template <typename Receiver>
friend operation_state<Receiver> tag_invoke(
connect_t, on_sender& s, Receiver&& receiver)
connect_t, schedule_from_sender& s, Receiver&& receiver)
{
return {s.predecessor_sender, s.scheduler,
HPX_FORWARD(Receiver, receiver)};
}
};
} // namespace detail

inline constexpr struct on_t final
: hpx::functional::detail::tag_fallback<on_t>
HPX_HOST_DEVICE_INLINE_CONSTEXPR_VARIABLE struct schedule_from_t final
msimberg marked this conversation as resolved.
Show resolved Hide resolved
: hpx::functional::detail::tag_fallback<schedule_from_t>
{
private:
// clang-format off
template <typename Sender, typename Scheduler,
template <typename Scheduler, typename Sender,
HPX_CONCEPT_REQUIRES_(
is_sender_v<Sender>
)>
// clang-format on
friend constexpr HPX_FORCEINLINE auto tag_fallback_invoke(
on_t, Sender&& predecessor_sender, Scheduler&& scheduler)
schedule_from_t, Scheduler&& scheduler, Sender&& predecessor_sender)
{
return detail::on_sender<Sender, Scheduler>{
return detail::schedule_from_sender<Sender, Scheduler>{
HPX_FORWARD(Sender, predecessor_sender),
HPX_FORWARD(Scheduler, scheduler)};
}

template <typename Scheduler>
friend constexpr HPX_FORCEINLINE auto tag_fallback_invoke(
on_t, Scheduler&& scheduler)
{
return detail::partial_algorithm<on_t, Scheduler>{
HPX_FORWARD(Scheduler, scheduler)};
}
} on{};
} schedule_from{};
}}} // namespace hpx::execution::experimental
@@ -0,0 +1,67 @@
// Copyright (c) 2020 ETH Zurich
//
// SPDX-License-Identifier: BSL-1.0
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

#pragma once

#include <hpx/config.hpp>
#include <hpx/concepts/concepts.hpp>
#include <hpx/execution/algorithms/detail/partial_algorithm.hpp>
#include <hpx/execution_base/completion_scheduler.hpp>
#include <hpx/execution_base/receiver.hpp>
#include <hpx/execution_base/sender.hpp>
#include <hpx/functional/detail/tag_priority_invoke.hpp>

#include <utility>

namespace hpx { namespace execution { namespace experimental {
inline constexpr struct transfer_t final
: hpx::functional::detail::tag_priority<transfer_t>
{
private:
// clang-format off
template <typename Sender, typename Scheduler,
HPX_CONCEPT_REQUIRES_(
is_sender_v<Sender> &&
is_scheduler_v<Scheduler> &&
hpx::execution::experimental::detail::
is_completion_scheduler_tag_invocable_v<
hpx::execution::experimental::set_value_t, Sender,
transfer_t, Scheduler>)>
// clang-format on
friend constexpr HPX_FORCEINLINE auto tag_override_invoke(
bulk_t, Sender&& sender, Scheduler&& scheduler)
{
auto completion_scheduler =
hpx::execution::experimental::get_completion_scheduler<
hpx::execution::experimental::set_value_t>(sender);
return hpx::functional::tag_invoke(transfer_t{},
HPX_MOVE(completion_scheduler), HPX_FORWARD(Sender, sender),
HPX_FORWARD(Scheduler, scheduler));
}

// clang-format off
template <typename Sender, typename Scheduler,
HPX_CONCEPT_REQUIRES_(
is_sender_v<Sender> &&
is_scheduler_v<Scheduler>
)>
// clang-format on
friend constexpr HPX_FORCEINLINE auto tag_fallback_invoke(
transfer_t, Sender&& predecessor_sender, Scheduler&& scheduler)
{
return schedule_from(HPX_FORWARD(Scheduler, scheduler),
HPX_FORWARD(Sender, predecessor_sender));
}

template <typename Scheduler>
friend constexpr HPX_FORCEINLINE auto tag_fallback_invoke(
transfer_t, Scheduler&& scheduler)
{
return detail::partial_algorithm<transfer_t, Scheduler>{
HPX_FORWARD(Scheduler, scheduler)};
}
} transfer{};
}}} // namespace hpx::execution::experimental
Expand Up @@ -8,22 +8,22 @@

#include <hpx/config.hpp>
#include <hpx/execution/algorithms/just.hpp>
#include <hpx/execution/algorithms/on.hpp>
#include <hpx/execution/algorithms/transfer.hpp>
#include <hpx/functional/detail/tag_fallback_invoke.hpp>

#include <utility>

namespace hpx { namespace execution { namespace experimental {
inline constexpr struct just_on_t final
: hpx::functional::detail::tag_fallback<just_on_t>
inline constexpr struct transfer_just_t final
: hpx::functional::detail::tag_fallback<transfer_just_t>
{
private:
template <typename Scheduler, typename... Ts>
friend constexpr HPX_FORCEINLINE auto tag_fallback_invoke(
just_on_t, Scheduler&& scheduler, Ts&&... ts)
transfer_just_t, Scheduler&& scheduler, Ts&&... ts)
{
return on(just(HPX_FORWARD(Ts, ts)...),
return transfer(just(HPX_FORWARD(Ts, ts)...),
HPX_FORWARD(Scheduler, scheduler));
}
} just_on{};
} transfer_just{};
}}} // namespace hpx::execution::experimental