From 9e0e4d5d6da9f8a66c60e169c6b28b7359d907d2 Mon Sep 17 00:00:00 2001 From: Amanieu d'Antras Date: Sat, 7 Feb 2015 16:09:53 +0000 Subject: [PATCH] Avoid the need for CMPXCHG16B by using a tagged pointer instead --- CMakeLists.txt | 15 +- README.md | 17 +- include/async++/continuation_vector.h | 232 ++++++++++++-------------- 3 files changed, 122 insertions(+), 142 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index d034ad5..5e414b3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -23,7 +23,6 @@ project(Async++ C CXX) option(BUILD_SHARED_LIBS "Build Async++ as a shared library" ON) option(USE_CXX_EXCEPTIONS "Enable C++ exception support" ON) -option(USE_AMD64_CMPXCHG16B "Use the CMPXCHG16B instruction on x86_64" ON) if (APPLE) option(BUILD_FRAMEWORK "Build a Mac OS X framework instead of a library" OFF) if (BUILD_FRAMEWORK AND NOT BUILD_SHARED_LIBS) @@ -50,7 +49,7 @@ set(ASYNCXX_INCLUDE ${PROJECT_SOURCE_DIR}/include/async++/when_all_any.h ) set(ASYNCXX_SRC - ${PROJECT_SOURCE_DIR}/src/common.h + ${PROJECT_SOURCE_DIR}/src/internal.h ${PROJECT_SOURCE_DIR}/src/fifo_queue.h ${PROJECT_SOURCE_DIR}/src/scheduler.cpp ${PROJECT_SOURCE_DIR}/src/singleton.h @@ -74,7 +73,7 @@ if (APPLE) endif() set(THREADS_PREFER_PTHREAD_FLAG ON) find_package(Threads REQUIRED) -target_link_libraries(Async++ PUBLIC ${CMAKE_THREAD_LIBS_INIT}) +target_link_libraries(Async++ PUBLIC Threads::Threads) # Set up preprocessor definitions target_include_directories(Async++ PRIVATE ${PROJECT_SOURCE_DIR}/include) @@ -107,16 +106,6 @@ if (NOT USE_CXX_EXCEPTIONS) endif() endif() -# Allow disabling the CMPXCHG16B instruction which is not available on some -# early AMD64 processors. This will result in 128-bit atomics being implemented -# using the default implementation in std::atomic, which uses a lock. -if (NOT USE_AMD64_CMPXCHG16B) - message(WARNING "The CMPXCHG16B instruction has been disabled. Make sure " - "to define LIBASYNC_NO_CMPXCHG16B in all files that include " - "async++.h because this affects the library ABI.") - target_compile_definitions(Async++ PUBLIC LIBASYNC_NO_CMPXCHG16B) -endif() - # Install the library and produce a CMake export script install(TARGETS Async++ EXPORT Async++ diff --git a/README.md b/README.md index 8cbb55e..48ac3c2 100644 --- a/README.md +++ b/README.md @@ -21,15 +21,18 @@ int main() return 42; }); auto task3 = task2.then([](int value) -> int { - std::cout << "Task 3 executes after task 2, which returned " << value << std::endl; + std::cout << "Task 3 executes after task 2, which returned " + << value << std::endl; return value * 3; }); - auto task4 = async::when_all(task1, task3).then([](std::tuple results) { - std::cout << "Task 4 executes after tasks 1 and 3. Task 3 returned " << std::get<1>(results) << std::endl; + auto task4 = async::when_all(task1, task3); + auto task5 = task4.then([](std::tuple results) { + std::cout << "Task 5 executes after tasks 1 and 3. Task 3 returned " + << std::get<1>(results) << std::endl; }); - task4.get(); - std::cout << "Task 4 has completed" << std::endl; + task5.get(); + std::cout << "Task 5 has completed" << std::endl; async::parallel_invoke([] { std::cout << "This is executed in parallel..." << std::endl; @@ -52,8 +55,8 @@ int main() // Task 1 executes asynchronously // Task 2 executes in parallel with task 1 // Task 3 executes after task 2, which returned 42 -// Task 4 executes after tasks 1 and 3. Task 3 returned 126 -// Task 4 has completed +// Task 5 executes after tasks 1 and 3. Task 3 returned 126 +// Task 5 has completed // This is executed in parallel... // with this // 01234 diff --git a/include/async++/continuation_vector.h b/include/async++/continuation_vector.h index 2f0dee5..4b945ba 100644 --- a/include/async++/continuation_vector.h +++ b/include/async++/continuation_vector.h @@ -25,6 +25,66 @@ namespace async { namespace detail { +// Compress the flags in the low bits of the pointer if the structures are +// suitably aligned. Fall back to a separate flags variable otherwise. +template +class compressed_ptr { + void* ptr; + std::uintptr_t flags; + +public: + compressed_ptr() = default; + compressed_ptr(void* ptr, std::uintptr_t flags) + : ptr(ptr), flags(flags) {} + + template + T* get_ptr() const + { + return static_cast(ptr); + } + std::uintptr_t get_flags() const + { + return flags; + } + + void set_ptr(void* p) + { + ptr = p; + } + void set_flags(std::uintptr_t f) + { + flags = f; + } +}; +template +class compressed_ptr { + std::uintptr_t data; + +public: + compressed_ptr() = default; + compressed_ptr(void* ptr, std::uintptr_t flags) + : data(reinterpret_cast(ptr) | flags) {} + + template + T* get_ptr() const + { + return reinterpret_cast(data & ~Mask); + } + std::uintptr_t get_flags() const + { + return data & Mask; + } + + void set_ptr(void* p) + { + data = reinterpret_cast(p) | (data & Mask); + } + void set_flags(std::uintptr_t f) + { + data = (data & ~Mask) | f; + } +}; + // Thread-safe vector of task_ptr which is optimized for the common case of // only having a single continuation. class continuation_vector { @@ -34,117 +94,46 @@ class continuation_vector { std::mutex lock; }; - // Internal data of the vector - struct internal_data { - // If this is true then no more changes are allowed - bool is_locked; + // Flags to describe the state of the vector + enum flags { + // If set, no more changes are allowed to internal_data + is_locked = 1, - // Indicates which element of the union is currently active - bool is_vector; - - union { - // Fast path: This represents zero (nullptr) or one elements - task_base* inline_ptr; - - // Slow path: This is used for two or more elements - vector_data* vector_ptr; - }; + // If set, the pointer is a vector_data* instead of a task_base*. If + // there are 0 or 1 elements in the vector, the task_base* form is used. + is_vector = 2 }; + static const std::uintptr_t flags_mask = 3; - // On x86_64, some compilers will not use CMPXCHG16B because some early AMD - // processors do not implement that instruction. Since these are quite rare, - // we force the use of CMPXCHG16B unless the user has explicitly asked - // otherwise using LIBASYNC_NO_CMPXCHG16B. -#if !defined(LIBASYNC_NO_CMPXCHG16B) && \ - ((defined(__GNUC__) && defined(__x86_64__) && !defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_16)) || \ - (defined(_MSC_VER) && defined(_M_AMD64))) - class atomic_data_type { - // Internal storage for the atomic data -# ifdef __GNUC__ - __uint128_t storage; -# else -# pragma intrinsic(_InterlockedCompareExchange128) - std::int64_t storage[2]; -# endif - static_assert(sizeof(internal_data) == 16, "Wrong size for internal_data"); - - public: - // These functions use memcpy to convert between internal_data and - // integer types to avoid any potential issues with strict aliasing. - // Tests have shown that compilers are smart enough to optimize the - // memcpy calls away and produce optimal code. - atomic_data_type(internal_data data) - { - std::memcpy(&storage, &data, sizeof(internal_data)); - } - internal_data load(std::memory_order) - { -# ifdef __GNUC__ - std::int64_t value[2]; - __asm__ __volatile__ ( - "movq %%rbx, %%rax\n\t" - "movq %%rcx, %%rdx\n\t" - "lock; cmpxchg16b %[storage]" - : "=&a" (value[0]), "=&d" (value[1]) - : [storage] "m" (storage) - : "cc", "memory", "rbx", "rcx" - ); -# else - std::int64_t value[2] = {}; - _InterlockedCompareExchange128(storage, value[0], value[1], value); -# endif - internal_data result; - std::memcpy(&result, value, sizeof(internal_data)); - return result; - } - bool compare_exchange_weak(internal_data& expected, internal_data desired, std::memory_order, std::memory_order) - { - std::int64_t desired_value[2]; - std::memcpy(desired_value, &desired, sizeof(internal_data)); - std::int64_t expected_value[2]; - std::memcpy(expected_value, &expected, sizeof(internal_data)); - bool success; -# ifdef __GNUC__ - __asm__ __volatile__ ( - "lock; cmpxchg16b %[storage]\n\t" - "sete %[success]" - : "+a,a" (expected_value[0]), "+d,d" (expected_value[1]), [storage] "+m,m" (storage), [success] "=q,m" (success) - : "b,b" (desired_value[0]), "c,c" (desired_value[1]) - : "cc", "memory" - ); -# else - success = _InterlockedCompareExchange128(storage, desired_value[0], desired_value[1], expected_value) != 0; -# endif - std::memcpy(&expected, expected_value, sizeof(internal_data)); - return success; - } - }; -#else - typedef std::atomic atomic_data_type; -#endif + // Embed the two bits in the data if they are suitably aligned. We only + // check the alignment of vector_data here because task_base isn't defined + // yet. Since we align task_base to LIBASYNC_CACHELINE_SIZE just use that. + typedef compressed_ptr::value & flags_mask) == 0> internal_data; // All changes to the internal data are atomic - atomic_data_type atomic_data; + std::atomic atomic_data; public: // Start unlocked with zero elements in the fast path continuation_vector() - : atomic_data(internal_data{false, false, {}}) {} + : atomic_data(internal_data(nullptr, 0)) {} // Free any left over data ~continuation_vector() { - // Converting task_ptr instead of using remove_ref because task_base + // Converting to task_ptr instead of using remove_ref because task_base // isn't defined yet at this point. internal_data data = atomic_data.load(std::memory_order_relaxed); - if (!data.is_vector) { - // If the data is locked then the inline pointer is already gone - if (!data.is_locked) - task_ptr tmp(data.inline_ptr); - } else { - for (task_base* i: data.vector_ptr->vector) + if (data.get_flags() & flags::is_vector) { + // No need to lock the mutex, we are the only thread at this point + for (task_base* i: data.get_ptr()->vector) task_ptr tmp(i); - delete data.vector_ptr; + delete data.get_ptr(); + } else { + // If the data is locked then the inline pointer is already gone + if (!(data.get_flags() & flags::is_locked)) + task_ptr tmp(data.get_ptr()); } } @@ -159,37 +148,36 @@ class continuation_vector { // Compare-exchange loop on atomic_data internal_data data = atomic_data.load(std::memory_order_relaxed); internal_data new_data; - new_data.is_locked = false; do { // Return immediately if the vector is locked - if (data.is_locked) + if (data.get_flags() & flags::is_locked) return false; - if (!data.is_vector) { - if (!data.inline_ptr) { - // Going from 0 to 1 elements - new_data.inline_ptr = t.get(); - new_data.is_vector = false; - } else { - // Going from 1 to 2 elements, allocate a vector_data - if (!vector) - vector.reset(new vector_data{{data.inline_ptr, t.get()}, {}}); - new_data.vector_ptr = vector.get(); - new_data.is_vector = true; - } - } else { + if (data.get_flags() & flags::is_vector) { // Larger vectors use a mutex, so grab the lock std::atomic_thread_fence(std::memory_order_acquire); - std::lock_guard locked(data.vector_ptr->lock); + std::lock_guard locked(data.get_ptr()->lock); // We need to check again if the vector has been locked here // to avoid a race condition with flush_and_lock - if (atomic_data.load(std::memory_order_relaxed).is_locked) + if (atomic_data.load(std::memory_order_relaxed).get_flags() & flags::is_locked) return false; // Add the element to the vector and return - data.vector_ptr->vector.push_back(t.release()); + data.get_ptr()->vector.push_back(t.release()); return true; + } else { + if (data.get_ptr()) { + // Going from 1 to 2 elements, allocate a vector_data + if (!vector) + vector.reset(new vector_data{{data.get_ptr(), t.get()}, {}}); + new_data.set_ptr(vector.get()); + new_data.set_flags(flags::is_vector); + } else { + // Going from 0 to 1 elements + new_data.set_ptr(t.get()); + new_data.set_flags(0); + } } } while (!atomic_data.compare_exchange_weak(data, new_data, std::memory_order_release, std::memory_order_relaxed)); @@ -209,24 +197,24 @@ class continuation_vector { internal_data new_data; do { new_data = data; - new_data.is_locked = true; + new_data.set_flags(data.get_flags() | flags::is_locked); } while (!atomic_data.compare_exchange_weak(data, new_data, std::memory_order_acquire, std::memory_order_relaxed)); - if (!data.is_vector) { - // If there is an inline element, just pass it on - if (data.inline_ptr) - func(task_ptr(data.inline_ptr)); - } else { + if (data.get_flags() & flags::is_vector) { // If we are using vector_data, lock it and flush all elements - std::lock_guard locked(data.vector_ptr->lock); - for (auto i: data.vector_ptr->vector) + std::lock_guard locked(data.get_ptr()->lock); + for (auto i: data.get_ptr()->vector) func(task_ptr(i)); // Clear the vector to save memory. Note that we don't actually free // the vector_data here because other threads may still be using it. // This isn't a very significant cost since multiple continuations // are relatively rare. - data.vector_ptr->vector.clear(); + data.get_ptr()->vector.clear(); + } else { + // If there is an inline element, just pass it on + if (data.get_ptr()) + func(task_ptr(data.get_ptr())); } } };