Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add hpx threading support for libcds #4

Merged
merged 2 commits into from Jul 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
36 changes: 31 additions & 5 deletions CMakeLists.txt
Expand Up @@ -5,7 +5,7 @@ if(POLICY CMP0042)
cmake_policy(SET CMP0042 NEW)
endif()

set(CMAKE_MODULE_PATH ${CMAKE_CURRENT_SOURCE_DIR}/build/cmake ${CMAKE_MODULE_PATH})
set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_CURRENT_SOURCE_DIR}/build/cmake" CACHE STRING "Modules for CMake" FORCE)
include(TargetArch)
include(CheckIncludeFileCXX)

Expand All @@ -21,9 +21,23 @@ option(LIBCDS_WITH_ASAN "Build ASan+UBSan instrumented code" OFF)
option(LIBCDS_WITH_TSAN "Build TSan instrumented code" OFF)
option(LIBCDS_ENABLE_UNIT_TEST "Enable unit test" ON)
option(LIBCDS_ENABLE_STRESS_TEST "Enable stress test" ON)
set(CMAKE_TARGET_ARCHITECTURE "" CACHE string "Target build architecture")

find_package(Threads)
set(CMAKE_TARGET_ARCHITECTURE "" CACHE STRING "Target build architecture")

if(NOT LIBCDS_WITH_HPX)
find_package(Threads)
else()
if(NOT LIBCDS_INSIDE_HPX)
find_package(HPX REQUIRED)
if (HPX_FOUND)
message(STATUS "HPX Found")
endif()
endif()
message(STATUS "HPX Found")
set(CDS_HAVE_HPX TRUE)
add_definitions(-DCDS_THREADING_HPX)
include_directories(${HPX_INCLUDE_DIRS})
endif()

if(NOT CMAKE_TARGET_ARCHITECTURE)
target_architecture(CMAKE_TARGET_ARCHITECTURE)
Expand Down Expand Up @@ -185,8 +199,20 @@ if(MINGW)
target_compile_definitions(${CDS_STATIC_LIBRARY} PRIVATE CDS_BUILD_STATIC_LIB)
endif()

target_link_libraries(${CDS_SHARED_LIBRARY} PRIVATE ${CMAKE_THREAD_LIBS_INIT})
target_link_libraries(${CDS_STATIC_LIBRARY} PRIVATE ${CMAKE_THREAD_LIBS_INIT})
if(NOT LIBCDS_WITH_HPX)
target_link_libraries(${CDS_SHARED_LIBRARY} PRIVATE ${CMAKE_THREAD_LIBS_INIT})
target_link_libraries(${CDS_STATIC_LIBRARY} PRIVATE ${CMAKE_THREAD_LIBS_INIT})
else()
target_compile_definitions(${CDS_SHARED_LIBRARY} PUBLIC CDS_THREADING_HPX)
target_compile_definitions(${CDS_STATIC_LIBRARY} PUBLIC CDS_THREADING_HPX)
if(LIBCDS_INSIDE_HPX)
target_link_libraries(${CDS_SHARED_LIBRARY} PUBLIC hpx_core)
target_link_libraries(${CDS_STATIC_LIBRARY} PUBLIC hpx_core)
else()
target_link_libraries(${CDS_SHARED_LIBRARY} PUBLIC HPX::hpx)
target_link_libraries(${CDS_STATIC_LIBRARY} PUBLIC HPX::hpx)
endif()
endif()
target_include_directories(${CDS_SHARED_LIBRARY} INTERFACE "$<BUILD_INTERFACE:${PROJECT_SOURCE_DIR}>"
$<INSTALL_INTERFACE:include>)
target_include_directories(${CDS_STATIC_LIBRARY} INTERFACE "$<BUILD_INTERFACE:${PROJECT_SOURCE_DIR}>"
Expand Down
4 changes: 4 additions & 0 deletions cds/algo/atomic.h
Expand Up @@ -37,6 +37,10 @@ namespace cds {
namespace cxx11_atomic {
}} // namespace cds::cxx11_atomic

#if CDS_THREADING_HPX
# include <hpx/config.hpp>
#endif

//@cond
#if defined(CDS_USE_BOOST_ATOMIC)
// boost atomic
Expand Down
8 changes: 8 additions & 0 deletions cds/algo/flat_combining/kernel.h
Expand Up @@ -6,6 +6,10 @@
#ifndef CDSLIB_ALGO_FLAT_COMBINING_KERNEL_H
#define CDSLIB_ALGO_FLAT_COMBINING_KERNEL_H

#if CDS_THREADING_HPX
#include <hpx/thread_support/thread_specific_ptr.hpp>
#endif

#include <cds/algo/flat_combining/defs.h>
#include <cds/algo/flat_combining/wait_strategy.h>

Expand Down Expand Up @@ -232,7 +236,11 @@ namespace cds { namespace algo {
atomics::atomic<unsigned int> m_nCount; ///< Total count of combining passes. Used as an age.
publication_record_type* m_pHead; ///< Head of active publication list
publication_record_type* m_pAllocatedHead; ///< Head of allocated publication list
#if CDS_THREADING_HPX
hpx::threads::thread_specific_ptr< publication_record_type > m_pThreadRec; ///< Thread-local publication record
#else
boost::thread_specific_ptr< publication_record_type > m_pThreadRec; ///< Thread-local publication record
#endif
mutable global_lock_type m_Mutex; ///< Global mutex
mutable stat m_Stat; ///< Internal statistics
unsigned int const m_nCompactFactor; ///< Publication list compacting factor (the list will be compacted through \p %m_nCompactFactor combining passes)
Expand Down
42 changes: 30 additions & 12 deletions cds/algo/flat_combining/wait_strategy.h
Expand Up @@ -6,6 +6,12 @@
#ifndef CDSLIB_ALGO_FLAT_COMBINING_WAIT_STRATEGY_H
#define CDSLIB_ALGO_FLAT_COMBINING_WAIT_STRATEGY_H

#ifdef CDS_THREADING_HPX
#include <hpx/config.hpp>
#include <hpx/synchronization/spinlock.hpp>
#include <hpx/synchronization/condition_variable.hpp>
#endif

#include <cds/algo/flat_combining/defs.h>
#include <cds/algo/backoff_strategy.h>
#include <mutex>
Expand Down Expand Up @@ -37,6 +43,18 @@ namespace cds { namespace algo { namespace flat_combining {
*/
namespace wait_strategy {

struct thread_traits
{
#ifdef CDS_THREADING_HPX
using mutex_type = hpx::lcos::local::mutex;
using condition_variable_type = hpx::lcos::local::condition_variable;
using cv_status = hpx::lcos::local::cv_status;
#else
using mutex_type = std::mutex;
using condition_variable_type = std::condition_variable;
using cv_status = std::cv_status;
#endif
};
/// Empty wait strategy
/**
Empty wait strategy is just spinning on request field.
Expand Down Expand Up @@ -187,11 +205,11 @@ namespace cds { namespace algo { namespace flat_combining {
class single_mutex_single_condvar
{
//@cond
std::mutex m_mutex;
std::condition_variable m_condvar;
thread_traits::mutex_type m_mutex;
thread_traits::condition_variable_type m_condvar;
bool m_wakeup;

typedef std::unique_lock< std::mutex > unique_lock;
typedef std::unique_lock< thread_traits::mutex_type > unique_lock;
//@endcond

public:
Expand Down Expand Up @@ -227,7 +245,7 @@ namespace cds { namespace algo { namespace flat_combining {
return true;
}

bool ret = m_condvar.wait_for( lock, std::chrono::milliseconds( c_nWaitMilliseconds )) == std::cv_status::no_timeout;
bool ret = m_condvar.wait_for( lock, std::chrono::milliseconds( c_nWaitMilliseconds )) == thread_traits::cv_status::no_timeout;
m_wakeup = false;
return ret;
}
Expand Down Expand Up @@ -263,10 +281,10 @@ namespace cds { namespace algo { namespace flat_combining {
class single_mutex_multi_condvar
{
//@cond
std::mutex m_mutex;
thread_traits::mutex_type m_mutex;
bool m_wakeup;

typedef std::unique_lock< std::mutex > unique_lock;
typedef std::unique_lock< thread_traits::mutex_type > unique_lock;
//@endcond

public:
Expand All @@ -281,7 +299,7 @@ namespace cds { namespace algo { namespace flat_combining {
struct type: public PublicationRecord
{
//@cond
std::condition_variable m_condvar;
thread_traits::condition_variable_type m_condvar;
//@endcond
};
};
Expand Down Expand Up @@ -309,7 +327,7 @@ namespace cds { namespace algo { namespace flat_combining {
return true;
}

bool ret = rec.m_condvar.wait_for( lock, std::chrono::milliseconds( c_nWaitMilliseconds )) == std::cv_status::no_timeout;
bool ret = rec.m_condvar.wait_for( lock, std::chrono::milliseconds( c_nWaitMilliseconds )) == thread_traits::cv_status::no_timeout;
m_wakeup = false;
return ret;
}
Expand Down Expand Up @@ -343,7 +361,7 @@ namespace cds { namespace algo { namespace flat_combining {
class multi_mutex_multi_condvar
{
//@cond
typedef std::unique_lock< std::mutex > unique_lock;
typedef std::unique_lock< thread_traits::mutex_type > unique_lock;
//@endcond
public:
enum {
Expand All @@ -357,8 +375,8 @@ namespace cds { namespace algo { namespace flat_combining {
struct type: public PublicationRecord
{
//@cond
std::mutex m_mutex;
std::condition_variable m_condvar;
thread_traits::mutex_type m_mutex;
thread_traits::condition_variable_type m_condvar;
bool m_wakeup;

type()
Expand Down Expand Up @@ -386,7 +404,7 @@ namespace cds { namespace algo { namespace flat_combining {
return true;
}

bool ret = rec.m_condvar.wait_for( lock, std::chrono::milliseconds( c_nWaitMilliseconds )) == std::cv_status::no_timeout;
bool ret = rec.m_condvar.wait_for( lock, std::chrono::milliseconds( c_nWaitMilliseconds )) == thread_traits::cv_status::no_timeout;
rec.m_wakeup = false;
return ret;
}
Expand Down
3 changes: 3 additions & 0 deletions cds/container/striped_map/boost_flat_map.h
Expand Up @@ -6,6 +6,9 @@
#ifndef CDSLIB_CONTAINER_STRIPED_MAP_BOOST_FLAT_MAP_ADAPTER_H
#define CDSLIB_CONTAINER_STRIPED_MAP_BOOST_FLAT_MAP_ADAPTER_H

#if CDS_THREADING_HPX
#include <hpx/config.hpp>
#endif
#include <boost/version.hpp>
#if BOOST_VERSION < 104800
# error "For boost::container::flat_map you must use boost 1.48 or above"
Expand Down
4 changes: 4 additions & 0 deletions cds/container/striped_map/boost_list.h
Expand Up @@ -6,6 +6,10 @@
#ifndef CDSLIB_CONTAINER_STRIPED_MAP_BOOST_LIST_ADAPTER_H
#define CDSLIB_CONTAINER_STRIPED_MAP_BOOST_LIST_ADAPTER_H

#if CDS_THREADING_HPX
#include <hpx/config.hpp>
#endif

#include <boost/version.hpp>
#if BOOST_VERSION < 104800
# error "For boost::container::list you must use boost 1.48 or above"
Expand Down
4 changes: 4 additions & 0 deletions cds/container/striped_map/boost_map.h
Expand Up @@ -6,6 +6,10 @@
#ifndef CDSLIB_CONTAINER_STRIPED_MAP_BOOST_MAP_ADAPTER_H
#define CDSLIB_CONTAINER_STRIPED_MAP_BOOST_MAP_ADAPTER_H

#if CDS_THREADING_HPX
#include <hpx/config.hpp>
#endif

#include <boost/version.hpp>
#if BOOST_VERSION < 104800
# error "For boost::container::map you must use boost 1.48 or above"
Expand Down
4 changes: 4 additions & 0 deletions cds/container/striped_map/boost_slist.h
Expand Up @@ -6,6 +6,10 @@
#ifndef CDSLIB_CONTAINER_STRIPED_MAP_BOOST_SLIST_ADAPTER_H
#define CDSLIB_CONTAINER_STRIPED_MAP_BOOST_SLIST_ADAPTER_H

#if CDS_THREADING_HPX
#include <hpx/config.hpp>
#endif

#include <boost/version.hpp>
#if BOOST_VERSION < 104800
# error "For boost::container::slist you must use boost 1.48 or above"
Expand Down
4 changes: 4 additions & 0 deletions cds/container/striped_map/boost_unordered_map.h
Expand Up @@ -6,6 +6,10 @@
#ifndef CDSLIB_CONTAINER_STRIPED_MAP_BOOST_UNORDERED_MAP_ADAPTER_H
#define CDSLIB_CONTAINER_STRIPED_MAP_BOOST_UNORDERED_MAP_ADAPTER_H

#if CDS_THREADING_HPX
#include <hpx/config.hpp>
#endif

#include <cds/container/striped_set/adapter.h>
#include <boost/unordered_map.hpp>

Expand Down
4 changes: 4 additions & 0 deletions cds/container/striped_set/boost_flat_set.h
Expand Up @@ -6,6 +6,10 @@
#ifndef CDSLIB_CONTAINER_STRIPED_SET_BOOST_FLAT_SET_ADAPTER_H
#define CDSLIB_CONTAINER_STRIPED_SET_BOOST_FLAT_SET_ADAPTER_H

#if CDS_THREADING_HPX
#include <hpx/config.hpp>
#endif

#include <boost/version.hpp>
#if BOOST_VERSION < 104800
# error "For boost::container::flat_set you must use boost 1.48 or above"
Expand Down
4 changes: 4 additions & 0 deletions cds/container/striped_set/boost_list.h
Expand Up @@ -6,6 +6,10 @@
#ifndef CDSLIB_CONTAINER_STRIPED_SET_BOOST_LIST_ADAPTER_H
#define CDSLIB_CONTAINER_STRIPED_SET_BOOST_LIST_ADAPTER_H

#if CDS_THREADING_HPX
#include <hpx/config.hpp>
#endif

#include <boost/version.hpp>
#if BOOST_VERSION < 104800
# error "For boost::container::list you must use boost 1.48 or above"
Expand Down
4 changes: 4 additions & 0 deletions cds/container/striped_set/boost_set.h
Expand Up @@ -6,6 +6,10 @@
#ifndef CDSLIB_CONTAINER_STRIPED_SET_BOOST_SET_ADAPTER_H
#define CDSLIB_CONTAINER_STRIPED_SET_BOOST_SET_ADAPTER_H

#if CDS_THREADING_HPX
#include <hpx/config.hpp>
#endif

#include <boost/version.hpp>
#if BOOST_VERSION < 104800
# error "For boost::container::set you must use boost 1.48 or above"
Expand Down
4 changes: 4 additions & 0 deletions cds/container/striped_set/boost_slist.h
Expand Up @@ -6,6 +6,10 @@
#ifndef CDSLIB_CONTAINER_STRIPED_SET_BOOST_SLIST_ADAPTER_H
#define CDSLIB_CONTAINER_STRIPED_SET_BOOST_SLIST_ADAPTER_H

#if CDS_THREADING_HPX
#include <hpx/config.hpp>
#endif

#include <functional> // ref
#include <cds/container/striped_set/adapter.h>
#include <boost/container/slist.hpp>
Expand Down
4 changes: 4 additions & 0 deletions cds/container/striped_set/boost_stable_vector.h
Expand Up @@ -6,6 +6,10 @@
#ifndef CDSLIB_CONTAINER_STRIPED_SET_BOOST_STABLE_VECTOR_ADAPTER_H
#define CDSLIB_CONTAINER_STRIPED_SET_BOOST_STABLE_VECTOR_ADAPTER_H

#if CDS_THREADING_HPX
#include <hpx/config.hpp>
#endif

#include <boost/version.hpp>
#if BOOST_VERSION < 104800
# error "For boost::container::stable_vector you must use boost 1.48 or above"
Expand Down
4 changes: 4 additions & 0 deletions cds/container/striped_set/boost_unordered_set.h
Expand Up @@ -6,6 +6,10 @@
#ifndef CDSLIB_CONTAINER_STRIPED_SET_BOOST_UNORDERED_SET_ADAPTER_H
#define CDSLIB_CONTAINER_STRIPED_SET_BOOST_UNORDERED_SET_ADAPTER_H

#if CDS_THREADING_HPX
#include <hpx/config.hpp>
#endif

#include <cds/container/striped_set/adapter.h>
#include <boost/unordered_set.hpp>

Expand Down
4 changes: 4 additions & 0 deletions cds/container/striped_set/boost_vector.h
Expand Up @@ -6,6 +6,10 @@
#ifndef CDSLIB_CONTAINER_STRIPED_SET_BOOST_VECTOR_ADAPTER_H
#define CDSLIB_CONTAINER_STRIPED_SET_BOOST_VECTOR_ADAPTER_H

#if CDS_THREADING_HPX
#include <hpx/config.hpp>
#endif

#include <boost/version.hpp>
#if BOOST_VERSION < 104800
# error "For boost::container::vector you must use boost 1.48 or above"
Expand Down
4 changes: 4 additions & 0 deletions cds/intrusive/fcqueue.h
Expand Up @@ -6,6 +6,10 @@
#ifndef CDSLIB_INTRUSIVE_FCQUEUE_H
#define CDSLIB_INTRUSIVE_FCQUEUE_H

#if CDS_THREADING_HPX
#include <hpx/config.hpp>
#endif

#include <cds/algo/flat_combining.h>
#include <cds/algo/elimination_opt.h>
#include <cds/intrusive/options.h>
Expand Down
4 changes: 4 additions & 0 deletions cds/intrusive/fcstack.h
Expand Up @@ -6,6 +6,10 @@
#ifndef CDSLIB_INTRUSIVE_FCSTACK_H
#define CDSLIB_INTRUSIVE_FCSTACK_H

#if CDS_THREADING_HPX
#include <hpx/config.hpp>
#endif

#include <cds/algo/flat_combining.h>
#include <cds/algo/elimination_opt.h>
#include <cds/intrusive/options.h>
Expand Down
4 changes: 4 additions & 0 deletions cds/intrusive/segmented_queue.h
Expand Up @@ -6,6 +6,10 @@
#ifndef CDSLIB_INTRUSIVE_SEGMENTED_QUEUE_H
#define CDSLIB_INTRUSIVE_SEGMENTED_QUEUE_H

#if CDS_THREADING_HPX
#include <hpx/config.hpp>
#endif

#include <mutex>
#include <cds/intrusive/details/base.h>
#include <cds/details/marked_ptr.h>
Expand Down
4 changes: 4 additions & 0 deletions cds/intrusive/striped_set/boost_avl_set.h
Expand Up @@ -6,6 +6,10 @@
#ifndef CDSLIB_INTRUSIVE_STRIPED_SET_BOOST_AVL_SET_ADAPTER_H
#define CDSLIB_INTRUSIVE_STRIPED_SET_BOOST_AVL_SET_ADAPTER_H

#if CDS_THREADING_HPX
#include <hpx/config.hpp>
#endif

#include <boost/intrusive/avl_set.hpp>
#include <cds/intrusive/striped_set/adapter.h>

Expand Down
4 changes: 4 additions & 0 deletions cds/intrusive/striped_set/boost_list.h
Expand Up @@ -6,6 +6,10 @@
#ifndef CDSLIB_INTRUSIVE_STRIPED_SET_BOOST_LIST_ADAPTER_H
#define CDSLIB_INTRUSIVE_STRIPED_SET_BOOST_LIST_ADAPTER_H

#if CDS_THREADING_HPX
#include <hpx/config.hpp>
#endif

#include <boost/intrusive/list.hpp>
#include <cds/intrusive/striped_set/adapter.h>

Expand Down