Skip to content

Commit

Permalink
Always return outermost thread id
Browse files Browse the repository at this point in the history
-flyby: deprecate get_outer_self_id
-flyby: ignoring locks during termination detection
  • Loading branch information
hkaiser committed Apr 28, 2024
1 parent e977ecc commit b05a03e
Show file tree
Hide file tree
Showing 38 changed files with 590 additions and 438 deletions.
11 changes: 5 additions & 6 deletions components/iostreams/src/server/output_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ namespace hpx::iostreams::detail {
ar << valid;
if (valid)
{
ar& data_;
ar & data_;
}
}

Expand All @@ -44,7 +44,7 @@ namespace hpx::iostreams::detail {
ar >> valid;
if (valid)
{
ar& data_;
ar & data_;
}
}
} // namespace hpx::iostreams::detail
Expand Down Expand Up @@ -89,10 +89,9 @@ namespace hpx::iostreams::server {
{ // {{{
// Perform the IO in another OS thread.
detail::buffer in(buf_in);
hpx::get_thread_pool("io_pool")->get_io_service().post(
hpx::bind_front(&output_stream::call_write_sync, this, locality_id,
count, std::ref(in),
threads::thread_id_ref_type(threads::get_outer_self_id())));
hpx::get_thread_pool("io_pool")->get_io_service().post(hpx::bind_front(
&output_stream::call_write_sync, this, locality_id, count,
std::ref(in), threads::thread_id_ref_type(threads::get_self_id())));

// Sleep until the worker thread wakes us up.
this_thread::suspend(threads::thread_schedule_state::suspended,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2007-2023 Hartmut Kaiser
// Copyright (c) 2007-2024 Hartmut Kaiser
//
// SPDX-License-Identifier: BSL-1.0
// Distributed under the Boost Software License, Version 1.0. (See accompanying
Expand Down Expand Up @@ -63,6 +63,9 @@ namespace examples::server {
}
~reset_id()
{
auto const mtx = outer_.mtx_;
std::lock_guard<hpx::mutex> l(*mtx);

[[maybe_unused]] hpx::thread::id const old_value = outer_.id_;
outer_.id_ = hpx::thread::id();
HPX_ASSERT(old_value != hpx::thread::id());
Expand Down Expand Up @@ -104,9 +107,15 @@ namespace examples::server {
});

auto const mtx = mtx_;
std::lock_guard<hpx::mutex> l(*mtx);
HPX_ASSERT(id_ != hpx::thread::id());
hpx::thread::interrupt(id_);

std::unique_lock<hpx::mutex> l(*mtx);
auto const id = id_;

if (id != hpx::thread::id())
{
l.unlock();
hpx::thread::interrupt(id);
}
}

HPX_DEFINE_COMPONENT_ACTION(cancelable_action, do_it, do_it_action)
Expand Down
20 changes: 11 additions & 9 deletions libs/core/config/include/hpx/config/threads_stack.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,22 @@
#endif

#if !defined(HPX_SMALL_STACK_SIZE)
# if defined(HPX_WINDOWS) && !defined(HPX_HAVE_GENERIC_CONTEXT_COROUTINES)
# define HPX_SMALL_STACK_SIZE_TARGET 0x4000 // 16kByte
# else
# if defined(HPX_DEBUG)
# define HPX_SMALL_STACK_SIZE_TARGET 0x20000 // 128kByte
# if !defined(HPX_SMALL_STACK_SIZE_TARGET)
# if defined(HPX_WINDOWS) && !defined(HPX_HAVE_GENERIC_CONTEXT_COROUTINES)
# define HPX_SMALL_STACK_SIZE_TARGET 0x4000 // 16kByte
# else
# if defined(__powerpc__) || defined(__INTEL_COMPILER)
# define HPX_SMALL_STACK_SIZE_TARGET 0x20000 // 128kByte
# if defined(HPX_DEBUG)
# define HPX_SMALL_STACK_SIZE_TARGET 0x20000 // 128kByte
# else
# define HPX_SMALL_STACK_SIZE_TARGET 0x10000 // 64kByte
# if defined(__powerpc__) || defined(__INTEL_COMPILER)
# define HPX_SMALL_STACK_SIZE_TARGET 0x20000 // 128kByte
# else
# define HPX_SMALL_STACK_SIZE_TARGET 0x10000 // 64kByte
# endif
# endif
# endif
# endif

#
# if HPX_SMALL_STACK_SIZE_TARGET < (2 * HPX_THREADS_STACK_OVERHEAD)
# define HPX_SMALL_STACK_SIZE (2 * HPX_THREADS_STACK_OVERHEAD)
# else
Expand Down
8 changes: 7 additions & 1 deletion libs/core/coroutines/include/hpx/coroutines/thread_enums.hpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2007-2023 Hartmut Kaiser
// Copyright (c) 2007-2024 Hartmut Kaiser
//
// SPDX-License-Identifier: BSL-1.0
// Distributed under the Boost Software License, Version 1.0. (See accompanying
Expand Down Expand Up @@ -425,6 +425,12 @@ namespace hpx::threads {
runs_as_child_mode_bits = static_cast<std::uint8_t>(bits);
}

void schedule_hint(std::int16_t core) noexcept
{
mode = thread_schedule_hint_mode::thread;
hint = core;
}

/// The hint associated with the mode. The interpretation of this hint
/// depends on the given mode.
std::int16_t hint = -1;
Expand Down
35 changes: 24 additions & 11 deletions libs/core/execution/tests/unit/bulk_async.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// Copyright (c) 2015 Daniel Bourgeois
// Copyright (c) 2024 Hartmut Kaiser
//
// SPDX-License-Identifier: BSL-1.0
// Distributed under the Boost Software License, Version 1.0. (See accompanying
Expand All @@ -16,16 +17,16 @@
#include <vector>

////////////////////////////////////////////////////////////////////////////////
int bulk_test(
hpx::thread::id tid, int value, bool is_par, int passed_through) //-V813
int bulk_test(hpx::thread::id const& tid, int value, bool is_par,
int passed_through) //-V813
{
HPX_TEST_EQ(is_par, (tid != hpx::this_thread::get_id()));
HPX_TEST_EQ(passed_through, 42);
return value;
}

template <typename Executor>
void test_bulk_sync(Executor& exec)
void test_bulk_sync(Executor&& exec)
{
hpx::thread::id tid = hpx::this_thread::get_id();

Expand All @@ -35,14 +36,15 @@ void test_bulk_sync(Executor& exec)
using hpx::placeholders::_1;
using hpx::placeholders::_2;

std::vector<int> results = hpx::parallel::execution::bulk_sync_execute(
exec, hpx::bind(&bulk_test, tid, _1, false, _2), v, 42);
std::vector<int> results =
hpx::parallel::execution::bulk_sync_execute(HPX_FORWARD(Executor, exec),
hpx::bind(&bulk_test, tid, _1, false, _2), v, 42);

HPX_TEST(std::equal(std::begin(results), std::end(results), std::begin(v)));
}

template <typename Executor>
void test_bulk_async(Executor& exec)
void test_bulk_async(Executor&& exec)
{
hpx::thread::id tid = hpx::this_thread::get_id();

Expand All @@ -54,31 +56,42 @@ void test_bulk_async(Executor& exec)

std::vector<hpx::future<int>> results =
hpx::parallel::execution::bulk_async_execute(
exec, hpx::bind(&bulk_test, tid, _1, true, _2), v, 42);
HPX_FORWARD(Executor, exec),
hpx::bind(&bulk_test, tid, _1, true, _2), v, 42);

HPX_TEST(std::equal(std::begin(results), std::end(results), std::begin(v),
[](hpx::future<int>& lhs, const int& rhs) {
return lhs.get() == rhs;
}));
}

template <typename Executor>
decltype(auto) disable_run_as_child(Executor&& exec)
{
auto hint = hpx::execution::experimental::get_hint(exec);
hint.runs_as_child_mode(hpx::threads::thread_execution_hint::none);

return hpx::experimental::prefer(hpx::execution::experimental::with_hint,
HPX_FORWARD(Executor, exec), hint);
}

////////////////////////////////////////////////////////////////////////////////
int hpx_main()
{
hpx::execution::sequenced_executor seq_exec;
test_bulk_sync(seq_exec);
test_bulk_sync(disable_run_as_child(seq_exec));

hpx::execution::parallel_executor par_exec;
hpx::execution::parallel_executor par_fork_exec(hpx::launch::fork);
test_bulk_async(par_exec);
test_bulk_async(par_fork_exec);
test_bulk_async(disable_run_as_child(par_exec));
test_bulk_async(disable_run_as_child(par_fork_exec));

return hpx::local::finalize();
}

int main(int argc, char* argv[])
{
// By default this test should run on all available cores
// By default, this test should run on all available cores
std::vector<std::string> const cfg = {"hpx.os_threads=all"};

// Initialize and run HPX
Expand Down

0 comments on commit b05a03e

Please sign in to comment.