Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve libCDS integration #4857

Merged
merged 6 commits into from Jul 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 13 additions & 1 deletion CMakeLists.txt
Expand Up @@ -932,8 +932,20 @@ if(HPX_WITH_APEX)
endif()

# LibCDS option
hpx_option(HPX_WITH_LIBCDS BOOL "Enable LibCDS support." OFF)
hpx_option(
HPX_WITH_LIBCDS BOOL "Enable LibCDS support (experimental)." OFF
CATEGORY "Thread Manager" ADVANCED
)
if(HPX_WITH_LIBCDS)
hpx_option(
HPX_WITH_LIBCDS_GIT_REPOSITORY STRING
"Define the LibCDS git repository to use."
https://github.com/STEllAR-GROUP/libcds CATEGORY "Thread Manager" ADVANCED
)
hpx_option(
HPX_WITH_LIBCDS_GIT_TAG STRING "Define the LibCDS git tag to use."
hpx-thread CATEGORY "Thread Manager" ADVANCED
)
include(HPX_SetupLibCDS)
if(NOT libcds_POPULATED)
hpx_error("HPX_WITH_LIBCDS was set to ON, but HPX failed to fetch LibCDS")
Expand Down
13 changes: 10 additions & 3 deletions cmake/HPX_SetupLibCDS.cmake
Expand Up @@ -17,6 +17,7 @@

if(HPX_WITH_LIBCDS AND NOT TARGET LibCDS::cds)
include(FetchContent)
include(HPX_Message)

set(LIBCDS_WITH_HPX
ON
Expand All @@ -27,11 +28,14 @@ if(HPX_WITH_LIBCDS AND NOT TARGET LibCDS::cds)
CACHE INTERNAL ""
)

hpx_info(
"Fetching libCDS from repository: ${HPX_WITH_LIBCDS_GIT_REPOSITORY}, "
"tag: ${HPX_WITH_LIBCDS_GIT_TAG}"
)
fetchcontent_declare(
libcds
# GIT_REPOSITORY https://github.com/khizmax/libcds
GIT_REPOSITORY https://github.com/weilewei/libcds
GIT_TAG hpx-thread
GIT_REPOSITORY ${HPX_WITH_LIBCDS_GIT_REPOSITORY}
GIT_TAG ${HPX_WITH_LIBCDS_GIT_TAG}
GIT_SHALLOW TRUE
)
fetchcontent_getproperties(libcds)
Expand All @@ -40,6 +44,9 @@ if(HPX_WITH_LIBCDS AND NOT TARGET LibCDS::cds)
fetchcontent_populate(libcds)
set(LIBCDS_CXX_STANDARD ${HPX_CXX_STANDARD})
add_subdirectory(${libcds_SOURCE_DIR} ${libcds_BINARY_DIR})

set_target_properties(cds PROPERTIES FOLDER "Core")
set_target_properties(cds-s PROPERTIES FOLDER "Core")
endif()

endif()
87 changes: 55 additions & 32 deletions tests/performance/local/libcds_hazard_pointer_overhead.cpp
Expand Up @@ -10,15 +10,13 @@
#include <hpx/hpx_init.hpp>
#include <hpx/include/apply.hpp>
#include <hpx/include/async.hpp>
#include <hpx/include/iostreams.hpp>
#include <hpx/include/parallel_executors.hpp>
#include <hpx/include/parallel_for_loop.hpp>
#include <hpx/include/threads.hpp>
#include <hpx/iostream.hpp>
#include <hpx/modules/format.hpp>
#include <hpx/modules/testing.hpp>
#include <hpx/modules/timing.hpp>
#include <hpx/runtime/actions/continuation.hpp>
#include <hpx/runtime/actions/plain_action.hpp>
#include <hpx/threading_base/annotated_function.hpp>

#include <hpx/include/parallel_execution.hpp>
Expand Down Expand Up @@ -115,10 +113,27 @@ double global_scratch = 0;
std::uint64_t num_iterations = 0;

///////////////////////////////////////////////////////////////////////////////
struct libcds_thread_manager_wrapper
{
explicit libcds_thread_manager_wrapper(bool uselibcds)
: uselibcds_(uselibcds)
{
if (uselibcds_)
cds::gc::hp::smr::attach_thread();
}
~libcds_thread_manager_wrapper()
{
if (uselibcds_)
cds::gc::hp::smr::detach_thread();
}

bool uselibcds_;
};

double null_function(bool uselibcds) noexcept
{
if (uselibcds)
cds::threading::Manager::attachThread();
libcds_thread_manager_wrapper wrap(uselibcds);

if (num_iterations > 0)
{
const int array_size = 4096;
Expand All @@ -132,8 +147,6 @@ double null_function(bool uselibcds) noexcept
}
return dummy[0];
}
if (uselibcds)
cds::threading::Manager::detachThread();
return 0.0;
}

Expand Down Expand Up @@ -210,10 +223,10 @@ void measure_function_futures_create_thread_hierarchical_placement(
};
auto const thread_func =
hpx::threads::detail::thread_function_nullary<decltype(func)>{func};
auto const desc = hpx::util::thread_description();
auto desc = hpx::util::thread_description();
auto prio = hpx::threads::thread_priority_normal;
auto const stack_size = hpx::threads::thread_stacksize_small;
auto const num_threads = hpx::get_num_worker_threads();
auto stack_size = hpx::threads::thread_stacksize_small;
auto num_threads = hpx::get_num_worker_threads();
hpx::error_code ec;

// start the clock
Expand All @@ -223,7 +236,7 @@ void measure_function_futures_create_thread_hierarchical_placement(
auto const hint =
hpx::threads::thread_schedule_hint(static_cast<std::int16_t>(t));
auto spawn_func = [&thread_func, sched, hint, t, count, num_threads,
desc, prio]() {
desc, prio, stack_size]() {
std::uint64_t const count_start = t * count / num_threads;
std::uint64_t const count_end = (t + 1) * count / num_threads;
hpx::error_code ec;
Expand Down Expand Up @@ -253,15 +266,27 @@ void measure_function_futures_create_thread_hierarchical_placement(
}

///////////////////////////////////////////////////////////////////////////////
struct libcds_wrapper
{
libcds_wrapper()
{
// Initialize libcds
cds::Initialize();
}

~libcds_wrapper()
{
// Terminate libcds
cds::Terminate();
}
};

int hpx_main(variables_map& vm)
{
// Initialize libcds
cds::Initialize();
libcds_wrapper wrapper;

{
// Initialize Hazard Pointer singleton
cds::gc::HP hpGC;

if (vm.count("hpx:queuing"))
queuing = vm["hpx:queuing"].as<std::string>();

Expand All @@ -284,12 +309,11 @@ int hpx_main(variables_map& vm)
if (HPX_UNLIKELY(0 == count))
throw std::logic_error("error: count of 0 futures specified\n");

cds::gc::HP hpGC;

hpx::parallel::execution::parallel_executor par;
hpx::parallel::execution::parallel_executor_aggregated par_agg;
hpx::parallel::execution::thread_pool_executor tpe;
hpx::parallel::execution::thread_pool_executor tpe_nostack(
hpx::threads::thread_priority_default,
hpx::threads::thread_stacksize_nostack);

for (int i = 0; i < repetitions; i++)
{
Expand All @@ -299,15 +323,14 @@ int hpx_main(variables_map& vm)
count, csv, bool(cds));
measure_function_futures_thread_count(
count, csv, par, bool(cds));
measure_function_futures_thread_count(
count, csv, par_agg, bool(cds));
measure_function_futures_thread_count(
count, csv, tpe, bool(cds));
}
}
}

// Terminate libcds
cds::Terminate();

return hpx::finalize();
}

Expand All @@ -318,20 +341,20 @@ int main(int argc, char* argv[])
options_description cmdline("usage: " HPX_APPLICATION_STRING " [options]");

// clang-format off
cmdline.add_options()("futures",
value<std::uint64_t>()->default_value(500000),
"number of futures to invoke")
cmdline.add_options()
("futures", value<std::uint64_t>()->default_value(500000),
"number of futures to invoke")

("delay-iterations", value<std::uint64_t>()->default_value(0),
"number of iterations in the delay loop")
("delay-iterations", value<std::uint64_t>()->default_value(0),
"number of iterations in the delay loop")

("csv", "output results as csv (format: count,duration)")
("test-all", "run all benchmarks")
("repetitions", value<int>()->default_value(1),
"number of repetitions of the full benchmark")
("csv", "output results as csv (format: count,duration)")
("test-all", "run all benchmarks")
("repetitions", value<int>()->default_value(1),
"number of repetitions of the full benchmark")

("info", value<std::string>()->default_value("no-info"),
"extra info for plot output (e.g. branch name)");
("info", value<std::string>()->default_value("no-info"),
"extra info for plot output (e.g. branch name)");
// clang-format on

// Initialize and run HPX.
Expand Down