Skip to content

Commit

Permalink
Fixing loop-end condition
Browse files Browse the repository at this point in the history
  • Loading branch information
hkaiser committed Apr 28, 2024
1 parent 802eb72 commit 60e255e
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 25 deletions.
6 changes: 3 additions & 3 deletions libs/core/coroutines/include/hpx/coroutines/thread_enums.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,15 +206,15 @@ namespace hpx::threads {
/// local thread number associated with this hint. Local thread numbers
/// are indexed from zero. It is up to the scheduler to decide how to
/// interpret thread numbers that are larger than the number of threads
/// available to the scheduler. Typically thread numbers will wrap
/// available to the scheduler. Typically, thread numbers will wrap
/// around when too large.
thread = 1,

/// A hint that tells the scheduler to prefer scheduling a task on the
/// NUMA domain associated with this hint. NUMA domains are indexed from
/// zero. It is up to the scheduler to decide how to interpret NUMA
/// domain indices that are larger than the number of available NUMA
/// domains to the scheduler. Typically indices will wrap around when
/// domains to the scheduler. Typically, indices will wrap around when
/// too large.
numa = 2,
};
Expand Down Expand Up @@ -295,7 +295,7 @@ namespace hpx::threads {
}

///////////////////////////////////////////////////////////////////////////
/// \enum thread_placement_hint
/// \enum thread_execution_hint
///
/// The type of hint given to the scheduler related running a thread as a
/// child directly in the context of the parent thread
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ namespace hpx::parallel::execution::detail {
template <typename F, typename Ts>
void do_work_chunk(F&& f, Ts&& ts, std::uint32_t const index) const
{
#if HPX_HAVE_ITTNOTIFY != 0 && !defined(HPX_HAVE_APEX)
#if defined(HPX_HAVE_ITTNOTIFY) && HPX_HAVE_ITTNOTIFY != 0 && \
!defined(HPX_HAVE_APEX)
static hpx::util::itt::event notify_event(
"set_value_loop_visitor_static::do_work_chunk(chunking)");

Expand Down Expand Up @@ -150,7 +151,7 @@ namespace hpx::parallel::execution::detail {
// Finish the work for one worker thread. If this is not the last worker
// thread to finish, it will only decrement the counter. If it is the
// last thread it will call set_exception if there is an exception.
// Otherwise it will call set_value on the shared state.
// Otherwise, it will call set_value on the shared state.
void finish() const
{
if (--(state->tasks_remaining.data_) == 0)
Expand Down Expand Up @@ -438,17 +439,21 @@ namespace hpx::parallel::execution::detail {

// Initialize the queues for all worker threads so that worker
// threads can start stealing immediately when they start.
for (std::uint32_t worker_thread = 0; worker_thread != num_threads;
++worker_thread)
if (hint.placement_mode() == placement::breadth_first ||
hint.placement_mode() == placement::breadth_first_reverse)
{
if (hint.placement_mode() == placement::breadth_first ||
hint.placement_mode() == placement::breadth_first_reverse)
for (std::uint32_t worker_thread = 0;
worker_thread != num_threads; ++worker_thread)
{
init_queue_breadth_first(worker_thread, num_chunks);
}
else
}
else
{
// the default for this executor is depth-first placement
for (std::uint32_t worker_thread = 0;
worker_thread != num_threads; ++worker_thread)
{
// the default for this executor is depth-first placement
init_queue_depth_first(worker_thread, num_chunks);
}
}
Expand Down Expand Up @@ -546,8 +551,8 @@ namespace hpx::parallel::execution::detail {
auto launch_data = generate_launch_data();
std::size_t const size = launch_data.size();

// Do straight spawning if hierarchical spawning was disabled or we
// have less chunks than our threshold.
// Do straight spawning if hierarchical spawning was disabled or if
// we have less chunks than our threshold.
if (hierarchical_threshold == 0 || hierarchical_threshold >= size)
{
for (std::size_t i = 0; i != size; ++i)
Expand All @@ -558,36 +563,50 @@ namespace hpx::parallel::execution::detail {
return;
}

auto task = [desc, pool, launch_data](auto b, auto e) {
for (std::size_t i = b; i != e - 1; ++i)
auto task = [desc, pool, launch_data = HPX_MOVE(launch_data)](
auto b, auto e) mutable {
HPX_ASSERT(b != e);
for (std::size_t i = b + 1; i != e; ++i)
{
auto state = launch_data[i].func.state;
state->template do_work_task<false>(desc, pool,
launch_data[i].bind_to_core, launch_data[i].func);
launch_data[i].bind_to_core,
HPX_MOVE(launch_data[i].func));
}

// directly execute last task
auto state = launch_data[e - 1].func.state;
// directly execute first task
auto state = launch_data[b].func.state;
state->template do_work_task<true>(
desc, pool, false, launch_data[e - 1].func);
desc, pool, false, HPX_MOVE(launch_data[b].func));
};

// run task on small stack
auto post_policy = hpx::execution::experimental::with_stacksize(
policy, threads::thread_stacksize::small_);
auto post_policy_hint =
hpx::execution::experimental::get_hint(post_policy);
post_policy_hint.mode =
hpx::threads::thread_schedule_hint_mode::thread;

std::size_t start = 0;
while (true)
while (start < size)
{
// place the helper thread on the first core of the thread block
post_policy_hint.hint =
first_thread + static_cast<std::uint16_t>(start);
auto core_policy = hpx::execution::experimental::with_hint(
post_policy, post_policy_hint);

auto const stop = start + hierarchical_threshold;
if (stop > size)
{
hpx::detail::post_policy_dispatch<Launch>::call(
post_policy, desc, pool, HPX_MOVE(task), start, size);
core_policy, desc, pool, HPX_MOVE(task), start, size);
break;
}

hpx::detail::post_policy_dispatch<Launch>::call(
post_policy, desc, pool, task, start, stop);
core_policy, desc, pool, task, start, stop);
start = stop;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ namespace hpx::execution::experimental::detail {
template <typename Ts>
void do_work_chunk(Ts& ts, std::uint32_t const index) const
{
#if HPX_HAVE_ITTNOTIFY != 0 && !defined(HPX_HAVE_APEX)
#if defined(HPX_HAVE_ITTNOTIFY) && HPX_HAVE_ITTNOTIFY != 0 && \
!defined(HPX_HAVE_APEX)
static hpx::util::itt::event notify_event(
"set_value_loop_visitor_static::do_work_chunk(chunking)");

Expand All @@ -145,7 +146,7 @@ namespace hpx::execution::experimental::detail {
(std::min)(i_begin + task_f->chunk_size, task_f->size);

auto it = std::next(hpx::util::begin(op_state->shape), i_begin);
for (std::uint32_t i = i_begin; i != i_end; (void) ++it, ++i)
for (std::size_t i = i_begin; i != i_end; (void) ++it, ++i)
{
bulk_scheduler_invoke_helper(
index_pack_type{}, op_state->f, *it, ts);
Expand Down Expand Up @@ -274,7 +275,7 @@ namespace hpx::execution::experimental::detail {
// Finish the work for one worker thread. If this is not the last worker
// thread to finish, it will only decrement the counter. If it is the
// last thread it will call set_error if there is an exception.
// Otherwise it will call set_value on the connected receiver.
// Otherwise, it will call set_value on the connected receiver.
void finish() const
{
if (--(op_state->tasks_remaining.data_) == 0)
Expand Down

0 comments on commit 60e255e

Please sign in to comment.