Skip to content

Commit

Permalink
Disable run_as_child mode for some of the tests
Browse files Browse the repository at this point in the history
  • Loading branch information
hkaiser committed Jul 28, 2023
1 parent 9d46648 commit d3f50ae
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ namespace hpx::detail {
HPX_FORCEINLINE static std::enable_if_t<
traits::detail::is_deferred_invocable_v<F, Ts...>,
hpx::future<util::detail::invoke_deferred_result_t<F, Ts...>>>
call(Policy&& policy, hpx::threads::thread_description const& desc,
call(Policy policy, hpx::threads::thread_description const& desc,
threads::thread_pool_base* pool, F&& f, Ts&&... ts)
{
HPX_ASSERT(pool);
Expand All @@ -172,10 +172,8 @@ namespace hpx::detail {
lcos::local::futures_factory<result_type()> p(
util::deferred_call(HPX_FORWARD(F, f), HPX_FORWARD(Ts, ts)...));

threads::thread_id_ref_type tid =
p.post(pool, desc.get_description(), HPX_MOVE(policy));

if (tid)
if (threads::thread_id_ref_type tid =
p.post(pool, desc.get_description(), HPX_MOVE(policy)))
{
auto runs_as_child = hint.runs_as_child_mode();
if (runs_as_child ==
Expand Down Expand Up @@ -230,7 +228,7 @@ namespace hpx::detail {
HPX_FORCEINLINE static std::enable_if_t<
traits::detail::is_deferred_invocable_v<F, Ts...>,
hpx::future<util::detail::invoke_deferred_result_t<F, Ts...>>>
call(Policy&& policy, hpx::threads::thread_description const& desc,
call(Policy policy, hpx::threads::thread_description const& desc,
threads::thread_pool_base* pool, F&& f, Ts&&... ts)
{
HPX_ASSERT(pool != nullptr);
Expand Down Expand Up @@ -258,7 +256,7 @@ namespace hpx::detail {
p.post(pool, desc.get_description(), policy);

// make sure this thread is executed last
threads::thread_id_type tid_self = threads::get_self_id();
threads::thread_id_type const tid_self = threads::get_self_id();
if (tid && tid_self &&
get_thread_id_data(tid)->get_scheduler_base() ==
get_thread_id_data(tid_self)->get_scheduler_base())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,14 @@ namespace hpx::detail {
hpx::threads::thread_description const& desc,
threads::thread_pool_base* pool, F&& f, Ts&&... ts)
{
// run_as_child doesn't make sense if we _post_ a tasks
auto hint = policy.hint();
if (hint.runs_as_child_mode() ==
hpx::threads::thread_execution_hint::run_as_child)
{
if (!pool->get_scheduler()->supports_direct_execution())
{
hint.runs_as_child_mode(
hpx::threads::thread_execution_hint::none);
policy.set_hint(hint);
}
hint.runs_as_child_mode(
hpx::threads::thread_execution_hint::none);
policy.set_hint(hint);
}

threads::thread_init_data data(
Expand Down Expand Up @@ -76,32 +74,23 @@ namespace hpx::detail {
hpx::threads::thread_description const& desc,
threads::thread_pool_base* pool, F&& f, Ts&&... ts)
{
// run_as_child doesn't make sense if we _post_ a tasks
auto hint = policy.hint();
if (hint.runs_as_child_mode() ==
hpx::threads::thread_execution_hint::run_as_child)
{
if (!pool->get_scheduler()->supports_direct_execution())
{
hint.runs_as_child_mode(
hpx::threads::thread_execution_hint::none);
policy.set_hint(hint);
}
}

threads::thread_init_data data(
threads::make_thread_function_nullary(hpx::util::deferred_call(
HPX_FORWARD(F, f), HPX_FORWARD(Ts, ts)...)),
desc, policy.priority(),
threads::thread_schedule_hint(
threads::thread_schedule_hint_mode::thread,
static_cast<std::int16_t>(get_worker_thread_num()),
hint.placement_mode(), hint.runs_as_child_mode()),
hint.placement_mode(),
hpx::threads::thread_execution_hint::none),
policy.stacksize(),
threads::thread_schedule_state::pending_do_not_schedule, true);

threads::thread_id_ref_type tid =
threads::thread_id_ref_type const tid =
threads::register_thread(data, pool);
threads::thread_id_type tid_self = threads::get_self_id();
threads::thread_id_type const tid_self = threads::get_self_id();

// make sure this thread is executed last
if (tid && tid_self &&
Expand Down Expand Up @@ -181,52 +170,29 @@ namespace hpx::detail {
{
HPX_ASSERT(pool != nullptr);

auto hint = policy.hint();
if (hint.runs_as_child_mode() ==
hpx::threads::thread_execution_hint::run_as_child)
{
if (!pool->get_scheduler()->supports_direct_execution())
{
hint.runs_as_child_mode(
hpx::threads::thread_execution_hint::none);
policy.set_hint(hint);
}
}

// run_as_child doesn't make sense if we _post_ a tasks
if (policy == launch::sync)
{
auto mod_policy = launch::sync_policy(
policy.priority(), policy.stacksize(), hint);

post_policy_dispatch<launch::sync_policy>::call(
HPX_MOVE(mod_policy), desc, pool, HPX_FORWARD(F, f),
HPX_MOVE(policy), desc, pool, HPX_FORWARD(F, f),
HPX_FORWARD(Ts, ts)...);
}
else if (policy == launch::deferred)
{
auto mod_policy = launch::deferred_policy(
policy.priority(), policy.stacksize(), hint);

post_policy_dispatch<launch::deferred_policy>::call(
HPX_MOVE(mod_policy), desc, pool, HPX_FORWARD(F, f),
HPX_MOVE(policy), desc, pool, HPX_FORWARD(F, f),
HPX_FORWARD(Ts, ts)...);
}
else if (policy == launch::fork)
{
auto mod_policy = launch::fork_policy(
policy.priority(), policy.stacksize(), hint);

post_policy_dispatch<launch::fork_policy>::call(
HPX_MOVE(mod_policy), desc, pool, HPX_FORWARD(F, f),
HPX_MOVE(policy), desc, pool, HPX_FORWARD(F, f),
HPX_FORWARD(Ts, ts)...);
}
else
{
auto mod_policy = launch::async_policy(
policy.priority(), policy.stacksize(), hint);

post_policy_dispatch<launch::async_policy>::call(
HPX_MOVE(mod_policy), desc, pool, HPX_FORWARD(F, f),
HPX_MOVE(policy), desc, pool, HPX_FORWARD(F, f),
HPX_FORWARD(Ts, ts)...);
}
}
Expand Down
7 changes: 6 additions & 1 deletion libs/core/resource_partitioner/tests/unit/suspend_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,16 @@ int hpx_main()

hpx::threads::thread_pool_base& worker_pool =
hpx::resource::get_thread_pool("worker");
hpx::execution::parallel_executor worker_exec(
hpx::execution::parallel_executor exec(
&hpx::resource::get_thread_pool("worker"));
std::size_t const worker_pool_threads =
hpx::resource::get_num_threads("worker");

hpx::threads::thread_schedule_hint hint;
hint.runs_as_child_mode(hpx::threads::thread_execution_hint::none);

auto worker_exec = hpx::execution::experimental::with_hint(exec, hint);

{
// Suspend and resume pool with future
hpx::chrono::high_resolution_timer const t;
Expand Down
34 changes: 21 additions & 13 deletions libs/core/resource_partitioner/tests/unit/suspend_thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,36 +23,37 @@
#include <utility>
#include <vector>

std::size_t const max_threads = (std::min)(
std::size_t(4), std::size_t(hpx::threads::hardware_concurrency()));
std::size_t const max_threads = (std::min)(static_cast<std::size_t>(4),
static_cast<std::size_t>(hpx::threads::hardware_concurrency()));

int hpx_main()
{
std::size_t const num_threads = hpx::resource::get_num_threads("default");

HPX_TEST_EQ(std::size_t(max_threads), num_threads);
HPX_TEST_EQ(static_cast<std::size_t>(max_threads), num_threads);

hpx::threads::thread_pool_base& tp =
hpx::resource::get_thread_pool("default");

HPX_TEST_EQ(tp.get_active_os_thread_count(), std::size_t(max_threads));
HPX_TEST_EQ(
tp.get_active_os_thread_count(), static_cast<std::size_t>(max_threads));

{
// Check number of used resources
for (std::size_t thread_num = 0; thread_num < num_threads - 1;
++thread_num)
{
hpx::threads::suspend_processing_unit(tp, thread_num).get();
HPX_TEST_EQ(std::size_t(num_threads - thread_num - 1),
HPX_TEST_EQ(static_cast<std::size_t>(num_threads - thread_num - 1),
tp.get_active_os_thread_count());
}

for (std::size_t thread_num = 0; thread_num < num_threads - 1;
++thread_num)
{
hpx::threads::resume_processing_unit(tp, thread_num).get();
HPX_TEST_EQ(
std::size_t(thread_num + 2), tp.get_active_os_thread_count());
HPX_TEST_EQ(static_cast<std::size_t>(thread_num + 2),
tp.get_active_os_thread_count());
}
}

Expand All @@ -61,7 +62,7 @@ int hpx_main()

// NOTE: This only works as long as there is another OS thread which has
// no work and is able to steal.
std::size_t worker_thread_num = hpx::get_worker_thread_num();
std::size_t const worker_thread_num = hpx::get_worker_thread_num();
hpx::threads::suspend_processing_unit(tp, worker_thread_num).get();
hpx::threads::resume_processing_unit(tp, worker_thread_num).get();
}
Expand Down Expand Up @@ -150,13 +151,20 @@ int hpx_main()
std::size_t thread_num = 0;
bool up = true;
std::vector<hpx::future<void>> fs;
hpx::chrono::high_resolution_timer t;

hpx::threads::thread_schedule_hint hint;
hint.runs_as_child_mode(hpx::threads::thread_execution_hint::none);

hpx::launch::async_policy policy;
policy.set_hint(hint);

hpx::chrono::high_resolution_timer const t;
while (t.elapsed() < 2)
{
for (std::size_t i = 0;
i < hpx::resource::get_num_threads("default") * 10; ++i)
{
fs.push_back(hpx::async([]() {}));
fs.push_back(hpx::async(policy, []() {}));
}

if (up)
Expand Down Expand Up @@ -189,7 +197,7 @@ int hpx_main()
}
}

hpx::when_all(std::move(fs)).get();
hpx::wait_all(std::move(fs));

// Don't exit with suspended pus
for (std::size_t thread_num_resume = 0; thread_num_resume < thread_num;
Expand Down Expand Up @@ -226,7 +234,7 @@ int main(int argc, char* argv[])

{
// These schedulers should succeed
std::vector<hpx::resource::scheduling_policy> schedulers = {
std::vector<hpx::resource::scheduling_policy> const schedulers = {
hpx::resource::scheduling_policy::local,
hpx::resource::scheduling_policy::local_priority_fifo,
#if defined(HPX_HAVE_CXX11_STD_ATOMIC_128BIT)
Expand All @@ -247,7 +255,7 @@ int main(int argc, char* argv[])

{
// These schedulers should fail
std::vector<hpx::resource::scheduling_policy> schedulers = {
std::vector<hpx::resource::scheduling_policy> const schedulers = {
hpx::resource::scheduling_policy::static_,
hpx::resource::scheduling_policy::static_priority,
};
Expand Down
3 changes: 0 additions & 3 deletions libs/core/schedulers/include/hpx/schedulers/thread_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1176,9 +1176,6 @@ namespace hpx::threads::policies {
parameters_.small_stacksize_, thread_id_addref::no);
HPX_ASSERT(p);

// We initialize the stack eagerly
p->init();

// Finally, store the thread for later use
thread_heap_small_.emplace_back(p);
}
Expand Down

0 comments on commit d3f50ae

Please sign in to comment.