Skip to content

Commit

Permalink
[WIP] moar reduce
Browse files Browse the repository at this point in the history
  • Loading branch information
ericniebler committed Sep 25, 2023
1 parent b240e36 commit 66cbfbf
Show file tree
Hide file tree
Showing 24 changed files with 389 additions and 208 deletions.
11 changes: 11 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,17 @@
"initCommands": ["settings set target.disable-aslr false"],
"args": "${input:CXX_PROGRAM_ARGS}",
},
{
"name": "CUDA Current Target (cuda-gdb)",
"type": "cuda-gdb",
"request": "launch",
"stopAtEntry": false,
"breakOnLaunch": false,
"internalConsoleOptions": "neverOpen",
"program": "${command:cmake.launchTargetPath}",
"cwd": "${command:cmake.launchTargetDirectory}",
"args": "${input:CXX_PROGRAM_ARGS}",
},
],
"inputs": [
{
Expand Down
6 changes: 3 additions & 3 deletions examples/nvexec/maxwell/cuda.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@ void run_cuda(
cudaStream_t stream{};
cudaStreamCreate(&stream);

kernel<block_threads><<<grid_blocks, block_threads, 0, stream>>>(cells, initializer);
::kernel<block_threads><<<grid_blocks, block_threads, 0, stream>>>(cells, initializer);
STDEXEC_DBG_ERR(cudaStreamSynchronize(stream));

report_performance(grid.cells, n_iterations, method, [&]() {
for (std::size_t compute_step = 0; compute_step < n_iterations; compute_step++) {
kernel<block_threads><<<grid_blocks, block_threads, 0, stream>>>(cells, h_updater);
kernel<block_threads><<<grid_blocks, block_threads, 0, stream>>>(cells, e_updater);
::kernel<block_threads><<<grid_blocks, block_threads, 0, stream>>>(cells, h_updater);
::kernel<block_threads><<<grid_blocks, block_threads, 0, stream>>>(cells, e_updater);
}
writer(false);
STDEXEC_DBG_ERR(cudaStreamSynchronize(stream));
Expand Down
28 changes: 17 additions & 11 deletions examples/nvexec/reduce.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,26 @@
#include <cstdio>
#include <span>

namespace ex = stdexec;

int main() {
// const int n = 2 * 1024;
// thrust::device_vector<float> input(n, 1.0f);
// float* first = thrust::raw_pointer_cast(input.data());
// float* last = thrust::raw_pointer_cast(input.data()) + input.size();
const int n = 2 * 1024;
thrust::device_vector<float> input(n, 1.0f);
float* first = thrust::raw_pointer_cast(input.data());
float* last = thrust::raw_pointer_cast(input.data()) + input.size();

nvexec::stream_context stream_ctx{};
stdexec::scheduler auto stream_sched = stream_ctx.get_scheduler();

// nvexec::stream_context stream_ctx{};
auto snd = //
stdexec::v2::on(
stream_sched,
stdexec::just(std::span{first, last}) | nvexec::reduce(42.0f));

// auto snd = ex::transfer_just(stream_ctx.get_scheduler(), std::span{first, last})
// | nvexec::reduce(42.0f);
// // BUGBUG this doesn't work:
// auto snd = //
// stdexec::just(std::span{first, last})
// | stdexec::v2::continue_on(stream_sched, nvexec::reduce(42.0f));

// auto [result] = stdexec::sync_wait(std::move(snd)).value();
auto [result] = stdexec::sync_wait(std::move(snd)).value();

// std::cout << "result: " << result << std::endl;
std::cout << "result: " << result << std::endl;
}
49 changes: 1 addition & 48 deletions include/exec/inline_scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,52 +22,5 @@
namespace exec {
// A simple scheduler that executes its continuation inline, on the
// thread of the caller of start().
struct inline_scheduler {
template <class R_>
struct __op {
using R = stdexec::__t<R_>;
STDEXEC_NO_UNIQUE_ADDRESS R rec_;

friend void tag_invoke(stdexec::start_t, __op& op) noexcept {
stdexec::set_value((R&&) op.rec_);
}
};

struct __sender {
using is_sender = void;
using completion_signatures = stdexec::completion_signatures<stdexec::set_value_t()>;

template <class R>
friend auto tag_invoke(stdexec::connect_t, __sender, R&& rec) //
noexcept(stdexec::__nothrow_constructible_from<stdexec::__decay_t<R>, R>)
-> __op<stdexec::__x<stdexec::__decay_t<R>>> {
return {(R&&) rec};
}

struct __env {
friend inline_scheduler
tag_invoke(stdexec::get_completion_scheduler_t<stdexec::set_value_t>, const __env&) //
noexcept {
return {};
}
};

friend __env tag_invoke(stdexec::get_env_t, const __sender&) noexcept {
return {};
}
};

STDEXEC_DETAIL_CUDACC_HOST_DEVICE //
friend __sender
tag_invoke(stdexec::schedule_t, const inline_scheduler&) noexcept {
return {};
}

friend stdexec::forward_progress_guarantee
tag_invoke(stdexec::get_forward_progress_guarantee_t, const inline_scheduler&) noexcept {
return stdexec::forward_progress_guarantee::weakly_parallel;
}

bool operator==(const inline_scheduler&) const noexcept = default;
};
using inline_scheduler = stdexec::__inln::__scheduler;
}
5 changes: 3 additions & 2 deletions include/nvexec/multi_gpu_context.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,10 @@ namespace nvexec {
}
} else {
if (op.status_ == cudaSuccess) {
continuation_kernel<<<1, 1, 0, op.stream_>>>(std::move(op.rec_), stdexec::set_value);
STDEXEC_STREAM_DETAIL_NS::continuation_kernel<<<1, 1, 0, op.stream_>>>(
std::move(op.rec_), stdexec::set_value);
} else {
continuation_kernel<<<1, 1, 0, op.stream_>>>(
STDEXEC_STREAM_DETAIL_NS::continuation_kernel<<<1, 1, 0, op.stream_>>>(
std::move(op.rec_), stdexec::set_error, std::move(op.status_));
}
}
Expand Down
6 changes: 3 additions & 3 deletions include/nvexec/stream/algorithm_base.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ namespace nvexec::STDEXEC_STREAM_DETAIL_NS::__algo_range_init_fun {
using binary_invoke_result_t = ::cuda::std::decay_t<
::cuda::std::invoke_result_t<Fun, stdexec::ranges::range_reference_t<Range>, InitT>>;

template <class SenderId, class ReceiverId, class InitT, class Fun, class DerivedReceiver>
template <class CvrefSenderId, class ReceiverId, class InitT, class Fun, class DerivedReceiver>
struct receiver_t {
struct __t : public stream_receiver_base {
using Sender = stdexec::__t<SenderId>;
using CvrefSender = stdexec::__cvref_t<CvrefSenderId>;
using Receiver = stdexec::__t<ReceiverId>;

template <class... Range>
Expand All @@ -54,7 +54,7 @@ namespace nvexec::STDEXEC_STREAM_DETAIL_NS::__algo_range_init_fun {
static constexpr ::std::size_t value = //
__v< __gather_completions_for<
set_value_t,
Sender,
CvrefSender,
env_of_t<Receiver>,
__q<result_size_for_t>,
__q<max_in_pack>>>;
Expand Down
18 changes: 15 additions & 3 deletions include/nvexec/stream/bulk.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ namespace nvexec::STDEXEC_STREAM_DETAIL_NS {
constexpr int block_threads = 256;
const int grid_blocks = (static_cast<int>(self.shape_) + block_threads - 1)
/ block_threads;
kernel<block_threads, As&...>
_bulk::kernel<block_threads, As&...>
<<<grid_blocks, block_threads, 0, stream>>>(self.shape_, std::move(self.f_), as...);
}

Expand Down Expand Up @@ -203,7 +203,7 @@ namespace nvexec::STDEXEC_STREAM_DETAIL_NS {
if (begin < end) {
cudaSetDevice(dev);
cudaStreamWaitEvent(stream, op_state.ready_to_launch_, 0);
kernel<block_threads, As&...>
multi_gpu_bulk::kernel<block_threads, As&...>
<<<grid_blocks, block_threads, 0, stream>>>(begin, end, self.f_, as...);
cudaEventRecord(op_state.ready_to_complete_[dev], op_state.streams_[dev]);
}
Expand All @@ -218,7 +218,7 @@ namespace nvexec::STDEXEC_STREAM_DETAIL_NS {
const int grid_blocks = (shape + block_threads - 1) / block_threads;

if (begin < end) {
kernel<block_threads, As&...>
multi_gpu_bulk::kernel<block_threads, As&...>
<<<grid_blocks, block_threads, 0, baseline_stream>>>(begin, end, self.f_, as...);
}
}
Expand Down Expand Up @@ -371,3 +371,15 @@ namespace nvexec::STDEXEC_STREAM_DETAIL_NS {
};
};
}

namespace stdexec::__detail {
template <class SenderId, class Shape, class Fun>
inline constexpr __mconst<
nvexec::STDEXEC_STREAM_DETAIL_NS::bulk_sender_t<__name_of<__t<SenderId>>, Shape, Fun>>
__name_of_v<nvexec::STDEXEC_STREAM_DETAIL_NS::bulk_sender_t<SenderId, Shape, Fun>>{};

template <class SenderId, class Shape, class Fun>
inline constexpr __mconst<
nvexec::STDEXEC_STREAM_DETAIL_NS::multi_gpu_bulk_sender_t<__name_of<__t<SenderId>>, Shape, Fun>>
__name_of_v<nvexec::STDEXEC_STREAM_DETAIL_NS::multi_gpu_bulk_sender_t<SenderId, Shape, Fun>>{};
}
52 changes: 40 additions & 12 deletions include/nvexec/stream/common.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ namespace nvexec {
}
};

struct stream_scheduler;

struct context_state_t {
std::pmr::memory_resource* pinned_resource_{nullptr};
std::pmr::memory_resource* managed_resource_{nullptr};
Expand Down Expand Up @@ -195,9 +197,9 @@ namespace nvexec {
void return_stream(cudaStream_t stream) {
stream_pools_->return_stream(stream, priority_);
}
};

struct stream_scheduler;
stream_scheduler make_stream_scheduler() const noexcept;
};

struct stream_sender_base {
using is_sender = void;
Expand Down Expand Up @@ -265,6 +267,10 @@ namespace nvexec {
stream_provider_t* operator()(const Env& env) const noexcept {
return tag_invoke(get_stream_provider_t{}, env);
}

friend constexpr bool tag_invoke(forwarding_query_t, const get_stream_provider_t&) noexcept {
return true;
}
};

template <class... Ts>
Expand Down Expand Up @@ -308,7 +314,10 @@ namespace nvexec {
using variant_storage_t = //
__minvoke< __minvoke<
__mfold_right<
__mbind_front_q<stream_storage_impl::variant, ::cuda::std::tuple<set_noop>>,
__mbind_front_q<
stream_storage_impl::variant,
::cuda::std::tuple<set_noop>,
::cuda::std::tuple<set_error_t, cudaError_t>>,
__mbind_front_q<stream_storage_impl::__bind_completions_t, _Sender, _Env>>,
set_value_t,
set_error_t,
Expand All @@ -330,7 +339,21 @@ namespace nvexec {

template <class BaseEnv>
auto make_stream_env(BaseEnv&& base_env, stream_provider_t* stream_provider) noexcept {
return __join_env(__mkprop(get_stream_provider, stream_provider), (BaseEnv&&) base_env);
return __join_env(
__env::__env_fn{
[stream_provider]<__one_of<get_stream_provider_t, get_scheduler_t, get_domain_t> Tag>(
Tag) noexcept {
__mfront<stream_provider_t, Tag>* str_provider = stream_provider;
if constexpr (same_as<Tag, get_stream_provider_t>) {
return str_provider;
} else if constexpr (same_as<Tag, get_scheduler_t>) {
return str_provider->context_.make_stream_scheduler();
} else {
return get_domain(str_provider->context_.make_stream_scheduler());
}
STDEXEC_UNREACHABLE();
}},
(BaseEnv&&) base_env);
}

template <class BaseEnv>
Expand Down Expand Up @@ -370,6 +393,10 @@ namespace nvexec {
stream_sender_base,
__decay_t<transform_sender_result_t<__env_domain_of_t<E>, S, E>>);

struct stream_scheduler;
template <class = stream_scheduler>
struct stream_domain;

template <class R>
concept stream_receiver = //
receiver<R> && //
Expand Down Expand Up @@ -427,8 +454,8 @@ namespace nvexec {
};
};

template <class Receiver, class... As, class Tag>
__launch_bounds__(1) __global__ void continuation_kernel(Receiver rcvr, Tag, As... as) {
template <class Receiver, class Tag, class... As>
__launch_bounds__(1) __global__ void continuation_kernel(Receiver rcvr, As... as) {
static_assert(trivially_copyable<Receiver, Tag, As...>);
Tag()(::cuda::std::move(rcvr), static_cast<As&&>(as)...);
}
Expand Down Expand Up @@ -552,7 +579,7 @@ namespace nvexec {
if (cudaCpuDeviceId == device_id) {
ptr->~T();
} else {
destructor_kernel<<<1, 1, 0, stream>>>(ptr);
STDEXEC_STREAM_DETAIL_NS::destructor_kernel<<<1, 1, 0, stream>>>(ptr);

// TODO Bury all the memory associated with the stream provider and then
// deallocate the memory
Expand All @@ -573,9 +600,9 @@ namespace nvexec {
if constexpr (stream_receiver<outer_receiver_t>) {
set_error((outer_receiver_t&&) rcvr_, (cudaError_t&&) status);
} else {
// pass a cudaError_t by value:
continuation_kernel<outer_receiver_t, Error>
<<<1, 1, 0, get_stream()>>>((outer_receiver_t&&) rcvr_, set_error_t(), status);
STDEXEC_STREAM_DETAIL_NS::
continuation_kernel<outer_receiver_t, set_error_t, cudaError_t> // by value
<<<1, 1, 0, get_stream()>>>((outer_receiver_t&&) rcvr_, status);
}
}

Expand All @@ -584,8 +611,9 @@ namespace nvexec {
if constexpr (stream_receiver<outer_receiver_t>) {
Tag()((outer_receiver_t&&) rcvr_, (As&&) as...);
} else {
continuation_kernel<outer_receiver_t, As&&...> // by reference
<<<1, 1, 0, get_stream()>>>((outer_receiver_t&&) rcvr_, Tag(), (As&&) as...);
STDEXEC_STREAM_DETAIL_NS::
continuation_kernel<outer_receiver_t, Tag, As&&...> // by reference
<<<1, 1, 0, get_stream()>>>((outer_receiver_t&&) rcvr_, (As&&) as...);
}
}
};
Expand Down
3 changes: 2 additions & 1 deletion include/nvexec/stream/ensure_started.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ namespace nvexec::STDEXEC_STREAM_DETAIL_NS {
cudaStream_t stream = state.stream_provider_.own_stream_.value();
using tuple_t = decayed_tuple<Tag, As...>;
state.index_ = SharedState::variant_t::template index_of<tuple_t>::value;
copy_kernel<Tag, As&&...><<<1, 1, 0, stream>>>(state.data_, (As&&) as...);
_ensure_started::copy_kernel<Tag, As&&...>
<<<1, 1, 0, stream>>>(state.data_, (As&&) as...);
state.stream_provider_.status_ = STDEXEC_DBG_ERR(cudaEventRecord(state.event_, stream));
} else {
using tuple_t = decayed_tuple<Tag, As...>;
Expand Down
Loading

0 comments on commit 66cbfbf

Please sign in to comment.