Skip to content

Commit

Permalink
Rename on schedule_from, rename just_on to just_transfer, and add tra…
Browse files Browse the repository at this point in the history
…nsfer
  • Loading branch information
msimberg committed Nov 18, 2021
1 parent af33761 commit 16424ac
Show file tree
Hide file tree
Showing 11 changed files with 280 additions and 213 deletions.
28 changes: 14 additions & 14 deletions libs/core/async_cuda/tests/unit/transform_stream.cu
Expand Up @@ -117,7 +117,7 @@ int hpx_main()
auto s2 = cu::transform_stream(std::move(s1), dummy{});
// NOTE: transform_stream calls triggers the receiver on a plain
// std::thread. We explicitly change the context back to an hpx::thread.
ex::sync_wait(ex::on(std::move(s2), ex::thread_pool_scheduler{}));
ex::sync_wait(ex::transfer(std::move(s2), ex::thread_pool_scheduler{}));
HPX_TEST_EQ(dummy::host_void_calls.load(), std::size_t(0));
HPX_TEST_EQ(dummy::stream_void_calls.load(), std::size_t(1));
HPX_TEST_EQ(dummy::host_int_calls.load(), std::size_t(0));
Expand All @@ -132,7 +132,7 @@ int hpx_main()
auto s2 = cu::transform_stream(std::move(s1), dummy{});
auto s3 = cu::transform_stream(std::move(s2), dummy{});
auto s4 = cu::transform_stream(std::move(s3), dummy{});
ex::sync_wait(ex::on(std::move(s4), ex::thread_pool_scheduler{}));
ex::sync_wait(ex::transfer(std::move(s4), ex::thread_pool_scheduler{}));
HPX_TEST_EQ(dummy::host_void_calls.load(), std::size_t(0));
HPX_TEST_EQ(dummy::stream_void_calls.load(), std::size_t(3));
HPX_TEST_EQ(dummy::host_int_calls.load(), std::size_t(0));
Expand All @@ -146,10 +146,10 @@ int hpx_main()
dummy::reset_counts();
auto s1 = ex::just();
auto s2 = cu::transform_stream(std::move(s1), dummy{});
auto s3 = ex::on(std::move(s2), ex::thread_pool_scheduler{});
auto s3 = ex::transfer(std::move(s2), ex::thread_pool_scheduler{});
auto s4 = ex::then(std::move(s3), dummy{});
auto s5 = cu::transform_stream(std::move(s4), dummy{});
ex::sync_wait(ex::on(std::move(s5), ex::thread_pool_scheduler{}));
ex::sync_wait(ex::transfer(std::move(s5), ex::thread_pool_scheduler{}));
HPX_TEST_EQ(dummy::host_void_calls.load(), std::size_t(1));
HPX_TEST_EQ(dummy::stream_void_calls.load(), std::size_t(2));
HPX_TEST_EQ(dummy::host_int_calls.load(), std::size_t(0));
Expand All @@ -163,7 +163,7 @@ int hpx_main()
auto s1 = ex::schedule(ex::thread_pool_scheduler{});
auto s2 = ex::then(std::move(s1), dummy{});
auto s3 = cu::transform_stream(std::move(s2), dummy{});
auto s4 = ex::on(std::move(s3), ex::thread_pool_scheduler{});
auto s4 = ex::transfer(std::move(s3), ex::thread_pool_scheduler{});
auto s5 = ex::then(std::move(s4), dummy{});
ex::sync_wait(std::move(s5));
HPX_TEST_EQ(dummy::host_void_calls.load(), std::size_t(2));
Expand All @@ -180,7 +180,7 @@ int hpx_main()
auto s1 = ex::just(1);
auto s2 = cu::transform_stream(std::move(s1), dummy{});
HPX_TEST_EQ(
ex::sync_wait(ex::on(std::move(s2), ex::thread_pool_scheduler{})),
ex::sync_wait(ex::transfer(std::move(s2), ex::thread_pool_scheduler{})),
2.0);
HPX_TEST_EQ(dummy::host_void_calls.load(), std::size_t(0));
HPX_TEST_EQ(dummy::stream_void_calls.load(), std::size_t(0));
Expand All @@ -197,7 +197,7 @@ int hpx_main()
auto s3 = cu::transform_stream(std::move(s2), dummy{});
auto s4 = cu::transform_stream(std::move(s3), dummy{});
HPX_TEST_EQ(
ex::sync_wait(ex::on(std::move(s4), ex::thread_pool_scheduler{})),
ex::sync_wait(ex::transfer(std::move(s4), ex::thread_pool_scheduler{})),
4.0);
HPX_TEST_EQ(dummy::host_void_calls.load(), std::size_t(0));
HPX_TEST_EQ(dummy::stream_void_calls.load(), std::size_t(0));
Expand All @@ -212,11 +212,11 @@ int hpx_main()
dummy::reset_counts();
auto s1 = ex::just(1);
auto s2 = cu::transform_stream(std::move(s1), dummy{});
auto s3 = ex::on(std::move(s2), ex::thread_pool_scheduler{});
auto s3 = ex::transfer(std::move(s2), ex::thread_pool_scheduler{});
auto s4 = ex::then(std::move(s3), dummy{});
auto s5 = cu::transform_stream(std::move(s4), dummy{});
HPX_TEST_EQ(
ex::sync_wait(ex::on(std::move(s5), ex::thread_pool_scheduler{})),
ex::sync_wait(ex::transfer(std::move(s5), ex::thread_pool_scheduler{})),
4.0);
HPX_TEST_EQ(dummy::host_void_calls.load(), std::size_t(0));
HPX_TEST_EQ(dummy::stream_void_calls.load(), std::size_t(0));
Expand All @@ -229,10 +229,10 @@ int hpx_main()
{
dummy::reset_counts();
auto s1 = ex::just(1);
auto s2 = ex::on(std::move(s1), ex::thread_pool_scheduler{});
auto s2 = ex::transfer(std::move(s1), ex::thread_pool_scheduler{});
auto s3 = ex::then(std::move(s2), dummy{});
auto s4 = cu::transform_stream(std::move(s3), dummy{});
auto s5 = ex::on(std::move(s4), ex::thread_pool_scheduler{});
auto s5 = ex::transfer(std::move(s4), ex::thread_pool_scheduler{});
auto s6 = ex::then(std::move(s5), dummy{});
HPX_TEST_EQ(ex::sync_wait(std::move(s6)), 4.0);
HPX_TEST_EQ(dummy::host_void_calls.load(), std::size_t(0));
Expand All @@ -245,11 +245,11 @@ int hpx_main()

{
dummy::reset_counts();
auto s1 = ex::just_on(ex::thread_pool_scheduler{}, 1);
auto s1 = ex::transfer_just(ex::thread_pool_scheduler{}, 1);
auto s2 = ex::then(std::move(s1), dummy{});
auto s3 = cu::transform_stream(std::move(s2), dummy{});
auto s4 = cu::transform_stream(std::move(s3), dummy{});
auto s5 = ex::on(std::move(s4), ex::thread_pool_scheduler{});
auto s5 = ex::transfer(std::move(s4), ex::thread_pool_scheduler{});
auto s6 = ex::then(std::move(s5), dummy{});
HPX_TEST_EQ(ex::sync_wait(std::move(s6)), 5.0);
HPX_TEST_EQ(dummy::host_void_calls.load(), std::size_t(0));
Expand Down Expand Up @@ -280,7 +280,7 @@ int hpx_main()
cu::transform_stream(cuda_memcpy_async{}) |
ex::then(&cu::check_cuda_error) |
ex::then([&p_h] { HPX_TEST_EQ(p_h, 3); }) |
ex::on(ex::thread_pool_scheduler{}) | ex::sync_wait();
ex::transfer(ex::thread_pool_scheduler{}) | ex::sync_wait();

cu::check_cuda_error(cudaFree(p));
}
Expand Down
5 changes: 3 additions & 2 deletions libs/core/execution/CMakeLists.txt
Expand Up @@ -15,16 +15,17 @@ set(execution_headers
hpx/execution/algorithms/ensure_started.hpp
hpx/execution/algorithms/execute.hpp
hpx/execution/algorithms/just.hpp
hpx/execution/algorithms/just_on.hpp
hpx/execution/algorithms/keep_future.hpp
hpx/execution/algorithms/let_error.hpp
hpx/execution/algorithms/let_value.hpp
hpx/execution/algorithms/make_future.hpp
hpx/execution/algorithms/on.hpp
hpx/execution/algorithms/schedule_from.hpp
hpx/execution/algorithms/split.hpp
hpx/execution/algorithms/start_detached.hpp
hpx/execution/algorithms/sync_wait.hpp
hpx/execution/algorithms/then.hpp
hpx/execution/algorithms/transfer.hpp
hpx/execution/algorithms/transfer_just.hpp
hpx/execution/algorithms/when_all.hpp
hpx/execution/detail/async_launch_policy_dispatch.hpp
hpx/execution/detail/execution_parameter_callbacks.hpp
Expand Down
Expand Up @@ -11,7 +11,6 @@
#include <hpx/datastructures/optional.hpp>
#include <hpx/datastructures/tuple.hpp>
#include <hpx/datastructures/variant.hpp>
#include <hpx/execution/algorithms/detail/partial_algorithm.hpp>
#include <hpx/execution_base/completion_scheduler.hpp>
#include <hpx/execution_base/receiver.hpp>
#include <hpx/execution_base/sender.hpp>
Expand All @@ -30,7 +29,7 @@
namespace hpx { namespace execution { namespace experimental {
namespace detail {
template <typename Sender, typename Scheduler>
struct on_sender
struct schedule_from_sender
{
HPX_NO_UNIQUE_ADDRESS std::decay_t<Sender> predecessor_sender;
HPX_NO_UNIQUE_ADDRESS std::decay_t<Scheduler> scheduler;
Expand Down Expand Up @@ -75,7 +74,7 @@ namespace hpx { namespace execution { namespace experimental {
>
friend constexpr auto tag_invoke(
hpx::execution::experimental::get_completion_scheduler_t<CPO>,
on_sender const& sender)
schedule_from_sender const& sender)
{
if constexpr (std::is_same_v<std::decay_t<CPO>,
hpx::execution::experimental::set_value_t>)
Expand Down Expand Up @@ -292,46 +291,38 @@ namespace hpx { namespace execution { namespace experimental {

template <typename Receiver>
friend operation_state<Receiver> tag_invoke(
connect_t, on_sender&& s, Receiver&& receiver)
connect_t, schedule_from_sender&& s, Receiver&& receiver)
{
return {HPX_MOVE(s.predecessor_sender), HPX_MOVE(s.scheduler),
HPX_FORWARD(Receiver, receiver)};
}

template <typename Receiver>
friend operation_state<Receiver> tag_invoke(
connect_t, on_sender& s, Receiver&& receiver)
connect_t, schedule_from_sender& s, Receiver&& receiver)
{
return {s.predecessor_sender, s.scheduler,
HPX_FORWARD(Receiver, receiver)};
}
};
} // namespace detail

HPX_INLINE_CONSTEXPR_VARIABLE struct on_t final
: hpx::functional::detail::tag_fallback<on_t>
HPX_INLINE_CONSTEXPR_VARIABLE struct schedule_from_t final
: hpx::functional::detail::tag_fallback<schedule_from_t>
{
private:
// clang-format off
template <typename Sender, typename Scheduler,
template <typename Scheduler, typename Sender,
HPX_CONCEPT_REQUIRES_(
is_sender_v<Sender>
)>
// clang-format on
friend constexpr HPX_FORCEINLINE auto tag_fallback_invoke(
on_t, Sender&& predecessor_sender, Scheduler&& scheduler)
schedule_from_t, Scheduler&& scheduler, Sender&& predecessor_sender)
{
return detail::on_sender<Sender, Scheduler>{
return detail::schedule_from_sender<Sender, Scheduler>{
HPX_FORWARD(Sender, predecessor_sender),
HPX_FORWARD(Scheduler, scheduler)};
}

template <typename Scheduler>
friend constexpr HPX_FORCEINLINE auto tag_fallback_invoke(
on_t, Scheduler&& scheduler)
{
return detail::partial_algorithm<on_t, Scheduler>{
HPX_FORWARD(Scheduler, scheduler)};
}
} on{};
} schedule_from{};
}}} // namespace hpx::execution::experimental
67 changes: 67 additions & 0 deletions libs/core/execution/include/hpx/execution/algorithms/transfer.hpp
@@ -0,0 +1,67 @@
// Copyright (c) 2020 ETH Zurich
//
// SPDX-License-Identifier: BSL-1.0
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

#pragma once

#include <hpx/config.hpp>
#include <hpx/concepts/concepts.hpp>
#include <hpx/execution/algorithms/detail/partial_algorithm.hpp>
#include <hpx/execution_base/completion_scheduler.hpp>
#include <hpx/execution_base/receiver.hpp>
#include <hpx/execution_base/sender.hpp>
#include <hpx/functional/detail/tag_priority_invoke.hpp>

#include <utility>

namespace hpx { namespace execution { namespace experimental {
HPX_INLINE_CONSTEXPR_VARIABLE struct transfer_t final
: hpx::functional::detail::tag_priority<transfer_t>
{
private:
// clang-format off
template <typename Sender, typename Scheduler,
HPX_CONCEPT_REQUIRES_(
is_sender_v<Sender> &&
is_scheduler_v<Scheduler> &&
hpx::execution::experimental::detail::
is_completion_scheduler_tag_invocable_v<
hpx::execution::experimental::set_value_t, Sender,
transfer_t, Scheduler>)>
// clang-format on
friend constexpr HPX_FORCEINLINE auto tag_override_invoke(
bulk_t, Sender&& sender, Scheduler&& scheduler)
{
auto completion_scheduler =
hpx::execution::experimental::get_completion_scheduler<
hpx::execution::experimental::set_value_t>(sender);
return hpx::functional::tag_invoke(transfer_t{},
HPX_MOVE(completion_scheduler), HPX_FORWARD(Sender, sender),
HPX_FORWARD(Scheduler, scheduler));
}

// clang-format off
template <typename Sender, typename Scheduler,
HPX_CONCEPT_REQUIRES_(
is_sender_v<Sender> &&
is_scheduler_v<Scheduler>
)>
// clang-format on
friend constexpr HPX_FORCEINLINE auto tag_fallback_invoke(
transfer_t, Sender&& predecessor_sender, Scheduler&& scheduler)
{
return schedule_from(HPX_FORWARD(Scheduler, scheduler),
HPX_FORWARD(Sender, predecessor_sender));
}

template <typename Scheduler>
friend constexpr HPX_FORCEINLINE auto tag_fallback_invoke(
transfer_t, Scheduler&& scheduler)
{
return detail::partial_algorithm<transfer_t, Scheduler>{
HPX_FORWARD(Scheduler, scheduler)};
}
} transfer{};
}}} // namespace hpx::execution::experimental
Expand Up @@ -8,22 +8,22 @@

#include <hpx/config.hpp>
#include <hpx/execution/algorithms/just.hpp>
#include <hpx/execution/algorithms/on.hpp>
#include <hpx/execution/algorithms/transfer.hpp>
#include <hpx/functional/detail/tag_fallback_invoke.hpp>

#include <utility>

namespace hpx { namespace execution { namespace experimental {
HPX_INLINE_CONSTEXPR_VARIABLE struct just_on_t final
: hpx::functional::detail::tag_fallback<just_on_t>
HPX_INLINE_CONSTEXPR_VARIABLE struct transfer_just_t final
: hpx::functional::detail::tag_fallback<transfer_just_t>
{
private:
template <typename Scheduler, typename... Ts>
friend constexpr HPX_FORCEINLINE auto tag_fallback_invoke(
just_on_t, Scheduler&& scheduler, Ts&&... ts)
transfer_just_t, Scheduler&& scheduler, Ts&&... ts)
{
return on(just(HPX_FORWARD(Ts, ts)...),
return transfer(just(HPX_FORWARD(Ts, ts)...),
HPX_FORWARD(Scheduler, scheduler));
}
} just_on{};
} transfer_just{};
}}} // namespace hpx::execution::experimental

0 comments on commit 16424ac

Please sign in to comment.