Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Always return outermost thread id #6465

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 5 additions & 6 deletions components/iostreams/src/server/output_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ namespace hpx::iostreams::detail {
ar << valid;
if (valid)
{
ar& data_;
ar & data_;
}
}

Expand All @@ -44,7 +44,7 @@ namespace hpx::iostreams::detail {
ar >> valid;
if (valid)
{
ar& data_;
ar & data_;
}
}
} // namespace hpx::iostreams::detail
Expand Down Expand Up @@ -89,10 +89,9 @@ namespace hpx::iostreams::server {
{ // {{{
// Perform the IO in another OS thread.
detail::buffer in(buf_in);
hpx::get_thread_pool("io_pool")->get_io_service().post(
hpx::bind_front(&output_stream::call_write_sync, this, locality_id,
count, std::ref(in),
threads::thread_id_ref_type(threads::get_outer_self_id())));
hpx::get_thread_pool("io_pool")->get_io_service().post(hpx::bind_front(
&output_stream::call_write_sync, this, locality_id, count,
std::ref(in), threads::thread_id_ref_type(threads::get_self_id())));

// Sleep until the worker thread wakes us up.
this_thread::suspend(threads::thread_schedule_state::suspended,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2007-2023 Hartmut Kaiser
// Copyright (c) 2007-2024 Hartmut Kaiser
//
// SPDX-License-Identifier: BSL-1.0
// Distributed under the Boost Software License, Version 1.0. (See accompanying
Expand Down Expand Up @@ -63,6 +63,9 @@ namespace examples::server {
}
~reset_id()
{
auto const mtx = outer_.mtx_;
std::lock_guard<hpx::mutex> l(*mtx);

[[maybe_unused]] hpx::thread::id const old_value = outer_.id_;
outer_.id_ = hpx::thread::id();
HPX_ASSERT(old_value != hpx::thread::id());
Expand Down Expand Up @@ -104,9 +107,15 @@ namespace examples::server {
});

auto const mtx = mtx_;
std::lock_guard<hpx::mutex> l(*mtx);
HPX_ASSERT(id_ != hpx::thread::id());
hpx::thread::interrupt(id_);

std::unique_lock<hpx::mutex> l(*mtx);
auto const id = id_;

if (id != hpx::thread::id())
{
l.unlock();
hpx::thread::interrupt(id);
}
}

HPX_DEFINE_COMPONENT_ACTION(cancelable_action, do_it, do_it_action)
Expand Down
20 changes: 11 additions & 9 deletions libs/core/config/include/hpx/config/threads_stack.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,22 @@
#endif

#if !defined(HPX_SMALL_STACK_SIZE)
# if defined(HPX_WINDOWS) && !defined(HPX_HAVE_GENERIC_CONTEXT_COROUTINES)
# define HPX_SMALL_STACK_SIZE_TARGET 0x4000 // 16kByte
# else
# if defined(HPX_DEBUG)
# define HPX_SMALL_STACK_SIZE_TARGET 0x20000 // 128kByte
# if !defined(HPX_SMALL_STACK_SIZE_TARGET)
# if defined(HPX_WINDOWS) && !defined(HPX_HAVE_GENERIC_CONTEXT_COROUTINES)
# define HPX_SMALL_STACK_SIZE_TARGET 0x4000 // 16kByte
# else
# if defined(__powerpc__) || defined(__INTEL_COMPILER)
# define HPX_SMALL_STACK_SIZE_TARGET 0x20000 // 128kByte
# if defined(HPX_DEBUG)
# define HPX_SMALL_STACK_SIZE_TARGET 0x20000 // 128kByte
# else
# define HPX_SMALL_STACK_SIZE_TARGET 0x10000 // 64kByte
# if defined(__powerpc__) || defined(__INTEL_COMPILER)
# define HPX_SMALL_STACK_SIZE_TARGET 0x20000 // 128kByte
# else
# define HPX_SMALL_STACK_SIZE_TARGET 0x10000 // 64kByte
# endif
# endif
# endif
# endif

#
# if HPX_SMALL_STACK_SIZE_TARGET < (2 * HPX_THREADS_STACK_OVERHEAD)
# define HPX_SMALL_STACK_SIZE (2 * HPX_THREADS_STACK_OVERHEAD)
# else
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2007-2023 Hartmut Kaiser
// Copyright (c) 2007-2024 Hartmut Kaiser
//
// SPDX-License-Identifier: BSL-1.0
// Distributed under the Boost Software License, Version 1.0. (See accompanying
Expand Down Expand Up @@ -425,6 +425,12 @@ namespace hpx::threads {
runs_as_child_mode_bits = static_cast<std::uint8_t>(bits);
}

void schedule_hint(std::int16_t core) noexcept
{
mode = thread_schedule_hint_mode::thread;
hint = core;
}

/// The hint associated with the mode. The interpretation of this hint
/// depends on the given mode.
std::int16_t hint = -1;
Expand Down
35 changes: 24 additions & 11 deletions libs/core/execution/tests/unit/bulk_async.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// Copyright (c) 2015 Daniel Bourgeois
// Copyright (c) 2024 Hartmut Kaiser
//
// SPDX-License-Identifier: BSL-1.0
// Distributed under the Boost Software License, Version 1.0. (See accompanying
Expand All @@ -16,16 +17,16 @@
#include <vector>

////////////////////////////////////////////////////////////////////////////////
int bulk_test(
hpx::thread::id tid, int value, bool is_par, int passed_through) //-V813
int bulk_test(hpx::thread::id const& tid, int value, bool is_par,
int passed_through) //-V813
{
HPX_TEST_EQ(is_par, (tid != hpx::this_thread::get_id()));
HPX_TEST_EQ(passed_through, 42);
return value;
}

template <typename Executor>
void test_bulk_sync(Executor& exec)
void test_bulk_sync(Executor&& exec)
{
hpx::thread::id tid = hpx::this_thread::get_id();

Expand All @@ -35,14 +36,15 @@ void test_bulk_sync(Executor& exec)
using hpx::placeholders::_1;
using hpx::placeholders::_2;

std::vector<int> results = hpx::parallel::execution::bulk_sync_execute(
exec, hpx::bind(&bulk_test, tid, _1, false, _2), v, 42);
std::vector<int> results =
hpx::parallel::execution::bulk_sync_execute(HPX_FORWARD(Executor, exec),
hpx::bind(&bulk_test, tid, _1, false, _2), v, 42);

HPX_TEST(std::equal(std::begin(results), std::end(results), std::begin(v)));
}

template <typename Executor>
void test_bulk_async(Executor& exec)
void test_bulk_async(Executor&& exec)
{
hpx::thread::id tid = hpx::this_thread::get_id();

Expand All @@ -54,31 +56,42 @@ void test_bulk_async(Executor& exec)

std::vector<hpx::future<int>> results =
hpx::parallel::execution::bulk_async_execute(
exec, hpx::bind(&bulk_test, tid, _1, true, _2), v, 42);
HPX_FORWARD(Executor, exec),
hpx::bind(&bulk_test, tid, _1, true, _2), v, 42);

HPX_TEST(std::equal(std::begin(results), std::end(results), std::begin(v),
[](hpx::future<int>& lhs, const int& rhs) {
return lhs.get() == rhs;
}));
}

template <typename Executor>
decltype(auto) disable_run_as_child(Executor&& exec)
{
auto hint = hpx::execution::experimental::get_hint(exec);
hint.runs_as_child_mode(hpx::threads::thread_execution_hint::none);

return hpx::experimental::prefer(hpx::execution::experimental::with_hint,
HPX_FORWARD(Executor, exec), hint);
}

////////////////////////////////////////////////////////////////////////////////
int hpx_main()
{
hpx::execution::sequenced_executor seq_exec;
test_bulk_sync(seq_exec);
test_bulk_sync(disable_run_as_child(seq_exec));

hpx::execution::parallel_executor par_exec;
hpx::execution::parallel_executor par_fork_exec(hpx::launch::fork);
test_bulk_async(par_exec);
test_bulk_async(par_fork_exec);
test_bulk_async(disable_run_as_child(par_exec));
test_bulk_async(disable_run_as_child(par_fork_exec));

return hpx::local::finalize();
}

int main(int argc, char* argv[])
{
// By default this test should run on all available cores
// By default, this test should run on all available cores
std::vector<std::string> const cfg = {"hpx.os_threads=all"};

// Initialize and run HPX
Expand Down